Kafkaクライアント

Kafkaクライアントは、主にKafkaエコシステム内のコンポーネントで、Kafkaトピックにデータを読み書きします。

ノート

Kafkaクライアントの構成中、モジュールcom.sun.security.auth.module.JndiLoginModuleはサーバーおよびクライアントのJAAS構成には使用できません。Kerberosを使用した認証には、デフォルト・モジュールcom.sun.security.auth.module.Krb5LoginModuleを使用することをお薦めします。

一般的なコマンド

ノート

  • Kafkaサービスは、いずれかのノードにインストールして起動する必要があります。
  • 共通コマンドを実行するには、KafkaサービスがインストールされているノードにSSHを実行します。
  • イベントを公開するトピックを作成するには:
    1. 次の詳細を含むファイルclient.propertiesを作成し、Kafkaユーザーへの適切なアクセスを提供します
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true
      keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
                              
    2. 実行:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
                                      
  • イベントをトピックに公開するには:
    1. 次の詳細を含むファイルproducer.propertiesを作成し、Kafkaユーザーへの適切なアクセスを提供します。
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
    2. 実行:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
                                      
  • トピックから公開されたデータを読み取るには:
    1. 次の詳細を含むファイルconsumer.propertiesを作成し、Kafkaユーザーへの適切なアクセスを提供します。
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
      group.id=test-consumer-group   
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
      required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
    2. 実行:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning

Kafkaプロデューサ

Kafkaプロデューサは、トピックへのレコードの書込みを担当するパブリッシャです。通常、これはKafkaProducer APIを使用してプログラムを記述することを意味します。

プロデューサをインスタンス化するには、次を実行します。

KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

このコンストラクタで使用されるプロデューサ設定の詳細は、次を参照してください。

キーと値のシリアライズ

プロデューサごとに、2つのシリアライズ・プロパティkey.serializer(キー用)およびvalue.serializer(値用)を設定する必要があります。シリアライズ用のカスタム・コードを記述するか、Kafkaによって提供されるオプションの1つを使用します。例:

  • ByteArraySerializer: バイナリ・データ
  • StringSerializer: 文字列表現

承認

プロデューサからのレコードの完全書込みパスは、リーダー・パーティション、続いてすべてのフォロワ・レプリカです。プロデューサは、パス内のどのポイントが確認をトリガーするかを制御できます。acks設定に応じて、プロデューサは書込みがシステム全体に伝播されるまで待機するか、最も早い成功ポイントのみを待機します。

有効なacks値は次のとおりです。

  • 0: パーティションからの確認を待機しないでください(最速のスループット)。
  • 1: リーダー・パーティションのレスポンスのみを待機します。
  • all: フォロワがレスポンスをパーティション化し、最小(スループットが最も遅い)になるまで待機します。

Kafkaコンシューマ

Kafkaコンシューマは、1つ以上のトピックおよびトピックの1つ以上のパーティションからレコードを読み取るためのサブスクライバです。トピックをサブスクライブするコンシューマは、手動または自動で実行されます。通常、これはKafkaConsumer APIを使用してプログラムを記述することを意味します。

コンシューマを作成するには、次を実行します:

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);

KafkaConsumerクラスには、2つの汎用型パラメータがあります。プロデューサはキーを使用してデータ(値)を送信できるため、コンシューマはキーによってデータを読み取ることができます。この例では、キーと値はどちらも文字列です。異なるタイプを定義する場合は、代替タイプに対応するようにデシリアライザを定義する必要があります。デシリアライザの場合は、org.apache.kafka.common.serialization.Deserializerインタフェースを実装する必要があります。

指定する最も重要な構成パラメータは次のとおりです。

  • bootstrap.servers: 最初に接続するブローカのリスト。2人から3人のブローカを一覧表示します。クラスタ全体をリストする必要はありません。
  • group.id: すべてのコンシューマは、トピックのパーティションを共有するグループに属します。
  • key.deserializer/value.deserializer: Java表現をバイトのシーケンスに変換して、Kafkaプロトコルを介してデータを送信する方法を指定します。

トピックへのサブスクライブ

subscribe()メソッド・コールを使用したトピックのサブスクライブ:

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)

使用するトピックのリストとリバランス・リスナーを指定します。リバランスは消費者の生活の重要な部分です。クラスタまたはコンシューマの状態が変更されるたびに、リバランスが発行されます。これにより、すべてのパーティションがコンシューマに割り当てられます。

トピックをサブスクライブした後、コンシューマはポーリングして新しいレコードを検索します。

while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}

ポーリングは、クライアントで処理できるレコードを返します。レコードの処理後、クライアントはオフセットを同期的にコミットするため、処理が完了するまで待機してからポーリングを続行します。

必ず進捗を保存してください。commitSync()およびcommitAsync()メソッドを実行します。

自動コミットはお薦めしません。手動コミットは、ほとんどのユースケースで適切です。

クラスタ外からのビッグ・データ・サービスKafkaへのアクセス

ローカルからユーザーをkinitする手順:

  1. Kerberosクライアントが環境にインストールされていることを確認します。
  2. Kafkaキータブをクラスタから/etc/security/keytabs/kafka.service.keytabの場所から環境にコピーします。
  3. /etc/krb5.confの内容をクラスタ・ブローカ・ホストから環境の/etc/krb5.conf fileにコピーします(これは、krb5 confファイルのデフォルトの場所です)。
  4. /etc/hostsファイルに、すべてのクラスタ・ノードのホスト・エントリを作成します。
  5. コマンドsudo kinit -kt <keytab path in local> <principal name> を使用して、キータブおよびプリンシパルを使用して環境からkinitを実行します。
  6. コマンドklistを使用してチケットが発行されていることを確認します。

kinitが完了したら、次のステップを実行します。プロデューサ/コンシューマを実行します。

  1. /usr/odh/current/kafka-broker/config/kafka_jaas.confをローカルにコピーします。
  2. /usr/odh/current/kafka-broker/config/localserver.propertiesにコピーします。
  3. 環境変数のエクスポートKAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf"を設定します。
  4. kinitが完了し、有効なチケット(期限切れでない) kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>が利用可能であることを確認してください。

前述の設定では、一般的なコマンドに従って、ビッグ・データ・サービス・クラスタの外部からKafkaクライアントを実行するか、Java/pythonクライアントを実行できます。