20 Kafka Connectハンドラの使用
Kafka Connectハンドラは、標準的なKafkaのメッセージング機能を拡張したものです。
この章では、Kafka Connectハンドラの使用方法について説明します。
- 概要
Oracle GoldenGate Kafka Connectは、標準的なKafkaのメッセージング機能を拡張したものです。Kafka Connectは、標準的なKafkaのプロデューサおよびコンシューマのインタフェースの上にある機能レイヤーです。新規のソースおよびターゲット・システムをトポロジに簡単に追加できる、メッセージングの標準化を提供します。 - 詳細な機能
- Kafka Connectハンドラの設定および実行
- Kafka Connectハンドラのパフォーマンスに関する考慮事項
- Kafkaインターセプタのサポート
Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。 - Kafkaパーティションの選択
Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。 - Kafka Connectハンドラのトラブルシューティング
20.1 概要
Oracle GoldenGate Kafka Connectは、標準的なKafkaのメッセージング機能を拡張したものです。Kafka Connectは、標準的なKafkaのプロデューサおよびコンシューマのインタフェースの上にある機能レイヤーです。新規のソースおよびターゲット・システムをトポロジに簡単に追加できる、メッセージングの標準化を提供します。
ConfluentはKafka Connectの主要な採用先であり、そのConfluent Platform製品には標準的なKafka Connectの機能拡張が含まれています。これには、AvroのシリアライズとデシリアライズおよびAvroスキーマ・レジストリが含まれています。Kafka Connectの機能の多くは、Apache Kafkaで利用できます。様々なオープン・ソースのKafka Connect統合は、次の場所を参照してください。
https://www.confluent.io/product/connectors/
Kafka Connectハンドラは、Kafka Connectのソース・コネクタです。Oracle GoldenGateでサポートされているデータベースから、データベースの変更を取得し、その変更データをKafka Connectレイヤー経由でKafkaにストリーミングできます。このハンドラを使用して、Oracle Event Hub Cloud Service (EHCS)に接続することもできます。
Kafka Connectでは、プロプライエタリなオブジェクトを使用してスキーマ(org.apache.kafka.connect.data.Schema
)およびメッセージ(org.apache.kafka.connect.data.Struct
)が定義されます。Kafka Connectハンドラは、公開済データおよび公開済データの構造を管理するように構成できます。
Kafka Connectハンドラは、Kafkaハンドラでサポートされているどのプラガブル・フォーマッタもサポートしていません。
親トピック: Kafka Connectハンドラの使用
20.2 詳細な機能
JSONコンバータ
Kafka Connectフレームワークにより、インメモリーのKafka Connectメッセージを、ネットワーク経由での伝送に適したシリアライズされた形式に変換するコンバータが提供されます。このコンバータは、Kafkaプロデューサのプロパティ・ファイルの構成を使用して選択されます。
Kafka ConnectおよびJSONコンバータは、Apache Kafkaダウンロードの一部として使用可能です。JSONコンバータによりKafkaのキーおよび値がJSONに変換され、それがKafkaトピックに送信されます。Kafkaプロデューサのプロパティ・ファイルで、次の構成を使用してJSONコンバータを識別します。
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
メッセージのフォーマットは、ペイロード情報が後に続くメッセージ・スキーマ情報です。JSONは自己記述型のフォーマットであるため、Kafkaに公開される各メッセージにはスキーマ情報を含めないでください。
メッセージからJSONのスキーマ情報を省略するには、次のように設定します。
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Avroコンバータ
Confluentにより、Kafkaのインストール、Kafkaのサポート、Kafka上に構築された拡張機能が提供され、Kafkaの潜在的な可能性の実現が支援されます。Confluentは、オープン・ソース版のKafka (Confluent Open Source)と、購入可能なエンタープライズ版(Confluent Enterprise)の両方を提供しています。
一般的なKafkaのユースケースは、Kafka上でAvroメッセージを送信することです。これは、AvroメッセージをデシリアライズするためにAvroスキーマに依存するため、受信側で問題が発生する可能性があります。受信メッセージは、プロデューサ側でメッセージを生成するために使用されたAvroスキーマと完全に一致させる必要があるため、スキーマの進化によって問題が増加する可能性があります。間違ったAvroスキーマでAvroメッセージをデシリアライズすると、ランタイム・エラー、不完全なデータまたは不正なデータが発生することがあります。Confluentでは、スキーマ・レジストリおよびConfluentスキーマ・コンバータを使用してこの問題を解決しました。
Kafkaプロデューサのプロパティ・ファイルの構成を次に示します。
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
メッセージがKafkaに公開されると、Avroスキーマが登録され、スキーマ・レジストリに格納されます。Kafkaからメッセージが消費されるとき、メッセージの作成に使用された正確なAvroスキーマをスキーマ・レジストリから取得してAvroメッセージをデシリアライズできます。Avroメッセージとそれに対応する受信側のAvroスキーマの照合を作成することで、この問題を解決しています。
Avroコンバータを使用するための要件は次のとおりです。
-
この機能は、Confluent Kafkaの両バージョン(オープン・ソースまたはエンタープライズ)で使用できます。
-
Confluentスキーマ・レジストリ・サービスが実行されている必要があります。
-
ソース・データベース表には、関連付けられたAvroスキーマが必要です。異なるAvroスキーマに関連付けられたメッセージは、異なるKafkaトピックに送信する必要があります。
-
ConfluentのAvroコンバータおよびスキーマ・レジストリ・クライアントがクラスパスで使用できる必要があります。
スキーマ・レジストリは、トピックごとにAvroスキーマを追跡します。メッセージは、同じスキーマまたは同じスキーマの進化バージョンを持つトピックに送信する必要があります。ソース・メッセージにはソース・データベース表スキーマに基づくAvroスキーマがあり、Avroスキーマはソース表ごとに一意です。複数のソース表で単一のトピックにメッセージを公開すると、ソース表から送信されたメッセージが以前のメッセージと異なるたびに、スキーマが進化していることがスキーマ・レジストリに示されます。
親トピック: Kafka Connectハンドラの使用
20.3 Kafka Connectハンドラの設定および実行
ここでは、Kafka Connectハンドラのコンポーネントの構成およびハンドラの実行について説明します。
クラスパス構成
Kafka ConnectハンドラをKafkaに接続して実行できるように、gg.classpath
構成変数に2つのものを含める必要があります。必要なのは、Kafkaプロデューサ・プロパティ・ファイルと、KafkaクライアントJARです。KafkaクライアントのJARは、Kafka Connectハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、Kafkaハンドラ・クライアント依存性(「Kafka Connectハンドラ・クライアント依存性」)を参照してください。Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprm
ディレクトリです。
Kafka ConnectクライアントJARのデフォルトの場所は、Kafka_Home/libs/*
ディレクトリです。
gg.classpath
変数は、正確に構成する必要があります。Kafkaプロデューサのプロパティ・ファイルのパス指定に含めるパスには、ワイルドカードを使用しないでください。Kafkaプロデューサのプロパティ・ファイルへのパスにアスタリスク(*)のワイルドカードを含めると、それが破棄されます。依存関係JARのパス指定には、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*)を含める必要があります。*.jar
は使用しないでください
正しく構成されたApache Kafkaのクラスパスの例:
gg.classpath=dirprm:{kafka_install_dir}/libs/*
正しく構成されたConfluent Kafkaのクラスパスの例を次に示します。
gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*
親トピック: Kafka Connectハンドラの使用
20.3.1 Kafka Connectハンドラの構成
メタ列フィールドは次のプロパティとして構成できます。
gg.handler.name.metaColumnsTemplate
メタ列を以前のバージョンと同様に出力するには、次のように構成します。
gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}
主キー列およびトークンも含めるには、次のように構成します。
gg.handler.name.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}
詳細は、構成プロパティを参照してください
gg.handler.name.metaColumnsTemplate
表20-1 Kafka Connectハンドラの構成プロパティ
プロパティ | 必須/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
gg.handler.name.type |
必須 |
|
なし |
Kafka Connectハンドラを選択するための構成。 |
gg.handler.name.kafkaProducerConfigFile |
必須 |
string |
なし |
KafkaのプロパティおよびKafka Connect構成プロパティを含むプロパティ・ファイルへのパス。 |
gg.handler.name.topicMappingTemplate |
必須 |
実行時にKafkaトピック名を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
gg.handler.name.keyMappingTemplate |
必須 |
実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
gg.handler.name.includeTokens |
オプション |
|
|
このフィールドを抑止する場合は、 |
gg.handler.name.messageFormatting |
オプション |
|
|
出力メッセージのモデル化方法を制御します。行を選択すると、出力メッセージは行としてモデル化されます。opに設定すると、出力メッセージは操作メッセージとしてモデル化されます。 |
gg.handler.name.insertOpKey |
オプション |
任意の文字列 |
|
挿入操作を示すフィールド |
gg.handler.name.updateOpKey |
オプション |
任意の文字列 |
|
挿入操作を示すフィールド |
gg.handler.name.deleteOpKey |
オプション |
任意の文字列 |
|
削除操作を示すフィールド |
gg.handler.name.truncateOpKey |
オプション |
任意の文字列 |
|
切捨て操作を示すフィールド |
gg.handler.name.treatAllColumnsAsStrings |
オプション |
|
|
trueに設定すると、すべての出力フィールドが文字列として扱われます。falseに設定すると、ハンドラによって、ソース証跡ファイルからの対応するフィールド・タイプがそれに最も適したKafka Connectのデータ型にマップされます。 |
gg.handler.name.mapLargeNumbersAsStrings |
オプション |
|
|
大きい数値は、数値フィールドにDoubleとしてマッピングされます。特定のシナリオでは、精度を失う可能性があります。
|
|
オプション |
abend | update | delete-insert |
|
モデリング行メッセージが |
|
オプション |
任意のメタ列キーワード。 |
なし |
テンプレートを表す1つ以上のテンプレート値で構成されるカンマ区切り文字列。「メタ列出力の設定」を参照してください。 |
|
オプション |
|
|
このプロパティを各列に設定して、null値がソース証跡ファイルで実際にnullであるか、ソース証跡ファイルにないかを、ダウンストリーム・アプリケーションが区別できるようにします。 |
gg.handler.name.enableDecimalLogicalType |
オプション | true|false |
false |
Kafka Connectで小数論理型を有効にするには、true に設定します。小数論理型では、64ビット・データ型に収まらない数値を表すことができます。
|
gg.handler.name.oracleNumberScale |
オプション | 正の整数 | 38 | gg.handler.name.enableDecimalLogicalType=true の場合にのみ適用されます。一部のソース・データ型には、固定スケールが関連付けられていません。Kafka Connect小数論理型にスケールを設定する必要があります。メタデータにスケールがないソース型の場合は、このパラメータの値を使用してスケールを設定します。
|
gg.handler.name.EnableTimestampLogicalType |
オプション | true|false |
false |
Kafka Connectタイムスタンプ論理型を有効にするには、true に設定します。Kafka Connectタイムスタンプの論理時間は、Javaのエポックからのミリ秒の整数測定値です。これは、タイムスタンプ論理型が使用された場合、ミリ秒より高い精度が使用できないことを意味します。このプロパティを使用するには、gg.format.timestamp プロパティを設定する必要があります。このプロパティは、文字列形式のタイムスタンプの出力を決定するために使用されるタイムスタンプ書式設定文字列です。たとえば、gg.format.timestamp=yyyy-MM-dd HH:mm:ss.SSS です。goldengate.userexit.timestamp プロパティが構成ファイルに設定されていないことを確認します。このプロパティを設定すると、入力タイムスタンプが、論理タイムスタンプに必要なJavaオブジェクトに解析されなくなります。
|
gg.handler.name.metaHeadersTemplate |
オプション | メタ列キーワードのカンマ区切りリスト。 | なし | メタ列を選択し、メタ列キーワード構文を使用してコンテキスト・ベースのキー値ペアをKafkaメッセージ・ヘッダーに挿入できます。 |
gg.handler.name.schemaNamespace
|
オプション | KafkaコネクタAvroスキーマのネーミング要件に違反する文字を含まない任意の文字列。 | なし | 生成されるKafka Connectスキーマ名の制御に使用されます。設定されていない場合、スキーマ名は修飾されたソース表名と同じになります。たとえば、ソース表がQASOURCE.TCUSTMER である場合、Kafka Connectスキーマ名は同じになります。
このプロパティを使用して、生成されるスキーマ名を制御できます。たとえば、このプロパティが |
詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。
サンプル構成の確認
gg.handlerlist=kafkaconnect #The handler properties gg.handler.kafkaconnect.type=kafkaconnect gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties gg.handler.kafkaconnect.mode=op #The following selects the topic name based on the fully qualified table name gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys} #The formatter properties gg.handler.kafkaconnect.messageFormatting=row gg.handler.kafkaconnect.insertOpKey=I gg.handler.kafkaconnect.updateOpKey=U gg.handler.kafkaconnect.deleteOpKey=D gg.handler.kafkaconnect.truncateOpKey=T gg.handler.kafkaconnect.treatAllColumnsAsStrings=false gg.handler.kafkaconnect.pkUpdateHandling=abend
親トピック: Kafka Connectハンドラの設定および実行
20.3.2 テンプレートを使用したトピック名とメッセージ・キーの解決
Kafka Connectハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成パラメータに適用されます。
gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate
テンプレートのモード
Kafka Connectハンドラでは、操作メッセージのみを送信できます。Kafka Connectハンドラでは、操作メッセージをより大きなトランザクション・メッセージにグループ化できません。
テンプレートのキーワード
キーワード | 説明 |
---|---|
|
カタログ、スキーマおよび表名の間にピリオド(.)のデリミタを含む、完全修飾表名に解決されます。 たとえば、 |
|
カタログ名に解決されます。 |
|
スキーマ名に解決されます。 |
|
短い表名に解決されます。 |
|
操作の種類( |
|
アンダースコア(_)で区切られた連結主キー値に解決されます。 |
|
ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。 |
|
ソース証跡ファイルからの操作タイムスタンプ。 |
|
""に解決されます。 |
|
Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。 |
|
キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。
|
|
キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。たとえば:
|
または
|
現在のタイムスタンプに解決されます。 例:
|
|
NULL文字列に解決されます。 |
|
カスタム値リゾルバを記述することが可能です。必要に応じて、Oracleサポートに連絡してください。 |
テンプレートの例
テンプレートの構成値の例と解決される値を次に示します。
テンプレートの例 | 解決される値 |
---|---|
|
|
|
|
|
|
親トピック: Kafka Connectハンドラの設定および実行
20.3.3 Kafka Connectハンドラでのセキュリティの構成
Kafkaバージョン0.9.0.0にSSL/TLSまたはKerberosを介したセキュリティが導入されました。Kafka Connectハンドラは、SSL/TLSまたはKerberos使用して保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを利用する統合からのセキュリティ機能の抽象化を提供します。Kafka Connectハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサのプロパティ・ファイル(Kafkaハンドラで処理のために使用する)を必要なセキュリティ・プロパティを使用して構成する必要があります。
keytab
ファイルからKerberosパスワードを復号化できない場合があります。これによって、Kerberos認証が対話型モードにフォール・バックされ、プログラムで呼び出されているため機能しなくなります。この問題の原因は、Java Cryptography Extension (JCE)が、Java Runtime Environment (JRE)にインストールされていないことです。JCEがJREにロードされていることを確認します。http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.htmlを参照してください。
親トピック: Kafka Connectハンドラの設定および実行
20.4 Kafka Connectハンドラのパフォーマンスに関する考慮事項
Oracle GoldenGate for Big Dataの構成とKafkaプロデューサのパフォーマンスの両方に影響を与える複数の構成設定があります。
Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPS
パラメータです。GROUPTRANSOPS
パラメータを使用すると、Replicatで複数のソース・トランザクションを単一のターゲット・トランザクションにグループ化できます。トランザクションのコミット時、Kafka Connectハンドラは、書込み永続性のためにKafkaプロデューサ上でフラッシュをコールしてメッセージをKafkaにプッシュし、その後チェックポイントを送信します。フラッシュ・コールはコストの大きなコールであり、ReplicatのGROUPTRANSOPS
設定を大きく設定すると、Replicatのフラッシュ・コールの頻度が少なくてすみ、パフォーマンスが向上します。
GROUPTRANSOPS
のデフォルト設定は1000
で、値を2500、5000、さらには10000に増やすことでパフォーマンスの向上が得られます。
opモードのgg.handler.kafkaconnect.mode=op
パラメータもまた、txモードのgg.handler.kafkaconnect.mode=tx
よりもパフォーマンスを向上させることができます。
Kafkaプロデューサのいくつかのプロパティがパフォーマンスに影響する可能性があります。重要な影響を与えるパラメータは次のとおりです。
-
linger.ms
-
batch.size
-
acks
-
buffer.memory
-
compression.type
これらのパラメータのデフォルト値から開始し、パフォーマンスのベース・ラインを取得するためのパフォーマンス・テストを実行することをお薦めします。これらの各パラメータのKafkaドキュメントを参照して、その役割を理解し、パラメータを調整し、各パラメータのパフォーマンス効果を確認するための追加のパフォーマンス・テストを実行します。
親トピック: Kafka Connectハンドラの使用
20.5 Kafkaインターセプタのサポート
Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。
インターセプタの一般的な使用例は、監視です。Kafkaプロデューサ・インターセプタは、インタフェース org.apache.kafka.clients.producer.ProducerInterceptor
に準拠している必要があります。Kafka Connectハンドラは、プロデューサ・インターセプタの使用をサポートしています。
ハンドラでインターセプタを使用する場合の要件は次のとおりです。
- Kafkaプロデューサ構成プロパティ"
interceptor.classes
"は、起動するインターセプタのクラス名で構成する必要があります。 - インターセプタを呼び出すためには、jarファイルとすべての依存関係jarをJVMで使用できる必要があります。したがって、インターセプタを含むjarファイルとすべての依存関係jarを、ハンドラ構成ファイルの
gg.classpath
に追加する必要があります。詳細は、Kafkaのドキュメントを参照してください。
親トピック: Kafka Connectハンドラの使用
20.6 Kafkaパーティションの選択
Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。
(Kafkaメッセージ・キーのハッシュ)係数(パーティションの数) = 選択したパーティション番号
Kafkaメッセージ・キーは、次の構成値によって選択されます。
gg.handler.{your handler name}.keyMappingTemplate=
このパラメータが静的キーを生成する値に設定されている場合、すべてのメッセージは同じパーティションに送信されます。静的キーの例を次に示します。
gg.handler.{your handler name}.keyMappingTemplate=StaticValue
このパラメータが、頻繁に変更されないキーを生成する値に設定されている場合、パーティション選択の変更頻度は低くなります。次の例では、表名をメッセージ・キーとして使用します。特定のソース表に対するすべての操作は同じキーを持つため、同じパーティションにルーティングされます。
gg.handler.{your handler name}.keyMappingTemplate=${tableName}
gg.handler.{your handler name}.keyMappingTemplate=${null}
マッピング・キーの構成に推奨される設定は次のとおりです。
gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}
これにより、連結および区切られた主キー値であるKafkaメッセージ・キーが生成されます。
各行の操作には一意の主キーが必要であるため、各行に一意のKafkaメッセージ・キーが生成されます。別の重要な考慮事項として、異なるパーティションに送信されたKafkaメッセージは、送信された元の順序でKafkaコンシューマに配信される保証はありません。これはKafkaの仕様の一部です。順序はパーティション内でのみ維持されます。Kafkaメッセージ・キーとして主キーを使用すると、同じ主キーを持つ同じ行に対する操作で同じKafkaメッセージ・キーが生成されるため、同じKafkaパーティションに送信されます。このようにして、同じ行に対する操作の順序が維持されます。
DEBUG
ログ・レベルでは、Kafkaメッセージの座標(トピック、パーティションおよびオフセット)は、正常に送信されたメッセージの.log
ファイルに記録されます。
親トピック: Kafka Connectハンドラの使用
20.7 Kafka Connectハンドラのトラブルシューティング
親トピック: Kafka Connectハンドラの使用
20.7.1 Kafka Connectハンドラ用のJavaクラスパス
Javaクラスパスは、非常によくある問題の1つです。クラスパスに問題があることを示すのは、Oracle GoldenGate Javaのlog4j
ログ・ファイルにあるClassNotFoundException
で、gg.classpath
変数に入力ミスがある場合は、クラスパスを解決する際にエラーが発生します。
Kafkaクライアント・ライブラリは、Oracle GoldenGate for Big Data製品に付属しません。「Kafka Connectハンドラの設定および実行」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタのプロパティ・ファイルでgg.classpath
プロパティを適切に構成する作業は、ユーザーが行う必要があります。
20.7.2 無効なKafkaバージョン
Kafka Connectは、Kafka 0.9.0.0バージョンで導入されました。Kafka Connectハンドラは、0.8.2.2以前のバージョンのKafkaでは動作しません。Kafka 0.8.2.2でKafka Connectを使用しようとすると、実行時にClassNotFoundException
エラーが発生します。
20.7.3 Kafkaプロデューサ・プロパティ・ファイルが見つからない
通常、次の例外メッセージが表示されます。
ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties
Kafkaプロデューサの構成ファイル名のgg.handler.kafkahandler.KafkaProducerConfigFile
構成プロパティが正しく設定されていることを確認してください。
gg.classpath
変数にKafkaプロデューサのプロパティ・ファイルのパスが含まれていること、プロパティ・ファイルのパスの末尾にワイルドカード(*)が含まれていないことを確認してください。
20.7.4 Kafkaの接続の問題
通常、次の例外メッセージが表示されます。
WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN (Selector.java:276) - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused
これが発生すると、接続の再試行時間が経過し、Kafka接続ハンドラ・プロセスが異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサのプロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。
Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -l
など)を使用すると、Kafkaが所定のポートでリスニングしていることを確認できます。