Apache Kafkaの使用

Apache Kafkaは、分散ストリーミング、パイプライン化、データフィードのリプレイのためのリアルタイムストリーミングデータを処理する目的で構築されたオープンソースの分散パブリッシュ/サブスクライブメッセージングプラットフォームで、高速でスケーラブルな操作を実現します。Kafkaは、データのストリームをサーバーのクラスタ内のレコードとして維持することで動作するブローカベースのソリューションです。

ビッグ・データ・サービス・クラスタでは、Kafkaは次の方法で使用できます。

  1. Kafkaプロファイル・クラスタを作成します:
    1. Create a cluster.
    2. 「クラスタ・プロファイル」フィールドで、「Kafka」を選択します。
  2. Hadoop_Extendedプロファイル・クラスタを作成し、クラスタにKafkaを追加します:
    1. Create a cluster.
    2. 「クラスタ・プロファイル」フィールドで、Hadoop_Extendedを選択します。
    3. クラスタにKafkaを追加します

Kafka構成プロパティ

ビッグ・データ・サービス3.1.1以降に含まれるKafka構成プロパティ。

設定 プロパティ 説明
kafka-env kafka_opts Kafkaオプション
kerberos_param Kafka Kerberosパラメータ
kafka_jmx_opts Kafka JMXオプション
kafka_classpath Kafkaクラスパス

Apache Kafkaのベスト・プラクティス

ハードウェア要件

Apache Kafkaは、通常機能するため、特に一部の構成チューニングでは少量のリソースが必要です。デフォルトでは、Kafkaは1コアおよび1 GBのメモリーで実行でき、データ保持の要件に基づいてストレージがスケーリングされます。

KafkaはI/Oが多いため、CPUがボトルネックになることはほとんどありません。ただし、十分なスレッドがある中程度のサイズのCPUは、同時接続およびバックグラウンド・タスクを処理するために重要です。

  • Kafkaブローカ・ノード: 8コア、64 GBから128 GBのRAM、2つ以上の2 TBディスク(標準2.8以上、DenseIO以上または同等)
  • 少なくとも3つのKafkaブローカ・ノード
  • ハードウェアプロファイル: より多くのRAMと高速ディスクが優れている
  • Kafkaブローカをワーカー・ノードにインストールします(サイズが水平に増加する可能性があるため)。

推奨されるKafkaクラスタ・トポロジ

  1. ワーカー・ノードからノード・マネージャを削除
  2. Kafkaブローカではレプリケーション/HAに少なくとも3つのノードが必要であるため、Kafkaに追加のワーカー・ノードをプロビジョニングすることを検討できます。
  3. 追加のHDFSワーカー・ノードをプロビジョニングし、データ・ノードとノード・マネージャの両方をデコミッションします。
    ノート

    現在のワーカー・ノードは、Kafkaブローカ・ノード用に再利用されるHDFSワーカー・ノードの後にモデル化されます。したがって、Kafkaブローカ・ノードをHDFSデータ・ノードとともに実行すると、HDFSは有効なストレージを失います。

Kafkaの設定時にチューニングする一般的なパラメータは次のとおりです。

チューニングする機能 調整するパラメータ
メッセージの保存 ディスク・サイズ
クライアント・スループット(プロデューサおよびコンシューマ) ネットワーク容量
プロデューサのスループット ディスクI/O
コンシューマ・スループット メモリー

これらのパラメータはケースによって異なり、パフォーマンスを向上させるために慎重に設定する必要があります。すべてのユースケースに適合する単一の設定はありません。

ZooKeeper

ZooKeeperは、分散調整サービスとして機能するKafkaクラスタの重要なコンポーネントです。ZooKeeperは、クラスタのメタデータの監視と保存、多数のノードの操作の調整、およびKafkaクラスタの一般的な安定性と一貫性の保証を担当します。

ビッグ・データ・サービスのHAクラスタには、3つのZookeeperホストが含まれます。ただし、大規模な本番ユース・ケースでは、Zookeeperホストはビッグ・データ・サービス・クラスタで他のサービス間で共有されるため、水平方向にスケーリングすることをお薦めします。

