Kafka Connect: CDC with Debezium and MongoDB

Lydtech
Kafka Connect: CDC with Debezium and MongoDB

Introduction

When data is created, updated, or deleted on a database, a common design with event driven architectures is to emit events to Kafka topics detailing each relevant data change. This allows other interested parties to discover what data has changed by listening to these topics and consuming the change events. These downstream services can then act upon this new information, whether it be to trigger an action or to update a local copy of entities.

This article demonstrates using Change Data Capture (CDC) with Debezium to achieve streaming events from the NoSQL MongoDB database to the Kafka messaging broker. This compliments the article covering CDC with Postgres: Kafka Connect: Transactional Outbox With Debezium: Spring Boot Demo.

The source code for the accompanying demo application is available here.

CDC & Debezium

Data written to MongoDB can be streamed directly to Kafka using CDC. This is achieved with no additional code, but instead by configuring a Kafka Connect connector. The connector is responsible for tracking changes in the database by reading its log, and then generating events that it writes to Kafka. Debezium provides an implementation of a connector that can be configured to use with MongoDB.

Figure 1: Change Data Capture with MongoDB and Kafka Connect

Figure 1: Change Data Capture with MongoDB and Kafka Connect

The diagram illustrates how these two concerns, the application itself and the connector, are kept totally separate.

MongoDB uses an operations log, which is a special collection called the oplog that records all operations that make changes to the data in the databases. The Kafka Connect Debezium source connector tracks the oplog, and translates these to events that it writes to Kafka.

As the demo application demonstrates, replica sets must be enabled on the database otherwise MongoDB runs in standalone mode and does not use the oplog.

Spring Boot Demo

The accompanying Spring Boot demo application provides a CRUD REST API enabling a client to create, retrieve, update, and delete an item. The item is persisted in MongoDB in a collection named items. A Debezium connector is configured that when registered with an instance of Kafka Connect will begin streaming changes from the database to Kafka.

Figure 2: Spring Boot demo with CDC

Figure 2: Spring Boot demo with CDC

The steps to run up the demo in Docker containers are described first, before looking at an approach to automate the testing of the flow using TestContainers.

Running The Demo

The demo walks through starting up the containers, configuring Kafka Connect, and sending a request to create an item using the REST API. The MongoDB database write will be observed by the connector which will write a corresponding event to Kafka. The Kafka command line tools are used to consume the event to complete the round trip.

Step 1: Docker Container Start Up

First the Docker containers for Kafka, Zookeeper (which the version of Kafka being used requires), and Kafka Connect. These are described in the Dockerfile.

docker-compose up -d

The MongoDB replica set, required for CDC with Debezium, must be initiated on the MongoDB container:

docker exec -it mongodb mongosh --eval "rs.initiate({_id:'docker-rs', members: [{_id:0, host: 'mongodb'}]})"

Step 2: Register Connector with Kafka Connect

Kafka Connect provides a REST API to manage connectors. The connector is defined in the file debezium-mongodb-source-connector.json in the connector directory. Its contents are shown here:

{
   "name": "debezium-mongodb-source-connector",
   "config": {
       "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
       "mongodb.connection.string": "mongodb://mongodb:27017/?replicaSet=docker-rs",
       "mongodb.connection.mode": "replica_set",
       "topic.prefix": "mongodb",
       "database.include.list": "demo",
       "collection.include.list": "demo.items",
       "capture.mode": "change_streams_update_full",
       "tombstones.on.delete": "false"
   }
}

Along with the connector class which is responsible for processing the MongoDB changes, the connection string is provided enabling the connector to connect to the database. As previously mentioned, this must connect to a replica set for the CDC to work, which in this case has been named docker-rs. (This matches the hardcoded replica set name in the MongoDB Testcontainer, referred to below). The mongodb.connection.mode is set to replica-set.

The topic-prefix specifies the prefix to use for the topic that events are written to. With a topic prefix of mongodb, and a change in the database demo and collection items, the CDC events are written to the topic mongodb.demo.items.

The next configurations detail the databases and collections to monitor. The amount of detail to include in the event for the data change can be altered in the capture.mode configuration. Finally here the tombstones.on.delete configuration tells the connector whether to write a null value for an event to the topic when a record has been deleted. This denotes a logical delete of a message on a Kafka compacted topic.

The complete set of configuration options for this connector are detailed in the Debezium documentation.

The connector is registered with Kafka Connect:

curl -X POST localhost:8083/connectors -H "Content-Type: application/json" -d @./connector/debezium-mongodb-source-connector.json

Step 3: Build and Run the Spring Boot Application The application jar file is built, and then started:

mvn clean install
java -jar target/kafka-connect-debezium-mongodb-1.0.0.jar

Step 4: Start the Kafka Consumer

The Kafka Docker container contains the Kafka command line tools. The consumer is started to listen for the CDC events that are written to the mongodb.demo.items topic:

docker exec -ti kafka kafka-console-consumer --topic mongodb.demo.items --bootstrap-server kafka:29092

Step 5: POST a Request to Create an Item

The request to create an item takes a name field in the request body:

curl -i -X POST localhost:9001/v1/items -H "Content-Type: application/json" -d '{"name": "test-item"}'

