Kafka Streams Windowing - Tumbling Windows

Lydtech
Tumbling Window

Introduction

In the article Kafka Streams Windows Overview we defined a Tumbling window as a sequence of consecutive timeframes which do not overlap.

In this section we’ll take a closer look at the behaviour of a Tumbling window and explore the windows behaviour via a simple test suite.

Tumbling Window code example and Test

For the windowing operations we will be using the suppress() functionality as we’re only interested in the result of the window aggregations and not the individual events flowing through the window. The use of suppress() is not mandatory when using windows.

All code can for this example can be found on github: Kafka Streams Windowing

Our event flow is very basic….

Figure 0: Event flow

Events flow in from the link.status topic, they are passed into the process() method within Tumbling, and any output events are sent to the link.tumbling topic. For the windowing, it’s the topology processing that we are interested in, and that’s defined in the Tumbling class. let’s take a look.

The Topology

Windowing, aggregation, suppression and much more is defined in the topology. The topology IS the heart of our app, so let’s look at the topology for our Tumbling processor


streamsBuilder.stream("link.status", Consumed.with(stringSerde, linkSerde))
                .peek((k, v) -> log.info("Mapped event: {} : {}", k, v))
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(windowDuration), Duration.ofMillis(windowGrace)))
                .aggregate(() -> new LinkSummary(),
                        this::aggregate,
                        Materialized.>as("tumbling-window-link-store")
                                .withKeySerde(stringSerde)
                                .withValueSerde(linkSummarySerde)
                )
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .map((key,value) -> KeyValue.pair(key.key(), value))
                .peek((k, v) -> log.info("tumbling peeky: {} : {}", k, v))
                .to("link.tumbling", Produced.with(stringSerde, linkSummarySerde));

A summary of the data types flow would be…. Link(s) -> process -> LinkSummary

The input to the flow is a Link event, and the output is a LinkSummary event.

stream - Identifies the source topic for our topology, we also identify the serializers and deserializers required for the key and value of the events on the topic groupByKey - Because we’re going to run an aggregate on the events within a window we must first group the data. We’re using groupByKey as the alternative groupBy results in partitioning..

windowedBy - As we’re looking at a tumbling window, we only need to specify the window duration and the grace period.

aggregate - Aggregate allows us to perform calculations on events during the window timeframe, unlike reduce (used to combine multiple values into a single value), the use of aggregate allows us to return a different type than the input types.
For the aggregation we provide:

  • an initializer, in our case this is an ‘empty’ LinkSummary object.
  • The aggregator method (specified below)
  • A store

For the store we’ll default to using rocksDb as the window store, for which we specify the serializers and deserializers for both key and value.

suppress - We use suppression with the windowing as this allows us to only see the final event of a window aggregation, greatly reducing the noise when we look at the events during test.

toStream - Both aggregate and suppress work with a KTable, we need to convert this back to a KStream for the subsequent event omission

toMap - used to marry up the key and the summary value for our final event

to - specifies the output topic to emit the final event, we provide serializer and a deserializer for the output key and value

Taking a peek at runtime

One useful method to help with the development of your streams app is the peek method. It is included in the flow for example purposes, but have no impact on the data passing through. Simply, peek allows you to see view the event without terminating the flow. Peek can be a useful tool during development.


.peek((k, v) -> log.info("quick peek: {} : {}", k, v))

Aggregator

Our aggregator summarises the events that have passed through the window.
Counts for LinkUp and LinkDown events A toggle count to show how unstable the link has been And perhaps most useful for the demonstration is that the codes for each event in the window are concatenated in the summary.


