21 Kafka REST Proxyハンドラの使用

Kafka REST Proxyハンドラは、Confluentによって配布されたKafka REST Proxyにメッセージをストリーミングします。

この章では、Kafka REST Proxyハンドラの使用方法について説明します。

21.1 概要

Kafka REST Proxyハンドラを使用すると、KafkaメッセージをHTTPSプロトコルを使用してストリーミングできます。この機能のユースケースとしては、KafkaメッセージをOracle GoldenGate On Premisesインストールからクラウドに、またはクラウドからクラウドにストリーミングします。

Kafka REST Proxyは、KafkaクラスタへのRESTfulインタフェースを提供します。これにより、次のことを簡単に実行できます。

  • メッセージの生成および消費。

  • クラスタの状態の表示。

  • ネイティブKafkaプロトコルまたはクライアントを使用しない管理アクションの実行。

Kafka REST Proxyは、Confluent Open SourceおよびConfluent Enterprise配布の一部です。これは、Apache Kafka配布では使用できません。REST Proxy経由でKafkaにアクセスするには、Confluent Kafkaバージョンをインストールする必要があります。https://docs.confluent.io/current/kafka-rest/docs/index.htmlを参照してください。

21.2 Kafka REST Proxyハンドラ・サービスの設定および開始

ZIP、tarアーカイブ、Docker、パッケージなど、複数のインストール形式から選択できます。

21.2.1 Kafka REST Proxyハンドラの使用

Kafka REST Proxyは、Apache、ClouderaまたはHortonworksに含まれていないため、Confluent Open SourceまたはConfluent Enterprise配布をダウンロードしてインストールする必要があります。ZIP、TARアーカイブ、Docker、パッケージなど、複数のインストール形式から選択できます。

Kafka REST Proxyは、ZooKeeper、KafkaおよびSchema Registryに依存しています

21.2.2 依存性のダウンロード

次の場所から、Javaクライアント依存性のJersey RESTful Webサービスを確認およびダウンロードできます。

https://eclipse-ee4j.github.io/jersey/

Jersey Apache Connectorの依存性は、Mavenリポジトリhttps://mvnrepository.com/artifact/org.glassfish.jersey.connectors/jersey-apache-connectorから確認およびダウンロードできます。

21.2.3 クラスパス構成

Kafka RESTプロキシ・ハンドラは、Jerseyプロジェクトjersey-clientバージョン2.27およびjersey-connectors-apacheバージョン2.27を使用してKafkaに接続します。Oracle GoldenGate for Big Dataに必要な依存性が含まれていないため、それらを取得する必要があります。「依存性のダウンロード」を参照してください。

Javaアダプタ・プロパティ・ファイルでgg.classpathプロパティを使用して、これらの依存性を構成する必要があります。Kafka RESTプロキシ・ハンドラの正しく構成されたクラスパスの例を次に示します。

