Micronaut (6 of 6): Kafka Integration

Lydtech
Micronaut (6 of 6): Kafka Integration

Introduction

In this final article in the series the demo Micronaut application is adapted to showcase integrating with a Kafka messaging broker. The Micronaut-Kafka add-on module is used to provide the integration between the Micronaut application and Kafka. The unit, integration and component tests are updated to test the application flows and its integration with the broker.

The companion application, written both in Kotlin and in Java, has been adapted to demonstrate the integration with Kafka, and is available in these repos:

The source code for the companion application, in Kotlin and Java, is available here: [Kotlin version | Java version].

This is the final article in a six part series on the Micronaut framework.

  1. Framework Features
  2. Demo Application
  3. Testing
  4. Native Builds
  5. Postgres Integration
  6. Kafka Integration (this article)

Demo Application

The demo application consumes inbound messages from the Kafka broker, from the demo-inbound topic. It processes each message, and emits a corresponding message to the demo-outbound topic.

Figure 1: Consuming and producing messages with Kafka

Figure 1: Consuming and producing messages with Kafka

Kafka Client Library

To enable the application to talk to Kafka, Micronaut's micronaut-kafka client library is used. This enhances the Micronaut framework with Kafka integration capabilities. Developers can create Kafka consumers and producers with minimal configuration and boilerplate code. Annotations are used to define the consumers and producers, making it straightforward to send messages to, and receive messages from, Kafka.

High-level abstractions handle the complexities around the message processing. Error handling strategies and retry mechanisms are provided that can be configured and customised. Micronaut's configuration management provides seamless configuration integration, so Kafka clients are straightforward to configure, such as defining key/value serializers and deserializers, timeouts, security configurations, and much more.

Asynchronous message processing is supported, aligning with Micronaut's reactive architecture. It supports Kafka transactions, so the production and consumption of messages can be guaranteed to be atomic. It integrates with Micronaut's metrics capabilities to provide monitoring of the Kafka clients, surfacing metrics like message throughput and consumer lag.

Application Changes

Overview

This application is in its third incarnation, following the previous demonstrations of Micronaut's support for REST in the second article in the series, Micronaut: Demo Application, and integrating with Postgres in the fifth article in the series Micronaut: Postgres Integration. To focus on the Kafka integration capabilities of Micronaut, the REST API and Postgres integration has been removed, and in its place the application now consumes messages from and produces messages to Kafka.

Consumer

A consumer, DemoConsumer [Kotlin | Java], has been introduced that is responsible for handling the messages that are received from Kafka. The micronaut-kafka library does the heavy lifting in terms of polling Kafka for batches of messages, and it hands these off this handler. The class is annotated with @KafkaListener and a method (Java) or function (Kotlin) is declared that is annotated with @Topic, and it specifies which topic it should consume from. The method/function takes the message as an argument, and in this case it delegates to a service class for the actual message processing. This is the Kotlin version of the consumer:

@Singleton
@KafkaListener(groupId = "demo-group-id")
class DemoConsumer(private val demoService: DemoService) {
   companion object {
       private val log = LoggerFactory.getLogger(this::class.java)
   }

   @Topic("demo-inbound-topic")
   fun receive(demoInboundEvent: DemoInboundEvent) {
       log.info("Received message with id: " + demoInboundEvent.id)
       try {
           demoService.process(demoInboundEvent)
       } catch (e: Exception) {
           log.error("Error processing message: " + e.message)
       }
   }
}

Service

The service, DemoService [Kotlin | Java], creates an outbound event for each inbound event it is passed. It then passes this to the producer to emit this to Kafka.

fun process(demoInboundEvent: DemoInboundEvent) {
   val demoOutboundEvent = DemoOutboundEvent(randomUUID().toString(), "Processed data: " + demoInboundEvent.data)
   demoProducer.sendOutbound(demoOutboundEvent)
   log.info("Sent outbound event for consumed event with id: {}", demoInboundEvent.id)
}

Producer

The producer, DemoProducer [Kotlin | Java], is defined as an interface, and only the function/method is declared. It is annotated with @KafkaClient to tell Micronaut that this interface should be treated as a Kafka client, and the functions/methods are intended to produce messages to Kafka topics. Micronaut handles the configuration and instantiation of the Kafka producer based on this annotation.

@KafkaClient
interface DemoProducer {

   @Topic("demo-outbound-topic")
   fun sendOutbound(demoOutboundEvent: DemoOutboundEvent)
}

The @Topic annotation specifies the topic to which the messages will be sent. The function/method itself takes a parameter of type DemoOutboundEvent which Micronaut will serialise and send to the topic. No further coding is required by the developer, as Micronaut handles this.

Kafka Configuration

The application properties [Kotlin | Java] are updated with the configuration for the bootstrap servers. This is all that is required to enable the Kafka consumer and producer to be able to connect to the Kafka broker. The micronaut-kafka library then takes care of creating the connection.

kafka:
   bootstrap:
       servers: localhost:9092

Further configurations can be added here, which micronaut-kafka will apply.

Test Changes