private LinkSummary aggregate(String key, Link link, LinkSummary linkSummary) {
Long upCount = linkSummary.getUpCount();
Long downCount = linkSummary.getDownCount();
Long toggleCount = linkSummary.getToggleCount();
String codes = linkSummary.getCodes();
LinkStatusEnum status = linkSummary.getStatus();

        if (codes == null) codes = "";

        codes=codes.concat(link.getCode());

        if (link.getStatus() == LinkStatusEnum.DOWN) downCount++; else upCount++;

        // first window status will be null so do not increment toggleCount
        // if the status has changed increment toggleCount
        if (status != null && link.getStatus() != status) {
            toggleCount++;
        }

        return LinkSummary.builder()
                .name(link.getName())
                .downCount(downCount)
                .upCount(upCount)
                .codes(codes)
                .toggleCount(toggleCount)
                .status(link.getStatus())
                .build();
    }

The Test

There's a little bit of leg work required before we can exercise the test, so we'll look at the Test Setup, before looking at the actual test itself.

The full code for this test can be found on github: Tumbling Test

Test Setup

The best way to examine the behaviour of any software is via testing, so let’s take a look at how we’ve setup a test to explain the window behaviour


StreamsBuilder streamsBuilder;

    Serde linkSerde = new LinkSerde();
    Serde linkSummarySerde = new LinkSummarySerde();
    Serde stringSerde = new Serdes.StringSerde();

    TopologyTestDriver topologyTestDriver;
    private TestInputTopic linkStatusInput;
    private TestOutputTopic tumblingOutput;

    @BeforeEach
    public void setup(){

        streamsBuilder = new StreamsBuilder();

        Tumbling tumbling = new Tumbling(10L, 0L);
        tumbling.process(streamsBuilder);
        final Topology topology = streamsBuilder.build();

        topologyTestDriver = new TopologyTestDriver(topology);
        linkStatusInput = topologyTestDriver.createInputTopic("link.status", stringSerde.serializer(),
                linkSerde.serializer());
        tumblingOutput = topologyTestDriver.createOutputTopic("link.tumbling", stringSerde.deserializer(),
                linkSummarySerde.deserializer());
    }

    @AfterEach
    public void tearDown() {
         if (topologyTestDriver != null) {
             topologyTestDriver.close();
         }
    }

A few key points from the setup...

Tumbling - we first create a Tumbling instance, with a window duration of 10, and a grace period of 0

topology - is created by building the streamsBuilder which we primed on the previous line with the Tumbling topology. This topology now contains the processing logic we defined in the Tumbling class

topologyTestDriver - we create an instance of the TopologyTestDriver, and pass in the topology derived from the streamsbuilder, and it's from this test driver that we can create entry and exit points for the topology. We map the input and output topics, in this case we have named the linkStatusInput and tumblingOutput. The topic names must match the topic names specified in the topology

Inputs & Assertions

Using the input from the topologyTestDriver we can pump in a number of events with specific timestamps. We provide the Key, the Value (in this case a Link type), and the timestamp. We’re using timestamps in milliseconds here, the test starts from zero. This means that we can specify exactly when the event is received, and needn’t worry about any grace period.


linkStatusInput.pipeInput("Link 1", createLink("Link1", "a"), 2L);
linkStatusInput.pipeInput("Link 1", createLink("Link1", "b"), 6L);
linkStatusInput.pipeInput("Link 1", createLink("Link1", "c"), 10L);
linkStatusInput.pipeInput("Link 1", createLink("Link1", "d"), 16L);
linkStatusInput.pipeInput("Link 1", createLink("Link1", "e"), 32L);
linkStatusInput.pipeInput("Link 1", createLink("Link1", "f"), 35L);
linkStatusInput.pipeInput("Link 1", createLink("Link1", "z"), 40L);

Once the events are input we can assert on the outputs. We are simply confirming which summary events are produced for each window.


List> list = tumblingOutput.readKeyValuesToList();

assert list.size() == 3;

// The following assertions demonstrate the window grouping
assert list.get(0).value.getCodes().equals("ab");
assert list.get(1).value.getCodes().equals("cd");
assert list.get(2).value.getCodes().equals("ef");

A diagram gives us a clearer view as to how a tumbling window works.

Figure 1: tumbling window test visualization

