Kafka Streams Windowing - Hopping Windows

Lydtech
Hopping Window

In the Kafka Streams Windows Overview blog we learned that Hopping windows are similar to Tumbling windows, but a new window is created before the previous window closes.

Here we will take a closer look at the behaviour of the Hopping window using the TopologyTestDriver. The test setup and the topology for this Hopping window example share a lot in common with the Tumbling Window. If you’re not familiar with that example, then I would recommend reading through that before continuing Kafka Streams - Tumbling Windows

Hopping window code example via tests

For the windowing operations we will be using the suppress functionality to add clarity to the outputs as we’re only interested in the result of the window aggregations and not the individual events flowing through the window.

All code can for this example can be found on github in the Kafka Streams Windowing repo.

Our event flow for this example is very basic….

Figure 0: Event flow

Events flow in from the link.status topic, they are passed into the process() method within Hopping, and any output events are sent to the link.hopping topic.

For the windowing, it’s the topology processing that we are interested in, and that’s defined in the Hopping class. let’s take a look.

The Topology

Windowing, aggregation, suppression all form part of the topology. There is a lot that is familiar from the Tumbling topology referenced earlier, so we’ll skip over the duplication and focus on the window specifics…


streamsBuilder.stream("link.status", Consumed.with(stringSerde, linkSerde))
                .peek((k, v) -> log.info("Mapped event: {} : {}", k, v))
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(windowDuration)).advanceBy(Duration.ofMillis(windowAdvance)))
                .aggregate(() -> new LinkSummary(),
                        this::aggregate,
                        Materialized.>as("hopping-window-link-store")
                                .withKeySerde(stringSerde)
                                .withValueSerde(linkSummarySerde)
                )
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .map((key, value) -> KeyValue.pair(key.key(), value))
                .peek((k, v) -> log.info("hopping peeky: {} : {}", k, v))
                .to("link.hopping", Produced.with(stringSerde, linkSummarySerde));
    }

The only significant difference between the tumbling topology and our hopping topology is the windowedBy usage.
As we’re looking at using a hopping window, we need to specify both the window duration and the advance. We can provide grace period if we so wanted, but we are explicity not doing so in this example by using the method ofSizeWithNoGrace

advance - The advance defines the period between window creation, we’ll see this in action during the test, but we can also see this in the following illustration

Figure 1: hopping window advance

The above hopping window setup has an advance of 5. when window w1 is created at t20, window w2 will be created t5 later at t25

The Aggregator

Our aggregator summarises the events that have passed through the window.
The aggregator does a number of calculations but perhaps most useful for the demonstration is that the codes for each event (in the test these are a,b,c,d) in the window are concatenated in the summary.

If you want more detail on the aggregator, it is the same aggregator as used in the Tumbling topology and more details can be found there.

The Test

Setup

The initialisation of this test is pretty much identical to that of the Tumbling window, and we'll try to avoid any duplicate explanations

The full code for this test can be found on github

Hopping Inputs & Assertions

Using the input from the topologyTestDriver we can pump in a number of events with specific timestamps. We provide the Key, the Value (in this case a link), and the timestamp. We’re using timestamps in milliseconds here, the test starts from 0.


linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "a"), 0L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "b"), 7L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "c"), 10L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "d"), 15L);

Once the events are input we can assert on the outputs. We are simply confirming which summary events are produced for each window. The concatenation of the event code fields is confirmed in the assertions.


List> list = hoppingOutput.readKeyValuesToList();

assert list.size() == 2;

// The following assertions demonstrate the window grouping
assert list.get(0).value.getCodes().equals("ab");
assert list.get(1).value.getCodes().equals("bc");

A diagram gives us a clearer view as to how a hopping window works.

Figure 2: hopping window test visualization
  • 2 events are captured by the processor.
  • The window duration is 10, the advance is 5

The results of the test confirm that 2 LinkSummary aggregations are emitted, the events contained within the windows are identified via the ‘code’ field. We can see that window w1 contains events A & B, whereas window w2 contains events B & C

Note that event C which occurs at t10 is not included in window w1, this is because hopping windows - like tumbling windows - are exclusive of end time.

It’s also worth noting that event B is in both windows w1 & w2. This demonstrates that with hopping windows it is possible for an event to be present in more than 1 window.

Summing Up

The hopping window builds on the Tumbling window and adds that extra bit of complexity with the overlapping windows, but still shares the rest of the window behaviours We specify advance to determine the window increment. Overlapping windows means that an event can appear in more multiple windows.

Again, we saw how all this worked via the topologyTestDriver and exercised the logic of the window via unit tests.

For more kafka streams windows insights look no further than:

Credits Jumping Businessman (hopping guy in the window) by Gan Khoon Lay from Noun Project (CC BY 3.0)