Introduction
Writing comprehensive tests for a Kafka Streams application is essential, and there are multiple types of tests that should be considered by the developer before the application even reaches QA. These tests, covering unit, local integration, and component, are explored and demonstrated in this article.
The accompanying Kafka Streams Spring Boot application source code is available here.
This is one of a series of articles covering different aspects of Kafka Streams. Jump to the 'More On Kafka Streams...' section below to explore.
Test Goals
Tests should always be shifted left where possible, such that they live and run close to the code, following the principles of the Test Pyramid. The largest number of tests should be unit tests that are fine grained and written before (via Test Driven Development) or during the development of application classes. Next are local integration tests that are fewer in number and coarser grained, but ensure that end to end flows run as expected. Component tests then run the application up in a container, with fewer coarse grained tests that treat it as a black box, verifying the configuration and deployment locally, on top of the end to end flows.
As well as the ability to run these tests locally on the developer’s machine, these tests are automated in the pipeline. The pipeline will of course fail if code is committed that causes any tests to fail. However, the fact that each developer runs these comprehensive tests that test different characteristics of the application means that more often than not they get the immediate feedback when something has broken, and do not have to wait for the slow feedback loop of a failed pipeline. Further to this the developer can have high confidence that there should not be deployment/configuration issues as they are replicating this aspect locally in the component tests.
Example Application
As detailed in the Kafka Streams: Spring Boot Demo article, the Spring Boot application that is used here to demonstrate testing techniques simulates receiving payment events from Kafka, processing them and emitting the results to outbound rails topics. KStreams are used to filter, transform, branch and merge the payments. A KTable is used to perform aggregations on the account balances as the payments flow through the application.
Unit Testing With TopologyTestDriver
Test Overview
The Kafka Streams API provides an org.apache.kafka.streams.TopologyTestDriver to easily exercise the topology. This simulates the library running, ingesting, processing and emitting events continuously. These tests can be developed before or during the development of the Streams application, providing immediate feedback to the developer of any bugs.
Figure 1: Unit Testing
Test Example
Test Setup
The test configures the driver with the payment topology:
@Test
void testPaymentTopology() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
paymentTopology.buildPipeline(streamsBuilder);
Topology topology = streamsBuilder.build();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);
The properties required to simulate the configuration of the real streams applications are set here, in this case providing the default serdes classes for serialization and deserialization.
The test then provides the driver with the inbound and outbound test topics. This shows the definition of the inbound topic, which is provided with the necessary serializers to use for the keys and values:
TestInputTopic inputTopic = topologyTestDriver
.createInputTopic(PAYMENT_INBOUND_TOPIC, new StringSerializer(), PaymentSerdes.serdes().serializer());
As the inbound event is unmarshalled to a PaymentEvent PoJO, a custom serializer is used. The outbound topics are likewise provided with the necessary deserializers.
Test Execution
The tests are run as part of the standard maven 'test' phase:
mvn clean test
Test Flow
The payment events are sent in to the input topic, and the test then queries the resulting outbound events to assert that they are as required.
PaymentEvent payment1 = buildPaymentEvent(...);
inputTopic.pipeInput(payment1.getPaymentId(), payment1);
[...]
assertThat(railsFooOutputTopic.readKeyValuesToList(), hasItems(...));
The test also uses the driver to query the balance state store to assert the aggregation using the KTable has worked as expected.
KeyValueStore balanceStore = topologyTestDriver.getKeyValueStore("balance");
assertThat(balanceStore.get(ACCOUNT_GBP_ABC), equalTo(210L));
The full topology test is here.
Integration Testing With SpringBootTest
Test Overview
While the TopologyTestDriver should be the main port of call for testing the topology, a SpringBoot integration can be used to verify that events sent to Kafka are consumed and processed successfully by the application, and that the expected outbound events are written to Kafka. This proves the Spring components are being correctly wired based on the application configuration, and that it is able to connect to and interact with a running Kafka instance.
The spring-kafka-test module provides an in-memory broker to use in these tests, org.springframework.kafka.test.EmbeddedKafkaBroker. This is highly configurable in line with a real Kafka broker instance allowing testing many different scenarios.
Figure 2: Integration Testing
Test Example
Test Setup
EmbeddedBroker
To enable the embedded broker, simply annotate the Spring Boot test with the @EmbeddedKafka annotation:
@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = { KafkaStreamsDemoConfiguration.class } )
@EmbeddedKafka(controlledShutdown = true, topics = { "payment-topic", "rails-foo-topic", "rails-bar-topic" })
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@ActiveProfiles("test")
public class KafkaStreamsPaymentIntegrationTest {
As can be seen, the topics used by the test are defined as an annotation parameter. The topics are required to be created upfront for Kafka Streams, as unlike for the consumer/producer Kafka API, even if they are configured as auto.create.topics.enable: true (which is the default) they are never automatically created. If they are not present the application will throw the following error:
MissingSourceTopicException: One or more source topics were missing during rebalance
This is a good example of a potential issue that a SpringBoot integration test would uncover. The value in this kind of test is that it shows the actual broker behaviour. This configuration issue would not be uncovered by the TopologyTestDriver unit test. The integration test is still close to the code, so the developer feedback is quick.
When the test is executed the embedded broker starts. To ensure it is ready before the individual tests get underway, the setUp() method waits for the topic partitions to be assigned:
registry.getListenerContainers().stream().forEach(container ->
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic()));
Test Consumers
Kafka Consumers are defined to simulate the bank rails applications that would be receiving the payment events published to the rails topics. For example, this consumer receives payments sent to the FOO rails topic:
public static class KafkaFooRailsListener {
AtomicInteger counter = new AtomicInteger(0);
AtomicLong total = new AtomicLong(0);
@KafkaListener(groupId = "KafkaStreamsIntegrationTest", topics = "rails-foo-topic", autoStartup = "true")
void receive(@Payload final String payload, @Headers final MessageHeaders headers) {
log.debug("KafkaFooRailsListener - Received message: " + payload);
PaymentEvent payment = MarshallingUtil.readValue(payload, PaymentEvent.class);
total.addAndGet(payment.getAmount());
counter.incrementAndGet();
}
}
The consumers track the number of events received and their totals, so they can be asserted in the test.
Test Execution
The tests are run via the maven-surefire-plugin defined in the pom, as part of the 'test' phase:
mvn clean test
Test Flow
The test sends a number of payment events to the inbound payment-topic:
PaymentEvent payment1 = buildPaymentEvent(...);
sendMessage(PAYMENT_TEST_TOPIC, payment1);
The Awaitility library is used to check and wait as necessary until the expected number of events have been received by the consumers.
Awaitility.await().atMost(10, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS)
.until(fooRailsReceiver.counter::get, equalTo(3));
Then the payment amount totals that have been tracked in the test consumers can be asserted.
assertThat(fooRailsReceiver.total.get(), equalTo(210L));
The application exposes a REST endpoint that allows a client to query for the balance of a given account. The test calls this endpoint to assert that the aggregated balances are correct.
ResponseEntity responseAbc = restTemplate.getForEntity("/v1/kafka-streams/balance/"+ACCOUNT_GBP_ABC, String.class);
assertThat(responseAbc.getStatusCode(), equalTo(HttpStatus.OK));
assertThat(responseAbc.getBody(), equalTo("210")); // Payments: 100 + 60 + 50.
The full integration test is here.
Component Testing With TestContainers
Test Overview
Component tests are the coarsest grained test in the developer’s toolset demonstrated here. They treat the application as a blackbox similar to the integration test, but will typically run against real components such as databases and brokers, and mocks and simulators for third party services. The accompanying application demonstrates running up one (or optionally more) instance of the application in a Docker container along with a Dockerised Kafka/Zookeeper instance. The TestContainers library is used to manage the Docker containers. The tests use TestContainers to spin up the containers, and on completion of the test run TestContainers takes care of cleaning them up.
Figure 3: Component Testing
An important element of these tests is that they prove the deployment works, and everything wires up correctly to the external resources. Using Docker containers on the developer’s machine means this all happens close to the code. There is no need to wait until the application is deployed into a QA or other remote environment before discovering these kinds of issues.
Test Example
Test Setup
A Dockerfile is created that creates the Docker container for the application:
FROM openjdk:11.0.10-slim
ARG JAR_FILE=target/*.jar
COPY ${JAR_FILE} app.jar
ENTRYPOINT ["sh", "-c", "java ${JAVA_OPTS} -jar /app.jar"]
Lydtech’s open source library component-test-framework is responsible for starting and managing the TestContainers instances. Usage of the library is detailed in the library’s ReadMe. The dependency is declared in the kafka-streams pom:
<dependency>
<groupId>dev.lydtech</groupId>
<artifactId>component-test-framework</artifactId>
<version>1.3.0</version>
<scope>test</scope>
</dependency>
The component test is annotated with the following annotation from this library:
@ExtendWith(TestContainersSetupExtension.class)
public class KafkaStreamsCT {
The TestContainersSetupExtension is the JUnit5 extension that is used to provide a hook into the component test framework to bring up the Docker containers before the tests themselves run.
Test Execution
The tests are run via the maven-surefire-plugin defined in the 'component' profile of the pom:
mvn test -Pcomponent
Test Flow
Similar to the integration test the component test produces payment events to the inbound payment-topic that this time resides in the Dockerised Kafka broker.
KafkaClient.getInstance().sendMessage(PAYMENT_TOPIC, payment.getPaymentId(), payment);
This topic is subscribed to by the application consumer, and the events are consumed and processed by the Kafka Streams flow.
The test then uses Kafka consumers to consume the events emitted to the outbound topics by the application and asserts for correctness.
fooRailsConsumer = KafkaClient.getInstance().createConsumer(RAILS_FOO_TOPIC);
KafkaClient.getInstance().consumeAndAssert("MultiplePayments", fooRailsConsumer, fooRailsCount.get(), 3);
The tests use the RestAssured HTTP client library to call the REST endpoint on the Dockerised application to query the account balances.
get("/v1/kafka-streams/balance/"+ACCOUNT_XXX).then().assertThat()
.statusCode(200)
.and()
.body(equalTo("100"));
One further facet of the component tests is the ability to test against multiple instances of the application. This can uncover different or unexpected behaviour and bugs that would not be found with the unit or integration tests. The component-test-framework is configured with the number of application instances to start, and again TestContainers takes care of starting and managing these instances.
If this application has two instances started the component test uncovers the fact that the use of the REST endpoint to query the account balances is flawed. This will only return the balance captured in the state store for the instance that is servicing the REST request. This may or may not hold the correct balance for the account being queried, depending on whether it is that application instance’s consumer that has consumed and processed the payments for the account being queried.
The full component test is here.
Conclusion
The combination of developer tests at different levels in the test pyramid ensure that a Kafka Streams application is fully tested and behaving as it should before it reaches QA. The goal of these tests is to achieve quality, along with fast feedback to the developer by writing the tests as close to the code as possible.
The Kafka Streams test library enables the writing of unit tests to assert that the topology works as it should. The Spring Boot test module and the Kafka embedded broker provided by the Spring Kafka test module mean that local integration tests where the application context is loaded prove that the stream processing can be triggered with events sent to/from Kafka. TestContainers that spin up the application and Kafka in docker containers enable testing the application as a black box, proving both end to end flows and that locally the deployment is correct.
Source Code
The source code for the accompanying Kafka Streams application is available here:
https://github.com/lydtechconsulting/kafka-streams/tree/v1.3.0
More On Kafka Streams...
The Kafka Streams: Introduction article delves into the API and covers its benefits and characteristics.
The Kafka Streams: Spring Boot Demo article details the application that is under test.
The Kafka Streams: State Store article looks at the role and usage of the state store in a stateful Kafka Streams application.
The Kafka Streams: Transactions & Exactly-Once Messaging article detailing how to achieve exactly-once messaging using Kafka Transactions with a Kafka Streams application.
View this article on our Medium Publication.