gg.classpath=dirprm:
{path_to_jersey_client_jars}/jaxrs-ri/lib/*:{path_to_jersey_client_jars}
/jaxrs-ri/api/*
:{path_to_jersey_client_jars}/jaxrs-ri/ext/*:{path_to_jersey_client_jars}
/connector/*

21.2.4 Kafka REST Proxyハンドラの構成

Kafka REST Proxyハンドラの構成可能な値は次のとおりです。Kafka REST Proxyプロパティ・ファイルは、Oracle GoldenGate dirprmディレクトリに保存することをお薦めします。

Kafka REST Proxyハンドラの選択を有効にするには、まずgg.handler.name.type=kafkarestproxyを指定してハンドラ・タイプを構成してから、次に示す他のKafka REST Proxyハンドラ・プロパティを構成する必要があります。

プロパティ 必須/オプション 有効な値 デフォルト 説明

gg.handler.name.type

必須

kafkarestproxy

なし

Kafka REST Proxyハンドラを選択するための構成。

gg.handler.name.topicMappingTemplate

必須

実行時にKafkaトピック名を解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.keyMappingTemplate

必須

実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.postDataUrl

必須

Rest Proxyのリスナー・アドレス。

なし

Kafka REST ProxyのURLに設定します。

gg.handler.name.format

必須

avro | json

なし

RESTプロキシ・ペイロード・データ形式に設定します

gg.handler.name.payloadsize

オプション

ペイロード・サイズ(MB)を表す値。

5MB

HTTPメッセージのペイロードの最大サイズに設定します。

gg.handler.name.apiVersion

オプション

v1 | v2

v2

使用するAPIのバージョンを設定します。

gg.handler.name.mode

オプション

op | tx

op

操作の処理方法を設定します。opモードでは、操作は受信したとおりに処理されます。txモードでは、操作はトランザクションのコミット時にキャッシュされ、処理されます。

gg.handler.name.trustStore

オプション

トラストストアへのパス。

なし

信頼できる認証局(CA)の証明書を保持するトラストストア・ファイルへのパス。これらのCAは、SSL接続中にサーバーによって提示された証明書を検証するために使用されます。「キーストアまたはトラストストアの生成」を参照してください。

gg.handler.name.trustStorePassword

オプション

トラストストアのパスワード。

なし

トラストストア・パスワード。

gg.handler.name.keyStore

オプション

キーストアへのパス。

なし

アイデンティティの検証のために他のパーティ(サーバーまたはクライアント)に提示する秘密キーおよびアイデンティティ証明書が含まれるキーストア・ファイルへのパス。「キーストアまたはトラストストアの生成」を参照してください。

gg.handler.name.keyStorePassword

オプション

キーストアのパスワード。

なし

キーストア・パスワード。

gg.handler.name.proxy

オプション

http://host:port

なし

プロキシURLは次の形式です。http://host:port

gg.handler.name.proxyUserName

オプション

任意の文字列。

なし

プロキシ・ユーザー名

gg.handler.name.proxyPassword

オプション

任意の文字列。

なし

プロキシ・パスワード。

gg.handler.name.readTimeout

オプション

整数値。

なし

サーバーが応答できる時間。

gg.handler.name.connectionTimeout

オプション

整数値。

なし

ホストへの接続の確立を待機する時間。

詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。

21.2.5 サンプル構成の確認

Javaアダプタ・プロパティ・ファイルからのKafka RESTプロキシ・ハンドラの構成例を次に示します。

gg.handlerlist=kafkarestproxy

#The handler properties
gg.handler.kafkarestproxy.type=kafkarestproxy
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys}
gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083
gg.handler.kafkarestproxy.apiVersion=v1
gg.handler.kafkarestproxy.format=json
gg.handler.kafkarestproxy.payloadsize=1
gg.handler.kafkarestproxy.mode=tx

#Server auth properties
#gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks
#gg.handler.kafkarestproxy.trustStorePassword=test1234
#Client auth properites
#gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks
#gg.handler.kafkarestproxy.keyStorePassword=test1234

#Proxy properties
#gg.handler.kafkarestproxy.proxy=http://proxyurl:80
#gg.handler.kafkarestproxy.proxyUserName=username
#gg.handler.kafkarestproxy.proxyPassword=password

#The MetaColumnTemplate formatter properties
gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}

21.2.6 セキュリティ

次の間でセキュリティを使用できます。

  • Kafka RESTプロキシ・クライアントとKafka RESTプロキシ・サーバー。Oracle GoldenGate RESTプロキシ・ハンドラはKafka RESTプロキシ・クライアントです。

  • Kafka RESTプロキシ・サーバーとKafkaブローカ。Kafka RESTプロキシ・サーバーのセキュリティ・ドキュメントおよび構成を十分に確認することをお薦めします。https://docs.confluent.io/current/kafka-rest/docs/index.htmlを参照してください。

REST Proxyは、クライアントとKafka REST Proxyハンドラの間の通信を保護するためにSSLをサポートしています。SSLを構成するには、次の手順を実行します。

  1. スクリプトを使用してキーストアを生成します。「キーストアまたはトラストストアの生成」を参照してください。

  2. 次のプロパティを使用して、kafka-rest.propertiesファイルのKafka REST Proxyサーバー構成を更新します。

    listeners=https://hostname:8083
    confluent.rest.auth.propagate.method=SSL
    
    Configuration Options for HTTPS
    ssl.client.auth=true
    ssl.keystore.location={keystore_file_path}/server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    ssl.truststore.location={keystore_file_path}/server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
  3. サーバーを再起動します。

相互認証を無効にするには、ssl.client.auth=プロパティをtrueからfalseに更新します。

21.2.7 キーストアまたはトラストストアの生成

トラストストアの生成

このスクリプトを実行して、ca-certca-keyおよびtruststore.jksトラストストア・ファイルを生成します。

#!/bin/bash
PASSWORD=password
CLIENT_PASSWORD=password
VALIDITY=365

次に、この例のようにCAを生成します。

openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -passin pass:$PASSWORD
      -passout pass:$PASSWORD -subj "/C=US/ST=CA/L=San Jose/O=Company/OU=Org/CN=FQDN"
      -nodes

最後に、keytoolを使用して、CAをサーバーのトラストストアに追加します。

keytool -keystore truststore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORD
      -keypass $PASSWORD

キーストアの生成

このスクリプトを実行し、引数としてfqdnを渡して、ca-cert.srlcert-filecert-signed、およびkeystore.jksキーストア・ファイルを生成します。

#!/bin/bash
PASSWORD=password
VALIDITY=365

if [ $# -lt 1 ];
then
echo "`basename $0` host fqdn|user_name|app_name"
exit 1
fi

CNAME=$1
ALIAS=`echo $CNAME|cut -f1 -d"."`

次に、この例のようにkeytoolを使用してキーストアを生成します。

keytool -noprompt ¿keystore keystore.jks -alias $ALIAS -keyalg RSA -validity $VALIDITY
      -genkey -dname "CN=$CNAME,OU=BDP,O=Company,L=San Jose,S=CA,C=US" -storepass $PASSWORD
      -keypass $PASSWORD

次に、キーストア内のすべての証明書をCAで署名します。

keytool -keystore keystore.jks -alias $ALIAS -certreq -file cert-file -storepass
      $PASSWORDopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY
      -CAcreateserial -passin pass:$PASSWORD

最後に、CAと署名付き証明書の両方をキーストアにインポートします。

keytool -keystore keystore.jks -alias CARoot -import -file ca-cert -storepass
      $PASSWORDkeytool -keystore keystore.jks -alias $ALIAS -import -file cert-signed -storepass
      $PASSWORD
21.2.7.1 メタ列出力の設定

メタ列出力を制御するKafka REST Proxyハンドラのメタ列テンプレート・プロパティの構成可能な値は次のとおりです。

表21-1 メタ列テンプレート・プロパティ

プロパティ 必須/オプション 有効な値 デフォルト 説明
gg.handler.name.format.metaColumnsTemplate

オプション

${alltokens} | ${token} | ${env} | ${sys} | ${javaprop} | ${optype} | ${position} | ${timestamp} | ${catalog} | ${schema} | ${table} | ${objectname} | ${csn} | ${xid} | ${currenttimestamp} | ${opseqno} | ${timestampmicro} | ${currenttimestampmicro} |

${txind}

| ${primarykeycolumns}|${currenttimestampiso8601}${static}${segno} | ${rba}

なし

現在のメタ列情報を簡単な方法で構成でき、使用する明示的な必要性が排除されます。

insertOpKey | updateOpKey | deleteOpKey | truncateOpKey | includeTableName | includeOpTimestamp | includeOpType | includePosition | includeCurrentTimestamp, useIso8601Format

これは、テンプレートを表す1つ以上のテンプレート化された値で構成されているカンマ区切りの文字列です。

次に、メタ列のリストを生成する例を示します。
${optype}, ${token.ROWID}, ${sys.username}, ${currenttimestamp}

メタ列のキーワードの説明

メタ列機能を使用すると、生成された出力メッセージに表示するメタデータ・フィールドを選択できます。メタ列構文の形式は次のとおりです。

${keyword[fieldName].argument}

キーワードは、メタ列構文に基づいて固定されます。オプションで、大カッコの間にフィールド名を指定できます。フィールド名を指定しない場合は、デフォルトのフィールド名が使用されます。

メタ列の値を解決するには、引数が必要です。

${alltokens}

すべてのOracle GoldenGateトークン。

${token}

特定のOracle GoldenGateトークンの値。トークン・キーは、ピリオド(.)演算子を使用してトークンの後に続ける必要があります。たとえば: ${token.MYTOKEN}

${token.MYTOKEN}

${sys}

システム環境変数。変数名は、ピリオド(.)演算子を使用してsysの後に続ける必要があります。

${sys.MYVAR}

${sys.MYVAR}

Oracle GoldenGate環境変数。変数名は、ピリオド(.)演算子を使用してenvの後に続ける必要があります。

${env}

Oracle GoldenGate環境変数。変数名は、ピリオド(.)演算子を使用してenvの後に続ける必要があります。たとえば: ${env.someVariable}

${javaprop}

Java JVM変数。変数名は、ピリオド(.)演算子を使用してjavapropの後に続ける必要があります。たとえば: ${javaprop.MYVAR}

${optype}

操作タイプ。

${position}

レコードの位置。

${timestamp}

レコードのタイムスタンプ。

${catalog}

カタログ名。

${schema}

スキーマ名。

${table}

表名。

${objectname}

完全修飾の表名。

${csn}

ソースのコミット順序番号

${xid}

ソース・トランザクションID。

${currenttimestamp}

現在のタイムスタンプ。

${currenttimestampiso8601}

ISO 8601形式の現在のタイムスタンプ。

${opseqno}

トランザクション内のレコード順序番号。

${timestampmicro}

レコードのタイムスタンプ(マイクロ秒/エポック後)。

${currenttimestampmicro}

現在のタイムスタンプ(マイクロ秒/エポック後)。

${txind}

ソース証跡ファイルのトランザクション・インジケータです。トランザクションの値は、最初の操作の場合はB、中間の操作の場合はM、最後の操作の場合はE、操作1つのみの場合はWです。フィルタリング操作または調整適用を使用すると、このフィールドの有用性が無効になります。

${primarykeycolumns}

主キー列名のリストを含むフィールドを挿入する場合に使用します。

${static}

静的値を含むフィールドを出力に挿入する場合に使用します。必要な値は引数です。必要な値がabcである場合の構文は、${static.abc}または${static[FieldName].abc}です。

${seqno}
証跡ファイル・シーケンス含むフィールドを出力に挿入する場合に使用します。
${rba}
操作のrbaを含むフィールドを出力に挿入する場合に使用します。

サンプル構成:

gg.handlerlist=kafkarestproxy 

#The handler properties
gg.handler.kafkarestproxy.type=kafkarestproxy
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys}

gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083
gg.handler.kafkarestproxy.apiVersion=v1
gg.handler.kafkarestproxy.format=json
gg.handler.kafkarestproxy.payloadsize=1
gg.handler.kafkarestproxy.mode=tx

#Server auth properties
#gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks
#gg.handler.kafkarestproxy.trustStorePassword=test1234
#Client auth properites
#gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks
#gg.handler.kafkarestproxy.keyStorePassword=test1234

#Proxy properties
#gg.handler.kafkarestproxy.proxy=http://proxyurl:80
#gg.handler.kafkarestproxy.proxyUserName=username
#gg.handler.kafkarestproxy.proxyPassword=password

#The MetaColumnTemplate formatter properties
gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}

21.2.8 テンプレートを使用したトピック名とメッセージ・キーの解決

Kafka REST Proxyハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成プロパティを使用します。

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

テンプレートのモード

Kafka REST Proxyハンドラは、操作(挿入、更新、削除)ごとに1つのメッセージを送信するように構成できます。または、トランザクション・レベルで操作をメッセージにグループ化するように構成できます。

テンプレートのキーワード

この表には、トランザクション・レベルのメッセージでそのキーワードがサポートされているかどうかを示す列が含まれています。

キーワード 説明 トランザクション・メッセージのサポート

${fullyQualifiedTableName}

カタログ、スキーマおよび表名の間にピリオド(.)のデリミタを含む、完全修飾表名に解決されます。

たとえば、test.dbo.table1です。

いいえ

${catalogName}

カタログ名に解決されます。

いいえ

${schemaName}

スキーマ名に解決されます。

いいえ

${tableName}

短い表名に解決されます。

いいえ

${opType}

操作の種類(INSERTUPDATEDELETEまたはTRUNCATE)に解決されます

いいえ

${primaryKeys}

アンダースコア(_)で区切られた連結主キー値に解決されます。

いいえ

${position}

ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。

はい

${opTimestamp}

ソース証跡ファイルからの操作タイムスタンプ。

はい

${emptyString}

""に解決されます。

はい

${groupName}

Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。

はい

${staticMap[]}

キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。

${staticMap[dbo.table1=value1,dbo.table2=value2]}

いいえ

${columnValue[]}

キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。たとえば:

${staticMap[dbo.table1=col1,dbo.table2=col2]}

いいえ

${currentTimestamp}

または

${currentTimestamp[]}

現在のタイムスタンプに解決されます。SimpleDateFormatクラスで説明されているJavaベースの書式設定を使用して、現在のタイムスタンプの書式を制御できます(https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.htmlを参照)。

例:

${currentDate}
${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

はい

${null}

NULL文字列に解決されます。

はい

${custom[]}

カスタム値リゾルバを記述することが可能です。必要に応じて、Oracleサポートに連絡してください。

実装依存

テンプレートの例

テンプレートの構成値の例と解決される値を次に示します。

テンプレートの例 解決される値

${groupName}_${fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

21.2.9 Kafka REST Proxyハンドラのフォーマッタ・プロパティ

Kafka REST Proxyハンドラ・フォーマッタの構成可能な値は次のとおりです。

表21-2 Kafka REST Proxyハンドラ・フォーマッタ・プロパティ

プロパティ オプション/オプション 有効な値 デフォルト 説明
gg.handler.name.format.includeOpType

オプション

true | false

true

trueに設定すると、出力メッセージにop_tsというフィールドが作成されます。この値は、ソース・データベースの操作タイプのインジケータ(たとえば、挿入の場合はI、更新の場合はU、削除の場合はD)になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includeOpTimestamp

オプション

true | false

true

trueに設定すると、出力メッセージにop_typeというフィールドが作成されます。この値は、ソース証跡ファイルの操作タイムスタンプ(コミット・タイムスタンプ)になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includeCurrentTimestamp

オプション

true | false

true

trueに設定すると、出力メッセージにcurrent_tsというフィールドが作成されます。この値は、ハンドラが操作を処理するときの現在のタイムスタンプになります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includePosition

オプション

true | false

true

trueに設定すると、出力メッセージにposというフィールドが作成されます。この値は、ソース証跡ファイルからの操作の位置(順序番号+オフセット)になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includePrimaryKeys

オプション

true | false

true

trueに設定すると、出力メッセージにprimary_keysというフィールドが作成されます。この値は、主キー列の列名の配列になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includeTokens

オプション

true | false

true

trueに設定すると、出力メッセージにマップ・フィールドが含まれます。キーはトークンで、その値は、キーと値がOracle GoldenGateソース証跡ファイルからのトークン・キーと値であるマップです。

このフィールドを抑止する場合は、falseに設定します。

gg.handler.name.format.insertOpKey

オプション

任意の文字列。

I

挿入操作を示すフィールドop_typeの値。

gg.handler.name.format.updateOpKey

オプション

任意の文字列。

U

更新操作を示すフィールドop_typeの値。

gg.handler.name.format.deleteOpKey

オプション

任意の文字列。

D

削除操作を示すフィールドop_typeの値。

gg.handler.name.format.truncateOpKey

オプション

任意の文字列。

T

切捨て操作を示すフィールドop_typeの値。

gg.handler.name.format.treatAllColumnsAsStrings

オプション

true | false

false

trueに設定すると、すべての出力フィールドが文字列として扱われます。

falseに設定すると、ハンドラによって、ソース証跡ファイルの対応するフィールド・タイプがそれに最も適したKafkaのデータ型にマップされます。

gg.handler.name.format.mapLargeNumbersAsStrings

オプション

true | false

false

trueに設定すると、精度を保つため、これらのフィールドが文字列としてマップされます。このプロパティは、Avroフォーマッタに固有であり、他のフォーマッタと組み合せて使用することはできません。

gg.handler.name.format.iso8601Format

オプション

true | false

false

trueに設定すると、現在の日付がISO8601形式で出力されます。

gg.handler.name.format.pkUpdateHandling

オプション

abend | update | delete-insert

abend

これは、.(gg.handler.name.format.messageFormatting=rowプロパティの行メッセージをモデリングしている場合にのみ適用されます。これは、更新時に操作前および操作後イメージがメッセージに伝播される操作メッセージをモデリングしている場合は適用されません。

21.3 レコードの消費

Kafka REST Proxyハンドラを使用して、Kafkaトピックからデータを消費する簡単な方法はカールです。

JSONデータの消費

  1. JSONデータのコンシューマを作成します。

    curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json"  
    
    https://localhost:8082/consumers/my_json_consumer
  2. トピックをサブスクライブします。

    curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json"    --data '{"topics":["topicname"]}' \
    
    https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
  3. レコードを消費します。

    curl –k -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
    
    https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
    

Avroデータの消費

  1. Avroデータのコンシューマを作成します。

    curl -k -X POST  -H "Content-Type: application/vnd.kafka.v2+json" \
     --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
    
    https://localhost:8082/consumers/my_avro_consumer
  2. トピックをサブスクライブします。

    curl –k -X POST -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["topicname"]}' \
    
    https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
  3. レコードを消費します。

    curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
    
    https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records

ノート:

RESTプロキシをホストしているマシンからcurlを使用している場合は、メッセージを使用する前に、http_proxy環境変数の設定を解除します。ローカル・マシンからcurlを使用してKafka RESTプロキシからメッセージを取得する場合は、http_proxy環境変数の設定が必要になります。

21.4 パフォーマンスに関する考慮事項

Oracle GoldenGate for Big Dataの構成とKafkaプロデューサのパフォーマンスの両方に影響を与える複数の構成設定があります。

Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPSパラメータです。これにより、Replicatは複数のソース・トランザクションを1つのターゲット・トランザクションにグループ化できます。トランザクションのコミット時に、Kafka REST Proxyハンドラは、KafkaプロデューサにデータをPOSTします。

ReplicatのGROUPTRANSOPSを大きい値に設定すると、ReplicatがPOSTを呼び出す頻度が少なくなり、パフォーマンスが向上します。GROUPTRANSOPSのデフォルト値は1000で、値を2500、5000、さらには10000に増やすことでパフォーマンスが向上します。

21.5 Kafka REST Proxyハンドラのメタ列テンプレート・プロパティ

Kafka REST Proxyサーバーを起動する際の問題

Kafka RESTプロキシ・サーバーを起動するスクリプトは、そのCLASSPATHを環境CLASSPATH変数に追加します。これが設定され、環境CLASSPATHにKafka RESTプロキシ・サーバーの正常な実行と競合するJARファイルが含まれていた場合、起動できなくなります。Kafka RESTプロキシ・サーバーを起動する前に、CLASSPATH環境変数の設定を解除することをお薦めします。CLASSPATH""に再設定すると、この問題を解決できます。