Kafka Consumer Group Rebalance - The Next-Gen Protocol With Kafka 4.0

Lydtech
Kafka Consumer Group Rebalance - The Next-Gen Protocol

Introduction

A new generation of the consumer group protocol has been designed that is due for release in the Kafka 4.0 release, which is likely to land late 2024/early 2025. In this article the changes and improvements that the new protocol brings are covered, along with the problems that it fixes in the previous (current) design.

Consumer Groups

Consumer groups are a core part of Kafka's distributed message processing that facilitate the ability to scale up applications. The consumer instances that are subscribed to any one topic are grouped together, and the partitions from that topic are assigned across those instances. Only the instance assigned to a partition will consume messages from it. By scaling up the number of partitions in a topic along with the number of consumer instances subscribed to that topic the throughput of the application is increased.

Consumer Group Rebalance

A consumer group rebalance refers to the revocation and reassignment of partitions to consumer instances within the group. If a new consumer joins a consumer group, it can be assigned partitions that were assigned to other instances to better spread the load between them. Likewise if a consumer leaves a consumer group (whether it is cleanly stopped or the broker believes it has failed following a time out), then the partitions that were assigned to it can be reassigned.

The Last-Gen Protocol

The last generation protocol is covered in detail in the two part article 'Kafka Consumer Group Rebalance'. The first part provides a detailed breakdown of what consumer groups are, what triggers rebalances, and the configuration options that can be used to alter the impact of rebalances on the application processing.

The second part details the eager and incremental rebalance strategies in the last-gen protocol, along with the static group membership protocol that was introduced as an option to help mitigate some of the impact of these strategies.

The Next-Gen Protocol

Key Goals

The next-gen protocol aims to give more stable and scalable consumer groups. The key goals are that:

  • The complexity moves from the client (consumer) to the server (group coordinator).
  • A consumer should not be impacted by a rebalance if its assignments are not changed.
  • At-least-once delivery is guaranteed at worst, and exactly-once delivery is achievable in a clean rebalance.
  • The rebalance should be incremental and not rely on a global synchronisation barrier (as per the last-gen incremental rebalance mentioned above).
  • Consumers can be upgraded without downtime.

Moving from Client to Server

In order to solve the problems inherent with the last-gen protocol a major redesign was undertaken under the Kafka Improvement Proposal KIP-848. Crucially the rebalance processing has moved from the client to the server. Prior to this the Kafka client library was responsible for managing topic partition assignments within consumer groups and tracking metadata in order to determine when a rebalance should be triggered. Now it is the responsibility of the group coordinator, a component that is part of the Kafka broker on the server side. This change leads to a number of important consequences.

Previously whenever a new client implementation was written, that is the Kafka client code that an application uses to connect to the broker and send and receive messages, the consumer group behaviour had to be implemented too. The consumer was therefore a thick client, requiring complex processing logic to cater for rebalances. This could often be error prone, with edge cases resulting in difficult to find bugs. Every new NodeJS, Go, Python, Rust - the list goes on - Kafka client library written had to implement this. With the next-gen protocol the logic is moved to the server so the responsibility has been removed from the consumer. This makes the consumer greatly simplified, reducing the risk of bugs being introduced, and making it easier to track down and fix issues.

Assignment logic can still run on the client side, allowing Kafka Streams for example to remain independent from the broker.

Performant Rebalance

Similar to the incremental and static group membership rebalance protocols the new protocol ensures that a rebalance does not result in all partitions being revoked and reassigned (“stop the world processing"), as was the case with the initial protocol, eager rebalance. Only those partitions that need to be reassigned in order to spread the load between the consumer instances in the consumer group take part in the rebalance, minimising the overall impact. However the new design achieves this in a far more performant manner, possible now that the responsibility for the rebalance is in the group coordinator on the server. The incremental and static group membership protocols were not able to achieve a similar level of performance as they were constrained by having to be built upon the existing consumer group design.

Note that static membership, introduced in KIP-345, is still supported by the new protocol.

ConsumerGroupHeartbeat API

The next-gen rebalance protocol introduces a new core API, the ConsumerGroupHearbeat API, used by consumers to form a consumer group. It allows the members to propagate the topic partitions that they are subscribed to, their state, and their assignors to the server. The group coordinator uses the API to assign and revoke partitions from its members. The API also provides the ability to perform a liveness check on the consumers, to check whether they are up and healthy.

Rebalance Walk Through

A consumer group rebalance entails working through three stages:

  1. Revoke partitions, ensuring that current offsets are committed cleanly before other consumers begin consuming from them.
  2. Consumers transition to the next epoch, that is the next iteration for the target assignment.
  3. Assign partitions that have been revoked to other members in the group based on the target assignment.

As part of the consumer's heartbeating to the group coordinator using the Heartbeat API, within the request it specifies its current assignment epoch and the partitions it is assigned. The response from the group coordinator tells the consumer what the epoch is for the target assignment (which will have incremented in a rebalance) and which partitions it has been assigned. The consumer therefore processes the messages from this set of assigned partitions (which may have changed), and sends this metadata back to the group coordinator in its next heartbeat confirming its assignments.

For a simple example, a consumer group has a single consumer A which is listening to topic Foo. The topic has three partitions. The assignment in the group coordinator has the current epoch with the current and target assignments set to these three partitions.

Figure 1: Initial consumer group state

Figure 1: Initial consumer group state

A second instance of the application starts and its consumer B (client) notifies the group coordinator (server) that it is joining the consumer group. With an additional consumer available to spread the topic load between, a rebalance is triggered. The coordinator generates a new target assignment, with a new epoch. It begins the reconciliation of its members with this new target assignment. The initial response to consumer B's heartbeat request is the new epoch, but that there are no partitions to consume from at this point.

Figure 2: Consumer B joins the consumer group

Figure 2: Consumer B joins the consumer group

On the next heartbeat by consumer A the coordinator responds with the updated set of partitions it should be consuming from - in this example partition 2 is being revoked (as it will be assigned to consumer B).

Figure 3: Consumer A is notified of its reduced assigned partitions

Figure 3: Consumer A is notified of its reduced assigned partitions

Consumer A completes the revocation of partition 2, completing any inflight processing and committing offsets. On its next heartbeat it acknowledges that the revocation has completed, as it confirms it is only consuming from partitions 0 and 1. The coordinator sees that consumer A now owns the target assignment partitions, so it transitions to the new epoch and responds with this to the consumer.

Figure 4: Consumer A confirms new partition assignment

Figure 4: Consumer A confirms new partition assignment

Partition 2 is now available, so when consumer B next heartbeats the coordinator continues to respond with the new epoch but now includes this partition. Consumer B now fetches the offsets for this partition and starts consuming messages. The rebalance is complete, with no impact on the consuming of messages on partitions 0 and 1.

Figure 5: Consumer group rebalance completes

Figure 5: Consumer group rebalance completes

Summary

The next generation consumer group rebalance protocol is a significant step forward in improving the stability and scalability of consumer groups. It minimises the impact of rebalances to consumer message processing via its incremental rebalance design, and this is achievable in a far more performant manner than the last-gen protocol with the move of the logic from the client to the server. This also results in a simpler and leaner consumer implementation which removes the need to reimplement the rebalance logic within every client library and so reduces the risk of edge case bugs occurring during rebalances.

More On Kafka Consumer Group Rebalance

The original two part series on consumer group rebalances, covering their role and impact, and the original rebalance strategies:

References


View this article on our Medium Publication.