21 Kafka REST Proxyハンドラの使用
Kafka REST Proxyハンドラは、Confluentによって配布されたKafka REST Proxyにメッセージをストリーミングします。
この章では、Kafka REST Proxyハンドラの使用方法について説明します。
21.1 概要
Kafka REST Proxyハンドラを使用すると、KafkaメッセージをHTTPSプロトコルを使用してストリーミングできます。この機能のユースケースとしては、KafkaメッセージをOracle GoldenGate On Premisesインストールからクラウドに、またはクラウドからクラウドにストリーミングします。
Kafka REST Proxyは、KafkaクラスタへのRESTfulインタフェースを提供します。これにより、次のことを簡単に実行できます。
-
メッセージの生成および消費。
-
クラスタの状態の表示。
-
ネイティブKafkaプロトコルまたはクライアントを使用しない管理アクションの実行。
Kafka REST Proxyは、Confluent Open SourceおよびConfluent Enterprise配布の一部です。これは、Apache Kafka配布では使用できません。REST Proxy経由でKafkaにアクセスするには、Confluent Kafkaバージョンをインストールする必要があります。https://docs.confluent.io/current/kafka-rest/docs/index.htmlを参照してください。
親トピック: Kafka REST Proxyハンドラの使用
21.2 Kafka REST Proxyハンドラ・サービスの設定および開始
ZIP、tarアーカイブ、Docker、パッケージなど、複数のインストール形式から選択できます。
- Kafka REST Proxyハンドラの使用
- 依存性のダウンロード
- クラスパス構成
- Kafka REST Proxyハンドラの構成
- サンプル構成の確認
- セキュリティ
- キーストアまたはトラストストアの生成
- テンプレートを使用したトピック名とメッセージ・キーの解決
- Kafka REST Proxyハンドラのフォーマッタ・プロパティ
親トピック: Kafka REST Proxyハンドラの使用
21.2.1 Kafka REST Proxyハンドラの使用
Kafka REST Proxyは、Apache、ClouderaまたはHortonworksに含まれていないため、Confluent Open SourceまたはConfluent Enterprise配布をダウンロードしてインストールする必要があります。ZIP、TARアーカイブ、Docker、パッケージなど、複数のインストール形式から選択できます。
Kafka REST Proxyは、ZooKeeper、KafkaおよびSchema Registryに依存しています
21.2.2 依存性のダウンロード
次の場所から、Javaクライアント依存性のJersey RESTful Webサービスを確認およびダウンロードできます。
https://eclipse-ee4j.github.io/jersey/
Jersey Apache Connectorの依存性は、Mavenリポジトリhttps://mvnrepository.com/artifact/org.glassfish.jersey.connectors/jersey-apache-connectorから確認およびダウンロードできます。
21.2.3 クラスパス構成
Kafka RESTプロキシ・ハンドラは、Jerseyプロジェクトjersey-client
バージョン2.27およびjersey-connectors-apache
バージョン2.27を使用してKafkaに接続します。Oracle GoldenGate for Big Dataに必要な依存性が含まれていないため、それらを取得する必要があります。「依存性のダウンロード」を参照してください。
Javaアダプタ・プロパティ・ファイルでgg.classpath
プロパティを使用して、これらの依存性を構成する必要があります。Kafka RESTプロキシ・ハンドラの正しく構成されたクラスパスの例を次に示します。
gg.classpath=dirprm:
{path_to_jersey_client_jars}/jaxrs-ri/lib/*:{path_to_jersey_client_jars}
/jaxrs-ri/api/*
:{path_to_jersey_client_jars}/jaxrs-ri/ext/*:{path_to_jersey_client_jars}
/connector/*
21.2.4 Kafka REST Proxyハンドラの構成
Kafka REST Proxyハンドラの構成可能な値は次のとおりです。Kafka REST Proxyプロパティ・ファイルは、Oracle GoldenGate dirprm
ディレクトリに保存することをお薦めします。
Kafka REST Proxyハンドラの選択を有効にするには、まずgg.handler.name.type=kafkarestproxy
を指定してハンドラ・タイプを構成してから、次に示す他のKafka REST Proxyハンドラ・プロパティを構成する必要があります。
プロパティ | 必須/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
|
必須 |
|
なし |
Kafka REST Proxyハンドラを選択するための構成。 |
|
必須 |
実行時にKafkaトピック名を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
|
必須 |
実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。 |
|
必須 |
Rest Proxyのリスナー・アドレス。 |
なし |
Kafka REST ProxyのURLに設定します。 |
|
必須 |
|
なし |
RESTプロキシ・ペイロード・データ形式に設定します |
|
オプション |
ペイロード・サイズ(MB)を表す値。 |
|
HTTPメッセージのペイロードの最大サイズに設定します。 |
|
オプション |
|
|
使用するAPIのバージョンを設定します。 |
|
オプション |
|
|
操作の処理方法を設定します。 |
|
オプション |
トラストストアへのパス。 |
なし |
信頼できる認証局(CA)の証明書を保持するトラストストア・ファイルへのパス。これらのCAは、SSL接続中にサーバーによって提示された証明書を検証するために使用されます。「キーストアまたはトラストストアの生成」を参照してください。 |
|
オプション |
トラストストアのパスワード。 |
なし |
トラストストア・パスワード。 |
|
オプション |
キーストアへのパス。 |
なし |
アイデンティティの検証のために他のパーティ(サーバーまたはクライアント)に提示する秘密キーおよびアイデンティティ証明書が含まれるキーストア・ファイルへのパス。「キーストアまたはトラストストアの生成」を参照してください。 |
|
オプション |
キーストアのパスワード。 |
なし |
キーストア・パスワード。 |
|
オプション |
|
なし |
プロキシURLは次の形式です。 |
|
オプション |
任意の文字列。 |
なし |
プロキシ・ユーザー名 |
|
オプション |
任意の文字列。 |
なし |
プロキシ・パスワード。 |
|
オプション |
整数値。 |
なし |
サーバーが応答できる時間。 |
|
オプション |
整数値。 |
なし |
ホストへの接続の確立を待機する時間。 |
詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。
21.2.5 サンプル構成の確認
Javaアダプタ・プロパティ・ファイルからのKafka RESTプロキシ・ハンドラの構成例を次に示します。
gg.handlerlist=kafkarestproxy #The handler properties gg.handler.kafkarestproxy.type=kafkarestproxy #The following selects the topic name based on the fully qualified table name gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys} gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083 gg.handler.kafkarestproxy.apiVersion=v1 gg.handler.kafkarestproxy.format=json gg.handler.kafkarestproxy.payloadsize=1 gg.handler.kafkarestproxy.mode=tx #Server auth properties #gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks #gg.handler.kafkarestproxy.trustStorePassword=test1234 #Client auth properites #gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks #gg.handler.kafkarestproxy.keyStorePassword=test1234 #Proxy properties #gg.handler.kafkarestproxy.proxy=http://proxyurl:80 #gg.handler.kafkarestproxy.proxyUserName=username #gg.handler.kafkarestproxy.proxyPassword=password #The MetaColumnTemplate formatter properties gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}
21.2.6 セキュリティ
次の間でセキュリティを使用できます。
-
Kafka RESTプロキシ・クライアントとKafka RESTプロキシ・サーバー。Oracle GoldenGate RESTプロキシ・ハンドラはKafka RESTプロキシ・クライアントです。
-
Kafka RESTプロキシ・サーバーとKafkaブローカ。Kafka RESTプロキシ・サーバーのセキュリティ・ドキュメントおよび構成を十分に確認することをお薦めします。https://docs.confluent.io/current/kafka-rest/docs/index.htmlを参照してください。
REST Proxyは、クライアントとKafka REST Proxyハンドラの間の通信を保護するためにSSLをサポートしています。SSLを構成するには、次の手順を実行します。
-
スクリプトを使用してキーストアを生成します。「キーストアまたはトラストストアの生成」を参照してください。
-
次のプロパティを使用して、
kafka-rest.properties
ファイルのKafka REST Proxyサーバー構成を更新します。listeners=https://hostname:8083 confluent.rest.auth.propagate.method=SSL Configuration Options for HTTPS ssl.client.auth=true ssl.keystore.location={keystore_file_path}/server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location={keystore_file_path}/server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
-
サーバーを再起動します。
相互認証を無効にするには、ssl.client.auth=
プロパティをtrue
からfalse
に更新します。
21.2.7 キーストアまたはトラストストアの生成
トラストストアの生成
このスクリプトを実行して、ca-cert
、ca-key
およびtruststore.jks
トラストストア・ファイルを生成します。
#!/bin/bash PASSWORD=password CLIENT_PASSWORD=password VALIDITY=365
次に、この例のようにCAを生成します。
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -passin pass:$PASSWORD -passout pass:$PASSWORD -subj "/C=US/ST=CA/L=San Jose/O=Company/OU=Org/CN=FQDN" -nodes
最後に、keytool
を使用して、CAをサーバーのトラストストアに追加します。
keytool -keystore truststore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORD -keypass $PASSWORD
キーストアの生成
このスクリプトを実行し、引数としてfqdn
を渡して、ca-cert.srl
、cert-file
、cert-signed
、およびkeystore.jks
キーストア・ファイルを生成します。
#!/bin/bash PASSWORD=password VALIDITY=365 if [ $# -lt 1 ]; then echo "`basename $0` host fqdn|user_name|app_name" exit 1 fi CNAME=$1 ALIAS=`echo $CNAME|cut -f1 -d"."`
次に、この例のようにkeytool
を使用してキーストアを生成します。
keytool -noprompt ¿keystore keystore.jks -alias $ALIAS -keyalg RSA -validity $VALIDITY -genkey -dname "CN=$CNAME,OU=BDP,O=Company,L=San Jose,S=CA,C=US" -storepass $PASSWORD -keypass $PASSWORD
次に、キーストア内のすべての証明書をCAで署名します。
keytool -keystore keystore.jks -alias $ALIAS -certreq -file cert-file -storepass $PASSWORDopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
最後に、CAと署名付き証明書の両方をキーストアにインポートします。
keytool -keystore keystore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORDkeytool -keystore keystore.jks -alias $ALIAS -import -file cert-signed -storepass $PASSWORD
21.2.7.1 メタ列出力の設定
メタ列出力を制御するKafka REST Proxyハンドラのメタ列テンプレート・プロパティの構成可能な値は次のとおりです。
表21-1 メタ列テンプレート・プロパティ
プロパティ | 必須/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
gg.handler.name.format.metaColumnsTemplate |
オプション |
| |
なし |
現在のメタ列情報を簡単な方法で構成でき、使用する明示的な必要性が排除されます。 insertOpKey | updateOpKey | deleteOpKey | truncateOpKey | includeTableName | includeOpTimestamp | includeOpType | includePosition | includeCurrentTimestamp, useIso8601Format これは、テンプレートを表す1つ以上のテンプレート化された値で構成されているカンマ区切りの文字列です。 |
${optype}, ${token.ROWID}, ${sys.username}, ${currenttimestamp}
メタ列のキーワードの説明
メタ列機能を使用すると、生成された出力メッセージに表示するメタデータ・フィールドを選択できます。メタ列構文の形式は次のとおりです。
-
${keyword[fieldName].argument}
-
キーワードは、メタ列構文に基づいて固定されます。オプションで、大カッコの間にフィールド名を指定できます。フィールド名を指定しない場合は、デフォルトのフィールド名が使用されます。
メタ列の値を解決するには、引数が必要です。
-
${alltokens}
-
すべてのOracle GoldenGateトークン。
-
${token}
-
特定のOracle GoldenGateトークンの値。トークン・キーは、ピリオド(
.
)演算子を使用してトークンの後に続ける必要があります。たとえば:${token.MYTOKEN}
-
${token.MYTOKEN}
-
${sys}
システム環境変数。変数名は、ピリオド(.)演算子を使用してsysの後に続ける必要があります。
-
${sys.MYVAR}
-
${sys.MYVAR}
Oracle GoldenGate環境変数。変数名は、ピリオド(.)演算子を使用して
env
の後に続ける必要があります。 -
${env}
-
Oracle GoldenGate環境変数。変数名は、ピリオド(
.
)演算子を使用してenv
の後に続ける必要があります。たとえば:${env.someVariable}
-
${javaprop}
-
Java JVM変数。変数名は、ピリオド(
.
)演算子を使用してjavaprop
の後に続ける必要があります。たとえば:${javaprop.MYVAR}
-
${optype}
-
操作タイプ。
-
${position}
-
レコードの位置。
-
${timestamp}
-
レコードのタイムスタンプ。
-
${catalog}
-
カタログ名。
-
${schema}
-
スキーマ名。
-
${table}
-
表名。
-
${objectname}
-
完全修飾の表名。
-
${csn}
-
ソースのコミット順序番号
-
${xid}
-
ソース・トランザクションID。
-
${currenttimestamp}
-
現在のタイムスタンプ。
-
${currenttimestampiso8601}
-
ISO 8601形式の現在のタイムスタンプ。
-
${opseqno}
-
トランザクション内のレコード順序番号。
-
${timestampmicro}
-
レコードのタイムスタンプ(マイクロ秒/エポック後)。
-
${currenttimestampmicro}
-
現在のタイムスタンプ(マイクロ秒/エポック後)。
-
${txind}
-
ソース証跡ファイルのトランザクション・インジケータです。トランザクションの値は、最初の操作の場合は
B
、中間の操作の場合はM
、最後の操作の場合はE
、操作1つのみの場合はW
です。フィルタリング操作または調整適用を使用すると、このフィールドの有用性が無効になります。 -
${primarykeycolumns}
-
主キー列名のリストを含むフィールドを挿入する場合に使用します。
-
${static}
-
静的値を含むフィールドを出力に挿入する場合に使用します。必要な値は引数です。必要な値が
abc
である場合の構文は、${static.abc}
または${static[FieldName].abc}
です。
サンプル構成:
gg.handlerlist=kafkarestproxy
#The handler properties
gg.handler.kafkarestproxy.type=kafkarestproxy
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys}
gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083
gg.handler.kafkarestproxy.apiVersion=v1
gg.handler.kafkarestproxy.format=json
gg.handler.kafkarestproxy.payloadsize=1
gg.handler.kafkarestproxy.mode=tx
#Server auth properties
#gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks
#gg.handler.kafkarestproxy.trustStorePassword=test1234
#Client auth properites
#gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks
#gg.handler.kafkarestproxy.keyStorePassword=test1234
#Proxy properties
#gg.handler.kafkarestproxy.proxy=http://proxyurl:80
#gg.handler.kafkarestproxy.proxyUserName=username
#gg.handler.kafkarestproxy.proxyPassword=password
#The MetaColumnTemplate formatter properties
gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}
親トピック: キーストアまたはトラストストアの生成
21.2.8 テンプレートを使用したトピック名とメッセージ・キーの解決
Kafka REST Proxyハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成プロパティを使用します。
gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate
テンプレートのモード
Kafka REST Proxyハンドラは、操作(挿入、更新、削除)ごとに1つのメッセージを送信するように構成できます。または、トランザクション・レベルで操作をメッセージにグループ化するように構成できます。
テンプレートのキーワード
この表には、トランザクション・レベルのメッセージでそのキーワードがサポートされているかどうかを示す列が含まれています。
キーワード | 説明 | トランザクション・メッセージのサポート |
---|---|---|
|
カタログ、スキーマおよび表名の間にピリオド(.)のデリミタを含む、完全修飾表名に解決されます。 たとえば、 |
いいえ |
|
カタログ名に解決されます。 |
いいえ |
|
スキーマ名に解決されます。 |
いいえ |
|
短い表名に解決されます。 |
いいえ |
|
操作の種類( |
いいえ |
|
アンダースコア(_)で区切られた連結主キー値に解決されます。 |
いいえ |
|
ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。 |
はい |
|
ソース証跡ファイルからの操作タイムスタンプ。 |
はい |
|
""に解決されます。 |
はい |
|
Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。 |
はい |
|
キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。
|
いいえ |
|
キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。たとえば:
|
いいえ |
または
|
現在のタイムスタンプに解決されます。 例:
|
はい |
|
NULL文字列に解決されます。 |
はい |
|
カスタム値リゾルバを記述することが可能です。必要に応じて、Oracleサポートに連絡してください。 |
実装依存 |
テンプレートの例
テンプレートの構成値の例と解決される値を次に示します。
テンプレートの例 | 解決される値 |
---|---|
|
|
|
|
|
|
21.2.9 Kafka REST Proxyハンドラのフォーマッタ・プロパティ
Kafka REST Proxyハンドラ・フォーマッタの構成可能な値は次のとおりです。
表21-2 Kafka REST Proxyハンドラ・フォーマッタ・プロパティ
プロパティ | オプション/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
gg.handler.name.format.includeOpType |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includeOpTimestamp |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includeCurrentTimestamp |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includePosition |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includePrimaryKeys |
オプション |
|
|
出力でこのフィールドを省略する場合は、 |
gg.handler.name.format.includeTokens |
オプション |
|
|
このフィールドを抑止する場合は、 |
gg.handler.name.format.insertOpKey |
オプション |
任意の文字列。 |
|
挿入操作を示すフィールド |
gg.handler.name.format.updateOpKey |
オプション |
任意の文字列。 |
|
更新操作を示すフィールド |
gg.handler.name.format.deleteOpKey |
オプション |
任意の文字列。 |
|
削除操作を示すフィールド |
gg.handler.name.format.truncateOpKey |
オプション |
任意の文字列。 |
|
切捨て操作を示すフィールド |
gg.handler.name.format.treatAllColumnsAsStrings |
オプション |
|
|
|
gg.handler.name.format.mapLargeNumbersAsStrings |
オプション |
|
|
|
gg.handler.name.format.iso8601Format |
オプション |
|
|
|
gg.handler.name.format.pkUpdateHandling |
オプション |
|
|
これは、 |
21.3 レコードの消費
Kafka REST Proxyハンドラを使用して、Kafkaトピックからデータを消費する簡単な方法はカールです。
JSONデータの消費
-
JSONデータのコンシューマを作成します。
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" https://localhost:8082/consumers/my_json_consumer
-
トピックをサブスクライブします。
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topicname"]}' \ https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
-
レコードを消費します。
curl –k -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Avroデータの消費
-
Avroデータのコンシューマを作成します。
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \ https://localhost:8082/consumers/my_avro_consumer
-
トピックをサブスクライブします。
curl –k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topicname"]}' \ https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
-
レコードを消費します。
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \ https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records
ノート:
RESTプロキシをホストしているマシンからcurl
を使用している場合は、メッセージを使用する前に、http_proxy
環境変数の設定を解除します。ローカル・マシンからcurl
を使用してKafka RESTプロキシからメッセージを取得する場合は、http_proxy
環境変数の設定が必要になります。
親トピック: Kafka REST Proxyハンドラの使用
21.4 パフォーマンスに関する考慮事項
Oracle GoldenGate for Big Dataの構成とKafkaプロデューサのパフォーマンスの両方に影響を与える複数の構成設定があります。
Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPS
パラメータです。これにより、Replicatは複数のソース・トランザクションを1つのターゲット・トランザクションにグループ化できます。トランザクションのコミット時に、Kafka REST Proxyハンドラは、KafkaプロデューサにデータをPOST
します。
ReplicatのGROUPTRANSOPS
を大きい値に設定すると、ReplicatがPOST
を呼び出す頻度が少なくなり、パフォーマンスが向上します。GROUPTRANSOPS
のデフォルト値は1000で、値を2500、5000、さらには10000に増やすことでパフォーマンスが向上します。
親トピック: Kafka REST Proxyハンドラの使用
21.5 Kafka REST Proxyハンドラのメタ列テンプレート・プロパティ
Kafka REST Proxyサーバーを起動する際の問題
Kafka RESTプロキシ・サーバーを起動するスクリプトは、そのCLASSPATH
を環境CLASSPATH
変数に追加します。これが設定され、環境CLASSPATH
にKafka RESTプロキシ・サーバーの正常な実行と競合するJARファイルが含まれていた場合、起動できなくなります。Kafka RESTプロキシ・サーバーを起動する前に、CLASSPATH
環境変数の設定を解除することをお薦めします。CLASSPATH
を""
に再設定すると、この問題を解決できます。
親トピック: Kafka REST Proxyハンドラの使用