11 Kafka Connectハンドラの使用

Kafka Connectハンドラの使用方法について説明します。このハンドラは、標準的なKafkaのメッセージング機能を拡張したものです。

トピック:

11.1 概要

Oracle GoldenGate Kafka Connectは、標準的なKafkaのメッセージング機能を拡張したものです。Kafka Connectは、標準的なKafkaのプロデューサおよびコンシューマのインタフェースの上にある機能レイヤーです。新規のソースおよびターゲット・システムをトポロジに簡単に追加できる、メッセージングの標準化を提供します。

ConfluentはKafka Connectの主要な採用先であり、そのConfluent Platform製品には標準的なKafka Connectの機能拡張が含まれています。これには、AvroのシリアライズとデシリアライズおよびAvroスキーマ・レジストリが含まれています。Kafka Connectの機能の多くは、Apache Kafkaで利用できます。様々なオープン・ソースのKafka Connect統合は、次の場所を参照してください。

https://www.confluent.io/product/connectors/

Kafka Connectハンドラは、Kafka Connectのソース・コネクタです。Oracle GoldenGateでサポートされているデータベースから、データベースの変更を取得し、その変更データをKafka Connectレイヤー経由でKafkaにストリーミングできます。このハンドラを使用して、Oracle Event Hub Cloud Service (EHCS)に接続することもできます。

Kafka Connectでは、プロプライエタリなオブジェクトを使用してスキーマ(org.apache.kafka.connect.data.Schema)およびメッセージ(org.apache.kafka.connect.data.Struct)が定義されます。Kafka Connectハンドラは、公開済データおよび公開済データの構造を管理するように構成できます。

Kafka Connectハンドラは、Kafkaハンドラでサポートされているどのプラガブル・フォーマッタもサポートしていません

トピック:

11.2 詳細な機能

JSONコンバータ

Kafka Connectフレームワークにより、インメモリーのKafka Connectメッセージを、ネットワーク経由での伝送に適したシリアライズされた形式に変換するコンバータが提供されます。このコンバータは、Kafkaプロデューサのプロパティ・ファイルの構成を使用して選択されます。

Kafka ConnectおよびJSONコンバータは、Apache Kafkaダウンロードの一部として使用可能です。JSONコンバータによりKafkaのキーおよび値がJSONに変換され、それがKafkaトピックに送信されます。Kafkaプロデューサのプロパティ・ファイルで、次の構成を使用してJSONコンバータを識別します。

key.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=true 
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=true

メッセージのフォーマットは、ペイロード情報が後に続くメッセージ・スキーマ情報です。JSONは自己記述型のフォーマットであるため、Kafkaに公開される各メッセージにはスキーマ情報を含めないでください。

メッセージからJSONのスキーマ情報を省略するには、次のように設定します。

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Avroコンバータ

Confluentにより、Kafkaのインストール、Kafkaのサポート、Kafka上に構築された拡張機能が提供され、Kafkaの潜在的な可能性の実現が支援されます。Confluentは、オープン・ソース版のKafka (Confluent Open Source)と、購入可能なエンタープライズ版(Confluent Enterprise)の両方を提供しています。

一般的なKafkaのユースケースは、Kafka上でAvroメッセージを送信することです。これは、AvroメッセージをデシリアライズするためにAvroスキーマに依存するため、受信側で問題が発生する可能性があります。受信メッセージは、プロデューサ側でメッセージを生成するために使用されたAvroスキーマと完全に一致させる必要があるため、スキーマの進化によって問題が増加する可能性があります。間違ったAvroスキーマでAvroメッセージをデシリアライズすると、ランタイム・エラー、不完全なデータまたは不正なデータが発生することがあります。Confluentでは、スキーマ・レジストリおよびConfluentスキーマ・コンバータを使用してこの問題を解決しました。

Kafkaプロデューサのプロパティ・ファイルの構成を次に示します。

key.converter=io.confluent.connect.avro.AvroConverter 
value.converter=io.confluent.connect.avro.AvroConverter 
key.converter.schema.registry.url=http://localhost:8081 
value.converter.schema.registry.url=http://localhost:8081 

メッセージがKafkaに公開されると、Avroスキーマが登録され、スキーマ・レジストリに格納されます。Kafkaからメッセージが消費されるとき、メッセージの作成に使用された正確なAvroスキーマをスキーマ・レジストリから取得してAvroメッセージをデシリアライズできます。Avroメッセージとそれに対応する受信側のAvroスキーマの照合を作成することで、この問題を解決しています。

