KafkaJS: Integration Testing With Testcontainers

Lydtech
KafkaJS: Integration Testing With Testcontainers

Introduction

KafkaJS is a popular NodeJS Kafka client library used by applications to integrate with the Kafka messaging broker. It provides an API for the developer to use to consume and produce events. To facilitate integration testing the application and proving that events are successfully sent and received from Kafka, the Testcontainers test library can be used. Testcontainers spins up Kafka (and other resources) in docker containers so the application under test is able to connect to these real instances.

This article and the accompanying application demonstrate running a Typescript application that uses KafkaJS to integrate with Kafka, as well as persisting records to a Postgres database using the Prisma ORM framework. It demonstrates how to utilise Testcontainers in the integration test phase to test the application against dockerised Kafka and Postgres instances. The accompanying source code is available here.

KafkaJS

KafkaJS is a Kafka client library for NodeJS, written in Javascript. As well as providing the API for consuming and producing messages, the Kafka client library is a thick client as it pushes the boundaries of responsibility with the broker. This is necessary in order to provide aspects like high availability, message ordering and managing duplicate messages and message loss.

For more detail on KafkaJS, and a comparison with the Java Apache Kafka client library, see the article: Kafka Client: Apache Kafka vs KafkaJS

Integration Testing

Unit tests require mocking resources such as the broker, so this level of testing does not verify that the actual integration and resource API usage is implemented correctly. Integration tests on the other hand are used to prove that the application under test can connect to and integrate with the external resources. If mature in-memory versions of the resources are available then these should be used by the integration tests. However, in the case of Kafka, when using the NodeJS client library KafkaJS there is no in-memory broker available. Therefore a real instance of Kafka should be spun up for the purposes of this level of testing, which Testcontainers facilitates. As with unit tests, these integration tests are run both locally (providing fast feedback on failures) as well as automated to run in the CI pipeline.

Testcontainers

Testcontainers is an open source test framework that is used to spin up an application's external resources in lightweight docker containers. These dependencies typically include databases and messaging brokers. The Testcontainers framework manages the lifecycle of the containers, being responsible for starting them up at the beginning of the test run and tearing them down cleanly at the end.

Demo Application

Overview

A Typescript application has been developed that provides a REST API allowing a client to create and retrieve item records. The items are persisted in a Postgres database, and an outbound event is emitted to the item-created topic. The application also consumes events from an inbound topic, create-item, which similarly results in an item being persisted to the database and an outbound event being emitted.


Figure 1: Demo application

Figure 1: Demo application

Build & Run Demo

The following commands, run from the root of the project, build the docker image containing the demo application, and spin up docker containers for Kafka, Zookeeper, Postgres, and the application itself, based on the docker-compose.yml file. The commands in the Dockerfile, called from the docker-compose.yml file, result in the Prisma ORM schema being generated and applied to the database, and the seed data being created:

docker-compose build --no-cache
docker-compose up -d

With the docker containers running, an item can now be created by sending a POST request to the REST endpoint:

curl -i -X POST localhost:3000/items -H "Content-Type: application/json" -d '{"name": "test-item"}'

The response location header contains the newly created item Id, so the item can be retrieved with the following call:

curl localhost:3000/items/{itemId}

The create item call also results in the application writing an event to the item-created topic. This can be consumed by running the Kafka console consumer command line tool, available in the Kafka docker container:

docker exec -ti kafka kafka-console-consumer \
--topic item-created \
--bootstrap-server localhost:9092 \
--from-beginning

The output shows the created item:

{"id":1,"name":"test-item"}

Alternatively an event can be sent to the create-item topic using the console producer which the application consumes, persisting a new item and emitting an outbound event:

docker exec -ti kafka kafka-console-producer \
--topic create-item \
--broker-list localhost:9092

Now enter the message:

{"name": "test-item"}

The console consumer can again be used to output the resulting item-created event.

Demo Integration Tests

An example integration test is provided: endtoend.integration.test. This runs the above end to end flows, asserting the expected responses are received and outbound events emitted.

Figure 2: Integration testing using Testcontainers

Figure 2: Integration testing using Testcontainers

Step 1Send a POST request to the application to create an item.

Step 2Retrieve the created item. (Also assert the outbound event from the item-created topic).

Step 3Produce an event to the create-item topic.

Step 4Consume the outbound event from the item-created topic.

The tests can either be run in conjunction with the unit tests, or by themselves, via one of the following two commands:

npm run test
npm run test:integration

The integration test target is defined in package.json, and it specifies the config file to use via the flag:

--config jest.config.integration.ts

This jest.config.integration.ts file in turn specifies the global setup file jest.setup.integration.ts which houses the code to start the required docker containers using the Testcontainers API. For example, the Kafka container is configured and started as follows:

let kafkaContainer = new KafkaContainer(process.env.KAFKA_CONTAINER_IMAGE)
       .withName('kafka')
       .withNetworkMode(network.getName())
       .withNetworkAliases('kafka')
       .withEnvironment({
           'KAFKA_BROKER_ID': '1'
       })
       .withExposedPorts(9092, DEFAULT_KAFKA_PORT)
   if(process.env.TESTCONTAINERS_REUSE_ENABLE === 'true') {
       kafkaContainer = kafkaContainer.withReuse()
   }
   const startedKafkaContainer = await kafkaContainer.start()

The Postgres container is likewise started too. The connection URLs are set on the environment, which the tests and application use for connecting to these resources.

With the setup complete, the tests now execute. The Typescript application is started, using the supertest library to start up an HTTP server. The tests then send requests to the application REST endpoints, and assert on the responses.

const createItemResponse = await request('http://localhost:3000').post("/items")
           .send({name: itemName})
           .set('Content-type', 'application/json')
expect(createItemResponse.statusCode).toBe(201)
expect(createItemResponse.header.location).toBeDefined()

The tests also configure a test producer to send events to Kafka that the application consumes, and a test consumer to consume the outbound events emitted to Kafka by the application.

// Produce
await producer.send({
    topic: 'create-item',
    messages: [{value: JSON.stringify(payload)}]
})

// Consume
await consumer.run({
    eachMessage: async ({ message }) => {
        const parsedMessage = JSON.parse(message.value?.toString() || '{}')
        if(parsedMessage['name'] === itemName) {
            consumedMessages.push(parsedMessage);
        }
    }
})

// Assert
await waitForExpect( async () => {
	expect(consumedMessages.length).toEqual(1)
})

At the completion of the test run the application is stopped, and by default Testcontainers, which itself is also running in a docker container, cleans up the running containers. It is possible to override this Testcontainers behaviour to instead leave the containers up at the end of the test run, and reuse them for the next run. To achieve this, set the following flag in the .env file in the root of the project to true:

TESTCONTAINERS_REUSE_ENABLE=true

This allows the tester to develop and run their tests without having to stop and restart the containers for each test run, making test development quicker. To facilitate this, and for good practice, it is important to remember that tests should be repeatable. Running the same test twice against the same database and broker should not result in it failing the second time (due to a data conflict or duplicate ID clash for example).

Summary

Integration testing provides the ability to prove that the application being developed is able to connect to and integrate with external resources such as databases and message brokers. The Testcontainers library facilitates this kind of testing by spinning up these resources in docker containers at the outset of the test run. The tests can then be run locally to provide fast feedback to the developer as they implement features on the application. It also allows them to be automated in the CI pipeline ensuring that as other changes are merged into the codebase that the end to end flows are not broken.

Source Code

The source code for the accompanying Typescript demo application is available here:

https://github.com/lydtechconsulting/kafkajs-consume-produce/tree/v1.0.0


View this article on our Medium Publication.