Kafka Streams Windowing - Sliding Windows

Lydtech
Sliding Window

In the Kafka Streams Windows Overview blog we called out that the defining feature of the Sliding window was that the creation of the window is tied to events entering and leaving the window duration rather than being at a fixed interval

In this section we’ll take a closer look at the behaviour of the Sliding window using the TopologyTestDriver. The test setup, and the topology share a lot in common with the Tumbling Window. If you’re not familiar with that example, then it might be worth reading that before continuing.. Kafka Streams - Tumbling Windows

Sliding window code example via tests

As with the previous window examples, we’ll reduce the outputs to make the test easier to read

All code can for this example can be found in the GitHub repo: Kafka Streams Windowing

Our event flow is very basic….

Figure 0: Event flow

Events flow in from the link.status topic, they are passed into the process() method within Sliding, and any output events are sent to the link.sliding topic. For the windowing, it’s the topology processing that we are interested in, and that’s defined in the Sliding class. let’s take a look.

The Topology


streamsBuilder.stream("link.status", Consumed.with(stringSerde, linkSerde))
                .peek((k, v) -> log.info("Mapped event: {} : {}", k, v))
                .groupByKey()
                .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(windowDuration), Duration.ofMillis(windowGrace)))
                .aggregate(() -> new LinkMonitor(),
                        this::aggregate,
                        Materialized.>as("sliding-window-link-store")
                                .withKeySerde(stringSerde)
                                .withValueSerde(linkMonitorSerde)
                )
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .map((key, value) -> KeyValue.pair(key.key(), value))
                .filter((key, linkMonitor) -> linkMonitor.getDownCount() >= linkThreshold )
                .peek((k, v) -> log.info("sliding peeky: {} : {}", k, v))
                .to("link.sliding", Produced.with(stringSerde, linkMonitorSerde));
    }

The Aggregator

Apart from performing some logic based on UP/DOWN counts to determine the number of consecutive DOWNs, there is little to learn from the aggregator that was not covered in the Tumbling window section.

The Test

Let’s look at the behaviour of the Sliding window using a test. There’s plenty to explore with the Sliding window as its behaviour is significantly different to that of the Tumbling and Hopping windows.

The full code for this test can be found on github

Setup

The initialisation of this test is pretty much identical to that of the Tumbling window, and we'll try to avoid any duplicate explanations

Tumbling Inputs & Assertions

For the sliding test, we send in 5 events. The number of events we produce for this test, has an impact on the number of windows created.


linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "a", LinkStatusEnum.DOWN), 31L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "b", LinkStatusEnum.DOWN), 40L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "c", LinkStatusEnum.DOWN), 65L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "d", LinkStatusEnum.DOWN), 75L);

linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "z", LinkStatusEnum.DOWN), 300L);

Event Z is our flushing event, but the behaviour differs from that of fixed windows, we will examine that after taking a quick look at the more obvious/simpler stuff. First let's understand why there are a lot more final events to be seen in the output...


List> list = slidingOutput.readKeyValuesToList();

assert list.size() == 8;

// The following assertions demonstrate the window grouping
assert list.get(0).value.getCodes().equals("a");
assert list.get(1).value.getCodes().equals("ab");
assert list.get(2).value.getCodes().equals("b");
assert list.get(3).value.getCodes().equals("bc");
assert list.get(4).value.getCodes().equals("c");
assert list.get(5).value.getCodes().equals("cd");
assert list.get(6).value.getCodes().equals("d");
assert list.get(7).value.getCodes().equals("z");

We can visualise these final events as..

Figure 1: sliding window test visualization

That’s a lot of windows… The Sliding window has a fixed duration, but window creation differs from that of the Hopping and Tumbling windows, as the window is created when an event is processed or a processed event becomes older than window duration. Let’s look at how those windows came into being

w1 created when event A is processed, effectively looks backwards and is the only event for that window duration. w1 contains only event A

w2 window created when event B is processed. The events in window duration are A & B

w3 event A is older than the window duration, so a new window is created. This window includes event B only

... etc...

w7 contains event Z

Cripes, Why do we have an Event Z this time?

If you can recall the Tumbling window test we explored some not entirely obvious behaviour when testing windows and suppression, this materialises as the latest events not being included in a window.

As the Sliding window is looking backwards, and we have no grace period, then the processing of event Z immediately results in a window (w7). This differs from the Tumbling and Hopping windows which are forward looking windows.

Grace Period and Suppression with the Sliding Window

There’s some interesting experimentation to be had with this test. The results above are based on a grace period of 0

If you apply a grace period, i.e. set it to 1, then the test will fail as there are now fewer events.. So what’s occurring here? Without the grace period the event is processed immediately and the final event emitted. When you apply the grace period, the final event will not be generated immediately but will wait for the arrival of any out of order events, but that is too long for the test which will have finished as soon as the last event (event Z) is processed.

Test Gotcha - Events with a Timestamp less than window duration

The first event is from 0, so in the above scenario w1 contains events A & B, there is no individual event, as it’s before 0.

Figure 2: sliding window test t0 oddity visualization

As we see in the diagram, if we start from t0, then there is no window containing only event A, window w1 contains events A & B. if we add in any events with a timestamp less than the window duration, they will all end up in window w1

Summing Up

The sliding window is a useful too and can provide efficiencies, but at the cost of some cognitive challenges. We have seen that there are notable differences to the window model, such as looking backwards, and we've also seen a couple of gotchas in the testing setup, such as the oddities of starting from t0 and the inclusion of the last event (this is understandable, but easy to get caught out on) Again, we saw how all this worked via the topologyTestDriver and exercised the logic of the window via unit tests.

For more kafka streams windows insights look no further than:

Credits Sliding (guy in the window) by Eric Benoit from Noun Project (CC BY 3.0)