In the Kafka Streams Windows Overview blog we learned that Hopping windows are similar to Tumbling windows, but a new window is created before the previous window closes.
Here we will take a closer look at the behaviour of the Hopping window using the TopologyTestDriver. The test setup and the topology for this Hopping window example share a lot in common with the Tumbling Window. If you’re not familiar with that example, then I would recommend reading through that before continuing Kafka Streams - Tumbling Windows
For the windowing operations we will be using the suppress functionality to add clarity to the outputs as we’re only interested in the result of the window aggregations and not the individual events flowing through the window.
All code can for this example can be found on github in the Kafka Streams Windowing repo.
Our event flow for this example is very basic….
Events flow in from the link.status topic, they are passed into the process() method within Hopping, and any output events are sent to the link.hopping topic.
For the windowing, it’s the topology processing that we are interested in, and that’s defined in the Hopping class. let’s take a look.
Windowing, aggregation, suppression all form part of the topology. There is a lot that is familiar from the Tumbling topology referenced earlier, so we’ll skip over the duplication and focus on the window specifics…
streamsBuilder.stream("link.status", Consumed.with(stringSerde, linkSerde))
.peek((k, v) -> log.info("Mapped event: {} : {}", k, v))
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(windowDuration)).advanceBy(Duration.ofMillis(windowAdvance)))
.aggregate(() -> new LinkSummary(),
this::aggregate,
Materialized.>as("hopping-window-link-store")
.withKeySerde(stringSerde)
.withValueSerde(linkSummarySerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.peek((k, v) -> log.info("hopping peeky: {} : {}", k, v))
.to("link.hopping", Produced.with(stringSerde, linkSummarySerde));
}
The only significant difference between the tumbling topology and our hopping topology is the windowedBy
usage.
As we’re looking at using a hopping window, we need to specify both the window duration
and the advance
. We can provide grace period
if we so wanted, but we are explicity not doing so in this example by using the method ofSizeWithNoGrace
advance - The advance defines the period between window creation, we’ll see this in action during the test, but we can also see this in the following illustration
The above hopping window setup has an advance of 5. when window w1 is created at t20, window w2 will be created t5 later at t25
Our aggregator summarises the events that have passed through the window.
The aggregator does a number of calculations but perhaps most useful for the demonstration is that the codes
for each event (in the test these are a,b,c,d) in the window are concatenated in the summary.
If you want more detail on the aggregator, it is the same aggregator as used in the Tumbling topology and more details can be found there.
The initialisation of this test is pretty much identical to that of the Tumbling window, and we'll try to avoid any duplicate explanations
The full code for this test can be found on github
Using the input from the topologyTestDriver we can pump in a number of events with specific timestamps. We provide the Key, the Value (in this case a link), and the timestamp. We’re using timestamps in milliseconds here, the test starts from 0.
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "a"), 0L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "b"), 7L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "c"), 10L);
linkStatusInput.pipeInput("testLink 1", createLink("testLink1", "d"), 15L);
Once the events are input we can assert on the outputs. We are simply confirming which summary events are produced for each window. The concatenation of the event code fields is confirmed in the assertions.
List> list = hoppingOutput.readKeyValuesToList();
assert list.size() == 2;
// The following assertions demonstrate the window grouping
assert list.get(0).value.getCodes().equals("ab");
assert list.get(1).value.getCodes().equals("bc");
A diagram gives us a clearer view as to how a hopping window works.
The results of the test confirm that 2 LinkSummary aggregations are emitted, the events contained within the windows are identified via the ‘code’ field. We can see that window w1 contains events A
& B
, whereas window w2 contains events B
& C
Note that event C
which occurs at t10 is not included in window w1, this is because hopping windows - like tumbling windows - are exclusive of end time.
It’s also worth noting that event B
is in both windows w1 & w2. This demonstrates that with hopping windows it is possible for an event to be present in more than 1 window.
The hopping window builds on the Tumbling window and adds that extra bit of complexity with the overlapping windows, but still shares the rest of the window behaviours
We specify advance
to determine the window increment.
Overlapping windows means that an event can appear in more multiple windows.
Again, we saw how all this worked via the topologyTestDriver and exercised the logic of the window via unit tests.
For more kafka streams windows insights look no further than:
Credits Jumping Businessman (hopping guy in the window) by Gan Khoon Lay from Noun Project (CC BY 3.0)