この章では、Kafka Connectハンドラについて説明し、その機能を理解できるように例を示します。
トピック:
Oracle GoldenGate Kafka Connectは、標準的なKafkaのメッセージング機能を拡張したものです。Kafka Connectは、標準的なKafkaのプロデューサおよびコンシューマのインタフェースの上にある機能レイヤーです。新規のソースおよびターゲット・システムをトポロジに簡単に追加できる、メッセージングの標準化を提供します。
Confluent IOはKafka Connectを採用した主要な企業であり、そのKafka製品には、Avroのシリアライズとデシリアライズ、Avroスキーマ・レジストリなどの標準的なKafka Connectの機能拡張が含まれています。Kafka Connectの機能の多くは、Apache Kafkaで利用できます。https://www.confluent.io/product/connectors/で、いくつかのオープン・ソースのKafka Connect統合を参照できます。
Kafka Connectハンドラは、Kafka Connectのソース・コネクタです。Oracle GoldenGateでサポートされているデータベースから、データベースの変更を取得し、その変更データをKafka Connectレイヤー経由でKafkaにストリーミングできます。
Kafka Connectでは、プロプライエタリなオブジェクトを使用してスキーマ(org.apache.kafka.connect.data.Schema
)およびメッセージ(org.apache.kafka.connect.data.Struct
)が定義されます。Kafka Connectハンドラは、公開済データおよび公開済データの構造を管理するように構成できます。
Kafka Connectハンドラは、Kafkaハンドラでサポートされているどのプラガブル・フォーマッタもサポートしていません。
トピック:
JSONコンバータ
Kafka Connectフレームワークにより、インメモリーのKafka Connectメッセージを、ネットワーク経由での伝送に適したシリアライズされた形式に変換するコンバータが提供されます。このコンバータは、Kafkaプロデューサのプロパティ・ファイルの構成を使用して選択されます。
Kafka ConnectおよびJSONコンバータは、Apache Kafkaダウンロードの一部として使用可能です。JSONコンバータによりKafkaのキーおよび値がJSONに変換され、それがKafkaトピックに送信されます。Kafkaプロデューサのプロパティ・ファイルで、次の構成を使用してJSONコンバータを識別します。
key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true
メッセージのフォーマットは、ペイロード情報が後に続くメッセージ・スキーマ情報です。JSONは自己記述型のフォーマットであるため、Kafkaに公開される各メッセージにはスキーマ情報を含めないでください。
メッセージからJSONのスキーマ情報を省略するには、次のように設定します。
key.converter.schemas.enable=false value.converter.schemas.enable=false
Avroコンバータ
Confluent IOにより、Kafkaのインストール、Kafkaのサポート、Kafka上に構築された拡張機能が提供され、Kafkaの潜在的な可能性の実現が支援されます。Confluent IOは、オープン・ソース版のKafka(Confluent Open Source)と、購入可能なエンタープライズ版(Confluent Enterprise)の両方を提供しています。
一般的なKafkaのユースケースは、Kafka上でAvroメッセージを送信することです。これは、AvroメッセージをデシリアライズするためにAvroスキーマに依存するため、受信側で問題が発生する可能性があります。受信メッセージは、プロデューサ側でメッセージを生成するために使用されたAvroスキーマと完全に一致させる必要があるため、スキーマの進化によって問題が増加する可能性があります。間違ったAvroスキーマでAvroメッセージをデシリアライズすると、ランタイム・エラー、不完全なデータまたは不正なデータが発生することがあります。Confluent IOでは、スキーマ・レジストリおよびConfluent IOスキーマ・コンバータを使用してこの問題を解決しました。
Kafkaプロデューサのプロパティ・ファイルの構成を次に示します。
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter.schema.registry.url=http://localhost:8081
メッセージがKafkaに公開されると、Avroスキーマが登録され、スキーマ・レジストリに格納されます。Kafkaからメッセージが消費されるとき、メッセージの作成に使用された正確なAvroスキーマをスキーマ・レジストリから取得してAvroメッセージをデシリアライズできます。Avroメッセージとそれに対応する受信側のAvroスキーマの照合を作成することで、この問題を解決しています。
Avroコンバータを使用するための要件は次のとおりです。
この機能は現在、Confluent IOのKafkaバージョン(オープン・ソースまたはエンタープライズ)で利用できます。
Confluentスキーマ・レジストリ・サービスが実行されている必要があります。
ソース・データベース表には、関連付けられたAvroスキーマが必要です。異なるAvroスキーマに関連付けられたメッセージは、異なるKafkaトピックに送信する必要があります。
Confluent IOのAvroコンバータおよびスキーマ・レジストリ・クライアントがクラスパスで使用できる必要があります。
スキーマ・レジストリは、トピックごとにAvroスキーマを追跡します。メッセージは、同じスキーマまたは同じスキーマの進化バージョンを持つトピックに送信する必要があります。ソース・メッセージにはソース・データベース表スキーマに基づくAvroスキーマがあり、Avroスキーマはソース表ごとに一意です。複数のソース表で単一のトピックにメッセージを公開すると、ソース表から送信されたメッセージが以前のメッセージと異なるたびに、スキーマが進化していることがスキーマ・レジストリに示されます。
ここでは、Kafka Connectハンドラのコンポーネントの構成およびハンドラの実行について説明します。
クラスパス構成
Kafka ConnectハンドラをKafkaに接続して実行できるように、gg.classpath
構成変数に2つのものを含める必要があります。必要なのは、Kafkaプロデューサ・プロパティ・ファイルと、KafkaクライアントJARです。KafkaクライアントのJARは、Kafka Connectハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、Kafkaハンドラ・クライアント依存性(「Kafka Connectハンドラ・クライアント依存性」)を参照してください。Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprm
ディレクトリです。
Kafka ConnectクライアントJARのデフォルトの場所は、Kafka_Home/libs/*
ディレクトリです。
gg.classpath
変数は、正確に構成する必要があります。Kafkaプロデューサのプロパティ・ファイルのパス指定に含めるパスには、ワイルドカードを使用しないでください。Kafkaプロデューサのプロパティ・ファイルへのパスにアスタリスク(*)のワイルドカードを含めると、それが破棄されます。依存関係JARのパス指定には、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*)を含める必要があります。*.jar
は使用しないでください
正しく構成されたApache Kafkaのクラスパスの例:
gg.classpath=dirprm:{kafka_install_dir}/libs/*
正しく構成されたConfluent IOのKafkaのクラスパスの例:
gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*
トピック:
Kafka Connectハンドラの構成可能な値は次のとおりです。
表9-1 Kafka Connectハンドラの構成プロパティ
プロパティ | 必須/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
gg.handler.name.type |
必須 |
|
なし |
Kafka Connectハンドラを選択するための構成。 |
gg.handler.name.kafkaProducerConfigFile |
必須 |
string |
なし |
KafkaのプロパティおよびKafka Connect構成プロパティを含むプロパティ・ファイルへのパス。 |
gg.handler.name.topicMappingTemplate |
必須 |
実行時にKafkaトピック名を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
gg.handler.name.keyMappingTemplate |
必須 |
実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
gg.handler.name.includeTableName |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.includeOpType |
オプション |
|
|
|
gg.handler.name.includeOpTimestamp |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.includeCurrentTimestamp |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.includePosition |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.includePrimaryKeys |
オプション |
|
|
このフィールドを抑止する場合は、 |
gg.handler.name.includeTokens |
オプション |
|
|
このフィールドを抑止する場合は、 |
gg.handler.name.messageFormatting |
オプション |
|
|
出力メッセージのモデル化方法を制御します。行を選択すると、出力メッセージは行としてモデル化されます。opに設定すると、出力メッセージは操作メッセージとしてモデル化されます。 |
gg.handler.name.insertOpKey |
オプション |
任意の文字列 |
|
挿入操作を示すフィールド |
gg.handler.name.updateOpKey |
オプション |
任意の文字列 |
|
挿入操作を示すフィールド |
gg.handler.name.deleteOpKey |
オプション |
任意の文字列 |
|
削除操作を示すフィールド |
gg.handler.name.truncateOpKey |
オプション |
任意の文字列 |
|
切捨て操作を示すフィールド |
gg.handler.name.treatAllColumnsAsStrings |
オプション |
|
|
trueに設定すると、すべての出力フィールドが文字列として扱われます。falseに設定すると、ハンドラによって、ソース証跡ファイルからの対応するフィールド・タイプがそれに最も適したKafka Connectのデータ型にマップされます。 |
gg.handler.name.mapLargeNumbersAsStrings |
オプション |
|
|
大きい数値は、数値フィールドにDoubleとしてマッピングされます。特定のシナリオでは、精度を失う可能性があります。
|
gg.handler.name.iso8601Format |
オプション |
|
|
trueに設定すると、現在の日付がISO8601形式で出力されます。 |
gg.handler.name.pkUpdateHandling |
オプション |
insert | abend | update | delete |
|
モデリング行メッセージが |
詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。
Kafka Connectハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成パラメータに適用されます。
gg.handler.name.topicMappingTemplate gg.handler.name.keyMappingTemplate
テンプレートのモード
Kafka Connectハンドラでは、操作メッセージのみを送信できます。Kafka Connectハンドラでは、操作メッセージをより大きなトランザクション・メッセージにグループ化できません。
テンプレートのキーワード
キーワード | 説明 |
---|---|
|
カタログ、スキーマおよび表名の間にピリオド(.)のデリミタを含む、完全修飾表名に解決されます。 たとえば、 |
|
カタログ名に解決されます。 |
|
スキーマ名に解決されます。 |
|
短い表名に解決されます。 |
|
操作の種類( |
|
アンダースコア(_)で区切られた連結主キー値に解決されます。 |
|
ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。 |
|
ソース証跡ファイルからの操作タイムスタンプ。 |
|
""に解決されます。 |
|
Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。 |
|
キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。 ${staticMap[dbo.table1=value1,dbo.table2=value2]} |
|
キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。次に例を示します。 ${staticMap[dbo.table1=col1,dbo.table2=col2]} |
または
|
現在のタイムスタンプに解決されます。 例: ${currentDate} ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
|
NULL文字列に解決されます。 |
|
カスタム値リゾルバを記述することが可能です。必要に応じて、Oracleサポートに連絡してください。 |
テンプレートの例
テンプレートの構成値の例と解決される値を次に示します。
テンプレートの例 | 解決される値 |
---|---|
|
|
|
|
|
|
Kafkaバージョン0.9.0.0にSSL/TLSまたはKerberosを介したセキュリティが導入されました。Kafka Connectハンドラは、SSL/TLSまたはKerberos使用して保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを利用する統合からのセキュリティ機能の抽象化を提供します。Kafka Connectハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサのプロパティ・ファイル(Kafkaハンドラで処理のために使用する)を必要なセキュリティ・プロパティを使用して構成する必要があります。
詳細は、http://kafka.apache.org/documentation.html#securityを参照してください。
Oracle GoldenGate for Big Dataの構成とKafkaプロデューサのパフォーマンスの両方に影響を与える複数の構成設定があります。
Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPS
パラメータです。GROUPTRANSOPS
パラメータを使用すると、Replicatで複数のソース・トランザクションを単一のターゲット・トランザクションにグループ化できます。トランザクションのコミット時、Kafka Connectハンドラは、書込み永続性のためにKafkaプロデューサ上でフラッシュをコールしてメッセージをKafkaにプッシュし、その後チェックポイントを送信します。フラッシュ・コールはコストの大きなコールであり、ReplicatのGROUPTRANSOPS
設定を大きく設定すると、Replicatのフラッシュ・コールの頻度が少なくてすみ、パフォーマンスが向上します。
GROUPTRANSOPS
のデフォルト設定は1000
で、値を2500、5000、さらには10000に増やすことでパフォーマンスの向上が得られます。
opモードのgg.handler.kafkaconnect.mode=op
パラメータもまた、txモードのgg.handler.kafkaconnect.mode=tx
よりもパフォーマンスを向上させることができます。
Kafkaプロデューサのいくつかのプロパティがパフォーマンスに影響する可能性があります。重要な影響を与えるパラメータは次のとおりです。
linger.ms
batch.size
acks
buffer.memory
compression.type
これらのパラメータのデフォルト値から開始し、パフォーマンスのベース・ラインを取得するためのパフォーマンス・テストを実行することをお薦めします。これらの各パラメータのKafkaドキュメントを参照して、その役割を理解し、パラメータを調整し、各パラメータのパフォーマンス効果を確認するための追加のパフォーマンス・テストを実行します。
トピック:
Javaクラスパスは、非常によくある問題の1つです。クラスパスに問題があることを示すのは、Oracle GoldenGate Javaのlog4j
ログ・ファイルにあるClassNotFoundException
で、gg.classpath
変数に入力ミスがある場合は、クラスパスを解決する際にエラーが発生します。
Kafkaクライアント・ライブラリは、Oracle GoldenGate for Big Data製品に付属しません。「Kafka Connectハンドラの設定および実行」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタのプロパティ・ファイルでgg.classpath
プロパティを適切に構成する作業は、ユーザーが行う必要があります。
Kafka Connectは、Kafka 0.9.0.0バージョンで導入されました。Kafka Connectハンドラは、0.8.2.2以前のバージョンのKafkaでは動作しません。Kafka 0.8.2.2でKafka Connectを使用しようとすると、実行時にClassNotFoundException
エラーが発生します。
通常、次の例外メッセージが表示されます。
ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties
Kafkaプロデューサの構成ファイル名のgg.handler.kafkahandler.KafkaProducerConfigFile
構成プロパティが正しく設定されていることを確認してください。
gg.classpath
変数にKafkaプロデューサのプロパティ・ファイルのパスが含まれていること、プロパティ・ファイルのパスの末尾にワイルドカード(*)が含まれていないことを確認してください。
通常、次の例外メッセージが表示されます。
WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN (Selector.java:276) - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused
これが発生すると、接続の再試行時間が経過し、Kafka接続ハンドラ・プロセスが異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサのプロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。
Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -l
など)を使用すると、Kafkaが所定のポートでリスニングしていることを確認できます。