Kafka Streams: Spring Boot Demo

Lydtech
Kafka Streams: Spring Boot Demo

Introduction

The Kafka Streams: Introduction article provided an introduction to the Kafka Streams API and its architecture, benefits and usage. This article details the accompanying Spring Boot application that demonstrates stateless and stateful message processing using the API.

This is one of a series of articles covering different aspects of Kafka Streams. Jump to the 'More On Kafka Streams...' section below to explore.

Spring Boot Application

For this a demo a Spring Boot application has been created that simulates receiving payment events from Kafka and processes these payments including currency conversion using stateless KStream processors. The application tracks the account balances by aggregating the payment amounts using a stateful KTable processor, using RocksDB as the state store. The payments are then emitted to the relevant outbound rails topic. The rails topics relate to the bank rails that would then pick up the payments to make the actual money transfers. The code repository is here.

Topology

The Kafka Streams topology is as follows:

Figure 1

Figure 1: Payment Kafka Streams topology

Configuration

Spring handles the wiring of the Kafka Streams components and their configuration, with the following Kafka Streams configuration defined in KafkaStreamsDemoConfiguration being wired in:

@EnableKafkaStreams
public class KafkaStreamsDemoConfiguration {

   @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
   public KafkaStreamsConfiguration kafkaStreamsConfig(@Value("${kafka.bootstrap-servers}") final String bootstrapServers) {
       Map props = new HashMap<>();
       props.put(APPLICATION_ID_CONFIG, "kafka-streams-demo");
       props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
       return new KafkaStreamsConfiguration(props);
   }

Processing Flow

The processors are defined in PaymentTopology. This Spring component is wired with a StreamsBuilder:

@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder) {

The PaymentTopology then defines the following processing flow that consumed events will pass through:

  • Payment events are received from the inbound payment-topic.

KStream messageStream = streamsBuilder
   .stream(properties.getPaymentInboundTopic(), Consumed.with(STRING_SERDE, PaymentSerdes.serdes()))
  • A custom serdes class PaymentSerdes (for serializing/deserializing) is provided here as the inbound JSON must be deserialized into a PaymentEvent PoJO.

public static Serde serdes() {
   JsonSerializer serializer = new JsonSerializer<>();
   JsonDeserializer deserializer = new JsonDeserializer<>(PaymentEvent.class);
   return Serdes.serdeFrom(serializer, deserializer);
}
  • Any payments that use unsupported rails are filtered out.

.filter((key, payment) -> SUPPORTED_RAILS.contains(payment.getRails()))
  • The payments are branched into two streams, one for GBP payments, and one for USD payments.

KStream[] currenciesBranches = messageStream.branch(
   (key, payment) -> payment.getCurrency().equals(Currency.GBP.name()),
   (key, payment) -> payment.getCurrency().equals(Currency.USD.name())
);
  • The USD payments are transformed to GBP payments by applying the FX rate.

KStream fxStream = currenciesBranches[1].mapValues(
   // Use mapValues() as we are transforming the payment, but not changing the key.
   (payment) -> {
       // Perform FX conversion.
       double usdToGbpRate = 0.8;
       PaymentEvent transformedPayment = PaymentEvent.builder()
               .paymentId(payment.getPaymentId())
               .amount(Math.round(payment.getAmount() * usdToGbpRate))
               .currency(Currency.GBP.name())
               .fromAccount(payment.getFromAccount())
               .toAccount(payment.getToAccount())
               .rails(payment.getRails())
               .build();
       return transformedPayment;
   });
  • The branched streams are merged back into one.

KStream mergedStreams = currenciesBranches[0].merge(fxStream)

Two things now occur:

    • The stream is branched by the payments’ bank rails.

    KStream[] railsBranches = mergedStreams.branch(
       (key, payment) -> payment.getRails().equals(Rails.BANK_RAILS_FOO.name()),
       (key, payment) -> payment.getRails().equals(Rails.BANK_RAILS_BAR.name()));
    • The payment events are serialized and emitted to their respective outbound bank rails topics.

    railsBranches[0].to(properties.getRailsFooOutboundTopic(), Produced.with(STRING_SERDE, PaymentSerdes.serdes()));
    railsBranches[1].to(properties.getRailsBarOutboundTopic(), Produced.with(STRING_SERDE, PaymentSerdes.serdes()));
    • A KTable is used to aggregate the payment balances by the fromAccount name.

    mergedStreams
       .map((key, payment) -> new KeyValue<>(payment.getFromAccount(), payment.getAmount()))
       .groupByKey(Grouped.with(STRING_SERDE, LONG_SERDE))
       .aggregate(new Initializer() {
           @Override
           public Long apply() {
               return 0L;
           }
       }, new Aggregator() {
           @Override
           public Long apply(final String key, final Long value, final Long aggregate) {
               return aggregate + value;
           }
       }, Materialized.with(STRING_SERDE, LONG_SERDE).as("balance"));

State Store Query

A REST endpoint in BalanceController is exposed to allow clients to query the current balance by account:

@GetMapping("/balance/{account}")
public ResponseEntity getAccountBalance(@PathVariable String account) {
   KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
   ReadOnlyKeyValueStore balances = kafkaStreams.store(
           StoreQueryParameters.fromNameAndType("balance", QueryableStoreTypes.keyValueStore())
   );
   return ResponseEntity.ok(balances.get(account));
}

Testing

The Kafka Streams API provides a org.apache.kafka.streams.TopologyTestDriver which is used to test the topology in a unit test. An example test utilising the driver is PaymentTopologyTest. This and other means to fully test a Kafka Streams application are explored in the Kafka Streams: Testing article.

Topology Description

Spring’s StreamsBuilderFactory Bean provides an option to describe the topology. In TopologyController this has been exposed as a REST endpoint to query.

@GetMapping("/topology")
public ResponseEntity getTopology() {
   return ResponseEntity.ok(factoryBean.getTopology().describe().toString());
}

The topology description String in the REST response can be plugged into the tool at https://zz85.github.io/kafka-streams-viz/ to give a visual representation of the topology. Here is a part of this topology:

Figure 2

Figure 2: Generated Streams topology

Source Code

The source code for the accompanying Kafka Streams application is available here:

https://github.com/lydtechconsulting/kafka-streams/tree/v1.3.0

More On Kafka Streams...

Resources


View this article on our Medium Publication.