Introduction
The Kafka Streams: Introduction article provided an introduction to the Kafka Streams API and its architecture, benefits and usage. This article details the accompanying Spring Boot application that demonstrates stateless and stateful message processing using the API.
This is one of a series of articles covering different aspects of Kafka Streams. Jump to the 'More On Kafka Streams...' section below to explore.
Spring Boot Application
For this a demo a Spring Boot application has been created that simulates receiving payment events from Kafka and processes these payments including currency conversion using stateless KStream processors. The application tracks the account balances by aggregating the payment amounts using a stateful KTable processor, using RocksDB as the state store. The payments are then emitted to the relevant outbound rails topic. The rails topics relate to the bank rails that would then pick up the payments to make the actual money transfers. The code repository is here.
Topology
The Kafka Streams topology is as follows:
Figure 1: Payment Kafka Streams topology
Configuration
Spring handles the wiring of the Kafka Streams components and their configuration, with the following Kafka Streams configuration defined in KafkaStreamsDemoConfiguration being wired in:
@EnableKafkaStreams
public class KafkaStreamsDemoConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfig(@Value("${kafka.bootstrap-servers}") final String bootstrapServers) {
Map props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "kafka-streams-demo");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
Processing Flow
The processors are defined in PaymentTopology. This Spring component is wired with a StreamsBuilder:
@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder) {
The PaymentTopology then defines the following processing flow that consumed events will pass through:
Payment events are received from the inbound payment-topic.
KStream messageStream = streamsBuilder
.stream(properties.getPaymentInboundTopic(), Consumed.with(STRING_SERDE, PaymentSerdes.serdes()))
A custom serdes class PaymentSerdes (for serializing/deserializing) is provided here as the inbound JSON must be deserialized into a PaymentEvent PoJO.
public static Serde serdes() {
JsonSerializer serializer = new JsonSerializer<>();
JsonDeserializer deserializer = new JsonDeserializer<>(PaymentEvent.class);
return Serdes.serdeFrom(serializer, deserializer);
}
Any payments that use unsupported rails are filtered out.
.filter((key, payment) -> SUPPORTED_RAILS.contains(payment.getRails()))
The payments are branched into two streams, one for GBP payments, and one for USD payments.
KStream[] currenciesBranches = messageStream.branch(
(key, payment) -> payment.getCurrency().equals(Currency.GBP.name()),
(key, payment) -> payment.getCurrency().equals(Currency.USD.name())
);
The USD payments are transformed to GBP payments by applying the FX rate.
KStream fxStream = currenciesBranches[1].mapValues(
// Use mapValues() as we are transforming the payment, but not changing the key.
(payment) -> {
// Perform FX conversion.
double usdToGbpRate = 0.8;
PaymentEvent transformedPayment = PaymentEvent.builder()
.paymentId(payment.getPaymentId())
.amount(Math.round(payment.getAmount() * usdToGbpRate))
.currency(Currency.GBP.name())
.fromAccount(payment.getFromAccount())
.toAccount(payment.getToAccount())
.rails(payment.getRails())
.build();
return transformedPayment;
});
The branched streams are merged back into one.
KStream mergedStreams = currenciesBranches[0].merge(fxStream)
Two things now occur:
The stream is branched by the payments’ bank rails.
KStream[] railsBranches = mergedStreams.branch(
(key, payment) -> payment.getRails().equals(Rails.BANK_RAILS_FOO.name()),
(key, payment) -> payment.getRails().equals(Rails.BANK_RAILS_BAR.name()));
The payment events are serialized and emitted to their respective outbound bank rails topics.
railsBranches[0].to(properties.getRailsFooOutboundTopic(), Produced.with(STRING_SERDE, PaymentSerdes.serdes()));
railsBranches[1].to(properties.getRailsBarOutboundTopic(), Produced.with(STRING_SERDE, PaymentSerdes.serdes()));
A KTable is used to aggregate the payment balances by the fromAccount name.
mergedStreams
.map((key, payment) -> new KeyValue<>(payment.getFromAccount(), payment.getAmount()))
.groupByKey(Grouped.with(STRING_SERDE, LONG_SERDE))
.aggregate(new Initializer() {
@Override
public Long apply() {
return 0L;
}
}, new Aggregator() {
@Override
public Long apply(final String key, final Long value, final Long aggregate) {
return aggregate + value;
}
}, Materialized.with(STRING_SERDE, LONG_SERDE).as("balance"));
State Store Query
A REST endpoint in BalanceController is exposed to allow clients to query the current balance by account:
@GetMapping("/balance/{account}")
public ResponseEntity getAccountBalance(@PathVariable String account) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore balances = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("balance", QueryableStoreTypes.keyValueStore())
);
return ResponseEntity.ok(balances.get(account));
}
Testing
The Kafka Streams API provides a org.apache.kafka.streams.TopologyTestDriver which is used to test the topology in a unit test. An example test utilising the driver is PaymentTopologyTest. This and other means to fully test a Kafka Streams application are explored in the Kafka Streams: Testing article.
Topology Description
Spring’s StreamsBuilderFactory Bean provides an option to describe the topology. In TopologyController this has been exposed as a REST endpoint to query.
@GetMapping("/topology")
public ResponseEntity getTopology() {
return ResponseEntity.ok(factoryBean.getTopology().describe().toString());
}
The topology description String in the REST response can be plugged into the tool at https://zz85.github.io/kafka-streams-viz/ to give a visual representation of the topology. Here is a part of this topology:
Figure 2: Generated Streams topology
Source Code
The source code for the accompanying Kafka Streams application is available here:
https://github.com/lydtechconsulting/kafka-streams/tree/v1.3.0
More On Kafka Streams...
The Kafka Streams: Introduction article delves into the API and covers its benefits and characteristics.
The Kafka Streams: Testing article covers the tools and approaches available to comprehensively test a Kafka Streams application.
The Kafka Streams: State Store article looks at the role and usage of the state store in a stateful Kafka Streams application.
The Kafka Streams: Transactions & Exactly-Once Messaging article detailing how to achieve exactly-once messaging using Kafka Transactions with a Kafka Streams application.
Resources
Kafka Streams Topology Visualizer by Joshua Koo, Than Hedman, Marc Löhe
View this article on our Medium Publication.