20 Kafka Connectハンドラの使用

Kafka Connectハンドラは、標準的なKafkaのメッセージング機能を拡張したものです。

この章では、Kafka Connectハンドラの使用方法について説明します。

20.1 概要

Oracle GoldenGate Kafka Connectは、標準的なKafkaのメッセージング機能を拡張したものです。Kafka Connectは、標準的なKafkaのプロデューサおよびコンシューマのインタフェースの上にある機能レイヤーです。新規のソースおよびターゲット・システムをトポロジに簡単に追加できる、メッセージングの標準化を提供します。

ConfluentはKafka Connectの主要な採用先であり、そのConfluent Platform製品には標準的なKafka Connectの機能拡張が含まれています。これには、AvroのシリアライズとデシリアライズおよびAvroスキーマ・レジストリが含まれています。Kafka Connectの機能の多くは、Apache Kafkaで利用できます。様々なオープン・ソースのKafka Connect統合は、次の場所を参照してください。

https://www.confluent.io/product/connectors/

Kafka Connectハンドラは、Kafka Connectのソース・コネクタです。Oracle GoldenGateでサポートされているデータベースから、データベースの変更を取得し、その変更データをKafka Connectレイヤー経由でKafkaにストリーミングできます。このハンドラを使用して、Oracle Event Hub Cloud Service (EHCS)に接続することもできます。

Kafka Connectでは、プロプライエタリなオブジェクトを使用してスキーマ(org.apache.kafka.connect.data.Schema)およびメッセージ(org.apache.kafka.connect.data.Struct)が定義されます。Kafka Connectハンドラは、公開済データおよび公開済データの構造を管理するように構成できます。

Kafka Connectハンドラは、Kafkaハンドラでサポートされているどのプラガブル・フォーマッタもサポートしていません

20.2 詳細な機能

JSONコンバータ

Kafka Connectフレームワークにより、インメモリーのKafka Connectメッセージを、ネットワーク経由での伝送に適したシリアライズされた形式に変換するコンバータが提供されます。このコンバータは、Kafkaプロデューサのプロパティ・ファイルの構成を使用して選択されます。

Kafka ConnectおよびJSONコンバータは、Apache Kafkaダウンロードの一部として使用可能です。JSONコンバータによりKafkaのキーおよび値がJSONに変換され、それがKafkaトピックに送信されます。Kafkaプロデューサのプロパティ・ファイルで、次の構成を使用してJSONコンバータを識別します。

key.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=true 
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=true

メッセージのフォーマットは、ペイロード情報が後に続くメッセージ・スキーマ情報です。JSONは自己記述型のフォーマットであるため、Kafkaに公開される各メッセージにはスキーマ情報を含めないでください。

メッセージからJSONのスキーマ情報を省略するには、次のように設定します。

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Avroコンバータ

Confluentにより、Kafkaのインストール、Kafkaのサポート、Kafka上に構築された拡張機能が提供され、Kafkaの潜在的な可能性の実現が支援されます。Confluentは、オープン・ソース版のKafka (Confluent Open Source)と、購入可能なエンタープライズ版(Confluent Enterprise)の両方を提供しています。

一般的なKafkaのユースケースは、Kafka上でAvroメッセージを送信することです。これは、AvroメッセージをデシリアライズするためにAvroスキーマに依存するため、受信側で問題が発生する可能性があります。受信メッセージは、プロデューサ側でメッセージを生成するために使用されたAvroスキーマと完全に一致させる必要があるため、スキーマの進化によって問題が増加する可能性があります。間違ったAvroスキーマでAvroメッセージをデシリアライズすると、ランタイム・エラー、不完全なデータまたは不正なデータが発生することがあります。Confluentでは、スキーマ・レジストリおよびConfluentスキーマ・コンバータを使用してこの問題を解決しました。

Kafkaプロデューサのプロパティ・ファイルの構成を次に示します。

key.converter=io.confluent.connect.avro.AvroConverter 
value.converter=io.confluent.connect.avro.AvroConverter 
key.converter.schema.registry.url=http://localhost:8081 
value.converter.schema.registry.url=http://localhost:8081 

メッセージがKafkaに公開されると、Avroスキーマが登録され、スキーマ・レジストリに格納されます。Kafkaからメッセージが消費されるとき、メッセージの作成に使用された正確なAvroスキーマをスキーマ・レジストリから取得してAvroメッセージをデシリアライズできます。Avroメッセージとそれに対応する受信側のAvroスキーマの照合を作成することで、この問題を解決しています。