パフォーマンスの考慮事項

Kafkaは、すぐに最適化されます。ただし、クラスタのパフォーマンスを向上させるには、いくつかのチューニングが必要です。2つの主なメトリックについて考えてみます。

  • スループット: 特定の時間内に到着したメッセージの数。
  • 待機時間: 各メッセージの処理にかかる時間。

チューニング・ブローカ

トピック内のパーティションの数を制御します。パーティション数およびクラスタ内のブローカ数を増やすと、メッセージ消費の並列性が向上し、Kafkaクラスタのスループットが向上します。ただし、レプリカ・セット間でデータをレプリケートするために必要な時間も長くなります。

チューニング・プロデューサ

プロデューサは、同期と非同期の2つの異なるモードで実行できます。同期モードでは、メッセージがパブリッシュされるとすぐに、プロデューサはブローカにリクエストを送信します。したがって、1秒間に100メッセージを生成する場合、プロデューサはブローカに1秒間に100リクエストを送信します。これによりスループットが低下し、ブロック操作として機能します。そのため、大量のメッセージをパブリッシュする場合は、プロデューサを非同期モードで実行することをお薦めします。

非同期モードでは、パフォーマンスを最大にするために2つのパラメータをチューニングする必要があります: batch.sizeおよびlinger.ms (リンガ時間)。バッチ・サイズは、1つのバッチで送信されるデータのサイズで、バイト単位で測定されます。たとえば、バッチ・サイズを100に設定すると、プロデューサはブローカをコールする前に、メッセージの合計が最大100バイトになるまで待機します。メッセージ生成が低く、バッチ・サイズが高い場合、プロデューサは長時間待機してから最終的にメッセージを生成します。これにより、スループットが低下し、メッセージ配信の待機時間が長くなります。したがって、生成されるメッセージの数に応じて、この値を最適化する必要があります。デフォルトのバッチ・サイズは16,384です。

遅延時間は、プロデューサがブローカにリクエストを送信することを決定したときに基づく別のメトリックです。前述の例を使用して、バッチ・サイズが100バイトに設定され、1秒当たり50バイトしか生成しない場合、プロデューサは2秒待ってからこれらのメッセージを公開する必要があります。この遅延を回避するために、メッセージの送信前にプロデューサが長時間待機しないように、遅延時間(ミリ秒単位で測定)をチューニングできます。この例では、遅延時間を500ミリ秒に設定すると、プロデューサは最大で半秒待機します。

圧縮によってレイテンシも改善できます。デフォルトでは、Kafkaメッセージは圧縮されませんが、それらを圧縮するようにプロデューサを構成できます。その後、ブローカーとコンシューマはメッセージの解凍のオーバーヘッドを増やしますが、ネットワーク経由で転送されるデータの物理サイズが小さいため、全体的な待機時間を短縮する必要があります。

コンシューマのチューニング

コンシューマは、プロデューサがバッチで公開する方法と同様に、メッセージをバッチで受信します。多数のメッセージをプルし、それぞれの処理に多くの時間がかかる場合、スループットが低下します。同様に、ブローカを毎回1つのメッセージに対してポーリングする場合、ブローカへのリクエストの数がスループットを低下させる可能性があります。

コンシューマ・グループ内にパーティションとコンシューマが増えると、スループットの向上に役立ちます。ただし、コンシューマの数が増加すると、ブローカへのオフセット・コミット・リクエストの数も増加することに注意してください。オフセットをコミットすると、内部トピックにKafkaメッセージが送信されるため、ブローカへの負荷が間接的に増加します。そのため、最適な数の消費者がいることが重要です。

MirrorMakerパフォーマンスの最適化

Kafka MirrorMakerは、あるデータ・センターまたはクラスタから別のデータ・センターにKafkaメッセージをミラー化するために使用されるツールです。これは内部的にKafkaにメッセージを生成するため、すでに説明されているほとんどの最適化手法もここで当てはまります。これには長距離でのメッセージの送信も含まれるため、パフォーマンス向上のためにチューニングできる構成パラメータがさらにいくつかあります。

