Kafka Transactions: Part 2 - Spring Boot Demo

Lydtech
Kafka Transactions: Part 2 - Spring Boot Demo

Introduction

In the first part of this article the use of Kafka Transactions to achieve exactly-once messaging semantics was explored. In this second part the behaviour is demonstrated via a Spring Boot application.

Demo Application

The accompanying Spring Boot application demonstrates Kafka's exactly-once message processing. An integration test uses a Test Kafka Producer to send in messages via the embedded in-memory Kafka broker that are consumed by the application. Test consumers then poll for the resulting messages that the application produces to the outbound topics both with and without Kafka transactions.

The integration test class source is located here.

Test Flow

The following diagram shows the components in place for the Spring Boot Demo application and integration tests.

Figure 6

The application consumes a message and performs three resulting actions:

Step 1  Emits an outbound message to Topic 1.

Step 2  Makes a REST call to a third party service.

Step 3  Emits an outbound message to Topic 2.

As described in the Wiremock section below, the call to the REST service that is sandwiched between the two outbound writes allows the tests to demonstrate failure scenario behaviour.

Application Kafka Configuration

These actions take place within a Kafka transaction. To enable this, the application Producer is configured with a transactionalId, and idempotence is enabled:

config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

The KafkaTransactionManager is wired into the Spring context:

@Bean
public KafkaTransactionManager kafkaTransactionManager(final ProducerFactory producerFactoryTransactional) {
   return new KafkaTransactionManager<>(producerFactoryTransactional);
}

The full Kafka configuration source for the application is here.

The service method is annotated @Transactional:

@Transactional
public void processWithTransaction(String key, DemoInboundEvent event) {
   kafkaClient.sendMessageWithTransaction(key, event.getData(), properties.getOutboundTopic1());
   callThirdparty(key);
   kafkaClient.sendMessageWithTransaction(key, event.getData(), properties.getOutboundTopic2());
}

The consumer is configured to disable auto commit of consumer offsets, as it is the Producer's responsibility to send the consumer offsets to be committed to the Consumer Coordinator during the transaction:

config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

The consumer listener method itself is annotated with @KafkaListener, and told which topic to listen to. In this case the topic is named demo-transactional-inbound-topic to clearly indicate this topic as the source for the flow that will use Kafka transactions:

@KafkaListener(topics = "demo-transactional-inbound-topic",
	groupId = "kafkaConsumerGroup",
	containerFactory = "kafkaListenerContainerFactory")
public void listen(@Header(KafkaClient.EVENT_ID_HEADER_KEY) String eventId,
	@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
	@Payload final String payload) {

To contrast the behaviour between transactional and non-transactional flows, the service also has a consumer that consumes and processes a message, producing outbound messages using a non-transactional Producer. This consumer listens to the inbound topic demo-non-transactional-inbound-topic, again to clearly indicate that processing messages from this topic will not utilise a transaction.

Wiremock

The REST call at step Step 2 of course is not included in the atomic transaction, but is a useful way of forcing a retryable failure exception via wiremock stubbing. On the first call to the wiremock, following the first outbound event write to Topic 1, it returns a 500 Internal Server Error. Subsequent calls to the wiremock are stubbed to return a 200 Success.

stubWiremock("/api/kafkatransactionsdemo/" + key, 500, "Unavailable", "failOnce", STARTED, "succeedNextTime");
stubWiremock("/api/kafkatransactionsdemo/" + key, 200, "success", "failOnce", "succeedNextTime", "succeedNextTime");

This 500 exception percolates up and the poll ends unsuccessfully. This means the same event is re-consumed on the next poll. The first event is re-emitted to Topic 1, and this time the REST call returns successfully with a 200, and then the second event is emitted to Topic 2. The consumer offsets, sent by the Producer to the Consumer Coordinator, are written to the consumer offsets topic, marking the original event as successfully consumed.

The transaction completes, with Spring taking care of the boilerplate code, resulting in the Transaction Coordinator writing the commit markers to the two topics along with the consumer offsets topic. Processing completes, and the messages are committed to the topics.

SpringBoot Test Configuration

The SpringBoot integration test uses an embedded (in-memory) Kafka broker. The broker is configured via the following Spring Kafka annotation in the SpringBoot test, ensuring that there are three instances created to cater for the required replication factor of 3 to provide resilience for the transactions state topic.

@EmbeddedKafka(controlledShutdown = true, count=3,
    topics = { "demo-transactional-inbound-topic", "demo-non-transactional-inbound-topic" })

The test then defines its own Kafka configuration to ensure it does not use that configured for the application itself. The test has four downstream consumers configured, two for each of the two topics. (Different consumer groups are used for the consumers on each topic, to ensure both consume any messages written). One of each pair has read isolation.level READ_COMMITTED:

config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));

...and the other READ_UNCOMMITTED:

config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT));

With this test setup in place, a message is produced to the application inbound topic (as shown in the diagram above), which the application consumes and processes. The tests then assert on the counts of the number of events received by each downstream consumer.

Results

The READ_COMMITTED downstream consumer for Topic 1 receives the message once. The READ_UNCOMMITTED consumer for Topic 1 receives two messages, reflecting the fact the message was originally written to Topic 1 (but not committed) but the processing failed in the retryable exception.

The two consumers for Topic 2 both receive the one message, as in this test flow the message was only written to the topic once following the failure retry, and only then was it successfully committed.

So the READ_COMMITTED downstream consumer for Topic 1 demonstrates the Kafka exactly-once messaging semantics. Although a message within the consume-process-produce flow was processed twice, it has only been consumed once by the downstream transaction aware consumer.

The REST call to the third party service also highlights that although the exactly-once delivery to the downstream consumer can be guaranteed, any further actions occurring during the processing of the original message may happen more than once. The ramifications of those duplicate calls (and writes in the case of a database) will need their own consideration.

Source Code

The source code for the accompanying demo application is available here:

https://github.com/lydtechconsulting/kafka-transactions

Previous...

For the detailed exploration of how Kafka Transactions work to achieve exactly-once messaging, see the first part of this article.


View this article on our Medium Publication.