Kafka Schema Registry & Avro: Spring Boot Demo (2 of 2)

Lydtech
Kafka Schema Registry & Avro: Spring Boot Demo (2 of 2)

Introduction

This is the second part of the in-depth look at the Kafka Schema Registry & Avro Spring Boot demo application. Part one provided an overview of the application and covered the process flow as an Avro serialized message is consumed and deserialized by the application, and a resulting outbound message is serialized using Avro and produced to Kafka. In this part the project structure and implementation are covered in detail.

The source code for the Spring Boot demo is available here:

https://github.com/lydtechconsulting/kafka-schema-registry-avro/tree/v1.0.0

Project Structure

The project is structured as a multi-module maven project, with the following modules:

  • avro-schema: contains the Avro message schema definitions
  • schema-registry-demo-service: the Spring Boot application
  • component-test: the component tests

Separating these into their own modules ensures a clean separation of concerns. The Avro generated classes are therefore built into a jar and pulled in as a dependency to the service. The component tests meanwhile treat the application as a black box, also pulling in the Avro generated classes in order to construct and send the events (and consume the responses), but does not use any artifacts from the service itself. If the component tests lived within the service module there is a risk that a class can accidentally be used, breaking the clean separation.

The parent pom declares these modules, so building the project from the root builds each module in turn.

Avro Event Schemas

The Avro event schemas are defined as JSON in .avsc files, and located below the resources directory.

For example, the send-payment.avsc schema looks like this:

{
     "type": "record",
     "namespace": "demo.kafka.event",
     "name": "SendPayment",
     "version": "1",
     "fields": [
        { "name": "payment_id", "type": "string" },
        { "name": "amount", "type": "double" },
        { "name": "currency", "type": "string" },
        { "name": "to_account", "type": "string" },
        { "name": "from_account", "type": "string" }
     ]
}

Note that the schema definition does not include a schema Id. This Id is generated and managed by the Schema Registry, with a new unique Id being assigned when the new schema, or new version of an existing schema, is registered.

The pom.xml in the avro-schema module defines the avro-maven-plugin.

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>${avro.version}</version>
    <executions>
    	<execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
            </goals>
            <configuration>
            	<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                <outputDirectory>${project.build.directory}/generated/avro</outputDirectory>
                <createSetters>false</createSetters>
                <enableDecimalLogicalType>true</enableDecimalLogicalType>
                <fieldVisibility>private</fieldVisibility>
            </configuration>
        </execution>
    </executions>
</plugin>

To generate the Java sources for these schemas, run:

mvn generate-sources

To generate the jar file avro-schema-1.0.0.jar containing these classes that can be pulled in to other modules and projects, run:

mvn clean install

The schema-registry-demo-service module that contains the Spring Boot code then defines the following dependency in its pom.xml:

<dependency>
    <groupId>demo.kafka</groupId>
    <artifactId>avro-schema</artifactId>
    <version>${project.version}</version>
</dependency>

Consumer Configuration

The demo application defines a standard Spring Kafka consumer called PaymentConsumer, with the following definition:

@KafkaListener(topics = "send-payment", groupId = "demo-consumer-group", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Payload final SendPayment command) {

There is no direct coupling of Avro to the consumer, as the spring-kafka library is responsible for handing the consumer the deserialized payload, in this case SendPayment. Instead the container factory is specified, and it is this that is configured with the required deserializer to use. This container factory Spring bean and associated factory configuration is defined in the KafkaDemoConfiguration class, a Spring @Configuration class. This factory bean is wired with the following consumer configuration:

@Bean
public ConsumerFactory consumerFactory(@Value("${kafka.bootstrap-servers}") final String bootstrapServers, @Value("${kafka.schema.registry.url}") final String schemaRegistryUrl) {
    final Map config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-group");
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
    config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    config.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, false);
    config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    return new DefaultKafkaConsumerFactory<>(config);
}

The following table summaries the consumer deserialization related configuration parameters and their usage:

Configuration Parameter Value Usage
VALUE_DESERIALIZER_CLASS_CONFIG ErrorHandlingDeserializer.class Spring class to cleanly handle deserialization errors
KEY_DESERIALIZER_CLASS_CONFIG ErrorHandlingDeserializer.class Spring class to cleanly handle deserialization errors
VALUE_DESERIALIZER_CLASS KafkaAvroDeserializer.class The Confluent class to deserialize messages in the Avro format
KEY_DESERIALIZER_CLASS StringDeserializer.class The key is defined in String format, so use the String deserializer
SCHEMA_REGISTRY_URL_CONFIG Wired in from application properties, e.g. http://localhost:8081 The URL to the Confluent Schema Registry
AUTO_REGISTER_SCHEMAS false Whether schemas that do not yet exist should be registered
SPECIFIC_AVRO_READER_CONFIG true Deserialize to the generated Avro class rather than a GenericRecord type

This configuration highlights the fact that message keys can be serialized using Avro, and this schema is separate from the message value. In the demo application the key is serialized as a String, and the value is serialized using Avro.

The AUTO_REGISTER_SCHEMAS would be expected to be disabled in Production, as schemas should be registered via a managed process. It should not be left to the application to automatically register new schemas when it receives messages for which a schema does not yet exist. Automatic schema registration might though be enabled in other environments, such as to facilitate testing. However for this demo application it is disabled, and the included integration and component tests are responsible for the necessary mocking or actual schema registration itself.

Event Processing

Once the PaymentConsumer has been called by the spring-kafka framework with the SendPayment PoJO, it hands it off to the PaymentService to process. This includes a sendPayment() call that in this demo is a no-op call, but the intention would be that this would for example make a POST request to a bank rails service to instruct the actual payment. The PaymentService then builds a PaymentSent message, a class that was generated from the Avro schema. It passes this to the Kafka producer to emit the Avro serialized outbound payment-sent message.

Producer Configuration

As with the consumer, for the producer there is no Avro specific code required to be added by the developer for sending a message to Kafka. This is handled by configuration and the Spring framework wires it in as required. The standard producer code is in the KafkaClient class:

final ProducerRecord record = new ProducerRecord<>(properties.getOutboundTopic(), key, outboundEvent);
final SendResult result = (SendResult) kafkaTemplate.send(record).get();

No matter what changes are made to the deserialization strategy in the configuration, there will be no code change required here. The two are decoupled. The deserialization configuration is also defined in the KafkaDemoConfiguration class, and is passed in to the ProducerFactory Spring bean:

@Bean   
public ProducerFactory producerFactory(@Value("${kafka.bootstrap-servers}") final String bootstrapServers, @Value("${kafka.schema.registry.url}") final String schemaRegistryUrl) {
    final Map config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);          
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    config.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    config.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
    return new DefaultKafkaProducerFactory<>(config);
}

The following table summaries the producer serialization related configuration parameters and their usage:

Configuration Parameter Value Usage
VALUE_SERIALIZER_CLASS KafkaAvroSerializer.class The Confluent class to serialize messages in the Avro format
KEY_SERIALIZER_CLASS StringSerializer.class The key is defined in String format, so use the String serializer
SCHEMA_REGISTRY_URL_CONFIG Wired in from application properties, e.g. http://localhost:8081 The URL to the Confluent Schema Registry
AUTO_REGISTER_SCHEMAS false Whether schemas that do not yet exist should be registered

Testing

The application relies on a Kafka messaging broker and Schema Registry resources to be available, so these must be catered for in integration and component tests. For integration tests an embedded Kafka broker can be used, and the Schema Registry wiremocked. For component tests these resources can be spun up in Docker containers for the test run. These test stages are covered in separate articles, see the 'More On Kafka Schema Registry & Avro' section below.

Source Code

The source code for the accompanying Spring Boot demo application is available here:

https://github.com/lydtechconsulting/kafka-schema-registry-avro/tree/v1.0.0

More On Kafka Schema Registry & Avro

The following accompanying articles cover the Schema Registry and Avro:

Kafka Schema Registry & Avro: Introduction: provides an introduction to the schema registry and Avro, and details the serialization and deserialization flows.

Kafka Schema Registry & Avro: Spring Boot Demo (1 of 2): provides an overview of the companion Spring Boot application and steps to run the demo.

Kafka Schema Registry & Avro: Integration Testing: looks at integration testing the application using Spring Boot test, with the embedded Kafka broker and a wiremocked Schema Registry.

Kafka Schema Registry & Avro: Component Testing: looks at component testing the application using Testcontainers and the component-test-framework to bring up the application, Kafka, and the Schema Registry in Docker containers.


View this article on our Medium Publication.