Avroコンバータを使用するための要件は次のとおりです。

  • この機能は、Confluent Kafkaの両バージョン(オープン・ソースまたはエンタープライズ)で使用できます。

  • Confluentスキーマ・レジストリ・サービスが実行されている必要があります。

  • ソース・データベース表には、関連付けられたAvroスキーマが必要です。異なるAvroスキーマに関連付けられたメッセージは、異なるKafkaトピックに送信する必要があります。

  • ConfluentのAvroコンバータおよびスキーマ・レジストリ・クライアントがクラスパスで使用できる必要があります。

スキーマ・レジストリは、トピックごとにAvroスキーマを追跡します。メッセージは、同じスキーマまたは同じスキーマの進化バージョンを持つトピックに送信する必要があります。ソース・メッセージにはソース・データベース表スキーマに基づくAvroスキーマがあり、Avroスキーマはソース表ごとに一意です。複数のソース表で単一のトピックにメッセージを公開すると、ソース表から送信されたメッセージが以前のメッセージと異なるたびに、スキーマが進化していることがスキーマ・レジストリに示されます。

11.3 Kafka Connectハンドラの設定および実行

ここでは、Kafka Connectハンドラのコンポーネントの構成およびハンドラの実行について説明します。

クラスパス構成

Kafka ConnectハンドラをKafkaに接続して実行できるように、gg.classpath構成変数に2つのものを含める必要があります。必要なのは、Kafkaプロデューサ・プロパティ・ファイルと、KafkaクライアントJARです。KafkaクライアントのJARは、Kafka Connectハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、Kafkaハンドラ・クライアント依存性(「Kafka Connectハンドラ・クライアント依存性」)を参照してください。Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprmディレクトリです。

