Kafka Schema Registry & Avro: Spring Boot Demo (1 of 2)

Lydtech
Kafka Schema Registry & Avro: Spring Boot Demo (1 of 2)

Introduction

This is the first of a two part article detailing a Spring Boot demo application that provides an end to end example of an application using Kafka as the message broker along with Confluent Schema Registry and Avro serialization. In this first part an overview of the application is provided along with steps to run the demo. In the second part the implementation is covered in detail.

An overview of the benefits of using a message schema, choosing Avro for serialization, and adding a schema registry to the architecture is provided in the article Kafka Schema Registry & Avro: Introduction.

The source code for the Spring Boot demo is available here:

https://github.com/lydtechconsulting/kafka-schema-registry-avro/tree/v1.0.0

Spring Boot Application Overview

The application simulates a basic payments service, which receives a command event from the inbound Kafka topic send-payment that triggers a payment to be made. The intention would be that a third party banking service would be called via REST to instruct the payment. This call is shown in the diagram below for illustrative purposes, but is just implemented as a no-op call in the code. To complete the processing the application then produces an event to the payment-sent topic, informing interested parties that the payment was successfully made.

The SendPayment and PaymentSent events are defined via an Avro schema. The payments service uses the Schema Registry to retrieve the schema information in order to serialize and deserialize the Avro messages.

The following diagram shows the processing flow as a send-payment command event is consumed. The prerequisite for this is for the message schemas to have been registered with the Schema Registry.

Demo Application

Step 1A send-payment command event is consumed by the demo application from the inbound topic.

Step 2The schema Id is extracted from the bytes at the beginning of the message and used to query the Schema Registry for the message schema, which is then cached.

Step 3The message is deserialized using the schema.

Step 4The processing results in a POST REST call to the third party Bank Rails service. (This is excluded from the demo application, but included here for illustrative purposes)

Step 5The Schema Registry is queried to acquire the schema Id for the schema of the outbound message, which is then cached. The schema is acquired by reflection on the message class.

Step 6The message is serialized, with the schema Id included at the beginning of the message.

Step 7The message is written to the outbound payment-sent Kafka topic.

Running The Spring Boot Application

The Spring Boot application relies on a running Kafka broker instance (along with Zookeeper) and a running schema registry. This demo uses Confluent’s Schema Registry. The Schema Registry command line tools are used to send in a send-payment command event to the inbound topic, and to consume the resulting outbound payment-sent message.

Start Docker Containers

The first step is to spin up these components in Docker containers. A docker-compose.yml is provided in the root directory, so run:

docker-compose up -d

The logs can be followed for any of the containers, e.g. the container names can be listed using:

docker ps

Then to e.g. view the Kafka logs, run:

docker logs -f kafka

Start Application

The Spring Boot application itself is built as a jar file, via:

mvn clean install

To start the demo Spring Boot application jump to the schema-registry-demo-service directory and execute the jar:

java -jar target/schema-registry-demo-service-1.0.0.jar

Register Schemas

The application is now running, and the application’s consumer and producer are connected to the Kafka broker. The next step is to register the schemas for the Avro messages with the schema registry. For this the Confluent’s kafka-schema-registry-maven-plugin is used. This is configured with the location of the Avro schema files which it will POST to the Confluent Schema Registry:

<plugin>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry-maven-plugin</artifactId>
  <version>${confluent.version}</version>
  <configuration>
     <schemaRegistryUrls>
        <param>http://localhost:8081</param>
     </schemaRegistryUrls>
     <subjects>
        <send-payment-value>${project.basedir}/src/main/resources/avro/send_payment.avsc</send-payment-value>
        <payment-sent-value>${project.basedir}/src/main/resources/avro/payment_sent.avsc</payment-sent-value>
     </subjects>
  </configuration>
  <goals>
     <goal>register</goal>
  </goals>
</plugin>

Run the plugin from the avro-schemas directory with:

mvn schema-registry:register

The following output confirms that the schemas have been registered, along with the schema Ids that have been assigned to each:

[INFO] --- kafka-schema-registry-maven-plugin:5.5.5:register (default-cli) @ avro-schema ---
[INFO] Registered subject(payment-sent-value) with id 1 version 1
[INFO] Registered subject(send-payment-value) with id 2 version 1

Schema Registry REST API

Along with the API to register schemas, the Confluent Schema Registry provides a REST API for querying the registry. The new schemas just registered can be queried with curl:

List subjects:

curl -X GET http://localhost:8081/subjects

Get registered schemas for given the Ids:

curl -X GET http://localhost:8081/schemas/ids/1
curl -X GET http://localhost:8081/schemas/ids/2

Send Inbound Avro Message

At this point it is time to write a send-payment command event to the inbound topic for the application to consume. This is done using the command line tools. Jump onto the Schema Registry Docker container via:

docker exec -ti schema-registry bash

The command line tools from this Schema Registry container rather than the Kafka one must be used, as the Avro serializing producer and consumer tools are required.

Produce the send-payment command event. The consumer requires a message key to be present. As the key is only serialized to a String (and not using Avro) the key schema is defined accordingly:

kafka-avro-console-producer \
--topic send-payment \
--broker-list kafka:29092 \
--property schema.registry.url=http://localhost:8081 \
--property value.schema.id=2 \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property parse.key=true

Now enter the event to send (including a key prefix):

"0e8a9a5f-1d4f-46bc-be95-efc6af8fb308":{"payment_id": "0e8a9a5f-1d4f-46bc-be95-efc6af8fb308", "amount": 3.0, "currency": "USD", "to_account": "toAcc", "from_account": "fromAcc"}

Consume Outbound Avro Message

This send-payment command event is consumed by the application, which emits a resulting payment-sent event at the end of its processing. The Avro consumer can be used to consume this message:

kafka-avro-console-consumer \
--topic payment-sent \
--property schema.registry.url=http://localhost:8081 \
--bootstrap-server kafka:29092 \
--from-beginning

Output:

{"payment_id":"0e8a9a5f-1d4f-46bc-be95-efc6af8fb308","amount":3.0,"currency":"USD","to_account":"toAcc","from_account":"fromAcc"}

The end to end round trip has completed successfully.

Source Code

The source code for the accompanying Spring Boot demo application is available here:

https://github.com/lydtechconsulting/kafka-schema-registry-avro/tree/v1.0.0

More On Kafka Schema Registry & Avro

The following accompanying articles cover the Schema Registry and Avro:

Kafka Schema Registry & Avro: Introduction: provides an introduction to the schema registry and Avro, and details the serialization and deserialization flows.

Kafka Schema Registry & Avro: Spring Boot Demo (2 of 2): details the Spring Boot application project structure, implementation and configuration which enables it to utilise the Schema Registry and Avro.

Kafka Schema Registry & Avro: Integration Testing: looks at integration testing the application using Spring Boot test, with the embedded Kafka broker and a wiremocked Schema Registry.

Kafka Schema Registry & Avro: Component Testing: looks at component testing the application using Testcontainers and the component-test-framework to bring up the application, Kafka, and the Schema Registry in Docker containers.


View this article on our Medium Publication.