6 events are captured by the processor.
Technically 7, but the 7th event just closes off the test. (This is a distraction for the moment - we will cover this at the end of the explanation - see: So what happened to event Z)

The window duration is 10, the grace period is 0

The results of the test confirms that 3 LinkSummary aggregations are emitted, the events contained within the window are identified via the ‘code’ field W1 - events A & B W2 - events C & D W4 - events E & F

Let’s add some explanation to those results Event A lands at t2 and event B lands at t7. Window w1 starts at the epoch and last for 10. Tumbling windows are inclusive of start time and exclusive of end time, so here we see that w1 contains only events A and B. Event C occurs at t10 and is therefore captured in window w2 only.

With tumbling windows it is not possible for an event to occupy more than 1 window.

Event D occurs at t16 and is also captured by window w2.

We see that during window w3 no events occur. Despite this the window is still created, but we will see no LinkSummary event omitted. Not even an event suggesting that nothing has happened.

Finally, we see that events E and F occur within window w4, these effectively highlight that life continues after the uneventful w3 window.

So what happened to event Z? - Suppression and Testing As you recall we sent in 7 events, but capture and report on only 6, why is this?

We are using suppress to only produce a final event per window, specifically we are suppressing the emission of this event until the window closes. In this test each event has a timestamp, this gives us fine grained control over the windowing of each event. However, because we are using suppression we need to send in a later event to flush the windows. This is effectively what event Z is doing. Event Z is the final event in the test to be processed and no more final events are created as windowing has stopped.

Event Z must occur after the final windowing period that you wish to observe, otherwise no final event for that window will be emitted. For example, If event Z was in the same window as w4, e.g if event Z occurred at t38, then there would be no final event for w4. The test would only output 2 final events.

Grace Period & Suppression

There’s more fun to be had when we take the grace period into account. For the basic test we had a grace period of 0. This keeps things simple to build a basic understanding of windowing. But how does the grace period affect our test?

If we set the grace period to 1, we will see that the test fails as only 2 final events are emitted.
This is because the window is waiting for the grace period before it creates its final event. However, before this time has elapsed, event Z has been processed and the test stops, leaving no time for the window w4 to create its final event.

Experiment with the test, and see how this impacts the relationship between the time of event Z and the grace period.

caution, that’s not all... There’s a variation on this theme when we get to Sliding windows, but for Tumbling windows, this could help explain any perceived anomalies you may encounter during testing.

Multiple keys

The previous test shows how windows work with single keys, let’s expand this example a little and introduce a second key

linkStatusInput.pipeInput("Link 1", createLink("Link1", "a"), 0L); linkStatusInput.pipeInput("Link 1", createLink("Link1", "b"), 6L); linkStatusInput.pipeInput("Link 2", createLink("Link2", "a"), 4L); linkStatusInput.pipeInput("Link 2", createLink("Link2", "b"), 6L);

Test demonstrateTumblingWindowMultipleKeys uses multiple keys. We can visualise it as follows….

Figure 1: tumbling window with multiple keys

The test has 2 keys, in the diagram, these are represented as Green and Red, in the code they are Link1 and Link2

When we run this test, we see that we get 3 linkSummary events generated. W1 provides two events, one for each key encountered during the window.
W2 only has events from 1 key so only produces a single aggregation event.

Summing Up

The tumbling window is perhaps the simplest of all the window types to understand, and shares a lot of the same concepts as the other window types. We've seen that for windowing we need to group our inputs, ideally by key to prevent repartioning. We've also worked with aggregators that unlike reduce, allows us to output a different type than the input events. And we controlled the outputs using the suppress, which prevents the input events also being emitted. It was seen that Tumbling windows do not share events, you will never find the same event being captured in 2 different windows. Finally, we saw how all this worked via the topologyTestDriver and exercised the logic of the window via unit tests.

For more kafka streams windows insights look no further than:

Credits cartwheel icon by Andrew Doane from Noun Project (CC BY 3.0)