Overview

The unit, integration, and component tests for the original demo Micronaut application are covered in detail in the third article in the series Micronaut: Testing. This section covers the tests that are required now that the application integrates with Kafka.

Unit Tests

The DemoConsumerTest [Kotlin | Java] tests the DemoConsumer [Kotlin | Java] to ensure that it handles the message it consumes and delegates processing to the service. The Kotlin version uses the mockk mocking library, and the Java version uses Mockito to mock the service.

class DemoConsumerTest {
   private val serviceMock = mockk<DemoService>()
   private lateinit var consumer: DemoConsumer

   @BeforeEach
   fun setUp() {
       clearAllMocks()
       consumer = DemoConsumer(serviceMock)
   }

The receive function/method is then tested, which verifies that the service mock is called with the consumed event:

@Test
fun testReceive() {
   val demoInboundEvent = DemoInboundEvent("1", "Test Data")
   every { serviceMock.process(any(DemoInboundEvent::class)) } just Runs
   consumer.receive(demoInboundEvent)
   verify(exactly = 1) { serviceMock.process(demoInboundEvent) }
}

The service class is similarly tested in DemoSeviceTest [Kotlin | Java] to ensure the producer is called (using a mock) to emit an outbound event.

The DemoProducer [Kotlin | Java] is an interface so the DemoProducerTest [Kotlin | Java] can only verify that the function/method is present and callable:

@Test
fun testSendOutbound() {
   val demoProducer: DemoProducer = object : DemoProducer {
       override fun sendOutbound(demoOutboundEvent: DemoOutboundEvent) {}
   }
   val demoOutboundEvent = DemoOutboundEvent("1", "Test Data")
   demoProducer.sendOutbound(demoOutboundEvent)
}

Integration Tests

The integration tests are responsible for verifying that the application code correctly integrates with the Kafka broker. To this end a Kafka instance is required. It is important that these tests can run quickly, be repeatable, and can equally run locally or in a CI pipeline. To this end, using an in-memory Kafka instance that starts up quickly is ideal. However, support for an embedded Kafka broker was removed in version 4.0.0 of micronaut-kafka, in favour of using Testcontainers. Testcontainers can be used to spin up an actual instance of Kafka in a Docker container. This is a more heavyweight approach than using an in-memory broker, and in fact the strategy used for component testing, which is covered in the next section.

Therefore a third party library that provides an in-memory broker has been selected. A couple of strong candidates for this are embedded-kafka and kafka-junit. Both libraries are mature with a strong user base, good documentation, and are straightforward to use. The embedded-kafka library is implemented in Scala and is a particularly good fit for Scala based applications, whereas kafka-junit may be considered a better choice for a Kotlin or Java Micronaut application in terms of ease of integration. To that end, the kafka-junit library is used in this demo.

The integration test EndToEndIntegrationTest [Kotlin | Java], annotated with @MicronautTest to ensure the application context is started at the outset of the test, begins by starting up the embedded broker:

@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EndToEndIntegrationTest : TestPropertyProvider {

   companion object {
       private const val DEMO_INBOUND_TEST_TOPIC = "demo-inbound-topic"
       private const val DEMO_OUTBOUND_TEST_TOPIC = "demo-outbound-topic"
       private var kafka: EmbeddedKafkaCluster

       init {
           kafka = EmbeddedKafkaCluster.provisionWith(EmbeddedKafkaClusterConfig.defaultClusterConfig())
           kafka.start()
       }
   }

   @NonNull
   override fun getProperties(): Map {
       return mapOf(
           "kafka.bootstrap.servers" to kafka.brokerList
       )
   }

   @BeforeAll
   fun setupOnce() {
       KafkaTestUtils.initialize(kafka.brokerList).waitForApplicationConsumer(DEMO_INBOUND_TEST_TOPIC)
   }

It implements Micronaut Test's TestPropertyProvider which sets a property named kafka.bootstrap.servers with the bootstrap URL of the in-memory broker. The application-test.yml [Kotlin | Java] then references this value which is set by the in-memory broker on start up, enabling the application consumer and producer to connect to it during the test execution.

kafka:
 bootstrap:
   servers: ${kafka.bootstrap.servers}

A utility class KafkaTestUtils [Kotlin | Java] is provided that defines a function that waits until the application consumer has subscribed to the inbound topic. This ensures that the test does not continue and start sending messages to Kafka before the application consumer is ready to consume them and so risk missing these messages.

The integration test is now able to send in a number of events to the embedded broker using the send function on the EmbeddedKafkaCluster class. It then asserts that the application emits corresponding events for each to the outbound topic by consuming from the application outbound topic using the observe function.

@Test
fun testSuccess() {
   val totalMessages = 5
   for (i in 0 until totalMessages) {
       val id = UUID.randomUUID().toString()
       val payload = "payload-" + UUID.randomUUID()
       val event = JsonMapper.writeToJson(DemoInboundEvent(id, payload))
       kafka.send(SendValues.to(DEMO_INBOUND_TEST_TOPIC, event))
   }
   kafka.observe(ObserveKeyValues.on(DEMO_OUTBOUND_TEST_TOPIC, totalMessages))
}

Component Tests

As with the integration tests, the component tests treat the system as a black box, but instead of an in-memory broker it uses the Component Test Framework with the Testcontainers library to spin up an actual instance of Kafka in a Docker container. The Micronaut application itself is also spun up in a Docker container, and the component test sends in events to Kafka that the application consumes, and consumes the events from the topic that the application produces to.

Figure 2: Component testing a Micronaut application with Kafka

Figure 2: Component testing a Micronaut application with Kafka

As detailed in the third article in the series, Micronaut: Testing, the component test EndToEndCT [Kotlin | Java] is annotated with @ExtendWith(ComponentTestExtension::class), which hooks into the Component Test Framework and spins up the required Docker containers. It also uses this framework to initialise the Kafka client that is used to receive events.

@Slf4j
@ExtendWith(ComponentTestExtension::class)
class EndToEndCT {
   private lateinit var consumer: Consumer

   @BeforeEach
   fun setup() {
       consumer = KafkaClient.getInstance().initConsumer("EndToEndCT", "demo-outbound-topic", 3L)
   }

This is all that is required for the test set up. The test is now defined that sends in events to the demo-inbound topic and consumes and verifies the events from the demo-outbound topic.

@Test
fun testFlow() {
   val totalMessages = 100
   for (i in 0 until totalMessages) {
       val id = UUID.randomUUID().toString()
       KafkaClient.getInstance().sendMessage("demo-inbound-topic", null, JsonMapper.writeToJson(DemoInboundEvent(id, "Test data")))
   }
   val outboundEvents = KafkaClient.getInstance().consumeAndAssert<String>("testFlow", consumer, totalMessages, 3)
   outboundEvents.stream().forEach { outboundEvent: ConsumerRecord<String, String> ->
       assertThat(outboundEvent.value(), containsString("Test data"))
   }
}

The gradle.properties [Kotlin | Java] defines any configuration overrides required for the Component Test Framework. In this case it enables Kafka, so that the framework spins up Kafka in a Docker container at the outset of the test run:

systemProp.kafka.enabled=true

The application-test-component.yml [Kotlin | Java] defines the configuration used by the application during the component test run. In this case the bootstrap server URL is set to that of the Dockerised Kafka instance:

---
kafka:
   bootstrap:
       servers: kafka:9092

To run the component tests, the application and Docker image are first built, either as a standard application jar:

./gradlew clean build
docker build -t ct/micronaut-kafka-kotlin:latest .

Or as a native application jar:

./gradlew clean nativeCompile
./gradlew dockerBuildNative

And the component tests are then run via the command:

./gradlew componentTest --rerun-tasks

The component tests prove that a real running instance of the application successfully integrates with a real instance of the Kafka broker. This same test can be run both locally by the developer as they develop the code, and automated in the CI/CD pipeline to prove that future changes do not break this flow.

Running The Application

A docker-compose.yml file [Kotlin | Java] is provided that spins up Kafka and Zookeeper instances in Docker. This is run with the following command:

docker-compose up -d

The Micronaut application is then built and run, as previously described in Micronaut: Demo Application for the standard application and Micronaut: Native Builds for the native build of the application. For example, to run the native build:

./gradlew nativeCompile
./gradlew nativeRun

An event can now be sent to the Kafka broker, to the inbound topic of the application. The kafka-console-producer that is available in the Kafka Docker container can be used to achieve this with the following command:

docker exec -ti kafka kafka-console-producer \
--topic demo-inbound-topic \
--broker-list kafka:29092

Now enter the JSON event to send (which needs to match the expected schema):

{"id": "2c48eabf-35b0-4f43-8ff4-d123471ecdfb", "data": "my-data"}

This is consumed by the Micronaut application, which emits a corresponding event to the outbound topic. This can be then consumed using the kafka-console-consumer tool with the following command:

docker exec -ti kafka kafka-console-consumer \
--topic demo-outbound-topic \
--bootstrap-server kafka:29092 \
--from-beginning

The output shows the outbound event:

{"id":"368df778-473b-461c-9cc0-943f18c7b07e","data":"Processed data: my-data"}

Summary

The Micronaut framework with the Kafka module provides excellent support for integrating with Kafka. By applying annotations to the code the developer is able to easily build the necessary consumers and producers with little boilerplate required, instead concentrating their time on writing the required business logic. The application can be built and executed as either a standard or native build, with the choice coming down to the best fit for the environment it will be deployed in. By utilising an in-memory Kafka broker for integration testing and the Component Test Framework with Testcontainers for component testing, a comprehensive suite of tests can be built to verify that the application integrates with Kafka.

Source Code

There are two flavours of the Micronaut application, one in Kotlin and one in Java, and the source code is available here:

Kotlin version: https://github.com/lydtechconsulting/micronaut-kafka-kotlin/tree/v1.0.0
Java version: https://github.com/lydtechconsulting/micronaut-kafka-java/tree/v1.0.0


View this article on our Medium Publication.