Avroコンバータを使用するための要件は次のとおりです。

  • この機能は、Confluent Kafkaの両バージョン(オープン・ソースまたはエンタープライズ)で使用できます。

  • Confluentスキーマ・レジストリ・サービスが実行されている必要があります。

  • ソース・データベース表には、関連付けられたAvroスキーマが必要です。異なるAvroスキーマに関連付けられたメッセージは、異なるKafkaトピックに送信する必要があります。

  • ConfluentのAvroコンバータおよびスキーマ・レジストリ・クライアントがクラスパスで使用できる必要があります。

スキーマ・レジストリは、トピックごとにAvroスキーマを追跡します。メッセージは、同じスキーマまたは同じスキーマの進化バージョンを持つトピックに送信する必要があります。ソース・メッセージにはソース・データベース表スキーマに基づくAvroスキーマがあり、Avroスキーマはソース表ごとに一意です。複数のソース表で単一のトピックにメッセージを公開すると、ソース表から送信されたメッセージが以前のメッセージと異なるたびに、スキーマが進化していることがスキーマ・レジストリに示されます。

20.3 Kafka Connectハンドラの設定および実行

ここでは、Kafka Connectハンドラのコンポーネントの構成およびハンドラの実行について説明します。

クラスパス構成

Kafka ConnectハンドラをKafkaに接続して実行できるように、gg.classpath構成変数に2つのものを含める必要があります。必要なのは、Kafkaプロデューサ・プロパティ・ファイルと、KafkaクライアントJARです。KafkaクライアントのJARは、Kafka Connectハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、Kafkaハンドラ・クライアント依存性(「Kafka Connectハンドラ・クライアント依存性」)を参照してください。Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprmディレクトリです。

