8.2.8 Apache Kafka
Kafkaハンドラは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データをストリーミングするように設計されています。
この章では、Kafkaハンドラの使用方法について説明します。
- Apache Kafka
Kafkaハンドラは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データをストリーミングするように設計されています。 - Apache Kafka Connectハンドラ
Kafka Connectハンドラは、標準的なKafkaのメッセージング機能を拡張したものです。 - Apache Kafka RESTプロキシ
Kafka RESTプロキシ・ハンドラは、Confluentによって配布されたKafka RESTプロキシにメッセージをストリーミングします。
親トピック: ターゲット
8.2.8.1 Apache Kafka
Kafkaハンドラは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データをストリーミングするように設計されています。
この章では、Kafkaハンドラの使用方法について説明します。
- 概要
- 詳細な機能
- Kafkaハンドラの設定および実行
- スキーマの伝播
- パフォーマンスに関する考慮事項
- セキュリティについて
- メタデータ変更イベント
- Snappyに関する考慮事項
- Kafkaインターセプタのサポート
Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。 - Kafkaパーティションの選択
Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。 - トラブルシューティング
- Kafkaハンドラ・クライアント依存性
Apache Kafkaデータベースに接続するためのKafkaハンドラの依存性とはどのようなものでしょう。
親トピック: Apache Kafka
8.2.8.1.1 概要
Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)のKafkaハンドラでは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データがストリーミングされます。また、Kafkaハンドラには、メッセージを別のスキーマ・トピックに公開する機能もあります。AvroおよびJSONのスキーマの公開がサポートされています。
Apache Kafkaは、オープン・ソースで、パーティション化および複製された分散型のメッセージング・サービスです。http://kafka.apache.org/を参照してください。
Kafkaは、単一インスタンスとしても、複数サーバー上のクラスタとしても実行できます。Kafkaの各サーバー・インスタンスは、ブローカと呼ばれます。Kafkaにおけるトピックは、メッセージがプロデューサによって公開され、コンシューマによって取得されるときのカテゴリ、またはフィード名です。
Kafkaでは、トピック名が完全修飾のソース表名に対応する場合、KafkaハンドラがKafkaプロデューサを実装します。Kafkaプロデューサは、シリアライズされたチェンジ・データ・キャプチャを、複数のソース表から1つの構成されたトピックまたは分離したソース操作のいずれか、別のKafkaトピックに書き込みます。
親トピック: Apache Kafka
8.2.8.1.2 詳細な機能
トランザクション・モードと操作モード
Kafkaハンドラは、Kafka ProducerRecord
クラスのインスタンスをKafkaプロデューサAPIに送信し、次にそれがProducerRecord
をKafkaトピックに公開します。Kafka ProducerRecord
は、実質的に1つのKafkaメッセージの実装です。ProducerRecord
には、キーと値の2つのコンポーネントがあります。キーと値のどちらも、Kafkaハンドラによってバイト配列として表されます。この項では、Kafkaハンドラがデータを公開する方法について説明します。
トランザクション・モード
次の構成は、Kafkaハンドラをトランザクション・モードに設定します。
gg.handler.name.Mode=tx
トランザクション・モードでは、ソースOracle GoldenGate証跡ファイルからのトランザクションにおける各操作のシリアライズ・データが連結されます。連結された操作データの内容は、KafkaのProducerRecord
オブジェクトの値です。Kafka ProducerRecord
オブジェクトのキーはNULLです。結果として、Kafkaメッセージには1からNまでの操作のデータが含まれます。Nは、トランザクションにおける操作の数です。
グループ化されたトランザクションの場合、すべての操作のすべてのデータが1つのKafkaメッセージに連結されます。そのため、グループ化されたトランザクションは、大量の操作のデータを含む非常に大きいKafkaメッセージになる可能性があります。
操作モード
次の構成は、Kafkaハンドラを操作モードに設定します。
gg.handler.name.Mode=op
操作モードでは、各操作のシリアライズ・データは個々のProducerRecord
オブジェクトに値として配置されます。ProducerRecord
キーは、ソース操作の完全修飾表名です。ProducerRecord
は、KafkaプロデューサAPIを使用してすぐに送信されます。つまり、着信操作と、生成されるKafkaメッセージの数との間には1対1の関係があります。
トピック名の選択
トピックは、この構成パラメータを使用して実行時に解決されます。
gg.handler.topicMappingTemplate
静的な文字列、キーワード、または静的な文字列とキーワードの組合せを構成し、実行時にそのときの操作コンテキストに基づいてトピック名を動的に解決できます(「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照)。
Kafkaブローカ設定
トピックを自動的に作成するように構成するには、auto.create.topics.enable
プロパティをtrue
に設定します。これがデフォルトの設定です。
auto.create.topics.enable
プロパティがfalse
に設定されている場合は、Replicatプロセスを開始する前にトピックを手動で作成する必要があります。
スキーマの伝播
すべての表のスキーマ・データが、schemaTopicName
プロパティで構成されたスキーマ・トピックに提供されます。詳細は、「スキーマの伝播」を参照してください。
親トピック: Apache Kafka
8.2.8.1.3 Kafkaハンドラの設定および実行
ここでは、Kafkaハンドラのコンポーネントの構成およびハンドラの実行について説明します。
Kafkaをインストールし、単一ノードまたはクラスタ化されたインスタンスとして正しく構成する必要があります。http://kafka.apache.org/documentation.htmlを参照してください。
Apache Kafka以外のKafka配布を使用する場合、インストールおよび構成の詳細は、そのKafka配布に関するドキュメントを参照してください。
KafkaおよびKafkaブローカ(単数または複数)の前提条件となるコンポーネントであるZookeeperが稼働している必要があります。
データ・トピックとスキーマ・トピック(該当する場合)は、実行中のKafkaブローカで事前に構成されていることを、ベスト・プラクティスとしてお薦めします。Kafkaトピックを動的に作成できます。ただし、これはKafkaブローカが動的トピックを許可するように構成されている必要があります。
Kafkaブローカが、Kafkaハンドラ・プロセスと同じ場所に配置されていない場合は、Kafkaハンドラを実行しているマシンから、リモートのホスト・ポートにアクセスできる必要があります。
- クラスパス構成
- Kafkaハンドラ構成
- Javaアダプタ・プロパティ・ファイル
- Kafkaプロデューサ構成ファイル
- テンプレートを使用したトピック名とメッセージ・キーの解決
Kafkaハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、実行時にコンテンツを動的に解決し、その解決された値を解決された文字列に挿入するために使用します。 - HadoopプラットフォームでのKerberosを使用したKafkaの構成
- KafkaのSSLのサポート
Kafkaは、KafkaクライアントとKafkaクラスタの間のSSL接続をサポートしています。SSL接続により、クライアントとサーバーの間で転送されるメッセージの認証と暗号化の両方が可能になります。
親トピック: Apache Kafka
8.2.8.1.3.1 クラスパス構成
KafkaハンドラをKafkaに接続して実行するには、Kafkaプロデューサ・プロパティ・ファイルおよびKafkaクライアントJARが、gg.classpath
構成変数で構成されている必要があります。KafkaクライアントJARは、Kafkaハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、「Kafkaハンドラ・クライアント依存性」を参照してください。
Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprm
ディレクトリです。
KafkaクライアントJARのデフォルトの場所は、Kafka_Home
/libs/*
です。
gg.classpath
は、正確に構成する必要があります。Kafkaプロデューサ・プロパティ・ファイルのパスには、ワイルドカードが付加されていないパスを使用してください。Kafkaプロデューサ・プロパティ・ファイルのパスにワイルドカード(*
)を含めると、このファイルが選択されません。逆に、依存関係JARのパスには、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*
)を含める必要があります。*.jar
は使用しないでください。正しく構成されたクラスパスの例を次に示します。
gg.classpath={kafka install dir}/libs/*
親トピック: Kafkaハンドラの設定および実行
8.2.8.1.3.2 Kafkaハンドラ構成
Kafkaハンドラの設定可能な値は次のとおりです。これらのプロパティは、Javaアダプタ・プロパティ・ファイルにあります(Replicatプロパティ・ファイルにはありません)
Kafkaハンドラの選択を有効にするには、まずgg.handler.namr.type=kafka
およびその他のKafkaプロパティを次のように指定してハンドラ・タイプを構成する必要があります。
表8-9 Kafkaハンドラの構成プロパティ
プロパティ名 | 必須/オプション | プロパティ値 | デフォルト | 説明 |
---|---|---|---|---|
|
必須 |
|
なし |
使用するハンドラのリスト。 |
|
必須 |
|
なし |
使用するハンドラのタイプ。 |
|
オプション |
任意のカスタム・ファイル名 |
|
クラスパスにあるファイル名は、Apache Kafkaプロデューサを構成するApache Kafkaプロパティを保持します。 |
|
オプション |
フォーマッタ・クラスまたはショート・コード |
|
ペイロードのフォーマットに使用するフォーマッタ。 |
|
スキーマ配信が必要な場合に必須。 |
スキーマ・トピックの名前 |
なし |
スキーマ・データが配信されるトピック名。このプロパティを設定する場合、スキーマは伝播されません。スキーマが伝播されるのは、Avroフォーマッタの場合のみです。 |
|
オプション |
Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) Kafkaハンドラの |
この実装クラスを提供:
|
スキーマは |
|
オプション |
|
|
Kafkaハンドラが操作モードの場合、変更キャプチャ・データ・レコード(Insert、Update、Deleteなど)の各ペイロードはKafkaプロデューサ・レコードとして表され、一度に1つずつフラッシュされます。Kafkaハンドラがトランザクション・モードの場合、ソース・トランザクション内の操作はすべて、1つのKafkaプロデューサ・レコードとして表されます。組み合されたこのバイト・ペイロードが、トランザクション・コミット・イベント時にフラッシュされます。 |
|
必須 |
実行時にKafkaトピック名を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
|
必須 |
実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
|
オプション |
|
|
|
gg.handler.name.metaHeadersTemplate |
オプション | メタ列キーワードのカンマ区切りリスト。 | なし | メタ列を選択し、メタ列キーワード構文を使用してコンテキスト・ベースのキー値ペアをKafkaメッセージ・ヘッダーに挿入できます。 |
親トピック: Kafkaハンドラの設定および実行
8.2.8.1.3.3 Javaアダプタ・プロパティ・ファイル
アダプタ・プロパティ・ファイルからのKafkaハンドラのサンプル構成を次に示します。
gg.handlerlist = kafkahandler gg.handler.kafkahandler.Type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties gg.handler.kafkahandler.topicMappingTemplate=oggtopic gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp} gg.handler.kafkahandler.Format = avro_op gg.handler.kafkahandler.SchemaTopicName = oggSchemaTopic gg.handler.kafkahandler.SchemaPrClassName = com.company.kafkaProdRec.SchemaRecord gg.handler.kafkahandler.Mode = tx
Kafka統合のサンプルのReplicat構成とJavaアダプタ・プロパティ・ファイルの例は、次のディレクトリにあります。
GoldenGate_install_directory
/AdapterExamples/big-data/kafka
親トピック: Kafkaハンドラの設定および実行
8.2.8.1.3.4 Kafkaプロデューサ構成ファイル
Kafkaハンドラは、メッセージをKafkaに公開するために、Kafkaプロデューサ構成ファイルにアクセスする必要があります。Kafkaプロデューサ構成ファイルのファイル名は、Kafkaハンドラ・プロパティの次の構成によって制御されます。
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
Kafkaハンドラは、Javaのクラスパスを使用してKafkaプロデューサ構成ファイルを検索およびロードしようとします。したがって、Javaのクラスパスに、Kafkaプロデューサ構成ファイルがあるディレクトリが含まれている必要があります。
Kafkaプロデューサ構成ファイルには、Kafka専有のプロパティがあります。0.8.2.0のKafkaプロデューサ・インタフェース・プロパティについては、Kafkaドキュメントに構成情報が記載されています。Kafkaハンドラは、これらのプロパティを使用してKafkaブローカのホストおよびポートを解決し、Kafkaプロデューサ・クライアントとKafkaブローカの間の対話の動作は、Kafkaプロデューサ構成ファイルにあるプロパティによって制御されます。
Kafkaプロデューサのサンプル構成ファイルは、次のようになります。
bootstrap.servers=localhost:9092 acks = 1 compression.type = gzip reconnect.backoff.ms = 1000 value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size = 102400 linger.ms = 0 max.request.size = 1048576 send.buffer.bytes = 131072
8.2.8.1.3.4.1 Kafkaプロデューサ・プロパティの暗号化
資格証明ストアの使用方法の詳細は、「Oracle GoldenGate資格証明ストアでの識別子の使用」を参照してください。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice" password="alice";
次のように置き換えることができます:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username=ORACLEWALLETUSERNAME[alias domain_name] password=ORACLEWALLETPASSWORD[alias
domain_name];
親トピック: Kafkaプロデューサ構成ファイル
8.2.8.1.3.5 テンプレートを使用したトピック名とメッセージ・キーの解決
Kafkaハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、実行時にコンテンツを動的に解決し、その解決された値を解決された文字列に挿入するために使用します。
テンプレートは、次の構成プロパティを使用します。
gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate
テンプレートのモード
ソース・データベース・トランザクションは、それぞれが挿入、更新および削除である、1つ以上の個別の操作から構成されます。Kafkaハンドラは、操作ごとに1つのメッセージ(挿入、更新、削除)を送信するように構成することも、トランザクション・レベルで操作をメッセージにグループ化するように構成することもできます。テンプレート・キーワードの多くは、個々のソース・データベース操作のコンテキストに基づいてデータを解決します。したがって、トランザクション・レベルでメッセージを送信する場合には、キーワードの多くは機能しません。たとえば、トランザクション・レベルでメッセージを送信する場合は、${fullyQualifiedTableName}
は機能せずに、操作のための修飾されたソース表名に解決されます。ただし、トランザクションには、多くのソース表に対する複数の操作を含めることができます。トランザクション・レベルでのメッセージの完全修飾表名の解決は、非決定的であるため、実行時に異常終了します。
親トピック: Kafkaハンドラの設定および実行
8.2.8.1.3.6 HadoopプラットフォームでのKerberosを使用したKafkaの構成
次の手順で、Kerberosを使用してKafkaハンドラのReplicatを構成して、ClouderaインスタンスでKafkaトピックに対するOracle GoldenGate for Distributed Applications and Analytics (GG for DAA)証跡を処理できるようにします。
- GGSCIで、Kafka Replicatを追加します。
GGSCI> add replicat kafka, exttrail dirdat/gg
- 次のプロパティを使用して
prm
ファイルを構成します。replicat kafka discardfile ./dirrpt/kafkax.dsc, purge SETENV (TZ=PST8PDT) GETTRUNCATES GETUPDATEBEFORES ReportCount Every 1000 Records, Rate MAP qasource.*, target qatarget.*;
- 次のようにReplicatプロパティ・ファイルを構成します。
###KAFKA Properties file ### gg.log=log4j gg.log.level=info gg.report.time=30sec ###Kafka Classpath settings ### gg.classpath=/opt/cloudera/parcels/KAFKA-2.1.0-1.2.1.0.p0.115/lib/kafka/libs/* jvm.bootoptions=-Xmx64m -Xms64m -Djava.class.path=./ggjava/ggjava.jar -Dlog4j.configuration=log4j.properties -Djava.security.auth.login.config=/scratch/ydama/ogg/v123211/dirprm/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf ### Kafka handler properties ### gg.handlerlist = kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=kafka-producer.properties gg.handler.kafkahandler.format=delimitedtext gg.handler.kafkahandler.format.PkUpdateHandling=update gg.handler.kafkahandler.mode=op gg.handler.kafkahandler.format.includeCurrentTimestamp=false gg.handler.kafkahandler.format.fieldDelimiter=| gg.handler.kafkahandler.format.lineDelimiter=CDATA[\n] gg.handler.kafkahandler.topicMappingTemplate=myoggtopic gg.handler.kafkahandler.keyMappingTemplate=${position}
- 次のプロパティを使用してKafka Producerファイルを構成します。
bootstrap.servers=10.245.172.52:9092 acks=1 #compression.type=snappy reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=1024 linger.ms=2000 security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka sasl.mechanism=GSSAPI
-
次のプロパティを使用して
jaas.conf
ファイルを構成します。KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/scratch/ydama/ogg/v123211/dirtmp/keytabs/slc06unm/kafka.keytab" principal="kafka/slc06unm.us.oracle.com@HADOOPTEST.ORACLE.COM"; };
-
保護されたKafkaトピックにClouderaインスタンスから接続するための最新の
key.tab
ファイルがあることを確認します。 -
GGSCIからReplicatを起動し、
INFO ALL
で実行されていることを確認します。 -
Replicatレポートで、処理されたレコードの合計数を確認します。レポートは次のようになります。
Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) Copyright (c) 2007, 2018. Oracle and/or its affiliates. All rights reserved Built with Java 1.8.0_161 (class version: 52.0) 2018-08-05 22:15:28 INFO OGG-01815 Virtual Memory Facilities for: COM anon alloc: mmap(MAP_ANON) anon free: munmap file alloc: mmap(MAP_SHARED) file free: munmap target directories: /scratch/ydama/ogg/v123211/dirtmp. Database Version: Database Language and Character Set: *********************************************************************** ** Run Time Messages ** *********************************************************************** 2018-08-05 22:15:28 INFO OGG-02243 Opened trail file /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000 at 2018-08-05 22:15:28.258810. 2018-08-05 22:15:28 INFO OGG-03506 The source database character set, as determined from the trail file, is UTF-8. 2018-08-05 22:15:28 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTMER1", target qatarget."BDCUSTMER1". 2018-08-05 22:15:28 INFO OGG-02756 The definition for table QASOURCE.BDCUSTMER1 is obtained from the trail file. 2018-08-05 22:15:28 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, NAME, CITY, STATE. 2018-08-05 22:15:28 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTMER1: CUST_CODE. 2018-08-05 22:15:29 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTORD1", target qatarget."BDCUSTORD1". 2018-08-05 22:15:29 INFO OGG-02756 The definition for table QASOURCE.BDCUSTORD1 is obtained from the trail file. 2018-08-05 22:15:29 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID, PRODUCT_PRICE, PRODUCT_AMOUNT, TRANSACTION_ID. 2018-08-05 22:15:29 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTORD1: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID. 2018-08-05 22:15:33 INFO OGG-01021 Command received from GGSCI: STATS. 2018-08-05 22:16:03 INFO OGG-01971 The previous message, 'INFO OGG-01021', repeated 1 times. 2018-08-05 22:43:27 INFO OGG-01021 Command received from GGSCI: STOP. *********************************************************************** * ** Run Time Statistics ** * *********************************************************************** Last record for the last committed transaction is the following: ___________________________________________________________________ Trail name : /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000 Hdr-Ind : E (x45) Partition : . (x0c) UndoFlag : . (x00) BeforeAfter: A (x41) RecLength : 0 (x0000) IO Time : 2015-08-14 12:02:20.022027 IOType : 100 (x64) OrigNode : 255 (xff) TransInd : . (x03) FormatType : R (x52) SyskeyLen : 0 (x00) Incomplete : . (x00) AuditRBA : 78233 AuditPos : 23968384 Continued : N (x00) RecCount : 1 (x01) 2015-08-14 12:02:20.022027 GGSPurgedata Len 0 RBA 6473 TDR Index: 2 ___________________________________________________________________ Reading /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000, current RBA 6556, 20 records, m_file_seqno = 0, m_file_rba = 6556 Report at 2018-08-05 22:43:27 (activity since 2018-08-05 22:15:28) From Table QASOURCE.BDCUSTMER1 to qatarget.BDCUSTMER1: # inserts: 5 # updates: 1 # deletes: 0 # discards: 0 From Table QASOURCE.BDCUSTORD1 to qatarget.BDCUSTORD1: # inserts: 5 # updates: 3 # deletes: 5 # truncates: 1 # discards: 0
-
セキュアKafkaトピックが作成されていることを確認します。
/kafka/bin/kafka-topics.sh --zookeeper slc06unm:2181 --list myoggtopic
-
セキュアKafkaピックの内容を確認します。
-
次の内容の
consumer.properties
ファイルを作成します。security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka
-
次の環境変数を設定します。
export KAFKA_OPTS="-Djava.security.auth.login.config="/scratch/ogg/v123211/dirprm/jaas.conf"
-
コンシューマ・ユーティリティを実行してレコードをチェックします。
/kafka/bin/kafka-console-consumer.sh --bootstrap-server sys06:9092 --topic myoggtopic --new-consumer --consumer.config consumer.properties
-
親トピック: Kafkaハンドラの設定および実行
8.2.8.1.3.7 KafkaのSSLのサポート
Kafkaは、KafkaクライアントとKafkaクラスタの間のSSL接続をサポートしています。SSL接続により、クライアントとサーバーの間で転送されるメッセージの認証と暗号化の両方が可能になります。
- SSL用のKafkaクラスタの設定
- キーストア/トラストストア・ファイルでの自己署名証明書の作成
- SSL用のKafkaクライアントの構成
bootstrap.servers=localhost:9092 acks=1 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer security.protocol=SSL ssl.keystore.location=/var/private/ssl/server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location=/var/private/ssl/server.truststore.jks ssl.truststore.password=test1234
親トピック: Kafkaハンドラの設定および実行
8.2.8.1.4 スキーマの伝播
Kafkaハンドラには、スキーマをスキーマ・トピックに公開する機能があります。現在、スキーマ公開が有効なのは、Avro行フォーマッタとAvro操作フォーマッタのみです。KafkaハンドラのschemaTopicName
プロパティが設定されている場合、スキーマは次のイベントで公開されます。
-
特定の表のAvroスキーマは、その表に対する最初の操作が発生するときに公開されます。
-
Kafkaハンドラがメタデータ変更イベントを受信すると、スキーマはフラッシュされます。特定の表について再生成されるAvroスキーマは、その表に対する次の操作が発生するときに公開されます。
-
Avroのラッピング機能を有効にしている場合、汎用のラッパーAvroスキーマは、任意の操作が最初に発生するときに公開されます。汎用のラッパーを有効にする(Avroフォーマッタの構成でAvroスキーマの機能を有効にする)には、「Avro行フォーマッタ」および「Avro操作フォーマッタ」を参照してください。
KafkaのProducerRecord
値はスキーマになり、キーは完全修飾の表名になります。
AvroメッセージはAvroスキーマに直接依存しているため、Kafka上のAvroのユーザーには問題が発生することがあります。Avroメッセージはバイナリなので、人間が読める形式ではありません。Avroメッセージをデシリアライズするには、まず受信者に正しいAvroスキーマが必要ですが、ソース・データベースからの各表が個々のAvroスキーマになるため、困難な場合があります。ソースOracle GoldenGate証跡ファイルに複数の表からの操作が含まれる場合、Kafkaメッセージの受信者が、個々のメッセージをデシリアライズするためにどのAvroスキーマを使用するかを判断することはできません。この問題を解決するには、特殊なAvroメッセージを汎用のAvroメッセージ・ラッパーでラップできます。この汎用のAvroラッパーを使用すれば、完全修飾の表名、スキーマ文字列のハッシュコード、ラップされたAvroメッセージを確認することができます。受信者は、完全修飾の表名と、スキーマ文字列のハッシュコードを使用して、ラップされたメッセージに関連付けられたスキーマを解決し、そのスキーマを使用して、ラップされたメッセージをデシリアライズできます。
親トピック: Apache Kafka
8.2.8.1.5 パフォーマンスに関する考慮事項
最高のパフォーマンスを得るために、Kafkaハンドラを操作モードで動作するように送信することをお薦めします。
gg.handler.name.mode = op
また、Kafkaプロデューサ・プロパティ・ファイルでbatch.size
とlinger.msの値も設定することをお薦めします。これらの値は、ユースケース別の状況によって大きく異なります。通常、値が大きいほどスループットは向上しますが、レイテンシが大きくなります。これらのプロパティの値を小さくすると、レイテンシは小さくなりますが、全体的なスループットは低下します。
Replicat変数GROUPTRANSOPS
を使用しても、パフォーマンスは向上します。推奨される設定は10000
です。
ソース証跡ファイルからシリアライズした操作を個々のKafkaメッセージで配信する必要がある場合、Kafkaハンドラは操作モードに設定されている必要があります。
gg.handler.name.mode = op
親トピック: Apache Kafka
8.2.8.1.6 セキュリティについて
Kafkaバージョン0.9.0.0で、SSL/TLSおよびSASL(Kerberos)を介したセキュリティが導入されました。SSL/TLSおよびSASLセキュリティ製品の一方または両方を使用してKafkaハンドラを保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを使用する統合からのセキュリティ機能の抽象化を提供します。Kafkaハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサ・プロパティ・ファイルを必要なセキュリティ・プロパティを使用して構成する必要があります。Kafkaクラスタの保護に関する詳細は、次の場所にあるKafkaドキュメントを参照してください。
keytab
ファイルからKerberosパスワードを復号化できない場合があります。これによって、Kerberos認証が対話型モードにフォール・バックされ、プログラムで呼び出されているため機能しなくなります。この問題の原因は、Java Cryptography Extension (JCE)が、Java Runtime Environment (JRE)にインストールされていないことです。JCEがJREにロードされていることを確認します。http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.htmlを参照してください。
親トピック: Apache Kafka
8.2.8.1.7 メタデータ変更イベント
メタデータ変更イベントが、Kafkaハンドラによって処理されるようになりました。これが関係するのは、スキーマ・トピックを構成しており、使用するフォーマッタがスキーマ伝播をサポートしている場合のみです(現在は、Avro行フォーマッタとAvro操作フォーマッタ)。スキーマが変更されている表で次に操作が発生するとき、更新されたスキーマがスキーマ・トピックに公開されます。
メタデータ変更イベントをサポートするには、ソース・データベースで変更を取得するOracle GoldenGateプロセスが、証跡機能でのOracle GoldenGateメタデータ(Oracle GoldenGate 12c (12.2)で導入)をサポートする必要があります。
親トピック: Apache Kafka
8.2.8.1.8 Snappyに関する考慮事項
Kafkaプロデューサ構成ファイルは、圧縮の使用をサポートしています。構成可能なオプションの1つにSnappyがあり、これは、オープン・ソースの圧縮および圧縮解除(codec
)ライブラリで、他のcodec
ライブラリよりパフォーマンスが優れています。Snappy JARはすべてのプラットフォームで実行されるわけではありません。Snappyは、Linuxシステムでは動作するようですが、他のUNIXおよびWindowsの実装では動作する場合としない場合があります。Snappyの圧縮を使用する場合は、Snappyを使用する圧縮を実装する前に、必要なシステムすべてでSnappyをテストします。必要なシステムの一部にSnappyが対応しない場合は、別のcodec
ライブラリを使用することをお薦めします。
親トピック: Apache Kafka
8.2.8.1.9 Kafkaインターセプタのサポート
Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。
インターセプタの一般的な使用例は、監視です。Kafkaプロデューサ・インターセプタは、インタフェース org.apache.kafka.clients.producer.ProducerInterceptor
に準拠している必要があります。Kafkaハンドラは、プロデューサ・インターセプタの使用をサポートしています。
ハンドラでインターセプタを使用する場合の要件は次のとおりです。
- Kafkaプロデューサ構成プロパティ"
interceptor.classes
"は、起動するインターセプタのクラス名で構成する必要があります。 - インターセプタを呼び出すためには、jarファイルとすべての依存関係jarをJVMで使用できる必要があります。したがって、インターセプタを含むjarファイルとすべての依存関係jarを、ハンドラ構成ファイルの
gg.classpath
に追加する必要があります。詳細は、Kafkaのドキュメントを参照してください。
親トピック: Apache Kafka
8.2.8.1.10 Kafkaパーティションの選択
Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。
(Kafkaメッセージ・キーのハッシュ)係数(パーティションの数) = 選択したパーティション番号
Kafkaメッセージ・キーは、次の構成値によって選択されます。
gg.handler.{your handler name}.keyMappingTemplate=
このパラメータが静的キーを生成する値に設定されている場合、すべてのメッセージは同じパーティションに送信されます。静的キーの例を次に示します。
gg.handler.{your handler name}.keyMappingTemplate=StaticValue
このパラメータが、頻繁に変更されないキーを生成する値に設定されている場合、パーティション選択の変更頻度は低くなります。次の例では、表名をメッセージ・キーとして使用します。特定のソース表に対するすべての操作は同じキーを持つため、同じパーティションにルーティングされます。
gg.handler.{your handler name}.keyMappingTemplate=${tableName}
gg.handler.{your handler name}.keyMappingTemplate=${null}
マッピング・キーの構成に推奨される設定は次のとおりです。
gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}
これにより、連結および区切られた主キー値であるKafkaメッセージ・キーが生成されます。
各行の操作には一意の主キーが必要であるため、各行に一意のKafkaメッセージ・キーが生成されます。別の重要な考慮事項として、異なるパーティションに送信されたKafkaメッセージは、送信された元の順序でKafkaコンシューマに配信される保証はありません。これはKafkaの仕様の一部です。順序はパーティション内でのみ維持されます。Kafkaメッセージ・キーとして主キーを使用すると、同じ主キーを持つ同じ行に対する操作で同じKafkaメッセージ・キーが生成されるため、同じKafkaパーティションに送信されます。このようにして、同じ行に対する操作の順序が維持されます。
DEBUG
ログ・レベルでは、Kafkaメッセージの座標(トピック、パーティションおよびオフセット)は、正常に送信されたメッセージの.log
ファイルに記録されます。
親トピック: Apache Kafka
8.2.8.1.11 トラブルシューティング
8.2.8.1.11.1 Kafka設定の検証
コマンドラインのKafkaプロデューサを使用すると、ダミー・データをKafkaトピックに書き込むことができ、Kafkaコンシューマを使用してこのデータをKafkaトピックから読み取ることができます。この方法を使用して、ディスク上でのKafkaトピックの設定および読取り/書込み権限を確認します。http://kafka.apache.org/documentation.html#quickstartを参照してください。
親トピック: トラブルシューティング
8.2.8.1.11.2 クラスパスの問題
Javaクラスパスの問題はよくある問題です。そのような問題には、log4j
ログ・ファイルのClassNotFoundException
の問題や、gg.classpath
変数の入力ミスによるクラスパスの解決エラーなどがあります。Kafkaクライアント・ライブラリは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)製品には付属していません。「クラスパス構成」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタ・プロパティ・ファイルでgg.classpath
プロパティを適切に構成する必要があります。
親トピック: トラブルシューティング
8.2.8.1.11.3 無効なKafkaバージョン
Kafkaハンドラでは、Kafka 0.8.2.2以前のバージョンはサポートされていません。サポートされていないバージョンのKafkaを実行すると、実行時Java例外java.lang.NoSuchMethodError
が発生します。これは、org.apache.kafka.clients.producer.KafkaProducer.flush()
メソッドが見つからないことを意味します。このエラーが発生した場合は、Kafka 0.9.0.0以降のバージョンに移行します。
親トピック: トラブルシューティング
8.2.8.1.11.4 Kafkaプロデューサ・プロパティ・ファイルが見つからない
この問題は一般的に、次の例外として発生します。
ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties
gg.handler.kafkahandler.KafkaProducerConfigFile
構成変数をチェックし、Kafkaプロデューサ構成ファイルの名前が正しく設定されていることを確認します。gg.classpath
変数をチェックし、クラスパスにKafkaプロデューサ・プロパティ・ファイルのパスが含まれていることと、プロパティ・ファイルのパスの末尾にワイルドカード(*
)が含まれていないことを確認します。
親トピック: トラブルシューティング
8.2.8.1.11.5 Kafkaの接続の問題
この問題は、KafkaハンドラがKafkaに接続できない場合に発生します。次の警告が表示されます。
WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN (Selector.java:276) - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused
接続の再試行間隔が経過し、Kafkaハンドラ・プロセスは異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサ・プロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -l
など)を使用すると、想定するポートをKafkaがリスニングしていることを確認できます。
親トピック: トラブルシューティング
8.2.8.1.12 Kafkaハンドラ・クライアント依存性
Apache Kafkaデータベースに接続するためのKafkaハンドラの依存性とはどのようなものでしょう。
Maven Central RepositoryのKafkaデータベースのアーティファクトは次のとおりです。
Maven groupId: org.apache.kafka
Maven atifactId: kafka-clients
Maven version: 各セクションでリストされるKafkaのバージョン番号
8.2.8.1.12.1 Kafka 2.8.0
kafka-clients-2.8.0.jar lz4-java-1.7.1.jar slf4j-api-1.7.30.jar snappy-java-1.1.8.1.jar zstd-jni-1.4.9-1.jar
親トピック: Kafkaハンドラ・クライアント依存性
8.2.8.1.12.2 Kafka 2.7.0
kafka-clients-2.7.0.jar lz4-java-1.7.1.jar slf4j-api-1.7.30.jar snappy-java-1.1.7.7.jar zstd-jni-1.4.5-6.jar
親トピック: Kafkaハンドラ・クライアント依存性
8.2.8.1.12.3 Kafka 2.6.0
kafka-clients-2.6.0.jar lz4-java-1.7.1.jar slf4j-api-1.7.30.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.4-7.jar
親トピック: Kafkaハンドラ・クライアント依存性
8.2.8.1.12.4 Kafka 2.5.1
kafka-clients-2.5.1.jar lz4-java-1.7.1.jar slf4j-api-1.7.30.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.4-7.jar
親トピック: Kafkaハンドラ・クライアント依存性
8.2.8.1.12.5 Kafka 2.4.1
kafka-clients-2.4.1.jar lz4-java-1.6.0.jar slf4j-api-1.7.28.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.3-1.jarr
親トピック: Kafkaハンドラ・クライアント依存性
8.2.8.1.12.6 Kafka 2.3.1
kafka-clients-2.3.1.jar lz4-java-1.6.0.jar slf4j-api-1.7.26.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.0-1.jar
親トピック: Kafkaハンドラ・クライアント依存性
8.2.8.2 Apache Kafka Connectハンドラ
Kafka Connectハンドラは、標準的なKafkaのメッセージング機能を拡張したものです。
この章では、Kafka Connectハンドラの使用方法について説明します。
- 概要
Oracle GoldenGate Kafka Connectは、標準的なKafkaのメッセージング機能を拡張したものです。Kafka Connectは、標準的なKafkaのプロデューサおよびコンシューマのインタフェースの上にある機能レイヤーです。新規のソースおよびターゲット・システムをトポロジに簡単に追加できる、メッセージングの標準化を提供します。 - 詳細な機能
- Kafka Connectハンドラの設定および実行
- セキュア・スキーマ・レジストリへの接続
- Kafka Connectハンドラのパフォーマンスに関する考慮事項
- Kafkaインターセプタのサポート
Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。 - Kafkaパーティションの選択
Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。 - Kafka Connectハンドラのトラブルシューティング
- Kafka Connectハンドラ・クライアント依存性
Apache Kafka Connectデータベースに接続するためのKafka Connectハンドラの依存性とはどのようなものでしょう。
親トピック: Apache Kafka
8.2.8.2.1 概要
Oracle GoldenGate Kafka Connectは、標準的なKafkaのメッセージング機能を拡張したものです。Kafka Connectは、標準的なKafkaのプロデューサおよびコンシューマのインタフェースの上にある機能レイヤーです。新規のソースおよびターゲット・システムをトポロジに簡単に追加できる、メッセージングの標準化を提供します。
ConfluentはKafka Connectの主要な採用先であり、そのConfluent Platform製品には標準的なKafka Connectの機能拡張が含まれています。これには、AvroのシリアライズとデシリアライズおよびAvroスキーマ・レジストリが含まれています。Kafka Connectの機能の多くは、Apache Kafkaで利用できます。様々なオープン・ソースのKafka Connect統合は、次の場所を参照してください。
https://www.confluent.io/product/connectors/
Kafka Connectハンドラは、Kafka Connectのソース・コネクタです。Oracle GoldenGateでサポートされているデータベースから、データベースの変更を取得し、その変更データをKafka Connectレイヤー経由でKafkaにストリーミングできます。このハンドラを使用して、Oracle Event Hub Cloud Service (EHCS)に接続することもできます。
Kafka Connectでは、プロプライエタリなオブジェクトを使用してスキーマ(org.apache.kafka.connect.data.Schema
)およびメッセージ(org.apache.kafka.connect.data.Struct
)が定義されます。Kafka Connectハンドラは、公開済データおよび公開済データの構造を管理するように構成できます。
Kafka Connectハンドラは、Kafkaハンドラでサポートされているどのプラガブル・フォーマッタもサポートしていません。
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.2 詳細な機能
JSONコンバータ
Kafka Connectフレームワークにより、インメモリーのKafka Connectメッセージを、ネットワーク経由での伝送に適したシリアライズされた形式に変換するコンバータが提供されます。このコンバータは、Kafkaプロデューサのプロパティ・ファイルの構成を使用して選択されます。
Kafka ConnectおよびJSONコンバータは、Apache Kafkaダウンロードの一部として使用可能です。JSONコンバータによりKafkaのキーおよび値がJSONに変換され、それがKafkaトピックに送信されます。Kafkaプロデューサのプロパティ・ファイルで、次の構成を使用してJSONコンバータを識別します。
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
メッセージのフォーマットは、ペイロード情報が後に続くメッセージ・スキーマ情報です。JSONは自己記述型のフォーマットであるため、Kafkaに公開される各メッセージにはスキーマ情報を含めないでください。
メッセージからJSONのスキーマ情報を省略するには、次のように設定します。
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Avroコンバータ
Confluentにより、Kafkaのインストール、Kafkaのサポート、Kafka上に構築された拡張機能が提供され、Kafkaの潜在的な可能性の実現が支援されます。Confluentは、オープン・ソース版のKafka (Confluent Open Source)と、購入可能なエンタープライズ版(Confluent Enterprise)の両方を提供しています。
一般的なKafkaのユースケースは、Kafka上でAvroメッセージを送信することです。これは、AvroメッセージをデシリアライズするためにAvroスキーマに依存するため、受信側で問題が発生する可能性があります。受信メッセージは、プロデューサ側でメッセージを生成するために使用されたAvroスキーマと完全に一致させる必要があるため、スキーマの進化によって問題が増加する可能性があります。間違ったAvroスキーマでAvroメッセージをデシリアライズすると、ランタイム・エラー、不完全なデータまたは不正なデータが発生することがあります。Confluentでは、スキーマ・レジストリおよびConfluentスキーマ・コンバータを使用してこの問題を解決しました。
Kafkaプロデューサのプロパティ・ファイルの構成を次に示します。
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
メッセージがKafkaに公開されると、Avroスキーマが登録され、スキーマ・レジストリに格納されます。Kafkaからメッセージが消費されるとき、メッセージの作成に使用された正確なAvroスキーマをスキーマ・レジストリから取得してAvroメッセージをデシリアライズできます。Avroメッセージとそれに対応する受信側のAvroスキーマの照合を作成することで、この問題を解決しています。
Avroコンバータを使用するための要件は次のとおりです。
- この機能は、Confluent Kafkaの両バージョン(オープン・ソースまたはエンタープライズ)で使用できます。
- Confluentスキーマ・レジストリ・サービスが実行されている必要があります。
- ソース・データベース表には、関連付けられたAvroスキーマが必要です。異なるAvroスキーマに関連付けられたメッセージは、異なるKafkaトピックに送信する必要があります。
- ConfluentのAvroコンバータおよびスキーマ・レジストリ・クライアントがクラスパスで使用できる必要があります。
スキーマ・レジストリは、トピックごとにAvroスキーマを追跡します。メッセージは、同じスキーマまたは同じスキーマの進化バージョンを持つトピックに送信する必要があります。ソース・メッセージにはソース・データベース表スキーマに基づくAvroスキーマがあり、Avroスキーマはソース表ごとに一意です。複数のソース表で単一のトピックにメッセージを公開すると、ソース表から送信されたメッセージが以前のメッセージと異なるたびに、スキーマが進化していることがスキーマ・レジストリに示されます。
Protobuf Converter
Protobuf Converterでは、Kafka ConnectメッセージをGoogle Protocol Buffers形式としてフォーマットできます。Protobuf ConverterはConfluentスキーマ・レジストリと統合されており、この機能は、Confluentのオープン・ソース・バージョンとエンタープライズ・バージョンの両方で使用できます。Confluentは、Confluentバージョン5.5.0以降でProtobuf Converterを追加しました。
key.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
Protobuf Converterを使用するための要件は次のとおりです。
- この機能は、5.5.0以降、Confluent Kafkaの両バージョン(オープン・ソースまたはエンタープライズ)で使用できます。
- Confluentスキーマ・レジストリ・サービスが実行されている必要があります。
- スキーマ(ソース表)が異なるメッセージは、異なるKafkaトピックに送信する必要があります。
- Confluent Protobufコンバータおよびスキーマ・レジストリ・クライアントがクラスパスで使用できる必要があります。
スキーマ・レジストリは、トピックごとにProtobufスキーマを追跡します。メッセージは、同じスキーマまたは同じスキーマの進化バージョンを持つトピックに送信する必要があります。ソース・メッセージにはソース・データベース表スキーマに基づくProtobufスキーマがあり、Protobufスキーマはソース表ごとに一意です。複数のソース表で単一のトピックにメッセージを公開すると、ソース表から送信されたメッセージが以前のメッセージと異なるたびに、スキーマが進化していることがスキーマ・レジストリに示されます。
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.3 Kafka Connectハンドラの設定および実行
ここでは、Kafka Connectハンドラのコンポーネントの構成およびハンドラの実行について説明します。
クラスパス構成
Kafka ConnectハンドラをKafkaに接続して実行できるように、gg.classpath
構成変数に2つのものを含める必要があります。必要なのは、Kafkaプロデューサ・プロパティ・ファイルと、KafkaクライアントJARです。KafkaクライアントのJARは、Kafka Connectハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、Kafkaハンドラ・クライアント依存性(「Kafka Connectハンドラ・クライアント依存性」)を参照してください。Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprm
ディレクトリです。
Kafka ConnectクライアントJARのデフォルトの場所は、Kafka_Home/libs/*
ディレクトリです。
gg.classpath
変数は、正確に構成する必要があります。Kafkaプロデューサのプロパティ・ファイルのパス指定に含めるパスには、ワイルドカードを使用しないでください。Kafkaプロデューサのプロパティ・ファイルへのパスにアスタリスク(*)のワイルドカードを含めると、それが破棄されます。依存関係JARのパス指定には、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*)を含める必要があります。*.jar
は使用しないでください
正しく構成されたApache Kafkaのクラスパスの例:
gg.classpath=dirprm:{kafka_install_dir}/libs/*
正しく構成されたConfluent Kafkaのクラスパスの例を次に示します。
gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*
- Kafka Connectハンドラの構成
生成されたKafka Connectメッセージ内のメタ列フィールドの自動出力は、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) 21cリリース以降では削除されています。 - テンプレートを使用したトピック名とメッセージ・キーの解決
- Kafka Connectハンドラでのセキュリティの構成
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.3.1 Kafka Connectハンドラの構成
生成されたKafka Connectメッセージ内のメタ列フィールドの自動出力は、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) 21cリリース以降では削除されています。
メタ列フィールドは次のプロパティとして構成できます。
gg.handler.name.metaColumnsTemplate
メタ列を以前のバージョンと同様に出力するには、次のように構成します。
gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}
主キー列およびトークンも含めるには、次のように構成します。
gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}
詳細は、構成プロパティを参照してください:
gg.handler.name.metaColumnsTemplate
表8-10 Kafka Connectハンドラの構成プロパティ
プロパティ | 必須/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
gg.handler.name.type |
必須 |
|
なし |
Kafka Connectハンドラを選択するための構成。 |
gg.handler.name.kafkaProducerConfigFile |
必須 |
string |
なし |
KafkaのプロパティおよびKafka Connect構成プロパティを含むプロパティ・ファイルの名前。このファイルは、 |
gg.handler.name.topicMappingTemplate |
必須 |
実行時にKafkaトピック名を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
gg.handler.name.keyMappingTemplate |
必須 |
実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
gg.handler.name.includeTokens |
オプション |
|
|
このフィールドを抑止する場合は、 |
gg.handler.name.messageFormatting |
オプション |
|
|
出力メッセージのモデル化方法を制御します。行を選択すると、出力メッセージは行としてモデル化されます。opに設定すると、出力メッセージは操作メッセージとしてモデル化されます。 |
gg.handler.name.insertOpKey |
オプション |
任意の文字列 |
|
挿入操作を示すフィールド |
gg.handler.name.updateOpKey |
オプション |
任意の文字列 |
|
挿入操作を示すフィールド |
gg.handler.name.deleteOpKey |
オプション |
任意の文字列 |
|
削除操作を示すフィールド |
gg.handler.name.truncateOpKey |
オプション |
任意の文字列 |
|
切捨て操作を示すフィールド |
gg.handler.name.treatAllColumnsAsStrings |
オプション |
|
|
trueに設定すると、すべての出力フィールドが文字列として扱われます。falseに設定すると、ハンドラによって、ソース証跡ファイルからの対応するフィールド・タイプがそれに最も適したKafka Connectのデータ型にマップされます。 |
gg.handler.name.mapLargeNumbersAsStrings |
オプション |
|
|
大きい数値は、数値フィールドにDoubleとしてマッピングされます。特定のシナリオでは、精度を失う可能性があります。
|
|
オプション |
abend | update | delete-insert |
|
モデリング行メッセージが |
|
オプション |
任意のメタ列キーワード。 |
なし |
テンプレートを表す1つ以上のテンプレート値で構成されるカンマ区切り文字列。メタ列のキーワードを参照してください。 |
|
オプション |
|
|
このプロパティを各列に設定して、null値がソース証跡ファイルで実際にnullであるか、ソース証跡ファイルにないかを、ダウンストリーム・アプリケーションが区別できるようにします。 |
gg.handler.name.enableDecimalLogicalType |
オプション | true|false |
false |
Kafka Connectで小数論理型を有効にするには、true に設定します。小数論理型では、64ビット・データ型に収まらない数値を表すことができます。
|
gg.handler.name.oracleNumberScale |
オプション | 正の整数 | 38 | gg.handler.name.enableDecimalLogicalType=true の場合にのみ適用されます。一部のソース・データ型には、固定スケールが関連付けられていません。Kafka Connect小数論理型にスケールを設定する必要があります。メタデータにスケールがないソース型の場合は、このパラメータの値を使用してスケールを設定します。
|
gg.handler.name.EnableTimestampLogicalType |
オプション | true|false |
false |
Kafka Connectタイムスタンプ論理型を有効にするには、true に設定します。Kafka Connectタイムスタンプの論理時間は、Javaのエポックからのミリ秒の整数測定値です。これは、タイムスタンプ論理型が使用された場合、ミリ秒より高い精度が使用できないことを意味します。このプロパティを使用するには、gg.format.timestamp プロパティを設定する必要があります。このプロパティは、文字列形式のタイムスタンプの出力を決定するために使用されるタイムスタンプ書式設定文字列です。たとえば、gg.format.timestamp=yyyy-MM-dd HH:mm:ss.SSS です。goldengate.userexit.timestamp プロパティが構成ファイルに設定されていないことを確認します。このプロパティを設定すると、入力タイムスタンプが、論理タイムスタンプに必要なJavaオブジェクトに解析されなくなります。
|
gg.handler.name.metaHeadersTemplate |
オプション | メタ列キーワードのカンマ区切りリスト。 | なし | メタ列を選択し、メタ列キーワード構文を使用してコンテキスト・ベースのキー値ペアをKafkaメッセージ・ヘッダーに挿入できます。「メタ列のキーワード」を参照してください。 |
gg.handler.name.schemaNamespace
|
オプション | KafkaコネクタAvroスキーマのネーミング要件に違反する文字を含まない任意の文字列。 | なし | 生成されるKafka Connectスキーマ名の制御に使用されます。設定されていない場合、スキーマ名は修飾されたソース表名と同じになります。たとえば、ソース表がQASOURCE.TCUSTMER である場合、Kafka Connectスキーマ名は同じになります。
このプロパティを使用して、生成されるスキーマ名を制御できます。たとえば、このプロパティが |
gg.handler.name.enableNonnullable |
オプション | true|false |
false |
デフォルトの動作では、生成されたKafka接続スキーマですべてのフィールドをnull許容として設定します。このパラメータをtrue に設定すると、メタデータ・プロバイダによって提供されるターゲット・メタデータで、構成されたnull許容値を使用できます。このプロパティをtrue に設定すると、有害な副作用が発生する可能性があります。
|
詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。
サンプル構成の確認
gg.handlerlist=kafkaconnect #The handler properties gg.handler.kafkaconnect.type=kafkaconnect gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties gg.handler.kafkaconnect.mode=op #The following selects the topic name based on the fully qualified table name gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys} #The formatter properties gg.handler.kafkaconnect.messageFormatting=row gg.handler.kafkaconnect.insertOpKey=I gg.handler.kafkaconnect.updateOpKey=U gg.handler.kafkaconnect.deleteOpKey=D gg.handler.kafkaconnect.truncateOpKey=T gg.handler.kafkaconnect.treatAllColumnsAsStrings=false gg.handler.kafkaconnect.pkUpdateHandling=abend
親トピック: Kafka Connectハンドラの設定および実行
8.2.8.2.3.2 テンプレートを使用したトピック名とメッセージ・キーの解決
Kafka Connectハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成パラメータに適用されます。
gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate
テンプレートのモード
Kafka Connectハンドラでは、操作メッセージのみを送信できます。Kafka Connectハンドラでは、操作メッセージをより大きなトランザクション・メッセージにグループ化できません。
親トピック: Kafka Connectハンドラの設定および実行
8.2.8.2.3.3 Kafka Connectハンドラでのセキュリティの構成
Kafkaバージョン0.9.0.0にSSL/TLSまたはKerberosを介したセキュリティが導入されました。Kafka Connectハンドラは、SSL/TLSまたはKerberos使用して保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを利用する統合からのセキュリティ機能の抽象化を提供します。Kafka Connectハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサのプロパティ・ファイル(Kafkaハンドラで処理のために使用する)を必要なセキュリティ・プロパティを使用して構成する必要があります。
keytab
ファイルからKerberosパスワードを復号化できない場合があります。これによって、Kerberos認証が対話型モードにフォール・バックされ、プログラムで呼び出されているため機能しなくなります。この問題の原因は、Java Cryptography Extension (JCE)が、Java Runtime Environment (JRE)にインストールされていないことです。JCEがJREにロードされていることを確認します。http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.htmlを参照してください。
親トピック: Kafka Connectハンドラの設定および実行
8.2.8.2.4 セキュア・スキーマ・レジストリへの接続
Kafka Connectの顧客トポロジには、保護されているスキーマ・レジストリが含まれる場合があります。このトピックでは、保護されたスキーマ・レジストリへの接続用に構成されたKafkaプロデューサ・プロパティを設定する方法について説明します。
SSL相互認証key.converter.schema.registry.ssl.truststore.location= key.converter.schema.registry.ssl.truststore.password= key.converter.schema.registry.ssl.keystore.location= key.converter.schema.registry.ssl.keystore.password= key.converter.schema.registry.ssl.key.password= value.converter.schema.registry.ssl.truststore.location= value.converter.schema.registry.ssl.truststore.password= value.converter.schema.registry.ssl.keystore.location= value.converter.schema.registry.ssl.keystore.password= value.converter.schema.registry.ssl.key.password=
SSL基本認証
key.converter.basic.auth.credentials.source=USER_INFO key.converter.basic.auth.user.info=username:password key.converter.schema.registry.ssl.truststore.location= key.converter.schema.registry.ssl.truststore.password= value.converter.basic.auth.credentials.source=USER_INFO value.converter.basic.auth.user.info=username:password value.converter.schema.registry.ssl.truststore.location= value.converter.schema.registry.ssl.truststore.password=
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.5 Kafka Connectハンドラのパフォーマンスに関する考慮事項
Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)構成とKafkaプロデューサでは、パフォーマンスに影響を与える複数の構成設定があります。
Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPS
パラメータです。GROUPTRANSOPS
パラメータを使用すると、Replicatで複数のソース・トランザクションを単一のターゲット・トランザクションにグループ化できます。トランザクションのコミット時、Kafka Connectハンドラは、書込み永続性のためにKafkaプロデューサ上でフラッシュをコールしてメッセージをKafkaにプッシュし、その後チェックポイントを送信します。フラッシュ・コールはコストの大きなコールであり、ReplicatのGROUPTRANSOPS
設定を大きく設定すると、Replicatのフラッシュ・コールの頻度が少なくてすみ、パフォーマンスが向上します。
GROUPTRANSOPS
のデフォルト設定は1000
で、値を2500、5000、さらには10000に増やすことでパフォーマンスの向上が得られます。
opモードのgg.handler.kafkaconnect.mode=op
パラメータもまた、txモードのgg.handler.kafkaconnect.mode=tx
よりもパフォーマンスを向上させることができます。
Kafkaプロデューサのいくつかのプロパティがパフォーマンスに影響する可能性があります。重要な影響を与えるパラメータは次のとおりです。
-
linger.ms
-
batch.size
-
acks
-
buffer.memory
-
compression.type
これらのパラメータのデフォルト値から開始し、パフォーマンスのベース・ラインを取得するためのパフォーマンス・テストを実行することをお薦めします。これらの各パラメータのKafkaドキュメントを参照して、その役割を理解し、パラメータを調整し、各パラメータのパフォーマンス効果を確認するための追加のパフォーマンス・テストを実行します。
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.6 Kafkaインターセプタのサポート
Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。
インターセプタの一般的な使用例は、監視です。Kafkaプロデューサ・インターセプタは、インタフェース org.apache.kafka.clients.producer.ProducerInterceptor
に準拠している必要があります。Kafka Connectハンドラは、プロデューサ・インターセプタの使用をサポートしています。
ハンドラでインターセプタを使用する場合の要件は次のとおりです。
- Kafkaプロデューサ構成プロパティ"
interceptor.classes
"は、起動するインターセプタのクラス名で構成する必要があります。 - インターセプタを呼び出すためには、jarファイルとすべての依存関係jarをJVMで使用できる必要があります。したがって、インターセプタを含むjarファイルとすべての依存関係jarを、ハンドラ構成ファイルの
gg.classpath
に追加する必要があります。詳細は、Kafkaのドキュメントを参照してください。
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.7 Kafkaパーティションの選択
Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。
(Kafkaメッセージ・キーのハッシュ)係数(パーティションの数) = 選択したパーティション番号
Kafkaメッセージ・キーは、次の構成値によって選択されます。
gg.handler.{your handler name}.keyMappingTemplate=
このパラメータが静的キーを生成する値に設定されている場合、すべてのメッセージは同じパーティションに送信されます。静的キーの例を次に示します。
gg.handler.{your handler name}.keyMappingTemplate=StaticValue
このパラメータが、頻繁に変更されないキーを生成する値に設定されている場合、パーティション選択の変更頻度は低くなります。次の例では、表名をメッセージ・キーとして使用します。特定のソース表に対するすべての操作は同じキーを持つため、同じパーティションにルーティングされます。
gg.handler.{your handler name}.keyMappingTemplate=${tableName}
gg.handler.{your handler name}.keyMappingTemplate=${null}
マッピング・キーの構成に推奨される設定は次のとおりです。
gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}
これにより、連結および区切られた主キー値であるKafkaメッセージ・キーが生成されます。
各行の操作には一意の主キーが必要であるため、各行に一意のKafkaメッセージ・キーが生成されます。別の重要な考慮事項として、異なるパーティションに送信されたKafkaメッセージは、送信された元の順序でKafkaコンシューマに配信される保証はありません。これはKafkaの仕様の一部です。順序はパーティション内でのみ維持されます。Kafkaメッセージ・キーとして主キーを使用すると、同じ主キーを持つ同じ行に対する操作で同じKafkaメッセージ・キーが生成されるため、同じKafkaパーティションに送信されます。このようにして、同じ行に対する操作の順序が維持されます。
DEBUG
ログ・レベルでは、Kafkaメッセージの座標(トピック、パーティションおよびオフセット)は、正常に送信されたメッセージの.log
ファイルに記録されます。
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.8 Kafka Connectハンドラのトラブルシューティング
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.8.1 Kafka Connectハンドラ用のJavaクラスパス
Javaクラスパスは、非常によくある問題の1つです。クラスパスに問題があることを示すのは、Oracle GoldenGate Javaのlog4j
ログ・ファイルにあるClassNotFoundException
で、gg.classpath
変数に入力ミスがある場合は、クラスパスを解決する際にエラーが発生します。
Kafkaクライアント・ライブラリは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)製品には付属していません。「Kafka Connectハンドラの設定および実行」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタのプロパティ・ファイルでgg.classpath
プロパティを適切に構成する作業は、ユーザーが行う必要があります。
8.2.8.2.8.2 無効なKafkaバージョン
Kafka Connectは、Kafka 0.9.0.0バージョンで導入されました。Kafka Connectハンドラは、0.8.2.2以前のバージョンのKafkaでは動作しません。Kafka 0.8.2.2でKafka Connectを使用しようとすると、実行時にClassNotFoundException
エラーが発生します。
8.2.8.2.8.3 Kafkaプロデューサ・プロパティ・ファイルが見つからない
通常、次の例外メッセージが表示されます。
ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties
Kafkaプロデューサの構成ファイル名のgg.handler.kafkahandler.KafkaProducerConfigFile
構成プロパティが正しく設定されていることを確認してください。
gg.classpath
変数にKafkaプロデューサのプロパティ・ファイルのパスが含まれていること、プロパティ・ファイルのパスの末尾にワイルドカード(*)が含まれていないことを確認してください。
8.2.8.2.8.4 Kafkaの接続の問題
通常、次の例外メッセージが表示されます。
WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN (Selector.java:276) - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused
これが発生すると、接続の再試行時間が経過し、Kafka接続ハンドラ・プロセスが異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサのプロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。
Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -l
など)を使用すると、Kafkaが所定のポートでリスニングしていることを確認できます。
8.2.8.2.9 Kafka Connectハンドラ・クライアント依存性
Apache Kafka Connectデータベースに接続するためのKafka Connectハンドラの依存性とはどのようなものでしょう。
Maven Central RepositoryのKafka Connectデータベースのアーティファクトは次のとおりです。
Maven groupId: org.apache.kafka
Maven artifactId: kafka-clients & connect-json
Maven version: 各セクションでリストされるKafka Connectのバージョン番号
- Kafka 2.8.0
- Kafka 2.7.1
- Kafka 2.6.0
- Kafka 2.5.1
- Kafka 2.4.1
- Kafka 2.3.1
- Kafka 2.2.1
- Kafka 2.1.1
- Kafka 2.0.1
- Kafka 1.1.1
- Kafka 1.0.2
- Kafka 0.11.0.0
- Kafka 0.10.2.0
- Kafka 0.10.2.0
- Kafka 0.10.0.0
- Kafka 0.9.0.1
親トピック: Apache Kafka Connectハンドラ
8.2.8.2.9.1 Kafka 2.8.0
connect-api-2.8.0.jar connect-json-2.8.0.jar jackson-annotations-2.10.5.jar jackson-core-2.10.5.jar jackson-databind-2.10.5.1.jar jackson-datatype-jdk8-2.10.5.jar javax.ws.rs-api-2.1.1.jar kafka-clients-2.8.0.jar lz4-java-1.7.1.jar slf4j-api-1.7.30.jar snappy-java-1.1.8.1.jar zstd-jni-1.4.9-1.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.2 Kafka 2.7.1
connect-api-2.7.1.jar connect-json-2.7.1.jar jackson-annotations-2.10.5.jar jackson-core-2.10.5.jar jackson-databind-2.10.5.1.jar jackson-datatype-jdk8-2.10.5.jar javax.ws.rs-api-2.1.1.jar kafka-clients-2.7.1.jar lz4-java-1.7.1.jar slf4j-api-1.7.30.jar snappy-java-1.1.7.7.jar zstd-jni-1.4.5-6.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.3 Kafka 2.6.0
connect-api-2.6.0.jar connect-json-2.6.0.jar jackson-annotations-2.10.2.jar jackson-core-2.10.2.jar jackson-databind-2.10.2.jar jackson-datatype-jdk8-2.10.2.jar javax.ws.rs-api-2.1.1.jar kafka-clients-2.6.0.jar lz4-java-1.7.1.jar slf4j-api-1.7.30.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.4-7.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.4 Kafka 2.5.1
connect-api-2.5.1.jar connect-json-2.5.1.jar jackson-annotations-2.10.2.jar jackson-core-2.10.2.jar jackson-databind-2.10.2.jar jackson-datatype-jdk8-2.10.2.jar javax.ws.rs-api-2.1.1.jar kafka-clients-2.5.1.jar lz4-java-1.7.1.jar slf4j-api-1.7.30.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.4-7.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.5 Kafka 2.4.1
kafka-clients-2.4.1.jar lz4-java-1.6.0.jar slf4j-api-1.7.28.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.3-1.jarr
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.6 Kafka 2.3.1
connect-api-2.3.1.jar connect-json-2.3.1.jar jackson-annotations-2.10.0.jar jackson-core-2.10.0.jar jackson-databind-2.10.0.jar jackson-datatype-jdk8-2.10.0.jar javax.ws.rs-api-2.1.1.jar kafka-clients-2.3.1.jar lz4-java-1.6.0.jar slf4j-api-1.7.26.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.0-1.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.7 Kafka 2.2.1
kafka-clients-2.2.1.jar lz4-java-1.5.0.jar slf4j-api-1.7.25.jar snappy-java-1.1.7.2.jar zstd-jni-1.3.8-1.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.8 Kafka 2.1.1
audience-annotations-0.5.0.jar connect-api-2.1.1.jar connect-json-2.1.1.jar jackson-annotations-2.9.0.jar jackson-core-2.9.8.jar jackson-databind-2.9.8.jar javax.ws.rs-api-2.1.1.jar jopt-simple-5.0.4.jar kafka_2.12-2.1.1.jar kafka-clients-2.1.1.jar lz4-java-1.5.0.jar metrics-core-2.2.0.jar scala-library-2.12.7.jar scala-logging_2.12-3.9.0.jar scala-reflect-2.12.7.jar slf4j-api-1.7.25.jar snappy-java-1.1.7.2.jar zkclient-0.11.jar zookeeper-3.4.13.jar zstd-jni-1.3.7-1.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.9 Kafka 2.0.1
audience-annotations-0.5.0.jar connect-api-2.0.1.jar connect-json-2.0.1.jar jackson-annotations-2.9.0.jar jackson-core-2.9.7.jar jackson-databind-2.9.7.jar javax.ws.rs-api-2.1.jar jopt-simple-5.0.4.jar kafka_2.12-2.0.1.jar kafka-clients-2.0.1.jar lz4-java-1.4.1.jar metrics-core-2.2.0.jar scala-library-2.12.6.jar scala-logging_2.12-3.9.0.jar scala-reflect-2.12.6.jar slf4j-api-1.7.25.jar snappy-java-1.1.7.1.jar zkclient-0.10.jar zookeeper-3.4.13.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.10 Kafka 1.1.1
kafka-clients-1.1.1.jar lz4-java-1.4.1.jar slf4j-api-1.7.25.jar snappy-java-1.1.7.1.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.11 Kafka 1.0.2
kafka-clients-1.0.2.jar lz4-java-1.4.jar slf4j-api-1.7.25.jar snappy-java-1.1.4.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.12 Kafka 0.11.0.0
connect-api-0.11.0.0.jar connect-json-0.11.0.0.jar jackson-annotations-2.8.0.jar jackson-core-2.8.5.jar jackson-databind-2.8.5.jar jopt-simple-5.0.3.jar kafka_2.11-0.11.0.0.jar kafka-clients-0.11.0.0.jar log4j-1.2.17.jar lz4-1.3.0.jar metrics-core-2.2.0.jar scala-library-2.11.11.jar scala-parser-combinators_2.11-1.0.4.jar slf4j-api-1.7.25.jar slf4j-log4j12-1.7.25.jar snappy-java-1.1.2.6.jar zkclient-0.10.jar zookeeper-3.4.10.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.13 Kafka 0.10.2.0
connect-api-0.10.2.0.jar connect-json-0.10.2.0.jar jackson-annotations-2.8.0.jar jackson-core-2.8.5.jar jackson-databind-2.8.5.jar jopt-simple-5.0.3.jar kafka_2.11-0.10.2.0.jar kafka-clients-0.10.2.0.jar log4j-1.2.17.jar lz4-1.3.0.jar metrics-core-2.2.0.jar scala-library-2.11.8.jar scala-parser-combinators_2.11-1.0.4.jar slf4j-api-1.7.21.jar slf4j-log4j12-1.7.21.jar snappy-java-1.1.2.6.jar zkclient-0.10.jar zookeeper-3.4.9.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.14 Kafka 0.10.2.0
connect-api-0.10.1.1.jar connect-json-0.10.1.1.jar jackson-annotations-2.6.0.jar jackson-core-2.6.3.jar jackson-databind-2.6.3.jar jline-0.9.94.jar jopt-simple-4.9.jar kafka_2.11-0.10.1.1.jar kafka-clients-0.10.1.1.jar log4j-1.2.17.jar lz4-1.3.0.jar metrics-core-2.2.0.jar netty-3.7.0.Final.jar scala-library-2.11.8.jar scala-parser-combinators_2.11-1.0.4.jar slf4j-api-1.7.21.jar slf4j-log4j12-1.7.21.jar snappy-java-1.1.2.6.jar zkclient-0.9.jar zookeeper-3.4.8.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.15 Kafka 0.10.0.0
activation-1.1.jar connect-api-0.10.0.0.jar connect-json-0.10.0.0.jar jackson-annotations-2.6.0.jar jackson-core-2.6.3.jar jackson-databind-2.6.3.jar jline-0.9.94.jar jopt-simple-4.9.jar junit-3.8.1.jar kafka_2.11-0.10.0.0.jar kafka-clients-0.10.0.0.jar log4j-1.2.15.jar lz4-1.3.0.jar mail-1.4.jar metrics-core-2.2.0.jar netty-3.7.0.Final.jar scala-library-2.11.8.jar scala-parser-combinators_2.11-1.0.4.jar slf4j-api-1.7.21.jar slf4j-log4j12-1.7.21.jar snappy-java-1.1.2.4.jar zkclient-0.8.jar zookeeper-3.4.6.jar
親トピック: Kafka Connectハンドラ・クライアント依存性
8.2.8.2.9.16 Kafka 0.9.0.1
activation-1.1.jar connect-api-0.9.0.1.jar connect-json-0.9.0.1.jar jackson-annotations-2.5.0.jar jackson-core-2.5.4.jar jackson-databind-2.5.4.jar jline-0.9.94.jar jopt-simple-3.2.jar junit-3.8.1.jar kafka_2.11-0.9.0.1.jar kafka-clients-0.9.0.1.jar log4j-1.2.15.jar lz4-1.2.0.jar mail-1.4.jar metrics-core-2.2.0.jar netty-3.7.0.Final.jar scala-library-2.11.7.jar scala-parser-combinators_2.11-1.0.4.jar scala-xml_2.11-1.0.4.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar snappy-java-1.1.1.7.jar zkclient-0.7.jar zookeeper-3.4.6.jar
8.2.8.2.9.16.1 Confluentの依存性
ノート:
次にリストするConfluentの依存性は、Kafka Connect Avroコンバータおよび関連付けられたAvroスキーマ・レジストリ・クライアントに関するものです。Confluent Kafka Connectと統合されている場合、前の項にリストされている対応するKafkaバージョンのKafka Connect依存性に加えて、次の依存性が必要です。
- Confluent 6.2.0
- Confluent 6.1.0
- Confluent 6.0.0
- Confluent 5.5.0
- Confluent 5.4.0
- Confluent 5.3.0
- Confluent 5.2.1
- Confluent 5.1.3
- Confluent 5.0.3
- Confluent 4.1.2
親トピック: Kafka 0.9.0.1
8.2.8.2.9.16.1.1 Confluent 6.2.0
avro-1.10.1.jar commons-compress-1.20.jar common-utils-6.2.0.jar connect-api-6.2.0-ccs.jar connect-json-6.2.0-ccs.jar jackson-annotations-2.10.5.jar jackson-core-2.11.3.jar jackson-databind-2.10.5.1.jar jackson-datatype-jdk8-2.10.5.jar jakarta.annotation-api-1.3.5.jar jakarta.inject-2.6.1.jar jakarta.ws.rs-api-2.1.6.jar javax.ws.rs-api-2.1.1.jar jersey-common-2.34.jar kafka-avro-serializer-6.2.0.jar kafka-clients-6.2.0-ccs.jar kafka-connect-avro-converter-6.2.0.jar kafka-connect-avro-data-6.2.0.jar kafka-schema-registry-client-6.2.0.jar kafka-schema-serializer-6.2.0.jar lz4-java-1.7.1.jar osgi-resource-locator-1.0.3.jar slf4j-api-1.7.30.jar snappy-java-1.1.8.1.jar swagger-annotations-1.6.2.jar zstd-jni-1.4.9-1.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.2 Confluent 6.1.0
avro-1.9.2.jar commons-compress-1.19.jar common-utils-6.1.0.jar connect-api-6.1.0-ccs.jar connect-json-6.1.0-ccs.jar jackson-annotations-2.10.5.jar jackson-core-2.10.2.jar jackson-databind-2.10.5.1.jar jackson-datatype-jdk8-2.10.5.jar jakarta.annotation-api-1.3.5.jar jakarta.inject-2.6.1.jar jakarta.ws.rs-api-2.1.6.jar javax.ws.rs-api-2.1.1.jar jersey-common-2.31.jar kafka-avro-serializer-6.1.0.jar kafka-clients-6.1.0-ccs.jar kafka-connect-avro-converter-6.1.0.jar kafka-connect-avro-data-6.1.0.jar kafka-schema-registry-client-6.1.0.jar kafka-schema-serializer-6.1.0.jar lz4-java-1.7.1.jar osgi-resource-locator-1.0.3.jar slf4j-api-1.7.30.jar snappy-java-1.1.7.7.jar swagger-annotations-1.6.2.jar zstd-jni-1.4.5-6.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.3 Confluent 6.0.0
avro-1.9.2.jar commons-compress-1.19.jar common-utils-6.0.0.jar connect-api-6.0.0-ccs.jar connect-json-6.0.0-ccs.jar jackson-annotations-2.10.5.jar jackson-core-2.10.2.jar jackson-databind-2.10.5.jar jackson-datatype-jdk8-2.10.5.jar jakarta.annotation-api-1.3.5.jar jakarta.inject-2.6.1.jar jakarta.ws.rs-api-2.1.6.jar javax.ws.rs-api-2.1.1.jar jersey-common-2.30.jar kafka-avro-serializer-6.0.0.jar kafka-clients-6.0.0-ccs.jar kafka-connect-avro-converter-6.0.0.jar kafka-connect-avro-data-6.0.0.jar kafka-schema-registry-client-6.0.0.jar kafka-schema-serializer-6.0.0.jar lz4-java-1.7.1.jar osgi-resource-locator-1.0.3.jar slf4j-api-1.7.30.jar snappy-java-1.1.7.3.jar swagger-annotations-1.6.2.jar zstd-jni-1.4.4-7.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.4 Confluent 5.5.0
avro-1.9.2.jar classmate-1.3.4.jar common-config-5.5.0.jar commons-compress-1.19.jar commons-lang3-3.2.1.jar common-utils-5.5.0.jar connect-api-5.5.0-ccs.jar connect-json-5.5.0-ccs.jar guava-18.0.jar hibernate-validator-6.0.17.Final.jar jackson-annotations-2.10.2.jar jackson-core-2.10.2.jar jackson-databind-2.10.2.jar jackson-dataformat-yaml-2.4.5.jar jackson-datatype-jdk8-2.10.2.jar jackson-datatype-joda-2.4.5.jar jakarta.annotation-api-1.3.5.jar jakarta.el-3.0.2.jar jakarta.el-api-3.0.3.jar jakarta.inject-2.6.1.jar jakarta.validation-api-2.0.2.jar jakarta.ws.rs-api-2.1.6.jar javax.ws.rs-api-2.1.1.jar jboss-logging-3.3.2.Final.jar jersey-bean-validation-2.30.jar jersey-client-2.30.jar jersey-common-2.30.jar jersey-media-jaxb-2.30.jar jersey-server-2.30.jar joda-time-2.2.jar kafka-avro-serializer-5.5.0.jar kafka-clients-5.5.0-ccs.jar kafka-connect-avro-converter-5.5.0.jar kafka-connect-avro-data-5.5.0.jar kafka-schema-registry-client-5.5.0.jar kafka-schema-serializer-5.5.0.jar lz4-java-1.7.1.jar osgi-resource-locator-1.0.3.jar slf4j-api-1.7.30.jar snakeyaml-1.12.jar snappy-java-1.1.7.3.jar swagger-annotations-1.5.22.jar swagger-core-1.5.3.jar swagger-models-1.5.3.jar zstd-jni-1.4.4-7.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.5 Confluent 5.4.0
avro-1.9.1.jar common-config-5.4.0.jar commons-compress-1.19.jar commons-lang3-3.2.1.jar common-utils-5.4.0.jar connect-api-5.4.0-ccs.jar connect-json-5.4.0-ccs.jar guava-18.0.jar jackson-annotations-2.9.10.jar jackson-core-2.9.9.jar jackson-databind-2.9.10.1.jar jackson-dataformat-yaml-2.4.5.jar jackson-datatype-jdk8-2.9.10.jar jackson-datatype-joda-2.4.5.jar javax.ws.rs-api-2.1.1.jar joda-time-2.2.jar kafka-avro-serializer-5.4.0.jar kafka-clients-5.4.0-ccs.jar kafka-connect-avro-converter-5.4.0.jar kafka-schema-registry-client-5.4.0.jar lz4-java-1.6.0.jar slf4j-api-1.7.28.jar snakeyaml-1.12.jar snappy-java-1.1.7.3.jar swagger-annotations-1.5.22.jar swagger-core-1.5.3.jar swagger-models-1.5.3.jar zstd-jni-1.4.3-1.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.6 Confluent 5.3.0
audience-annotations-0.5.0.jar avro-1.8.1.jar common-config-5.3.0.jar commons-compress-1.8.1.jar common-utils-5.3.0.jar connect-api-5.3.0-ccs.jar connect-json-5.3.0-ccs.jar jackson-annotations-2.9.0.jar jackson-core-2.9.9.jar jackson-core-asl-1.9.13.jar jackson-databind-2.9.9.jar jackson-datatype-jdk8-2.9.9.jar jackson-mapper-asl-1.9.13.jar javax.ws.rs-api-2.1.1.jar jline-0.9.94.jar jsr305-3.0.2.jar kafka-avro-serializer-5.3.0.jar kafka-clients-5.3.0-ccs.jar kafka-connect-avro-converter-5.3.0.jar kafka-schema-registry-client-5.3.0.jar lz4-java-1.6.0.jar netty-3.10.6.Final.jar paranamer-2.7.jar slf4j-api-1.7.26.jar snappy-java-1.1.1.3.jar spotbugs-annotations-3.1.9.jar xz-1.5.jar zkclient-0.10.jar zookeeper-3.4.14.jar zstd-jni-1.4.0-1.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.7 Confluent 5.2.1
audience-annotations-0.5.0.jar avro-1.8.1.jar common-config-5.2.1.jar commons-compress-1.8.1.jar common-utils-5.2.1.jar connect-api-2.2.0-cp2.jar connect-json-2.2.0-cp2.jar jackson-annotations-2.9.0.jar jackson-core-2.9.8.jar jackson-core-asl-1.9.13.jar jackson-databind-2.9.8.jar jackson-datatype-jdk8-2.9.8.jar jackson-mapper-asl-1.9.13.jar javax.ws.rs-api-2.1.1.jar jline-0.9.94.jar kafka-avro-serializer-5.2.1.jar kafka-clients-2.2.0-cp2.jar kafka-connect-avro-converter-5.2.1.jar kafka-schema-registry-client-5.2.1.jar lz4-java-1.5.0.jar netty-3.10.6.Final.jar paranamer-2.7.jar slf4j-api-1.7.25.jar snappy-java-1.1.1.3.jar xz-1.5.jar zkclient-0.10.jar zookeeper-3.4.13.jar zstd-jni-1.3.8-1.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.8 Confluent 5.1.3
audience-annotations-0.5.0.jar avro-1.8.1.jar common-config-5.1.3.jar commons-compress-1.8.1.jar common-utils-5.1.3.jar connect-api-2.1.1-cp3.jar connect-json-2.1.1-cp3.jar jackson-annotations-2.9.0.jar jackson-core-2.9.8.jar jackson-core-asl-1.9.13.jar jackson-databind-2.9.8.jar jackson-mapper-asl-1.9.13.jar javax.ws.rs-api-2.1.1.jar jline-0.9.94.jar kafka-avro-serializer-5.1.3.jar kafka-clients-2.1.1-cp3.jar kafka-connect-avro-converter-5.1.3.jar kafka-schema-registry-client-5.1.3.jar lz4-java-1.5.0.jar netty-3.10.6.Final.jar paranamer-2.7.jar slf4j-api-1.7.25.jar snappy-java-1.1.1.3.jar xz-1.5.jar zkclient-0.10.jar zookeeper-3.4.13.jar zstd-jni-1.3.7-1.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.9 Confluent 5.0.3
audience-annotations-0.5.0.jar avro-1.8.1.jar common-config-5.0.3.jar commons-compress-1.8.1.jar common-utils-5.0.3.jar connect-api-2.0.1-cp4.jar connect-json-2.0.1-cp4.jar jackson-annotations-2.9.0.jar jackson-core-2.9.7.jar jackson-core-asl-1.9.13.jar jackson-databind-2.9.7.jar jackson-mapper-asl-1.9.13.jar javax.ws.rs-api-2.1.jar jline-0.9.94.jar kafka-avro-serializer-5.0.3.jar kafka-clients-2.0.1-cp4.jar kafka-connect-avro-converter-5.0.3.jar kafka-schema-registry-client-5.0.3.jar lz4-java-1.4.1.jar netty-3.10.6.Final.jar paranamer-2.7.jar slf4j-api-1.7.25.jar snappy-java-1.1.1.3.jar xz-1.5.jar zkclient-0.10.jar zookeeper-3.4.13.jar
親トピック: Confluentの依存性
8.2.8.2.9.16.1.10 Confluent 4.1.2
avro-1.8.1.jar common-config-4.1.2.jar commons-compress-1.8.1.jar common-utils-4.1.2.jar connect-api-1.1.1-cp1.jar connect-json-1.1.1-cp1.jar jackson-annotations-2.9.0.jar jackson-core-2.9.6.jar jackson-core-asl-1.9.13.jar jackson-databind-2.9.6.jar jackson-mapper-asl-1.9.13.jar jline-0.9.94.jar kafka-avro-serializer-4.1.2.jar kafka-clients-1.1.1-cp1.jar kafka-connect-avro-converter-4.1.2.jar kafka-schema-registry-client-4.1.2.jar log4j-1.2.16.jar lz4-java-1.4.1.jar netty-3.10.5.Final.jar paranamer-2.7.jar slf4j-api-1.7.25.jar slf4j-log4j12-1.6.1.jar snappy-java-1.1.1.3.jar xz-1.5.jar zkclient-0.10.jar zookeeper-3.4.10.jar
親トピック: Confluentの依存性
8.2.8.3 Apache Kafka RESTプロキシ
Kafka REST Proxyハンドラは、Confluentによって配布されたKafka REST Proxyにメッセージをストリーミングします。
この章では、Kafka REST Proxyハンドラの使用方法について説明します。
親トピック: Apache Kafka
8.2.8.3.1 概要
Kafka REST Proxyハンドラを使用すると、KafkaメッセージをHTTPSプロトコルを使用してストリーミングできます。この機能のユースケースとしては、KafkaメッセージをOracle GoldenGate On Premisesインストールからクラウドに、またはクラウドからクラウドにストリーミングします。
Kafka REST Proxyは、KafkaクラスタへのRESTfulインタフェースを提供します。これにより、次のことを簡単に実行できます。
-
メッセージの生成および消費。
-
クラスタの状態の表示。
-
ネイティブKafkaプロトコルまたはクライアントを使用しない管理アクションの実行。
Kafka REST Proxyは、Confluent Open SourceおよびConfluent Enterprise配布の一部です。これは、Apache Kafka配布では使用できません。REST Proxy経由でKafkaにアクセスするには、Confluent Kafkaバージョンをインストールする必要があります。https://docs.confluent.io/current/kafka-rest/docs/index.htmlを参照してください。
親トピック: Apache Kafka RESTプロキシ
8.2.8.3.2 Kafka REST Proxyハンドラ・サービスの設定および開始
ZIP、tarアーカイブ、Docker、パッケージなど、複数のインストール形式から選択できます。
- Kafka REST Proxyハンドラの使用
- 依存性のダウンロード
- クラスパス構成
- Kafka REST Proxyハンドラの構成
- サンプル構成の確認
- セキュリティ
- キーストアまたはトラストストアの生成
- テンプレートを使用したトピック名とメッセージ・キーの解決
- Kafka REST Proxyハンドラのフォーマッタ・プロパティ
親トピック: Apache Kafka RESTプロキシ
8.2.8.3.2.1 Kafka REST Proxyハンドラの使用
Kafka REST Proxyは、Apache、ClouderaまたはHortonworksに含まれていないため、Confluent Open SourceまたはConfluent Enterprise配布をダウンロードしてインストールする必要があります。ZIP、TARアーカイブ、Docker、パッケージなど、複数のインストール形式から選択できます。
Kafka REST Proxyは、ZooKeeper、KafkaおよびSchema Registryに依存しています
8.2.8.3.2.2 依存性のダウンロード
次の場所から、Javaクライアント依存性のJersey RESTful Webサービスを確認およびダウンロードできます。
https://eclipse-ee4j.github.io/jersey/
Jersey Apache Connectorの依存性は、Mavenリポジトリhttps://mvnrepository.com/artifact/org.glassfish.jersey.connectors/jersey-apache-connectorから確認およびダウンロードできます
8.2.8.3.2.3 クラスパス構成
Kafka RESTプロキシ・ハンドラは、Jerseyプロジェクトjersey-client
バージョン2.27およびjersey-connectors-apache
バージョン2.27を使用してKafkaに接続します。Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)には必要な依存性が含まれていないため、それらを取得する必要があります。「依存性のダウンロード」を参照してください。
Javaアダプタ・プロパティ・ファイルでgg.classpath
プロパティを使用して、これらの依存性を構成する必要があります。Kafka RESTプロキシ・ハンドラの正しく構成されたクラスパスの例を次に示します。
gg.classpath=dirprm:
{path_to_jersey_client_jars}/jaxrs-ri/lib/*:{path_to_jersey_client_jars}
/jaxrs-ri/api/*
:{path_to_jersey_client_jars}/jaxrs-ri/ext/*:{path_to_jersey_client_jars}
/connector/*
8.2.8.3.2.4 Kafka REST Proxyハンドラの構成
Kafka REST Proxyハンドラの構成可能な値は次のとおりです。Kafka REST Proxyプロパティ・ファイルは、Oracle GoldenGate dirprm
ディレクトリに保存することをお薦めします。
Kafka REST Proxyハンドラの選択を有効にするには、まずgg.handler.name.type=kafkarestproxy
を指定してハンドラ・タイプを構成してから、次に示す他のKafka REST Proxyハンドラ・プロパティを構成する必要があります。
プロパティ | 必須/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
|
必須 |
|
なし |
Kafka REST Proxyハンドラを選択するための構成。 |
|
必須 |
実行時にKafkaトピック名を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
|
必須 |
実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
|
必須 |
Rest Proxyのリスナー・アドレス。 |
なし |
Kafka REST ProxyのURLに設定します。 |
|
必須 |
|
なし |
RESTプロキシ・ペイロード・データ形式に設定します |
|
オプション |
ペイロード・サイズ(MB)を表す値。 |
|
HTTPメッセージのペイロードの最大サイズに設定します。 |
|
オプション |
|
|
使用するAPIのバージョンを設定します。 |
|
オプション |
|
|
操作の処理方法を設定します。 |
|
オプション |
トラストストアへのパス。 |
なし |
信頼できる認証局(CA)の証明書を保持するトラストストア・ファイルへのパス。これらのCAは、SSL接続中にサーバーによって提示された証明書を検証するために使用されます。「キーストアまたはトラストストアの生成」を参照してください。 |
|
オプション |
トラストストアのパスワード。 |
なし |
トラストストア・パスワード。 |
|
オプション |
キーストアへのパス。 |
なし |
アイデンティティの検証のために他のパーティ(サーバーまたはクライアント)に提示する秘密鍵およびアイデンティティ証明書が含まれるキーストア・ファイルへのパス。「キーストアまたはトラストストアの生成」を参照してください。 |
|
オプション |
キーストアのパスワード。 |
なし |
キーストア・パスワード。 |
|
オプション |
|
なし |
プロキシURLは次の形式です。 |
|
オプション |
任意の文字列。 |
なし |
プロキシ・ユーザー名 |
|
オプション |
任意の文字列。 |
なし |
プロキシ・パスワード。 |
|
オプション |
整数値。 |
なし |
サーバーが応答できる時間。 |
|
オプション |
整数値。 |
なし |
ホストへの接続の確立を待機する時間。 |
gg.handler.name.format.metaColumnsTemplate |
オプション |
| |
なし |
| ${optype}, ${token.ROWID}, ${sys.username}, ${currenttimestamp} |
詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。
8.2.8.3.2.5 サンプル構成の確認
Javaアダプタ・プロパティ・ファイルからのKafka RESTプロキシ・ハンドラの構成例を次に示します。
gg.handlerlist=kafkarestproxy #The handler properties gg.handler.kafkarestproxy.type=kafkarestproxy #The following selects the topic name based on the fully qualified table name gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys} gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083 gg.handler.kafkarestproxy.apiVersion=v1 gg.handler.kafkarestproxy.format=json gg.handler.kafkarestproxy.payloadsize=1 gg.handler.kafkarestproxy.mode=tx #Server auth properties #gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks #gg.handler.kafkarestproxy.trustStorePassword=test1234 #Client auth properites #gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks #gg.handler.kafkarestproxy.keyStorePassword=test1234 #Proxy properties #gg.handler.kafkarestproxy.proxy=http://proxyurl:80 #gg.handler.kafkarestproxy.proxyUserName=username #gg.handler.kafkarestproxy.proxyPassword=password #The MetaColumnTemplate formatter properties gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}
8.2.8.3.2.6 セキュリティ
次の間でセキュリティを使用できます。
-
Kafka RESTプロキシ・クライアントとKafka RESTプロキシ・サーバー。Oracle GoldenGate RESTプロキシ・ハンドラはKafka RESTプロキシ・クライアントです。
-
Kafka RESTプロキシ・サーバーとKafkaブローカ。Kafka RESTプロキシ・サーバーのセキュリティ・ドキュメントおよび構成を十分に確認することをお薦めします。https://docs.confluent.io/current/kafka-rest/docs/index.htmlを参照してください。
REST Proxyは、クライアントとKafka REST Proxyハンドラの間の通信を保護するためにSSLをサポートしています。SSLを構成するには:
-
スクリプトを使用してキーストアを生成します。「キーストアまたはトラストストアの生成」を参照してください。
-
次のプロパティを使用して、
kafka-rest.properties
ファイルのKafka REST Proxyサーバー構成を更新します。listeners=https://hostname:8083 confluent.rest.auth.propagate.method=SSL Configuration Options for HTTPS ssl.client.auth=true ssl.keystore.location={keystore_file_path}/server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location={keystore_file_path}/server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
-
サーバーを再起動します。
相互認証を無効にするには、ssl.client.auth=
プロパティをtrue
からfalse
に更新します。
8.2.8.3.2.7 キーストアまたはトラストストアの生成
トラストストアの生成
このスクリプトを実行して、ca-cert
、ca-key
およびtruststore.jks
トラストストア・ファイルを生成します。
#!/bin/bash PASSWORD=password CLIENT_PASSWORD=password VALIDITY=365
次に、この例のようにCAを生成します。
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -passin pass:$PASSWORD -passout pass:$PASSWORD -subj "/C=US/ST=CA/L=San Jose/O=Company/OU=Org/CN=FQDN" -nodes
最後に、keytool
を使用して、CAをサーバーのトラストストアに追加します。
keytool -keystore truststore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORD -keypass $PASSWORD
キーストアの生成
このスクリプトを実行し、引数としてfqdn
を渡して、ca-cert.srl
、cert-file
、cert-signed
、およびkeystore.jks
キーストア・ファイルを生成します。
#!/bin/bash PASSWORD=password VALIDITY=365 if [ $# -lt 1 ]; then echo "`basename $0` host fqdn|user_name|app_name" exit 1 fi CNAME=$1 ALIAS=`echo $CNAME|cut -f1 -d"."`
次に、この例のようにkeytool
を使用してキーストアを生成します。
keytool -noprompt ¿keystore keystore.jks -alias $ALIAS -keyalg RSA -validity $VALIDITY -genkey -dname "CN=$CNAME,OU=BDP,O=Company,L=San Jose,S=CA,C=US" -storepass $PASSWORD -keypass $PASSWORD
次に、キーストア内のすべての証明書をCAで署名します。
keytool -keystore keystore.jks -alias $ALIAS -certreq -file cert-file -storepass $PASSWORDopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
最後に、CAと署名付き証明書の両方をキーストアにインポートします。
keytool -keystore keystore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORDkeytool -keystore keystore.jks -alias $ALIAS -import -file cert-signed -storepass $PASSWORD
8.2.8.3.2.8 テンプレートを使用したトピック名とメッセージ・キーの解決
Kafka REST Proxyハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成プロパティを使用します。
gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate
テンプレートのモード
Kafka REST Proxyハンドラは、操作(挿入、更新、削除)ごとに1つのメッセージを送信するように構成できます。または、トランザクション・レベルで操作をメッセージにグループ化するように構成できます。
テンプレートの例
テンプレートの構成値の例と解決される値を次に示します。
テンプレートの例 | 解決される値 |
---|---|
|
|
|
|
|
|
8.2.8.3.2.9 Kafka REST Proxyハンドラのフォーマッタ・プロパティ
Kafka REST Proxyハンドラ・フォーマッタの構成可能な値は次のとおりです。
表8-11 Kafka REST Proxyハンドラ・フォーマッタ・プロパティ
プロパティ | オプション/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
gg.handler.name.format.includeOpType |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includeOpTimestamp |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includeCurrentTimestamp |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includePosition |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includePrimaryKeys |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includeTokens |
オプション |
|
|
このフィールドを抑止する場合は、 |
gg.handler.name.format.insertOpKey |
オプション |
任意の文字列。 |
|
挿入操作を示すフィールド |
gg.handler.name.format.updateOpKey |
オプション |
任意の文字列。 |
|
更新操作を示すフィールド |
gg.handler.name.format.deleteOpKey |
オプション |
任意の文字列。 |
|
削除操作を示すフィールド |
gg.handler.name.format.truncateOpKey |
オプション |
任意の文字列。 |
|
切捨て操作を示すフィールド |
gg.handler.name.format.treatAllColumnsAsStrings |
オプション |
|
|
|
gg.handler.name.format.mapLargeNumbersAsStrings |
オプション |
|
|
|
gg.handler.name.format.iso8601Format |
オプション |
|
|
|
gg.handler.name.format.pkUpdateHandling |
オプション |
|
|
これは、 |
8.2.8.3.3 レコードの消費
Kafka REST Proxyハンドラを使用して、Kafkaトピックからデータを消費する簡単な方法はCurlです。
JSONデータの消費
-
JSONデータのコンシューマを作成します。
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" https://localhost:8082/consumers/my_json_consumer
-
トピックをサブスクライブします。
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topicname"]}' \ https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
-
レコードを消費します。
curl –k -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Avroデータの消費
-
Avroデータのコンシューマを作成します。
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \ https://localhost:8082/consumers/my_avro_consumer
-
トピックをサブスクライブします。
curl –k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topicname"]}' \ https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
-
レコードを消費します。
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \ https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records
ノート:
RESTプロキシをホストしているマシンからcurl
を使用している場合は、メッセージを使用する前に、http_proxy
環境変数の設定を解除します。ローカル・マシンからcurl
を使用してKafka RESTプロキシからメッセージを取得する場合は、http_proxy
環境変数の設定が必要になります。
親トピック: Apache Kafka RESTプロキシ
8.2.8.3.4 パフォーマンスに関する考慮事項
Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)構成とKafkaプロデューサでは、パフォーマンスに影響を与える複数の構成設定があります。
Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPS
パラメータです。これにより、Replicatは複数のソース・トランザクションを1つのターゲット・トランザクションにグループ化できます。トランザクションのコミット時に、Kafka REST Proxyハンドラは、KafkaプロデューサにデータをPOST
します。
ReplicatのGROUPTRANSOPS
を大きい値に設定すると、ReplicatがPOST
を呼び出す頻度が少なくなり、パフォーマンスが向上します。GROUPTRANSOPS
のデフォルト値は1000で、値を2500、5000、さらには10000に増やすことでパフォーマンスが向上します。
親トピック: Apache Kafka RESTプロキシ
8.2.8.3.5 Kafka REST Proxyハンドラのメタ列テンプレート・プロパティ
Kafka REST Proxyサーバーを起動する際の問題
Kafka RESTプロキシ・サーバーを起動するスクリプトは、そのCLASSPATH
を環境CLASSPATH
変数に追加します。これが設定され、環境CLASSPATH
にKafka RESTプロキシ・サーバーの正常な実行と競合するJARファイルが含まれていた場合、起動できなくなります。Kafka RESTプロキシ・サーバーを起動する前に、CLASSPATH
環境変数の設定を解除することをお薦めします。CLASSPATH
を""
に再設定すると、この問題を解決できます。
親トピック: Apache Kafka RESTプロキシ