ノート:
- このチュートリアルでは、Oracle Cloudへのアクセスが必要です。無料アカウントにサインアップするには、Oracle Cloud Infrastructure Free Tierの開始を参照してください。
- Oracle Cloud Infrastructureの資格証明、テナンシおよびコンパートメントに例の値を使用します。演習を終える際は、これらの値をクラウド環境に固有の値に置き換えてください。
ダウンタイムなしでKafkaおよびZookeeperクラスタを移行
イントロダクション
多くのサービス(クラウドネイティブまたはオンプレミス)は、適切にスケーリングするメッセージング・テクノロジであるため、非同期メッセージング・ニーズにKafkaクラスタを使用します。ビジネス・ニーズのために、一連のノードまたはクラスタ全体を置き換える必要がある場合があります。このチュートリアルでは、KafkaおよびZookeeperクラスタを、停止時間なしで新しいノード・セットに移行する方法について説明します。
問題の内容
クラスタ内のノードを別のノードに置き換えることが困難なのはなぜですか。ロード・バランサの背後にある一連のノードに対してノードを追加または削除することは、ユビキタスです。新しいノードを追加し、削除する既存のノードをドレインします。進行中のすべての接続が終了したら、停止し、ロード・バランサ構成から削除します。
本当の課題は、アプリケーションによる状態管理です。通常、アプリケーションはステートレスであり、状態永続性をデータ管理システム(リレーショナル・データベース管理システム(RDBMS)など)に委任します。このようなアプリケーションとは対照的に、KafkaとZookeeperの両方がローカルで状態を管理します。つまり、状態永続性はローカル・ディスクを使用して行われます。つまり、既存のノードのローカル状態を処理せずに、単にノードを別のノードに置き換えることはできません。
また、Kafkaの場合、単にノードを追加しても、ワークロードの再バランシング(ブローカ間でのトピック・パーティションの再分散)は自動的に開始されません。新しいノードは、この時点以降に作成された新しいトピックのワークロード共有にのみ参加します。
定足数ベースのシステム(この場合は Zookeeper)の動作上の問題の1つは、クラスタサイズを事前に把握する必要があることです。つまり、各ノードはクラスタ内のほかのすべてのノードを認識している必要があります。さらに複雑にするために、複数のノード(正確な数は既存のクラスタ・サイズによって異なります)を1回で追加する場合は、新しく追加されたノードの1つがリーダーにならないようにする必要があります。そうしないと、データが失われます。
目的
- KafkaおよびZookeeperクラスタをゼロ・ダウンタイムで別のノード・セットに移行します。
前提条件
- KafkaおよびZookeeperアーキテクチャの理解。
Kafkaクラスタの移行
Kafkaクラスタのノードの置換は比較的簡単です。新しいブローカを追加し、トピック・パーティションの移行を呼び出します。移行が完了したら、古いブローカを廃止できます。Kafkaには、パーティション移行を実行するための管理APIが用意されています。alterPartitionReassignments
は、トピックにパーティション・レプリカを追加し、ISRのメンバーになるまで待機してから、パーティション・リーダーをスワップすることで機能します。
古いブローカへの新しいトピック・パーティションの割当てを無効にする必要があります(アプリケーション・ロジックが実行時にトピックを作成する場合)。そうしないと、これは無限の移行ループになります。これを実現するために、トピック作成APIのバリアントを使用して、パーティション割当てを自分で指定できます。次に、従うことができるアルゴリズムの例を示します。
-
作成するトピックと、パーティションまたはレプリカを割り当てないブローカIDのセットがある場合: トピック・パーティションとそのレプリカは、ラウンドロビン方式で残りのブローカに割り当てられます。ブローカIDは、ADおよびFDで順次作成されるものとします。たとえば、3つのAD(AD-1、AD-2、AD-3)があり、各ADに2つのFD(FD-1、FD-2)があり、ID(1-6)を持つブローカが6つあるとすると、次の順序で配置されます。
broker ID 1 -> AD-1, FD-1 broker ID 2 -> AD-2, FD-1 broker ID 3 -> AD-3, FD-1 broker ID 4 -> AD-1, FD-2 broker ID 5 -> AD-2, FD-2 broker ID 6 -> AD-3, FD-2
-
これは、パーティションとそのレプリカを異なる障害ドメインに配置するのに役立ちます。このアルゴリズムは、ブローカIDをソートし、配置のためにランダムに選択されたIDで順番に開始します。たとえば、3つのパーティションおよびレプリケーション係数3のトピックで、ランダムに選択されたブローカIDが3の場合、パーティションまたはレプリカの配置順序は次のようになります。
partition-0, replica-0 -> 3 partition-0, replica-1 -> 4 partition-0, replica-2 -> 5 partition-1, replica-0 -> 4 partition-1, replica-1 -> 5 partition-1, replica-2 -> 6 partition-2, replica-0 -> 5 partition-2, replica-1 -> 6 partition-2, replica-2 -> 1
経理で従うことができる移行の高度なステップ(移行プロセスが途中で停止または失敗した場合に再起動する場合)。
-
新しいブローカIDへの古いマッピングが指定され、完了していることをアサートします。
-
AdminClient
を使用して、新しいブローカが使用可能かどうかを確認します。これらすべてを使用できない場合は、エラーを発生させて終了します。 -
AdminClient
を使用して、トピック<kafka_topic_migration>
がクラスタに存在するかどうかを確認します。- トピック
<kafka_topic_migration>
が存在しない場合は、新しいブローカにレプリカを手動で割り当てて作成します。このトピックは、すでに再割当された手段が移行されたトピックの簿記に使用されます。
- トピック
-
コンシューマを使用して、最初からトピック
<kafka_topic_migration>
を読み取ります。各メッセージは、新しいブローカに再割当てされたトピック名です。 -
AdminClient
を使用して、クラスタで使用可能なすべてのトピックをリストします。このリストから、トピック<kafka_topic_migration>
を削除します。 -
前述の2つのステップから、新しいブローカに再割当てするトピックを見つけます。
-
各トピックについて:
-
マップ
Map<TopicPartition,Optional<NewPartitionReassignment>> reassignment = new HashMap<>();
を作成します。 -
AdminClient
を使用して、パーティションを検索するためのトピックを説明します。 -
パーティションごと:
-
古い各レプリカID (古いブローカID)を対応する新しいブローカIDに置き換えて、
NewPartitionReassignment
オブジェクトを準備します。ブローカIDマッピングにレプリカIDに対応するキーが含まれていない場合は、警告をログに記録し、現在のIDを使用します(このトピックがすでに移行されており、簿記を逃している可能性があります)。 -
このエントリをマップに追加します。
-
-
AdminClient
を使用して、alterPartitionReassignments
メソッドをコールして再割当てを開始します。 -
ループを実行して、再割当ての実行時にリストし(
listPartitionReassignments
)、そのサイズがゼロであることを確認することで、この再割当てが完了しているかどうかを確認します。各ループ間でN秒のスリープを実行します。 -
各パーティションの新しいレプリカが目的のレプリカと一致することを検証します。
-
このトピック名をメッセージとして
<kafka_topic_migration>
トピックにプッシュします。
-
-
再割当および終了のトピックがこれ以上ない場合は、ステップ4から6を繰り返します。
移行が完了したら、古いブローカを廃止し、トピック作成のカスタム・ロジックを無効にできます。
ノート:古いブローカを廃止する前に、このクラスタのすべてのクライアントが
bootstrap.servers
構成を新しいブローカで更新していることを確認してください。
Zookeeperクラスタの移行
定足数ベースのシステム(操作をコミットするには、少なくとも定足数の投票が必要です)であり、リーダーの再選により書き込み操作が停止され、Zookeeperの移行が最も困難です。ノードを交換するには、いくつかの方法があります。1つの簡単な方法は、クラスタ全体のローリング再起動を複数ラウンド実行し、各ラウンドに新しいノードを追加することです。このアプローチは、リーダー・ノードが複数ラウンド再起動され、移行全体の実行が遅延するため、許容できない場合があります。
ノート:すべてのデータを同期しない(データ損失の原因となる)新しいノードがリーダーになるリスクがあるため、すべての新規ノードを(参加者として)1回に追加することはできません。
参加者と同じ初期構成で複数の参加サーバーを指定しないでください。現在、結合サーバーは、既存のアンサンブルに参加していることを認識していません。複数のジョイナが参加者としてリストされている場合、独立した定足数を形成して、メイン・アンサンブルとは独立して処理操作などのスプリットブレイン状況が発生する可能性があります。初期構成で複数のジョイナをオブザーバとしてリストすることは問題ありません。
3.5.0
を超えるZookeeperバージョンでは、Zookeeperアンサンブルの動的再構成がサポートされています。動的再構成が機能するには、reconfigEnabled
フラグをtrue
に設定する必要があります。詳細は、「ZooKeeperアンサンブルの動的再構成」を参照してください。
ロールObserver
でアンサンブルする新しいノードを追加でき、最後にコミットされた構成のサーバーで構成されるジョイナの初期構成を使用できます。たとえば、サーバー4と5が(1、2、3)に同時に追加された場合、4の初期構成は(1、2、3、4、5)になります。ここで、4と5はオブザーバとしてリストされます。同様に、5の構成は(1、2、3、4、5)となり、4および5はオブザーバとしてリストされます。
ノート:オブザーバとしてジョイナをリストしても、実際にはオブザーバは作成されません。他のジョイナとの定足数が誤って形成されないようにするだけです。かわりに、現在の構成のサーバーに接続し、最後にコミットされた構成(1、2、3)を採用します。ここで、ジョイナはありません。現在のリーダーに接続すると、参加者は、システムが再構成され、必要に応じて(参加者またはオブザーバとして)アンサンブルに追加されるまで、非投票フォロワになります。
すべてのノードが起動して実行されると、reconfig APIが呼び出され、新しいノードが参加者として動的に追加され、古いノードがアンサンブルから削除されます。たとえば、CLIを使用します。
reconfig -remove 1,2,3 -add
server.5=hostA:2111:2112;2113,6=hostB:2111:2112:participant;2113,7=
hostC:2111:2112:participant;2113*
このステップの後、古いノードを廃止できます。
ノート:
古いノードを廃止する前に、このZookeeperクラスタのすべてのクライアントが新しいノードで接続文字列を更新していることを確認してください。
Zookeeperを最初に移行する必要があります(使用されているKafkaバージョンはZookeeperに依存するとします)。新しいKafkaブローカ構成に新しいZookeeperノードを追加することで、古いZookeeperノードを事前に削除する準備が整います。
Zookeeperの新規ノードの
myID
は単調に増加する必要があります。そうしないと、ノードの起動に失敗します。新しく追加したZookeeperノードは、
reconfig
がコールされるまで再起動しないでください。そうしないと、動的構成ファイルで自身の構成が欠落しているため起動しません。ノードの初期構成は、最後にコミットされた構成を持つ動的構成でオーバーライドされます。エコシステムに統合するオープンソース・コンポーネントを選択する際には、その管理性と運用面を考慮する必要があります。
トピック・パーティションの移行中に、ネットワーク経由で転送されるデータの量を認識する必要があります。
関連リンク
承認
- 作成者 - Shailesh Mishra
その他の学習リソース
docs.oracle.com/learnの他のラボをご覧いただくか、Oracle Learning YouTubeチャネルで無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスして、Oracle Learning Explorerになります。
製品ドキュメントは、Oracle Help Centerを参照してください。
Migrate Kafka and Zookeeper Cluster in Zero Downtime
F96321-01
April 2024