この章では、Kafkaハンドラについて説明し、その機能を理解できるように例を示します。
トピック:
Oracle GoldenGate for Big Data Kafka Handlerは、Oracle GoldenGate証跡からKafkaトピックに変更取得データをストリーミングするために設計されています。またKafkaハンドラには、メッセージに対応するスキーマを別のスキーマ・トピックに公開するオプションの機能もあります。AvroおよびJSONのスキーマの公開がサポートされています。
Apache Kafkaは、オープン・ソースで、パーティション化および複製された分散型のメッセージング・サービスです。Kafkaとその関連ドキュメントは、http://kafka.apache.org/で入手できます。
Kafkaは、単一インスタンスとしても、複数サーバー上のクラスタとしても実行できます。Kafkaの各サーバー・インスタンスは、ブローカと呼ばれます。Kafkaにおけるトピックは、メッセージがプロデューサによって公開され、コンシューマによって取得されるときのカテゴリ、またはフィード名です。
Kafkaハンドラは、シリアライズされた変更データ取得を複数のソース表から、1つの構成されたトピック、またはKafkaの別のKafkaトピックへの分離したソース操作(トピック名が完全修飾のソース表名に対応する場合)に書き込むKafkaプロデューサを実装します。
トランザクション・モードと操作モード
Kafkaハンドラは、Kafka ProducerRecord
クラスのインスタンスをKafkaプロデューサAPIに送信し、次にそれがProducerRecord
をKafkaトピックに公開します。Kafka ProducerRecord
は、実質的に1つのKafkaメッセージの実装です。ProducerRecord
には、キーと値の2つのコンポーネントがあります。キーと値のどちらも、Kafkaハンドラによってバイト配列として表されます。この項では、Kafkaハンドラがデータを公開する方法について説明します。
トランザクション・モード
トランザクション・モードは、Kafkaハンドラの次の構成によって示されます。
gg.handler.name.Mode=tx
トランザクション・モードでは、ソースOracle GoldenGate証跡ファイルからのトランザクションにおける各操作のシリアライズ・データが連結されます。連結された操作データの内容は、KafkaのProducerRecord
オブジェクトの値です。Kafka ProducerRecord
オブジェクトのキーはNULLです。結果として、Kafkaメッセージはには1からNまでの操作のデータが含まれます。Nは、トランザクションにおける操作の数です。グループ化されたトランザクションの場合、グループ化されたトランザクションの全操作の全データが1つのKafkaメッセージとして連結されます。そのため、Kafkaメッセージは大量の操作のデータを含んで非常に大きいメッセージになる可能性があります。
操作モード
操作モードは、Kafkaハンドラの次の構成によって示されます。
gg.handler.name.Mode=op
操作モードでは、各操作のシリアライズ・データは個々のProducerRecord
オブジェクトに値として配置されます。ProducerRecord
キーは、ソース操作の完全修飾表名です。ProducerRecord
は、KafkaプロデューサAPIを使用してすぐに送信されます。つまり、着信する操作と、生成されるKafkaメッセージの数との間には1対1の関係があります。
ブロック・モードと非ブロック・モード
Kafkaハンドラは、ブロック・モード(同期)または非ブロック・モード(非同期)で、Kafkaにメッセージを送信できます。
Blocking Mode
ブロック・モードは、Kafkaハンドラの次の構成プロパティによって設定されます。
gg.handler.name.BlockingSend=true
メッセージは同期ベースでKafkaに配信されます。Kafkaハンドラは、現在のメッセージが目的のトピックに書き込まれて確認が受信されるまで、次のメッセージを送信しません。ブロック・モードでは、負荷が高いためパフォーマンスは下がりますが、メッセージ配信が最も確実に保証されます。
ブロック・モードでは、Kafkaプロデューサにlinger.ms
変数は設定しないでください。そうすると、KafkaプロデューサはKafkaブローカにメッセージを送信する前にタイムアウト期間の間ずっと待機します。この場合、Kafkaブローカに送信されるメッセージをKafkaプロデューサがバッファすると同時に、メッセージが送信されたという確認を、Kafkaハンドラは待機しています。
Non-Blocking Mode
非ブロック・モードは、Kafkaハンドラの次の構成プロパティによって設定されます。
gg.handler.name.BlockingSend=false
メッセージは非同期ベースでKafkaに配信されます。Kafkaメッセージは、確認を待たずに順次送信されます。Kafkaプロデューサ・クライアントは、スループットを高めるために着信メッセージをバッファできます。
トランザクションがコミットされるたびに、すべての未処理メッセージがKafkaクラスタに確実に転送されるように、Kafkaプロデューサに対するフラッシュ・コールを呼び出します。そのため、Kafkaハンドラは安全なチェックポイントでデータの損失をゼロにします。Kafkaプロデューサに対するフラッシュ・コールの呼出しは、linger.ms
の持続期間による影響を受けません。そのため、Kafkaハンドラは安全なチェックポイントでデータの損失をゼロにします。
KafkaプロデューサがデータをKafkaブローカにフラッシュするタイミングは、Kafkaプロデューサ構成ファイルにある変更可能な多数の構成プロパティによって制御できます。Kafkaプロデューサによるメッセージの一括送信を有効にするには、Kafkaプロデューサ構成ファイルでbatch.size
とlinger.ms
の両方のKafkaプロデューサ・プロパティを設定する必要があります。Kafkaへの送信前にバッファする最大バイト数はbatch.size
によって制御され、データ送信前に待機する最大時間(ミリ秒)はlinger.ms
変数によって制御されます。batch.size
に達するか、linger.ms
の時間が経過するか、どちらか早いほうの条件が満たされると、データはKafkaに送信されます。batch.size
変数のみを設定すると、メッセージはただちにKafkaに送信されます。
トピック名の選択
トピックは、この構成パラメータを使用して実行時に解決されます。
gg.handler.topicMappingTemplate
静的な文字列、キーワード、または静的な文字列とキーワードの組合せを構成し、実行時にそのときの操作コンテキストに基づいてトピック名を動的に解決できます(「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照)。
Kafkaブローカ設定
トピックの自動作成を有効にするには、Kafkaブローカ構成でauto.create.topics.enable
プロパティをtrue
に設定します。このプロパティのデフォルト値はtrueです。
Kafkaブローカ構成でauto.create.topics.enable
プロパティがfalse
に設定されている場合、Replicatプロセスを開始する前に必要なすべてのトピックを手動で作成する必要があります。
スキーマの伝播
すべての表のスキーマ・データが、schemaTopicName
プロパティで構成されたスキーマ・トピックに提供されます。詳細は、「スキーマの伝播」を参照してください。
ここでは、Kafkaハンドラのコンポーネントの構成およびハンドラの実行について説明します。
Kafkaをインストールし、単一ノードまたはクラスタ化されたインスタンスとして正しく構成する必要があります。Apache Kafkaのインストールおよび構成方法に関する情報は、次の場所で入手できます。
http://kafka.apache.org/documentation.html
Apache Kafka以外のKafka配布を使用する場合、インストールおよび構成については、個々のKafka配布に関するドキュメントを参照してください。
KafkaおよびKafkaブローカ(単数または複数)の前提条件となるコンポーネントであるZookeeperが稼働している必要があります。
データ・トピックとスキーマ・トピック(該当する場合)は、実行中のKafkaブローカで事前に構成されていることを、ベスト・プラクティスとしてお薦めします。Kafkaトピックを動的に作成することはできますが、そのためにはKafkaブローカが動的トピックを許可するように構成されている必要があります。
Kafkaブローカが、Kafkaハンドラ・プロセスと同じ場所に配置されていない場合は、Kafkaハンドラを実行しているマシンから、リモートのホスト・ポートにアクセスできる必要があります。
トピック:
KafkaハンドラをKafkaに接続して実行できるように、gg.classpath
構成変数に2つのものを含める必要があります。必要なのは、Kafkaプロデューサ・プロパティ・ファイルと、KafkaクライアントJARです。KafkaクライアントJARは、Kafkaハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、「Kafkaハンドラ・クライアント依存性」を参照してください。
Kafkaプロデューサ・プロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprm
ディレクトリです。
KafkaクライアントJARのデフォルトの場所は、Kafka_Home
/libs/*
です。
gg.classpath
は、正確に構成する必要があります。Kafkaプロデューサ・プロパティ・ファイルのパス指定に含めるパスには、ワイルドカードを使用しないでください。Kafkaプロデューサ・プロパティ・ファイルのパスにワイルドカード(*
)を含めると、選択されなくなります。逆に、依存関係JARのパス指定には、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*
)を含める必要があります。*.jar
は使用しないでください。正しく構成されたクラスパスの例を次に示します。
gg.classpath={kafka install dir}/libs/*
表8-1 Kafkaハンドラの構成プロパティ
プロパティ名 | 必須 | プロパティ値 | デフォルト | 説明 |
---|---|---|---|---|
|
はい |
|
使用するハンドラのリスト。 |
|
|
はい |
|
使用するハンドラのタイプ。Kafka、Flume、HDFSなど。 |
|
|
いいえ。デフォルトは |
任意のカスタム・ファイル名 |
クラスパスにあるファイル名は、Apache Kafkaプロデューサを構成するApache Kafkaプロパティを保持します。 |
|
|
いいえ。デフォルトは |
フォーマッタ・クラスまたはショート・コード |
ペイロードのフォーマットに使用するフォーマッタ。 |
|
|
はい。スキーマ配信が必要な場合。 |
スキーマ・トピックの名前 |
スキーマ・データが配信されるトピック名。このプロパティを設定する場合、スキーマは伝播されません。スキーマが伝播されるのは、Avroフォーマッタの場合のみです。 |
|
|
いいえ。デフォルトの実装クラス |
Oracle GoldenGate for Big Data Kafka Handlerの |
スキーマは |
|
|
いいえ。デフォルトは |
|
このプロパティをtrueに設定すると、Kafkaへの配信は完全に非同期のモデルで動作します。次のペイロードは、現在のペイロードが目的のトピックに出力され、確認が受信されてから送信されます。そのための、トランザクション・モードでは厳密に1回のセマンティックが提供されます。このプロパティをfalseに設定すると、Kafkaへの配信は非同期モデルで動作します。ペイロードは、確認を待たずに順次送信されます。Kafkaの内部キューでコンテンツをバッファすると、スループットを増やすことができます。チェックポイントは、Javaコールバックを使用してKafkaブローカから確認が受信されたときにのみ作成されます。 |
|
|
いいえ。デフォルトは |
|
Kafkaハンドラが操作モードの場合、変更取得データレコード(Insert、Update、Deleteなど)の各ペイロードはKafkaプロデューサ・レコードとして表され、一度に1つずつフラッシュされます。Kafkaハンドラがトランザクション・モードの場合、ソース・トランザクション内の操作はすべて、1つのKafkaプロデューサ・レコードとして表されます。組み合されたこのバイト・ペイロードが、トランザクション・コミット・イベント時にフラッシュされます。 |
|
|
必須 |
実行時にKafkaトピック名を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
|
必須 |
実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
アダプタ・プロパティ・ファイルからのKafkaハンドラの構成例を次に示します。
gg.handlerlist = kafkahandler gg.handler.kafkahandler.Type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties gg.handler.kafkahandler.topicMappingTemplate=oggtopic gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp} gg.handler.kafkahandler.Format = avro_op gg.handler.kafkahandler.SchemaTopicName = oggSchemaTopic gg.handler.kafkahandler.SchemaPrClassName = com.company.kafkaProdRec.SchemaRecord gg.handler.kafkahandler.Mode = tx gg.handler.kafkahandler.BlockingSend = true
Kafka統合のレプリケーション構成とJavaアダプタ・プロパティ・ファイルの例は、次のディレクトリにあります。
GoldenGate_install_directory
/AdapterExamples/big-data/kafka
Kafkaハンドラは、メッセージをKafkaに公開するために、Kafkaプロデューサ構成ファイルにアクセスする必要があります。Kafkaプロデューサ構成ファイルのファイル名は、Kafkaハンドラ・プロパティの次の構成によって制御されます。
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
Kafkaハンドラは、Javaのクラスパスを使用してKafkaプロデューサ構成ファイルを検索およびロードしようとします。したがって、Javaのクラスパスに、Kafkaプロデューサ構成ファイルがあるディレクトリが含まれている必要があります。
Kafkaプロデューサ構成ファイルには、Kafka専有のプロパティがあります。0.8.2.0のKafkaプロデューサ・インタフェース・プロパティについては、Kafkaドキュメントに構成情報が記載されています。Kafkaハンドラは、これらのプロパティを使用してKafkaブローカのホストおよびポートを解決し、Kafkaプロデューサ・クライアントとKafkaブローカとの間のインタフェースの動作は、Kafkaプロデューサ構成ファイルにあるプロパティによって制御されます。
Kafkaプロデューサのサンプル構成ファイルは、次のようになります。
bootstrap.servers=localhost:9092 acks = 1 compression.type = gzip reconnect.backoff.ms = 1000 value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size = 102400 linger.ms = 0 max.request.size = 1048576 send.buffer.bytes = 131072
Kafkaハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成プロパティを使用します。
gg.handler.name.topicMappingTemplate gg.handler.name.keyMappingTemplate
テンプレートのモード
ソース・データベース・トランザクションは、それぞれが挿入、更新および削除である、1つ以上の個別の操作から構成されます。Kafkaハンドラは、操作ごとに1つのメッセージ(挿入、更新、削除)を送信するように構成することも、トランザクション・レベルで操作をメッセージにグループ化するように構成することもできます。テンプレート・キーワードの多くは、個々のソース・データベース操作のコンテキストに基づいてデータを解決します。したがって、トランザクション・レベルでメッセージを送信する場合には、キーワードの多くは機能しません。たとえば、トランザクション・レベルでメッセージを送信する場合は、${fullyQualifiedTableName}
は機能せずに、操作のための修飾されたソース表名に解決されます。ただし、トランザクションには、多くのソース表に対する複数の操作を含めることができます。トランザクション・レベルでのメッセージの完全修飾表名の解決は、非決定的であるため、実行時に異常終了します。
テンプレートのキーワード
この表には、トランザクション・レベルのメッセージでそのキーワードがサポートされているかどうかを示す列が含まれています。
キーワード | 説明 | トランザクション・メッセージのサポート |
---|---|---|
|
カタログ、スキーマおよび表名の間にピリオド(.)のデリミタを含む、完全修飾表名に解決されます。 たとえば、 |
いいえ |
|
カタログ名に解決されます。 |
いいえ |
|
スキーマ名に解決されます。 |
いいえ |
|
短い表名に解決されます。 |
いいえ |
|
操作の種類( |
いいえ |
|
アンダースコア(_)で区切られた連結主キー値に解決されます。 |
いいえ |
|
ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。 |
はい |
|
ソース証跡ファイルからの操作タイムスタンプ。 |
はい |
|
""に解決されます。 |
はい |
|
Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。 |
はい |
|
キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。 ${staticMap[dbo.table1=value1,dbo.table2=value2]} |
いいえ |
|
キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。次に例を示します。 ${staticMap[dbo.table1=col1,dbo.table2=col2]} |
いいえ |
または
|
現在のタイムスタンプに解決されます。 例: ${currentDate} ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
はい |
|
NULL文字列に解決されます。 |
はい |
|
カスタム値リゾルバを記述することが可能です。必要に応じて、Oracleサポートに連絡してください。 |
実装依存 |
テンプレートの例
テンプレートの構成値の例と解決される値を次に示します。
テンプレートの例 | 解決される値 |
---|---|
|
|
|
|
|
|
Kafkaハンドラには、スキーマをスキーマ・トピックに公開する機能があります。現在、スキーマ公開が有効なのは、Avro行フォーマッタとAvro操作フォーマッタのみです。KafkaハンドラのschemaTopicName
プロパティが設定されている場合、スキーマは次のイベントで公開されます。
特定の表のAvroスキーマは、その表に対する最初の操作が発生するときに公開されます。
Kafkaハンドラがメタデータ変更イベントを受信すると、スキーマはフラッシュされます。特定の表について再生成されるAvroスキーマは、その表に対する次の操作が発生するときに公開されます。
Avroのラッピング機能を有効にしている場合、汎用のラッパーAvroスキーマは、任意の操作が最初に発生するときに公開されます。汎用のラッパーAvroスキーマの機能は、Avroフォーマッタの構成で有効にできます。「Avro行フォーマッタ」および「Avro操作フォーマッタ」を参照してください。
KafkaのProducerRecord
値はスキーマになり、キーは完全修飾の表名になります。
Kafka上でAvroを使用すると、Avroスキーマに対するAvroメッセージの直接依存性が原因で問題が発生する可能性があります。Avroメッセージはバイナリなので、人間が読める形式ではありません。Avroメッセージをデシリアライズするには、まず受信者に正しいAvroスキーマが必要です。ソース・データベースからの各表が個々のAvroスキーマになるため、問題になる可能性があります。ソースOracle GoldenGate証跡ファイルに複数の表からの操作が含まれる場合、Kafkaメッセージの受信者が、個々のメッセージをデシリアライズするためにどのAvroスキーマを使用するかを判断することはできません。この問題を解決するには、特殊なAvroメッセージを汎用のAvroメッセージ・ラッパーでラップできます。この汎用のAvroラッパーを使用すれば、完全修飾の表名、スキーマ文字列のハッシュコード、ラップされたAvroメッセージを確認することができます。受信者は、完全修飾の表名と、スキーマ文字列のハッシュコードを使用して、ラップされたメッセージに関連付けられたスキーマを解決し、そのスキーマを使用して、ラップされたメッセージをデシリアライズできます。
gg.handler.name.BlockingSend=true
の場合、Kafkaプロデューサconfig
ファイルでlinger.ms
設定は使用しないことをお薦めします。これにより、Kafkaハンドラ構成とKafkaプロデューサ構成が競合するため、ブロックへの送信のたびに、少なくともlinger.ms
の時間が発生し、パフォーマンスに大きく影響します。このような構成の結果、Kafkaハンドラが送信確認を待つ一方で、Kafkaプロデューサは送信前に次のメッセージを待機することになり、一時的なデッドロックが発生します。このデッドロックは、linger.ms
の時間が過ぎると解決されます。この動作が、送信されるメッセージのたびに繰り返されます。
最高のパフォーマンスを得るためには、Javaアダプタ・プロパティ・ファイルで次のように構成して、Kafkaプロデューサへの非ブロック(非同期)コールを使用し、操作モードで動作するようにKafkaハンドラを設定することをお薦めします。
gg.handler.name.mode = op gg.handler.name.BlockingSend = false
また、Kafkaプロデューサ・プロパティ・ファイルでbatch.size
とlinger.msの値も設定することを推奨します。batch.size
とlinger.ms
に設定する値は、ユースケース別の状況によって大きく異なります。通常、値が大きいほどスループットは向上しますが、レイテンシが大きくなります。これらのプロパティの値を小さくすると、レイテンシは小さくなりますが、全体的なスループットは低下します。ソース証跡ファイルから大量の入力データを伴う場合、batch.size
とlinger.ms
のサイズはできるだけ大きく設定するようにしてください。
Replicat
変数GROUPTRANSOPS
を使用しても、パフォーマンスは向上します。その推奨設定は10000
です。
ソース証跡ファイルからシリアライズした操作を個々のKafkaメッセージで配信する必要がある場合、そのときKafkaハンドラは操作モードに設定されている必要があります。
gg.handler.name.mode = op
そのため、Kafkaメッセージの数が増え、パフォーマンスに悪影響が生じます。
Kafkaバージョン0.9.0.0で、SSL/TLSおよびSASL(Kerberos)を介したセキュリティが導入されました。SSL/TLSおよびSASL(Kerberos)セキュリティ製品の一方または両方を使用してKafkaハンドラを保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを利用する統合からのセキュリティ機能の抽象化を提供します。Kafkaハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサ・プロパティ・ファイル(Kafkaハンドラで処理のために使用する)を必要なセキュリティ・プロパティを使用して構成する必要があります。Kafkaクラスタの保護に関する詳細は、次の場所にあるKafkaドキュメントを参照してください。
http://kafka.apache.org/documentation.html#security_configclients
メタデータ変更イベントが、Kafkaハンドラによって処理されるようになりました。これが関係するのは、スキーマ・トピックを構成しており、使用するフォーマッタがスキーマ伝播をサポートしている場合のみです(現在は、Avro行フォーマッタとAvro操作フォーマッタ)。スキーマが変更されている表で次に操作が発生するとき、更新されたスキーマがスキーマ・トピックに公開されます。
メタデータ変更イベントをサポートするには、ソース・データベースで変更を取得するOracle GoldenGateプロセスが、証跡機能でのOracle GoldenGateメタデータ(Oracle GoldenGate 12c (12.2)で導入)をサポートする必要があります。
Kafkaプロデューサ構成ファイルは、圧縮の使用をサポートしています。構成可能なオプションの1つにSnappyがあり、これは、オープン・ソースの圧縮および圧縮解除(codec
)ライブラリで、他のcodec
ライブラリよりパフォーマンスが高い傾向があります。Snappy JARはすべてのプラットフォームで実行されるわけではありません。Snappyは、Linuxシステムでは動作するようですが、他のUNIXおよびWindowsの実装では動作する場合としない場合があります。Snappyの圧縮を使用する場合は、Snappyを使用する圧縮を実装する前に、必要なシステムすべてでSnappyをテストする必要があります。必要なシステムの一部にSnappyが対応しない場合は、別のcodec
ライブラリを使用することをお薦めします。
トピック:
コマンドラインのKafkaプロデューサを使用すると、ダミー・データをKafkaトピックに書き込むことができ、Kafkaコンシューマを使用してこのデータをKafkaトピックから読み取ることができます。これを利用して、ディスク上でのKafkaトピックの設定および読取り/書込み権限を検証します。詳細は、次のサイトでオンラインのKafkaドキュメントを参照してください
非常に一般的なのが、Javaクラスパスに関する問題です。通常、これはlog4j
ログ・ファイルのClassNotFoundException
の問題ですが、gg.classpath
変数に入力ミスがある場合は、クラスパスの解決エラーになることもあります。Kafkaクライアント・ライブラリは、Oracle GoldenGate for Big Data製品に付属しません。「クラスパス構成」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタ・プロパティ・ファイルでgg.classpath
プロパティを適切に構成する作業は、ユーザーが行います。
Kafkaハンドラでは、Kafka 0.8.2.2以前のバージョンはサポートされていません。サポートされていないバージョンのKafkaで実行したときの通常の結果は、ランタイムJava例外java.lang.NoSuchMethodError
で、これは org.apache.kafka.clients.producer.KafkaProducer.flush()
メソッドが見つからないことを示します。このエラーが発生した場合、Kafka 0.9.0.0以降のバージョンに移行する必要があります。
一般的に、この問題は次の例外として発生します。
ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties
gg.handler.kafkahandler.KafkaProducerConfigFile
構成変数で、Kafkaプロデューサ構成ファイルの名前が正しく設定されていることを検証する必要があります。gg.classpath
変数をチェックし、クラスパスにKafkaプロデューサ・プロパティ・ファイルのパスが含まれていることと、プロパティ・ファイルのパスの末尾にワイルドカード(*
)が含まれていないことを確認してください。
この問題は、KafkaハンドラがKafkaに接続できない場合に、次の警告とともに発生します。
WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN (Selector.java:276) - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused
接続の再試行時間が経過し、Kafkaハンドラ・プロセスは異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサ・プロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -l
など)を使用すると、Kafkaが所定のポートでリスニングしていることを検証できます。