Note:
- This tutorial requires access to Oracle Cloud. To sign up for a free account, see Get started with Oracle Cloud Infrastructure Free Tier.
- It uses example values for Oracle Cloud Infrastructure credentials, tenancy, and compartments. When completing your lab, substitute these values with ones specific to your cloud environment.
Migrate Kafka and Zookeeper Cluster in Zero Downtime
Introduction
Many services (cloud-native or on-premises) use Kafka clusters for their asynchronous messaging needs as it is a messaging technology which scales well. There can be situations when you have to replace a set of nodes or the whole cluster due to business needs. This tutorial talks about migrating Kafka and Zookeeper cluster to a new set of nodes without any downtime.
The Problem Statement
Why replacing a node in a cluster with another could be a challenge? Adding or removing a node from a set of nodes behind a load-balancer is ubiquitous: add a new node, drain the existing node to be removed, once all in-flight connections are terminated, shut it down, remove it from load-balancer config and so on.
The real challenge lies with state management with an application. Usually, applications are stateless and delegate state persistence to a data management system, for example, a relational database management system (RDBMS). As opposed to such applications, both Kafka and Zookeeper manage state locally, i.e. state persistence is done using local disks. This means, we cannot simply replace a node with another without taking care of the local state of existing node.
Also, in case of Kafka, simply adding a node, does not automatically start any re-balancing of workload, i.e, re-distribution of topic partitions across brokers. New node only takes part in workload sharing for new topics created from this point onward.
One operational issue with quorum-based systems (Zookeeper in this case) is that they need to know the cluster size upfront which means each node needs to have knowledge of all other nodes in the cluster. And, to complicate things further, if you are adding more than one node (exact number depends on the existing cluster size) in one go, you must ensure that one of the newly added nodes does not become the leader else we lose the data.
Objectives
- Migrate Kafka and Zookeeper cluster in zero downtime to another set of nodes.
Prerequisites
- Understanding of Kafka and Zookeeper architecture.
Migrate Kafka Cluster
Replacing nodes in Kafka cluster is relatively easy. Add new brokers and invoke topic partitions migration and once, migration is complete, we can decommission the old broker. Kafka provides admin API for performing the partition migration. The alterPartitionReassignments
works by adding additional partition replicas to a topic, waits for them to become a member of ISR and then swaps the partition leader.
You have to disable new topic partition assignments to old brokers (if application logic creates topics at runtime) else this would become an endless loop of migration. To achieve this, one can use a variant of the topic creation API where we can specify the partition assignment ourselves. Sample algorithm that can be followed is listed below.
-
Given a topic to be created and set of broker IDs which should not be assigned any partition or replica: topic partitions and their replicas are assigned to remaining brokers in a round-robin fashion. We assume that broker IDs are created in a sequential fashion in ADs and then in FDs. For example, assuming there are 3 ADs (AD-1,AD-2,AD-3) and each AD has 2 FDs (FD-1,FD-2) and there are 6 broker with IDs (1-6), then they are placed in following order.
broker ID 1 -> AD-1, FD-1 broker ID 2 -> AD-2, FD-1 broker ID 3 -> AD-3, FD-1 broker ID 4 -> AD-1, FD-2 broker ID 5 -> AD-2, FD-2 broker ID 6 -> AD-3, FD-2
-
This helps in placing a partition, and its replicas in different failure domains. The algorithm is as follows: we sort the broker IDs and start with a randomly selected ID for placement in a sequential fashion. For example, in case of topic with 3 partitions and replication factor 3, and randomly selected broker ID is 3, following will be the placement order for partitions or replicas.
partition-0, replica-0 -> 3 partition-0, replica-1 -> 4 partition-0, replica-2 -> 5 partition-1, replica-0 -> 4 partition-1, replica-1 -> 5 partition-1, replica-2 -> 6 partition-2, replica-0 -> 5 partition-2, replica-1 -> 6 partition-2, replica-2 -> 1
High level steps for migration that can be followed with bookkeeping (for restart in case of migration process is stopped or failed in the middle).
-
Assert old to new broker ID mapping is given and complete.
-
Use
AdminClient
to check if new brokers are available. If not all of them are available, raise error and terminate. -
Use
AdminClient
to check if topic<kafka_topic_migration>
exists in the cluster.- If topic
<kafka_topic_migration>
does not exist, create it by manually assigning replicas on new brokers. This topic is used for bookkeeping of topics that are already reassigned means migrated.
- If topic
-
Read topic
<kafka_topic_migration>
from the beginning using a consumer. Each message is a topic name that has been reassigned to new brokers. -
Use
AdminClient
to list all available topics in the cluster. From this list, remove the topic<kafka_topic_migration>
. -
From the above two steps, find the topics to be reassigned to new brokers.
-
For each topic:
-
Create a map
Map<TopicPartition,Optional<NewPartitionReassignment>> reassignment = new HashMap<>();
. -
Using
AdminClient
, describe the topic to find its partitions. -
For each partition:
-
Prepare a
NewPartitionReassignment
object by replacing each old replica ID (old broker ID) with corresponding new broker ID. If broker ID mapping does not contain the key corresponding to replica ID, log a warning and use current ID (most likely reason for this would be the case that this topic has been already migrated and we missed bookkeeping). -
Add this entry to map.
-
-
Use
AdminClient
to initiate the reassignment by callingalterPartitionReassignments
method. -
Run a loop to check if this reassignment is complete by listing on going reassignments (
listPartitionReassignments
) and checking its size is zero. Do sleep of N seconds in between each loop. -
Validate that new replicas of each partition matches the desired replicas.
-
Push this topic name as message to the
<kafka_topic_migration>
topic.
-
-
Repeat step 4 to 6, if there are no more topics left for reassignment and for terminate.
Once, migration is complete, we can decommission the old brokers and disable the custom logic of topic creation.
Note: Before decommissioning the old brokers, ensure that all clients for this cluster have updated the
bootstrap.servers
config with new broker(s).
Migrate Zookeeper Cluster
Being a quorum-based system (to commit any operation, we need at-least Quorum of votes) and given leader re-election causes write operations to be halted, Zookeeper migration is the hardest part. There are a couple of approaches for replacing nodes. One simple approach is to perform multiple rounds of rolling restart of the whole cluster and adding a new node in each round. This approach may not be acceptable due to multiple rounds of restart of leader node and delay in performing the overall migration.
Note: We cannot add all new nodes (as participants) in one go due to the risk of a new node becoming a leader without syncing all the data (causing data loss).
Never specify more than one joining server in the same initial configuration as participants. Currently, the joining servers do not know that they are joining an existing ensemble, if multiple joiners are listed as participants they may form an independent quorum creating a split-brain situation such as processing operations independently from your main ensemble. It is ok to list multiple joiners as observers in an initial config.
In Zookeeper version above 3.5.0
, we have support for dynamic reconfiguration of Zookeeper ensemble. The reconfigEnabled
flag should be set to true
for dynamic re-configuration to work. For more information, see Dynamic Reconfiguration of the ZooKeeper Ensemble.
You can add new nodes to ensemble with role Observer
with initial configuration of joiners comprised of servers in the last committed configuration. For example, if servers 4 and 5 are added at the same time to (1, 2, 3), the initial configuration of 4 will be (1, 2, 3, 4, 5), where 4 and 5 are listed as observers. Similarly, the configuration of 5 will be (1, 2, 3, 4, 5), where 4 and 5 are listed as observers.
Note: Listing the joiners as observers will not actually make them observers - it will only prevent them from accidentally forming a quorum with other joiners. Instead, they will contact the servers in the current configuration and adopt the last committed configuration (1, 2, 3), where the joiners are absent. After connecting to the current leader, joiners become non-voting followers until the system is reconfigured and they are added to the ensemble (as participant or observer, as desired).
Once all nodes are up and running, reconfig API is called to dynamically add new nodes as participants and remove old nodes from ensemble. For example, using CLI.
reconfig -remove 1,2,3 -add
server.5=hostA:2111:2112;2113,6=hostB:2111:2112:participant;2113,7=
hostC:2111:2112:participant;2113*
After this step, old nodes can be decommissioned.
Note:
Before decommissioning the old nodes, ensure that all clients for this Zookeeper cluster have updated connection string with new nodes.
Zookeeper should be migrated first (assuming Kafka version being used depends on Zookeeper). By adding new Zookeeper nodes in new Kafka brokers configuration, we will be prepared for removal of old Zookeeper nodes upfront.
Zookeeper new node’s
myID
should be monotonically increasing else node would fail to start.Do not restart the newly added Zookeeper node till
reconfig
has been called else it will not start due to missing config for itself in dynamic config file. Node’s initial config is overridden with dynamic config having last committed config.While selecting an open-source component to be integrated into your ecosystem, one must consider manageability and operational aspects of it.
While migrating topic partitions, one must be aware of amount of data that would be transferred over the network.
Related Links
Acknowledgments
- Author - Shailesh Mishra
More Learning Resources
Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.
For product documentation, visit Oracle Help Center.
Migrate Kafka and Zookeeper Cluster in Zero Downtime
F96278-01
April 2024