Kafka Connect: Transactional Outbox With Debezium: Spring Boot Demo

Lydtech
Kafka Connect: Transactional Outbox With Debezium: Spring Boot Demo

Introduction

Kafka Connect provides an API for streaming data between one resource and Kafka. Source connectors are used to stream data from datastores into Kafka, and sink connectors stream data from Kafka into datastores. Typically the datastore will be a database, although it could equally be a file, S3, or ElasticSearch for example. This article covers streaming data from a Postgres database into Kafka, using a source connector provided by Debezium.

A Spring Boot application is used to demonstrate Kafka Connect in action, making use of the Transactional Outbox pattern to broadcast events based on database writes. The source code for the application is here.

Kafka Connect

Kafka Connect is an API for streaming data to or from a datastore into and out of Kafka using connectors. It is an independent, distributed part of the architecture running as a cluster of processes, decoupled from any application. Kafka Connect is built on the Kafka Consumer and Producer APIs, and so retains the same benefits that these provide, including high scalability, throughput, availability, and resiliency. Typically in Production Kafka Connect would be run on its own server, separate from the Kafka cluster itself.

As well as streaming the data between Kafka and a datastore, Kafka Connect provides a mechanism for transforming the data. This may be in the form of decorating the events with more data, adding or removing fields, or altering the structure of the events.

Debezium

While Kafka Connect provides a framework and runtime for implementing and operating connectors, Debezium provides the implementation of open source connectors for change data capture (CDC). CDC refers to capturing all committed changes to a database, be it inserts, updates or deletes by scanning the database transaction log files, and writing these changes as new events to Kafka. It is highly efficient, working in real-time. This process can be referred to as extract, transform and load (ETL). Debezium extracts the data from the database, transforms it based on configuration, and loads it into Kafka.

Transactional Outbox

Using an Outbox is a common pattern when developing event driven systems. Records that should be streamed from the database to the messaging broker are written to a dedicated outbox database table. This ensures there is a single purpose for this table making the architecture clean and easy to understand. The alternative, which is also viable, is to stream records from the required tables, rather than having a single dedicated table. However this can quickly become complex and harder to follow as different tables may have different streaming requirements.

The Transactional Outbox pattern then is used to ensure that the records written to an outbox table are written within the same transaction as other related writes by the application. For example, a new Item record is persisted to the database, and along with this write a further write with the corresponding Outbox record is created within the same transaction. The writes are atomic. They either succeed together or are rolled back together, ensuring the data is consistent.

Figure 1: Transactional Outbox with Debezium

Figure 1: Transactional Outbox with Debezium

The Transactional Outbox pattern can be combined with the Idempotent Consumer pattern to provide exactly-once messaging semantics. New events are deduplicated via a dedicated event Id table, with these writes happening in the same transaction as the writes to the Outbox table. This design is covered in detail in the article Kafka Idempotent Consumer & Transactional Outbox.

Spring Boot Demo

Overview

The accompanying Spring Boot application demonstrates using the Transactional Outbox pattern, with the Debezium Postgres Kafka Connect source connector streaming outbox events to the Kafka broker. The application exposes a REST endpoint which a client can call to submit a request to create a new Item. The item is persisted in the same transaction as the persist of the Outbox record.

Figure 2: Spring Boot and Debezium demo flow

Figure 2: Spring Boot and Debezium demo flow

Debezium is scanning the database log file for changes, and picks up the INSERT into the Outbox table. It streams the event, transforming it into the required format, writing it to the configured outbox event topic.

It is important to note here that while database writes are resulting in events being written to Kafka, the application itself has no knowledge that Kafka is in use and has no dependencies on any Kafka or Kafka Connect libraries. The application and Kafka are fully decoupled. Changing the streaming behaviour in any way would therefore have no impact on the application itself.

Connector Configuration

The Kafka Connect Debezium Postgres connector definition is defined in the connector folder in the root of the project: debezium-postgres-source-connector.json. There are many options for configuring a connector, check the Debezium documentation for this. This connector streams changes from the Outbox table:

"table.include.list": "debezium_postgres_demo.outbox",