Kafka ConnectクライアントJARのデフォルトの場所は、Kafka_Home/libs/*ディレクトリです。

gg.classpath変数は、正確に構成する必要があります。Kafkaプロデューサのプロパティ・ファイルのパス指定に含めるパスには、ワイルドカードを使用しないでください。Kafkaプロデューサのプロパティ・ファイルへのパスにアスタリスク(*)のワイルドカードを含めると、それが破棄されます。依存関係JARのパス指定には、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*)を含める必要があります。*.jarは使用しないでください

正しく構成されたApache Kafkaのクラスパスの例:

gg.classpath=dirprm:{kafka_install_dir}/libs/*

正しく構成されたConfluent Kafkaのクラスパスの例を次に示します。

gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*

トピック:

11.3.1 Kafka Connectハンドラの構成

Kafka Connectハンドラの構成可能な値は次のとおりです。これらのプロパティは、Javaアダプタ・プロパティ・ファイルにあります(Replicatプロパティ・ファイルにはありません)。

Kafka Connectハンドラの選択を有効にするには、まずgg.handler.jdbc.type=kafkaconnectを指定してハンドラ・タイプを構成してから、次に示す他のKafka Connectプロパティを構成する必要があります。

表11-1 Kafka Connectハンドラの構成プロパティ

プロパティ 必須/オプション 有効な値 デフォルト 説明
gg.handler.name.type

必須

kafkaconnect

なし

Kafka Connectハンドラを選択するための構成。

gg.handler.name.kafkaProducerConfigFile

必須

string

なし

KafkaのプロパティおよびKafka Connect構成プロパティを含むプロパティ・ファイルへのパス。

gg.handler.name.topicMappingTemplate

必須

実行時にKafkaトピック名を解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.keyMappingTemplate

必須

実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.includeTableName

オプション

true | false

true

trueに設定すると、出力メッセージに「table」というフィールドが作成され、その値は完全修飾表名になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.includeOpType

オプション

true | false

true

trueに設定すると、出力メッセージにop_typeというフィールドが作成され、その値はソース・データベースの操作タイプのインジケータ(たとえば、挿入の場合はI、更新の場合はU、削除の場合はD)になります。出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.includeOpTimestamp

オプション

true | false

true

trueに設定すると、出力メッセージにop_tsというフィールドが作成され、その値はソース証跡ファイルの操作タイムスタンプ(コミット・タイムスタンプ)になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.includeCurrentTimestamp

オプション

true | false

true

trueに設定すると、出力メッセージにcurrent_tsというフィールドが作成され、その値はハンドラが操作を処理するときのタイムスタンプになります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.includePosition

オプション

true | false

true

trueに設定すると、出力メッセージにposというフィールドが作成され、その値はソース証跡ファイルからの操作の位置(順序番号 + オフセット)になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.includePrimaryKeys

オプション

true | false

false

trueに設定すると、そのメッセージにprimary_keysというフィールドが含まれ、その値は主キー列の列名の配列になります。

このフィールドを抑止する場合は、falseに設定します。

gg.handler.name.includeTokens

オプション

true | false

false

trueに設定すると、出力メッセージにマップ・フィールドが含まれます。キーはトークンで、その値は、キーと値がOracle GoldenGateソース証跡ファイルからのトークン・キーと値であるマップです。

このフィールドを抑止する場合は、falseに設定します。

gg.handler.name.messageFormatting

オプション

row | op

row

出力メッセージのモデル化方法を制御します。行を選択すると、出力メッセージは行としてモデル化されます。opに設定すると、出力メッセージは操作メッセージとしてモデル化されます。

gg.handler.name.insertOpKey

オプション

任意の文字列

I

挿入操作を示すフィールドop_typeの値。

gg.handler.name.updateOpKey

オプション

任意の文字列

U

挿入操作を示すフィールドop_typeの値。

gg.handler.name.deleteOpKey

オプション

任意の文字列

D

削除操作を示すフィールドop_typeの値。

gg.handler.name.truncateOpKey

オプション

任意の文字列

T

切捨て操作を示すフィールドop_typeの値。

gg.handler.name.treatAllColumnsAsStrings

オプション

true | false

false

trueに設定すると、すべての出力フィールドが文字列として扱われます。falseに設定すると、ハンドラによって、ソース証跡ファイルからの対応するフィールド・タイプがそれに最も適したKafka Connectのデータ型にマップされます。

gg.handler.name.mapLargeNumbersAsStrings

オプション

true | false

false

大きい数値は、数値フィールドにDoubleとしてマッピングされます。特定のシナリオでは、精度を失う可能性があります。

trueに設定すると、精度を保つため、これらのフィールドが文字列としてマップされます。

gg.handler.name.iso8601Format

オプション

True | False

false

trueに設定すると、現在の日付がISO8601形式で出力されます。

gg.handler.name.pkUpdateHandling

オプション

abend | update | delete-insert

abend

モデリング行メッセージがgg.handler.name.messageFormatting=rowの場合にのみ適用されます。更新の場合に、ビフォアおよびアフター・イメージとしてのモデリング操作メッセージがそのメッセージに伝播される場合は適用されません。

gg.handler.name.pkUpdateHandling

オプション

任意のメタ列キーワード。

なし

テンプレートを表す1つ以上のテンプレート値で構成されるカンマ区切り文字列。「メタ列出力の設定」を参照してください。

gg.handler.name.includeIsMissingFields

オプション

true | false

true

extract{column_name}を含めるにはtrueに設定します。

このプロパティを各列に設定して、null値がソース証跡ファイルで実際にnullであるか、ソース証跡ファイルにないかを、ダウンストリーム・アプリケーションが区別できるようにします。

詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。

サンプル構成の確認

gg.handlerlist=kafkaconnect

#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=$

{fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=$

{primaryKeys}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
gg.handler.kafkaconnect.includeTableName=true
gg.handler.kafkaconnect.includeOpType=true
gg.handler.kafkaconnect.includeOpTimestamp=true
gg.handler.kafkaconnect.includeCurrentTimestamp=true
gg.handler.kafkaconnect.includePosition=true
gg.handler.kafkaconnect.includePrimaryKeys=false
gg.handler.kafkaconnect.includeTokens=false

11.3.2 テンプレートを使用したトピック名とメッセージ・キーの解決

Kafka Connectハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成パラメータに適用されます。

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

テンプレートのモード

Kafka Connectハンドラでは、操作メッセージのみを送信できます。Kafka Connectハンドラでは、操作メッセージをより大きなトランザクション・メッセージにグループ化できません。

テンプレートのキーワード

キーワード 説明

${fullyQualifiedTableName}

カタログ、スキーマおよび表名の間にピリオド(.)のデリミタを含む、完全修飾表名に解決されます。

たとえば、test.dbo.table1です。

${catalogName}

カタログ名に解決されます。

${schemaName}

スキーマ名に解決されます。

${tableName}

短い表名に解決されます。

${opType}

操作の種類(INSERTUPDATEDELETEまたはTRUNCATE)に解決されます

${primaryKeys}

アンダースコア(_)で区切られた連結主キー値に解決されます。

${position}

ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。

${opTimestamp}

ソース証跡ファイルからの操作タイムスタンプ。

${emptyString}

""に解決されます。

${groupName}

Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。

${staticMap[]}

キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。

${staticMap[dbo.table1=value1,dbo.table2=value2]}

${columnValue[]}

キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。次に例を示します。

${staticMap[dbo.table1=col1,dbo.table2=col2]}

${currentTimestamp}

または

${currentTimestamp[]}

現在のタイムスタンプに解決されます。SimpleDateFormatクラスで説明されているJavaベースの書式設定を使用して、現在のタイムスタンプの書式を制御できます(https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.htmlを参照)。

例:

${currentDate}
${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

${null}

NULL文字列に解決されます。

${custom[]}

カスタム値リゾルバを記述することが可能です。必要に応じて、Oracleサポートに連絡してください。

テンプレートの例

テンプレートの構成値の例と解決される値を次に示します。

テンプレートの例 解決される値

${groupName}_{fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

11.3.3 Kafka Connectハンドラでのセキュリティの構成

Kafkaバージョン0.9.0.0にSSL/TLSまたはKerberosを介したセキュリティが導入されました。Kafka Connectハンドラは、SSL/TLSまたはKerberos使用して保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを利用する統合からのセキュリティ機能の抽象化を提供します。Kafka Connectハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサのプロパティ・ファイル(Kafkaハンドラで処理のために使用する)を必要なセキュリティ・プロパティを使用して構成する必要があります。

keytabファイルからKerberosパスワードを復号化できない場合があります。これによって、Kerberos認証が対話型モードにフォール・バックされ、プログラムで呼び出されているため機能しなくなります。この問題の原因は、Java Cryptography Extension (JCE)が、Java Runtime Environment (JRE)にインストールされていないことです。JCEがJREにロードされていることを確認します。http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.htmlを参照してください。

11.4 Kafka Connectハンドラのパフォーマンスに関する考慮事項

Oracle GoldenGate for Big Dataの構成とKafkaプロデューサのパフォーマンスの両方に影響を与える複数の構成設定があります。

Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPSパラメータです。GROUPTRANSOPSパラメータを使用すると、Replicatで複数のソース・トランザクションを単一のターゲット・トランザクションにグループ化できます。トランザクションのコミット時、Kafka Connectハンドラは、書込み永続性のためにKafkaプロデューサ上でフラッシュをコールしてメッセージをKafkaにプッシュし、その後チェックポイントを送信します。フラッシュ・コールはコストの大きなコールであり、ReplicatのGROUPTRANSOPS設定を大きく設定すると、Replicatのフラッシュ・コールの頻度が少なくてすみ、パフォーマンスが向上します。

GROUPTRANSOPSのデフォルト設定は1000で、値を2500、5000、さらには10000に増やすことでパフォーマンスの向上が得られます。

opモードのgg.handler.kafkaconnect.mode=opパラメータもまた、txモードのgg.handler.kafkaconnect.mode=txよりもパフォーマンスを向上させることができます。

Kafkaプロデューサのいくつかのプロパティがパフォーマンスに影響する可能性があります。重要な影響を与えるパラメータは次のとおりです。

  • linger.ms

  • batch.size

  • acks

  • buffer.memory

  • compression.type

これらのパラメータのデフォルト値から開始し、パフォーマンスのベース・ラインを取得するためのパフォーマンス・テストを実行することをお薦めします。これらの各パラメータのKafkaドキュメントを参照して、その役割を理解し、パラメータを調整し、各パラメータのパフォーマンス効果を確認するための追加のパフォーマンス・テストを実行します。

11.5 Kafka Connectハンドラのトラブルシューティング

トピック:

11.5.1 Kafka Connectハンドラ用のJavaクラスパス

Javaクラスパスは、非常によくある問題の1つです。クラスパスに問題があることを示すのは、Oracle GoldenGate Javaのlog4jログ・ファイルにあるClassNotFoundExceptionで、gg.classpath変数に入力ミスがある場合は、クラスパスを解決する際にエラーが発生します。

Kafkaクライアント・ライブラリは、Oracle GoldenGate for Big Data製品に付属しません。「Kafka Connectハンドラの設定および実行」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタのプロパティ・ファイルでgg.classpathプロパティを適切に構成する作業は、ユーザーが行う必要があります。

11.5.2 無効なKafkaバージョン

Kafka Connectは、Kafka 0.9.0.0バージョンで導入されました。Kafka Connectハンドラは、0.8.2.2以前のバージョンのKafkaでは動作しません。Kafka 0.8.2.2でKafka Connectを使用しようとすると、実行時にClassNotFoundExceptionエラーが発生します。

11.5.3 Kafkaプロデューサ・プロパティ・ファイルが見つからない

通常、次の例外メッセージが表示されます。

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

Kafkaプロデューサの構成ファイル名のgg.handler.kafkahandler.KafkaProducerConfigFile構成プロパティが正しく設定されていることを確認してください。

gg.classpath変数にKafkaプロデューサのプロパティ・ファイルのパスが含まれていること、プロパティ・ファイルのパスの末尾にワイルドカード(*)が含まれていないことを確認してください。

11.5.4 Kafkaの接続の問題

通常、次の例外メッセージが表示されます。

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] 

WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1  java.net.ConnectException: Connection refused

これが発生すると、接続の再試行時間が経過し、Kafka接続ハンドラ・プロセスが異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサのプロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。

Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -lなど)を使用すると、Kafkaが所定のポートでリスニングしていることを確認できます。