A response is returned with the 201 CREATED status code, with the new item id in the Location header:

HTTP/1.1 201
Location: 653d06f08faa89580090466e

The Kafka consumer consumes and displays the CDC event for the persisted record:

{"schema":{"type":"struct","fields": [...] "after":"{\"_id\": \"654cecdc4356b26c4bac68af\",\"name\": \"test-item\",\"_class\": \"demo.domain.Item\"}"

The same steps can be followed to update and delete the item, observing the CDC events being emitted for each. These are detailed in the demo application ReadMe.

Automating Testing

It is important to be able to automate testing this flow end to end, such that it can be tested both locally by the developer and in the CI pipeline. Kafka Connect is a separate component to the application, so while the application itself could be tested with an integration test using @SpringBootTest, the embedded Kafka Broker, and an embedded MongoDB, this would not exercise the flow end to end. The CDC aspect of the flow would not be part of such a test.

The demo application includes a component test EndToEndCT. This automates the above flow, using Testcontainers which is a testing library that is used to bring up resources in Docker containers. This is all orchestrated by Lydtech’s open source testing library Component Test Framework. The test sends requests via REST to the Spring Boot application which writes to MongoDB, resulting in CDC events being streamed to Kafka. The test then listens for these events, consumes them and asserts they are as expected.

Figure 3: Component testing the CDC flow

Figure 3: Component testing the CDC flow

Test Configuration

The pom.xml contains a component profile containing the maven-surefire-plugin definition. This specifies the system properties to override, including MongoDB and Debezium connection properties (for the test itself). Importantly these resources are enabled, ensuring they are spun up in Docker containers:

<mongodb.enabled>true</mongodb.enabled>
<kafka.enabled>true</kafka.enabled>

Conduktor can also be optionally enabled. This provides a view over the broker and topics, so the CDC events written can be viewed.

Test Execution

Build the application and a Docker image containing the application:

mvn clean install
docker build -t ct/kafka-connect-debezium-mongodb:latest .

The component tests can now be run (using the component profile):

mvn test -Pcomponent

The component-test-framework spins up a Docker container for each of Kafka, Zookeeper, Kafka Connect, MongoDB and the Spring Boot application. Once the containers are running and healthy the test execution continues to completion.

Component Test Overview

The test is configured to hook in to the component-test-framework by adding a JUnit extension annotation to the test class:

@ExtendWith(ComponentTestExtension.class)
@ActiveProfiles("component-test")
public class EndToEndCT {

The active profiles annotation results in the application-component-test.yml properties in src/test/resources/ being applied. This includes the connection details to the MongoDB instance that will be brought up using Testcontainers.

The test setup performs a couple of important steps using the component-test-framework provided resource client classes. It creates the test Kafka consumer with the KafkaClient:

consumer = KafkaClient.getInstance().createConsumer(GROUP_ID, "mongodb.demo.items");

It registers the Debezium connector with Kafka Connect with the DebeziumClient:

DebeziumClient.getInstance().createConnector("connector/debezium-mongodb-source-connector.json");

The test itself then uses the application REST API to create, retrieve, update and delete an item. The create, update and delete requests all result in a data change in MongoDB, each of which triggers a CDC event to be written to Kafka. The test consumer consumes each event and asserts it contains the expected data.

Create the item request:

Response createItemResponse = sendCreateItemRequest(createRequest);

Consume events:

List<ConsumerRecord<String, String>> outboundEvents = KafkaClient.getInstance().consumeAndAssert("testChangeDataCapture", consumer, 3, 3);

Assert the create event is as expected:

assertThat(outboundEvents.get(0).key(), containsString(itemId));
assertThat(outboundEvents.get(0).value(), containsString(createRequest.getName()));

If Conduktor is also enabled in the pom.xml, the events resulting from the test can be observed on the CDC mongodb.demo.items topic. Start the test with the param -Dcontainers.stayup to ensure the containers are not stopped at the end of the test run so that Conduktor can be opened and the topic inspected. Log in to the Conduktor UI at http://localhost:8088 with username / password of admin@conduktor.io / admin. Navigate to the Console and view the events on the mongodb.demo.items topic. The screenshot shows the CDC event representing the create operation (as denoted by the "ops": "c" value):

Figure 4: Viewing the CDC events in Conduktor

Figure 4: Viewing the CDC events in Conduktor

Summary

Applying the Kafka Connect Debezium MongoDB connector enables the streaming of events to Kafka for every data change made to the MongoDB database. This means that changes made to the data in one service can be easily broadcast to any other service that is interested. This implementation of an event-driven architecture allows services to communicate with one another while remaining fully decoupled. Kafka Connect itself is also decoupled from the application meaning these concerns remain separate.

Source Code

The source code for the accompanying Spring Boot demo application is available here:

https://github.com/lydtechconsulting/kafka-connect-debezium-mongodb/tree/v1.0.0

More On Kafka Connect...

Kafka Connect: Transactional Outbox With Debezium: Spring Boot Demo: Using Debezium to stream records from Postgres to Kafka using the Transactional Outbox pattern.

Kafka Idempotent Consumer & Transactional Outbox: Combining the Transactional Outbox pattern with the Idempotent Consumer pattern to achieve exactly-once messaging semantics.


View this article on our Medium Publication.