It routes the events to the topic based on the value written to the destination field of the Outbox table:

"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "destination",

Debezium defaults the topic name to outbox.event followed by the value in the route.by.field above, or the aggregatetype column if that is not set. In this case as the item is written to the database with a destination value of item, the topic name resolves to outbox.event.item.

Events are transformed to use the value of the Outbox id field as the event key, and to use the value of the payload field as the event payload:

"transforms.outbox.table.field.event.key": "id",
"transforms.outbox.table.field.event.payload": "payload",

Demo Execution

The following are the steps to run the demo.

The Docker containers (Kafka, Zookeeper, Postgres, and Debezium) are started using docker-compose:

docker-compose up -d

The Debezium source connector is registered with Kafka Connect using curl:

curl -X POST localhost:8083/connectors -H "Content-Type: application/json" -d @./connector/debezium-postgres-source-connector.json

The Spring Boot demo application is built (using Java 17) and started:

mvn clean install
java -jar target/kafka-connect-debezium-postgres-demo-1.0.0.jar

The Kafka command-line interface tool console-consumer is started, listening on the outbox topic:

docker exec -ti kafka bash
kafka-console-consumer --topic outbox.event.item --bootstrap-server kafka:29092

A REST request is POSTed to the application to create an item:

curl -i -X POST localhost:9001/v1/item -H "Content-Type: application/json" -d '{"name": "test-item"}'

When the application receives the REST request, the following occurs:

The application creates the item entity and writes to the outbox table. It logs these writes:

Item created with id 3e97d918-85cf-47ce-b58f-e13be187f080 - and Outbox entity created with Id: 687e5def-2c87-4d5b-ade1-27f8b4ed41b1

And responds to the REST call with a 201 Created:

HTTP/1.1 201
Location: 3e97d918-85cf-47ce-b58f-e13be187f080

Debezium writes an event to the outbound topic via Change Data Capture (CDC).

The console-consumer consumes the outbox event which is output:

{"schema":{"type":"string","optional":true},"payload":"{\"id\":\"3e97d918-85cf-47ce-b58f-e13be187f080\",\"name\":\"test-item\"}"}

Testing

As Kafka Connect is fully decoupled from the application it is important to note that the complete end to end flow cannot be covered in a Spring Boot integration test. The Spring Boot test in the demo application EndToEndIntegrationTest.java, covers the end to end flow from the perspective of the application. That is, a REST request is submitted to create an item, a successful response is expected, and the Item and Outbox entities have been written to the database.

However, in order to prove that the outbox event is captured by the Debezium connector and written to the outbound topic in Kafka, a component test is required. The example provided, EndToEndCT, uses the Testcontainers library to spin up the required resources in Docker containers, orchestrated by Lydtech’s open source Component Test Framework. Containers are started for the Kafka broker, Zookeeper, the Debezium Kafka Connect instance, and Postgres. It also runs the Spring Boot application in a Docker container. The test provides a Kafka listener on the outbound topic, which asserts that the expected outbound events have been emitted by the Debezium connector.

The following diagram contrasts the respective scopes of integration and component tests in this architecture:

Figure 3: Integration and component test scopes

Figure 3: Integration and component test scopes

The ReadMe file details how to run the integration and component tests in the project.

Conclusion

Kafka Connect provides an API to stream records to and from a datasource and Kafka. It is fully scalable, performant, and resilient to failure. There are many connectors available to use, including the popular Debezium open source connectors. The Spring Boot application demonstrated how to use such a connector with Postgres to create an end to end flow, with the application being fully decoupled from Kafka. By combining with the Transactional Outbox pattern, Kafka Connect provides a powerful tool when implementing event driven architectures.

Source Code

The source code for the accompanying Spring Boot demo application is available here:

https://github.com/lydtechconsulting/kafka-connect-debezium-postgres/tree/v1.0.0

More On Kafka Connect...

Kafka Idempotent Consumer & Transactional Outbox: Combining the Transactional Outbox pattern with the Idempotent Consumer pattern to achieve exactly-once messaging semantics.


View this article on our Medium Publication.