Kafka ConnectクライアントJARのデフォルトの場所は、Kafka_Home/libs/*ディレクトリです。

gg.classpath変数は、正確に構成する必要があります。Kafkaプロデューサのプロパティ・ファイルのパス指定に含めるパスには、ワイルドカードを使用しないでください。Kafkaプロデューサのプロパティ・ファイルへのパスにアスタリスク(*)のワイルドカードを含めると、それが破棄されます。依存関係JARのパス指定には、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*)を含める必要があります。*.jarは使用しないでください

正しく構成されたApache Kafkaのクラスパスの例:

gg.classpath=dirprm:{kafka_install_dir}/libs/*

正しく構成されたConfluent Kafkaのクラスパスの例を次に示します。

gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*

20.3.1 Kafka Connectハンドラの構成

メタ列フィールドは次のプロパティとして構成できます。

gg.handler.name.metaColumnsTemplate

メタ列を以前のバージョンと同様に出力するには、次のように構成します。

gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}

主キー列およびトークンも含めるには、次のように構成します。

gg.handler.name.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}

詳細は、構成プロパティを参照してください

gg.handler.name.metaColumnsTemplate

表20-1 Kafka Connectハンドラの構成プロパティ

プロパティ 必須/オプション 有効な値 デフォルト 説明
gg.handler.name.type

必須

kafkaconnect

なし

Kafka Connectハンドラを選択するための構成。

gg.handler.name.kafkaProducerConfigFile

必須

string

なし

KafkaのプロパティおよびKafka Connect構成プロパティを含むプロパティ・ファイルへのパス。

gg.handler.name.topicMappingTemplate

必須

実行時にKafkaトピック名を解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.keyMappingTemplate

必須

実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.includeTokens

オプション

true | false

false

trueに設定すると、出力メッセージにマップ・フィールドが含まれます。キーはトークンで、その値は、キーと値がOracle GoldenGateソース証跡ファイルからのトークン・キーと値であるマップです。

このフィールドを抑止する場合は、falseに設定します。

gg.handler.name.messageFormatting

オプション

row | op

row

出力メッセージのモデル化方法を制御します。行を選択すると、出力メッセージは行としてモデル化されます。opに設定すると、出力メッセージは操作メッセージとしてモデル化されます。

gg.handler.name.insertOpKey

オプション

任意の文字列

I

挿入操作を示すフィールドop_typeの値。

gg.handler.name.updateOpKey

オプション

任意の文字列

U

挿入操作を示すフィールドop_typeの値。

gg.handler.name.deleteOpKey

オプション

任意の文字列

D

削除操作を示すフィールドop_typeの値。

gg.handler.name.truncateOpKey

オプション

任意の文字列

T

切捨て操作を示すフィールドop_typeの値。

gg.handler.name.treatAllColumnsAsStrings

オプション

true | false

false

trueに設定すると、すべての出力フィールドが文字列として扱われます。falseに設定すると、ハンドラによって、ソース証跡ファイルからの対応するフィールド・タイプがそれに最も適したKafka Connectのデータ型にマップされます。

gg.handler.name.mapLargeNumbersAsStrings

オプション

true | false

false

大きい数値は、数値フィールドにDoubleとしてマッピングされます。特定のシナリオでは、精度を失う可能性があります。

trueに設定すると、精度を保つため、これらのフィールドが文字列としてマップされます。

gg.handler.name.pkUpdateHandling

オプション

abend | update | delete-insert

abend

モデリング行メッセージがgg.handler.name.messageFormatting=rowの場合にのみ適用されます。更新の場合に、ビフォアおよびアフター・イメージとしてのモデリング操作メッセージがそのメッセージに伝播される場合は適用されません。

gg.handler.name.metaColumnsTemplate

オプション

任意のメタ列キーワード。

なし

テンプレートを表す1つ以上のテンプレート値で構成されるカンマ区切り文字列。「メタ列出力の設定」を参照してください。

gg.handler.name.includeIsMissingFields

オプション

true|false

true

extract{column_name}を含めるにはtrueに設定します。

このプロパティを各列に設定して、null値がソース証跡ファイルで実際にnullであるか、ソース証跡ファイルにないかを、ダウンストリーム・アプリケーションが区別できるようにします。

gg.handler.name.enableDecimalLogicalType オプション true|false false Kafka Connectで小数論理型を有効にするには、trueに設定します。小数論理型では、64ビット・データ型に収まらない数値を表すことができます。
gg.handler.name.oracleNumberScale オプション 正の整数 38 gg.handler.name.enableDecimalLogicalType=trueの場合にのみ適用されます。一部のソース・データ型には、固定スケールが関連付けられていません。Kafka Connect小数論理型にスケールを設定する必要があります。メタデータにスケールがないソース型の場合は、このパラメータの値を使用してスケールを設定します。
gg.handler.name.EnableTimestampLogicalType オプション true|false false Kafka Connectタイムスタンプ論理型を有効にするには、trueに設定します。Kafka Connectタイムスタンプの論理時間は、Javaのエポックからのミリ秒の整数測定値です。これは、タイムスタンプ論理型が使用された場合、ミリ秒より高い精度が使用できないことを意味します。このプロパティを使用するには、gg.format.timestampプロパティを設定する必要があります。このプロパティは、文字列形式のタイムスタンプの出力を決定するために使用されるタイムスタンプ書式設定文字列です。たとえば、gg.format.timestamp=yyyy-MM-dd HH:mm:ss.SSSです。goldengate.userexit.timestampプロパティが構成ファイルに設定されていないことを確認します。このプロパティを設定すると、入力タイムスタンプが、論理タイムスタンプに必要なJavaオブジェクトに解析されなくなります。
gg.handler.name.metaHeadersTemplate オプション メタ列キーワードのカンマ区切りリスト。 なし メタ列を選択し、メタ列キーワード構文を使用してコンテキスト・ベースのキー値ペアをKafkaメッセージ・ヘッダーに挿入できます。
gg.handler.name.schemaNamespace

オプション KafkaコネクタAvroスキーマのネーミング要件に違反する文字を含まない任意の文字列。 なし 生成されるKafka Connectスキーマ名の制御に使用されます。設定されていない場合、スキーマ名は修飾されたソース表名と同じになります。たとえば、ソース表がQASOURCE.TCUSTMERである場合、Kafka Connectスキーマ名は同じになります。

このプロパティを使用して、生成されるスキーマ名を制御できます。たとえば、このプロパティがcom.example.companyに設定されている場合、表QASOURCE.TCUSTMERに対して生成されるKafka Connectスキーマ名はcom.example.company.TCUSTMERです。

詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。

サンプル構成の確認

gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.pkUpdateHandling=abend

20.3.2 テンプレートを使用したトピック名とメッセージ・キーの解決

Kafka Connectハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成パラメータに適用されます。

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

テンプレートのモード

Kafka Connectハンドラでは、操作メッセージのみを送信できます。Kafka Connectハンドラでは、操作メッセージをより大きなトランザクション・メッセージにグループ化できません。

テンプレートのキーワード

キーワード 説明

${fullyQualifiedTableName}

カタログ、スキーマおよび表名の間にピリオド(.)のデリミタを含む、完全修飾表名に解決されます。

たとえば、test.dbo.table1です。

${catalogName}

カタログ名に解決されます。

${schemaName}

スキーマ名に解決されます。

${tableName}

短い表名に解決されます。

${opType}

操作の種類(INSERTUPDATEDELETEまたはTRUNCATE)に解決されます

${primaryKeys}

アンダースコア(_)で区切られた連結主キー値に解決されます。

${position}

ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。

${opTimestamp}

ソース証跡ファイルからの操作タイムスタンプ。

${emptyString}

""に解決されます。

${groupName}

Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。

${staticMap[]}

キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。

${staticMap[dbo.table1=value1,dbo.table2=value2]}

${columnValue[]}

キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。たとえば:

${staticMap[dbo.table1=col1,dbo.table2=col2]}

${currentTimestamp}

または

${currentTimestamp[]}

現在のタイムスタンプに解決されます。SimpleDateFormatクラスで説明されているJavaベースの書式設定を使用して、現在のタイムスタンプの書式を制御できます(https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.htmlを参照)。

例:

${currentDate}
${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

${null}

NULL文字列に解決されます。

${custom[]}

カスタム値リゾルバを記述することが可能です。必要に応じて、Oracleサポートに連絡してください。

テンプレートの例

テンプレートの構成値の例と解決される値を次に示します。

テンプレートの例 解決される値

${groupName}_{fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

20.3.3 Kafka Connectハンドラでのセキュリティの構成

Kafkaバージョン0.9.0.0にSSL/TLSまたはKerberosを介したセキュリティが導入されました。Kafka Connectハンドラは、SSL/TLSまたはKerberos使用して保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを利用する統合からのセキュリティ機能の抽象化を提供します。Kafka Connectハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサのプロパティ・ファイル(Kafkaハンドラで処理のために使用する)を必要なセキュリティ・プロパティを使用して構成する必要があります。

keytabファイルからKerberosパスワードを復号化できない場合があります。これによって、Kerberos認証が対話型モードにフォール・バックされ、プログラムで呼び出されているため機能しなくなります。この問題の原因は、Java Cryptography Extension (JCE)が、Java Runtime Environment (JRE)にインストールされていないことです。JCEがJREにロードされていることを確認します。http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.htmlを参照してください。

20.4 Kafka Connectハンドラのパフォーマンスに関する考慮事項

Oracle GoldenGate for Big Dataの構成とKafkaプロデューサのパフォーマンスの両方に影響を与える複数の構成設定があります。

Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPSパラメータです。GROUPTRANSOPSパラメータを使用すると、Replicatで複数のソース・トランザクションを単一のターゲット・トランザクションにグループ化できます。トランザクションのコミット時、Kafka Connectハンドラは、書込み永続性のためにKafkaプロデューサ上でフラッシュをコールしてメッセージをKafkaにプッシュし、その後チェックポイントを送信します。フラッシュ・コールはコストの大きなコールであり、ReplicatのGROUPTRANSOPS設定を大きく設定すると、Replicatのフラッシュ・コールの頻度が少なくてすみ、パフォーマンスが向上します。

GROUPTRANSOPSのデフォルト設定は1000で、値を2500、5000、さらには10000に増やすことでパフォーマンスの向上が得られます。

opモードのgg.handler.kafkaconnect.mode=opパラメータもまた、txモードのgg.handler.kafkaconnect.mode=txよりもパフォーマンスを向上させることができます。

Kafkaプロデューサのいくつかのプロパティがパフォーマンスに影響する可能性があります。重要な影響を与えるパラメータは次のとおりです。

  • linger.ms

  • batch.size

  • acks

  • buffer.memory

  • compression.type

これらのパラメータのデフォルト値から開始し、パフォーマンスのベース・ラインを取得するためのパフォーマンス・テストを実行することをお薦めします。これらの各パラメータのKafkaドキュメントを参照して、その役割を理解し、パラメータを調整し、各パラメータのパフォーマンス効果を確認するための追加のパフォーマンス・テストを実行します。

20.5 Kafkaインターセプタのサポート

Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。

インターセプタの一般的な使用例は、監視です。Kafkaプロデューサ・インターセプタは、インタフェース org.apache.kafka.clients.producer.ProducerInterceptorに準拠している必要があります。Kafka Connectハンドラは、プロデューサ・インターセプタの使用をサポートしています。

ハンドラでインターセプタを使用する場合の要件は次のとおりです。

  • Kafkaプロデューサ構成プロパティ"interceptor.classes"は、起動するインターセプタのクラス名で構成する必要があります。
  • インターセプタを呼び出すためには、jarファイルとすべての依存関係jarをJVMで使用できる必要があります。したがって、インターセプタを含むjarファイルとすべての依存関係jarを、ハンドラ構成ファイルのgg.classpathに追加する必要があります。

    詳細は、Kafkaのドキュメントを参照してください。

20.6 Kafkaパーティションの選択

Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。

(Kafkaメッセージ・キーのハッシュ)係数(パーティションの数) = 選択したパーティション番号

Kafkaメッセージ・キーは、次の構成値によって選択されます。

gg.handler.{your handler name}.keyMappingTemplate=

このパラメータが静的キーを生成する値に設定されている場合、すべてのメッセージは同じパーティションに送信されます。静的キーの例を次に示します。

gg.handler.{your handler name}.keyMappingTemplate=StaticValue

このパラメータが、頻繁に変更されないキーを生成する値に設定されている場合、パーティション選択の変更頻度は低くなります。次の例では、表名をメッセージ・キーとして使用します。特定のソース表に対するすべての操作は同じキーを持つため、同じパーティションにルーティングされます。

gg.handler.{your handler name}.keyMappingTemplate=${tableName}
nullのKafkaメッセージ・キーはラウンドロビン・ベースでパーティションに分散されます。これを行うには、次のように設定します。
gg.handler.{your handler name}.keyMappingTemplate=${null}

マッピング・キーの構成に推奨される設定は次のとおりです。

gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}

これにより、連結および区切られた主キー値であるKafkaメッセージ・キーが生成されます。

各行の操作には一意の主キーが必要であるため、各行に一意のKafkaメッセージ・キーが生成されます。別の重要な考慮事項として、異なるパーティションに送信されたKafkaメッセージは、送信された元の順序でKafkaコンシューマに配信される保証はありません。これはKafkaの仕様の一部です。順序はパーティション内でのみ維持されます。Kafkaメッセージ・キーとして主キーを使用すると、同じ主キーを持つ同じ行に対する操作で同じKafkaメッセージ・キーが生成されるため、同じKafkaパーティションに送信されます。このようにして、同じ行に対する操作の順序が維持されます。

DEBUGログ・レベルでは、Kafkaメッセージの座標(トピック、パーティションおよびオフセット)は、正常に送信されたメッセージの.logファイルに記録されます。

20.7 Kafka Connectハンドラのトラブルシューティング

20.7.1 Kafka Connectハンドラ用のJavaクラスパス

Javaクラスパスは、非常によくある問題の1つです。クラスパスに問題があることを示すのは、Oracle GoldenGate Javaのlog4jログ・ファイルにあるClassNotFoundExceptionで、gg.classpath変数に入力ミスがある場合は、クラスパスを解決する際にエラーが発生します。

Kafkaクライアント・ライブラリは、Oracle GoldenGate for Big Data製品に付属しません。「Kafka Connectハンドラの設定および実行」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタのプロパティ・ファイルでgg.classpathプロパティを適切に構成する作業は、ユーザーが行う必要があります。

20.7.2 無効なKafkaバージョン

Kafka Connectは、Kafka 0.9.0.0バージョンで導入されました。Kafka Connectハンドラは、0.8.2.2以前のバージョンのKafkaでは動作しません。Kafka 0.8.2.2でKafka Connectを使用しようとすると、実行時にClassNotFoundExceptionエラーが発生します。

20.7.3 Kafkaプロデューサ・プロパティ・ファイルが見つからない

通常、次の例外メッセージが表示されます。

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

Kafkaプロデューサの構成ファイル名のgg.handler.kafkahandler.KafkaProducerConfigFile構成プロパティが正しく設定されていることを確認してください。

gg.classpath変数にKafkaプロデューサのプロパティ・ファイルのパスが含まれていること、プロパティ・ファイルのパスの末尾にワイルドカード(*)が含まれていないことを確認してください。

20.7.4 Kafkaの接続の問題

通常、次の例外メッセージが表示されます。

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] 

WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1  java.net.ConnectException: Connection refused

これが発生すると、接続の再試行時間が経過し、Kafka接続ハンドラ・プロセスが異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサのプロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。

Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -lなど)を使用すると、Kafkaが所定のポートでリスニングしていることを確認できます。