Note:

Migrate Kafka and Zookeeper Cluster in Zero Downtime

Introduction

Many services (cloud-native or on-premises) use Kafka clusters for their asynchronous messaging needs as it is a messaging technology which scales well. There can be situations when you have to replace a set of nodes or the whole cluster due to business needs. This tutorial talks about migrating Kafka and Zookeeper cluster to a new set of nodes without any downtime.

The Problem Statement

Why replacing a node in a cluster with another could be a challenge? Adding or removing a node from a set of nodes behind a load-balancer is ubiquitous: add a new node, drain the existing node to be removed, once all in-flight connections are terminated, shut it down, remove it from load-balancer config and so on.

The real challenge lies with state management with an application. Usually, applications are stateless and delegate state persistence to a data management system, for example, a relational database management system (RDBMS). As opposed to such applications, both Kafka and Zookeeper manage state locally, i.e. state persistence is done using local disks. This means, we cannot simply replace a node with another without taking care of the local state of existing node.

Also, in case of Kafka, simply adding a node, does not automatically start any re-balancing of workload, i.e, re-distribution of topic partitions across brokers. New node only takes part in workload sharing for new topics created from this point onward.

One operational issue with quorum-based systems (Zookeeper in this case) is that they need to know the cluster size upfront which means each node needs to have knowledge of all other nodes in the cluster. And, to complicate things further, if you are adding more than one node (exact number depends on the existing cluster size) in one go, you must ensure that one of the newly added nodes does not become the leader else we lose the data.

Objectives

Prerequisites

Migrate Kafka Cluster

Replacing nodes in Kafka cluster is relatively easy. Add new brokers and invoke topic partitions migration and once, migration is complete, we can decommission the old broker. Kafka provides admin API for performing the partition migration. The alterPartitionReassignments works by adding additional partition replicas to a topic, waits for them to become a member of ISR and then swaps the partition leader.

You have to disable new topic partition assignments to old brokers (if application logic creates topics at runtime) else this would become an endless loop of migration. To achieve this, one can use a variant of the topic creation API where we can specify the partition assignment ourselves. Sample algorithm that can be followed is listed below.

High level steps for migration that can be followed with bookkeeping (for restart in case of migration process is stopped or failed in the middle).

  1. Assert old to new broker ID mapping is given and complete.

  2. Use AdminClient to check if new brokers are available. If not all of them are available, raise error and terminate.

  3. Use AdminClient to check if topic <kafka_topic_migration> exists in the cluster.

    • If topic <kafka_topic_migration> does not exist, create it by manually assigning replicas on new brokers. This topic is used for bookkeeping of topics that are already reassigned means migrated.
  4. Read topic <kafka_topic_migration> from the beginning using a consumer. Each message is a topic name that has been reassigned to new brokers.

  5. Use AdminClient to list all available topics in the cluster. From this list, remove the topic <kafka_topic_migration>.

  6. From the above two steps, find the topics to be reassigned to new brokers.

  7. For each topic:

    1. Create a map Map<TopicPartition,Optional<NewPartitionReassignment>> reassignment = new HashMap<>();.

    2. Using AdminClient, describe the topic to find its partitions.

    3. For each partition:

      1. Prepare a NewPartitionReassignment object by replacing each old replica ID (old broker ID) with corresponding new broker ID. If broker ID mapping does not contain the key corresponding to replica ID, log a warning and use current ID (most likely reason for this would be the case that this topic has been already migrated and we missed bookkeeping).

      2. Add this entry to map.

    4. Use AdminClient to initiate the reassignment by calling alterPartitionReassignments method.

    5. Run a loop to check if this reassignment is complete by listing on going reassignments (listPartitionReassignments) and checking its size is zero. Do sleep of N seconds in between each loop.

    6. Validate that new replicas of each partition matches the desired replicas.

    7. Push this topic name as message to the <kafka_topic_migration> topic.

  8. Repeat step 4 to 6, if there are no more topics left for reassignment and for terminate.

Once, migration is complete, we can decommission the old brokers and disable the custom logic of topic creation.

Note: Before decommissioning the old brokers, ensure that all clients for this cluster have updated the bootstrap.servers config with new broker(s).

Migrate Zookeeper Cluster

Being a quorum-based system (to commit any operation, we need at-least Quorum of votes) and given leader re-election causes write operations to be halted, Zookeeper migration is the hardest part. There are a couple of approaches for replacing nodes. One simple approach is to perform multiple rounds of rolling restart of the whole cluster and adding a new node in each round. This approach may not be acceptable due to multiple rounds of restart of leader node and delay in performing the overall migration.

Note: We cannot add all new nodes (as participants) in one go due to the risk of a new node becoming a leader without syncing all the data (causing data loss).

Never specify more than one joining server in the same initial configuration as participants. Currently, the joining servers do not know that they are joining an existing ensemble, if multiple joiners are listed as participants they may form an independent quorum creating a split-brain situation such as processing operations independently from your main ensemble. It is ok to list multiple joiners as observers in an initial config.

In Zookeeper version above 3.5.0, we have support for dynamic reconfiguration of Zookeeper ensemble. The reconfigEnabled flag should be set to true for dynamic re-configuration to work. For more information, see Dynamic Reconfiguration of the ZooKeeper Ensemble.

You can add new nodes to ensemble with role Observer with initial configuration of joiners comprised of servers in the last committed configuration. For example, if servers 4 and 5 are added at the same time to (1, 2, 3), the initial configuration of 4 will be (1, 2, 3, 4, 5), where 4 and 5 are listed as observers. Similarly, the configuration of 5 will be (1, 2, 3, 4, 5), where 4 and 5 are listed as observers.

Note: Listing the joiners as observers will not actually make them observers - it will only prevent them from accidentally forming a quorum with other joiners. Instead, they will contact the servers in the current configuration and adopt the last committed configuration (1, 2, 3), where the joiners are absent. After connecting to the current leader, joiners become non-voting followers until the system is reconfigured and they are added to the ensemble (as participant or observer, as desired).

Once all nodes are up and running, reconfig API is called to dynamically add new nodes as participants and remove old nodes from ensemble. For example, using CLI.

reconfig -remove 1,2,3 -add
server.5=hostA:2111:2112;2113,6=hostB:2111:2112:participant;2113,7=
hostC:2111:2112:participant;2113*

After this step, old nodes can be decommissioned.

Note:

Acknowledgments

More Learning Resources

Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.

For product documentation, visit Oracle Help Center.