ノート

チューニングするときは、ビジネス・ユース・ケースのニーズに基づいてアクションを実行してください。
  • MirrorMaker2場所: MirrorMakerは、ソースまたは宛先にインストールできます。ただし、長距離でメッセージを生成すると転送中にデータが失われる可能性が高くなるため、宛先にインストールすることをお薦めします。
  • 圧縮: デフォルトでは、Kafkaプロデューサでのメッセージ圧縮はnoneに設定されます。ただし、ソースから宛先に送信されるメッセージを圧縮するには、圧縮をgzipに変更します。これにより、バッチ・サイズが大きくなります。
  • バッチ・サイズ: メッセージのバッチ・サイズを大きくすると、スループットが向上します。これを圧縮と組み合せると、大量のメッセージがすぐに送信されます。ターゲット・バッチ・サイズが構成済の遅延時間よりも時間がかかっている場合、送信中のバッチは完全には満たされません。これにより、圧縮効率が低下し、帯域幅が無駄になります。そのため、バッチ・サイズのチューニングと、圧縮およびチューニングの遅延時間を有効にすることが重要です。
  • 遅延時間: バッチを完全に入力できるようにする遅延時間を増やすことが重要です。これにより、待機時間が長くなる可能性がありますが、全体的なスループットは向上します。ビジネス・ユース・ケースにおけるレイテンシの重要性を考慮する必要があります。
  • 並列性の向上: スループットをさらに高めるために、同じコンシューマ・グループにMirrorMakerの複数のインスタンスをデプロイできます。これにより、複数のMirrorMakerコンシューマが同じソースから受信し、宛先に並行して生成することが容易になります。

Kafkaチューニングでの本番サーバー構成

本番環境でのKafkaのパフォーマンスを向上させるために、他のいくつかの構成パラメータをチューニングできます。次のパラメータにより、パーティション内のメッセージのレプリケーション・パフォーマンスが向上します。

  • num.replica.fetchers: リーダーからフォロワにメッセージをレプリケートするために使用するスレッドの数を指定します。多数のレプリカ・フェッチ機能を使用すると、レプリケーションの並列性が向上します。
  • replica.fetch.max.bytes: リーダーからフェッチするデータのバイト数を示します。数値が大きいほどフェッチされるデータのチャンクが大きくなり、レプリケーションのスループットが向上します。
  • num.partitions: トピックが特定のユーザー・グループに保持できるコンシューマの最大数を指定します。これは、そのトピックで使用可能なパーティション数と同じです。パーティションを増やすと、パラレル化およびスループットが向上します。ただし、多くのパーティションでもより多くのリソースを消費するため、リソースもスケール・アップする必要があります。

Apache Kafkaクラスタのバランシング

新しいブローカがKafkaクラスタに追加されるたびに、既存のパーティションは新しいブローカを介して分散されません。つまり、新しいブローカーはビジー状態ではなく、1つ以上の古いブローカーが停止すると、レプリケーションと潜在的なリーダーが減少します。これはリーダー・スキューと呼ばれます。これを回避するには、新しく追加されたブローカがパーティションのシェアを取得するようにします。クラスタのリバランスは重要です。同様に、ブローカがトピックの平均パーティション数(ブローカ・スキュー)を超える場合、パフォーマンスの問題が発生する可能性があります。

Kafkaパフォーマンスの最適化

Kafkaをクラスタとして実行する場合、そのパフォーマンスを最適化するには複数の方法があります。様々な構成パラメータをチューニングして、スループットと待機時間のバランスをとることができます。エンジニアリングでは、これらのパラメータの中には、遅延時間、バッチ・サイズ、パーティション数などの最良の値を計算する必要があります。ユース・ケースによっては、スループットがレイテンシよりも重要であること、レイテンシがスループットよりも重要であること、または2つの間のバランスが最適であると判断する場合があります。