19 Kafkaハンドラの使用

Kafkaハンドラは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データをストリーミングするように設計されています。

この章では、Kafkaハンドラの使用方法について説明します。

19.1 概要

Oracle GoldenGate for Big Data Kafka Handlerは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データをストリーミングします。また、Kafkaハンドラには、メッセージを別のスキーマ・トピックに公開する機能もあります。AvroおよびJSONのスキーマの公開がサポートされています。

Apache Kafkaは、オープン・ソースで、パーティション化および複製された分散型のメッセージング・サービスです。http://kafka.apache.org/を参照してください。

Kafkaは、単一インスタンスとしても、複数サーバー上のクラスタとしても実行できます。Kafkaの各サーバー・インスタンスは、ブローカと呼ばれます。Kafkaにおけるトピックは、メッセージがプロデューサによって公開され、コンシューマによって取得されるときのカテゴリ、またはフィード名です。

Kafkaでは、トピック名が完全修飾のソース表名に対応する場合、KafkaハンドラがKafkaプロデューサを実装します。Kafkaプロデューサは、シリアライズされたチェンジ・データ・キャプチャを、複数のソース表から1つの構成されたトピックまたは分離したソース操作のいずれか、別のKafkaトピックに書き込みます。

19.2 詳細な機能

トランザクション・モードと操作モード

Kafkaハンドラは、Kafka ProducerRecordクラスのインスタンスをKafkaプロデューサAPIに送信し、次にそれがProducerRecordをKafkaトピックに公開します。Kafka ProducerRecordは、実質的に1つのKafkaメッセージの実装です。ProducerRecordには、キーと値の2つのコンポーネントがあります。キーと値のどちらも、Kafkaハンドラによってバイト配列として表されます。この項では、Kafkaハンドラがデータを公開する方法について説明します。

トランザクション・モード

次の構成は、Kafkaハンドラをトランザクション・モードに設定します。

gg.handler.name.Mode=tx

トランザクション・モードでは、ソースOracle GoldenGate証跡ファイルからのトランザクションにおける各操作のシリアライズ・データが連結されます。連結された操作データの内容は、KafkaのProducerRecordオブジェクトの値です。Kafka ProducerRecordオブジェクトのキーはNULLです。結果として、Kafkaメッセージには1からNまでの操作のデータが含まれます。Nは、トランザクションにおける操作の数です。

グループ化されたトランザクションの場合、すべての操作のすべてのデータが1つのKafkaメッセージに連結されます。そのため、グループ化されたトランザクションは、大量の操作のデータを含む非常に大きいKafkaメッセージになる可能性があります。

操作モード

次の構成は、Kafkaハンドラを操作モードに設定します。

gg.handler.name.Mode=op

操作モードでは、各操作のシリアライズ・データは個々のProducerRecordオブジェクトに値として配置されます。ProducerRecordキーは、ソース操作の完全修飾表名です。ProducerRecordは、KafkaプロデューサAPIを使用してすぐに送信されます。つまり、着信操作と、生成されるKafkaメッセージの数との間には1対1の関係があります。

ブロック・モードと非ブロック・モード

Kafkaハンドラは、ブロック・モード(同期)または非ブロック・モード(非同期)で、Kafkaにメッセージを送信できます。

Blocking Mode

次の構成プロパティは、Kafkaハンドラをブロック・モードに設定します。

gg.handler.name.BlockingSend=true

メッセージは同期ベースでKafkaに配信されます。Kafkaハンドラは、現在のメッセージが目的のトピックに書き込まれて確認が受信されるまで、次のメッセージを送信しません。ブロック・モードでは、メッセージ配信が最も確実に保証されますが、パフォーマンスは下がります。

ブロック・モードのときは、Kafkaプロデューサにlinger.ms変数を設定しないでください。設定すると、KafkaプロデューサはKafkaブローカにメッセージを送信する前にタイムアウト期間の間ずっと待機します。この場合、Kafkaブローカに送信されるメッセージをKafkaプロデューサがバッファすると同時に、メッセージが送信されたという確認を、Kafkaハンドラは待機しています。

Non-Blocking Mode

次の構成プロパティは、Kafkaハンドラを非ブロック・モードに設定します。

gg.handler.name.BlockingSend=false

メッセージは非同期的にKafkaに配信されます。Kafkaメッセージは、確認を待たずに順次送信されます。Kafkaプロデューサ・クライアントは、スループットを高めるために着信メッセージをバッファできます。

トランザクションがコミットされるたびに、すべての未処理メッセージがKafkaクラスタに確実に転送されるように、Kafkaプロデューサに対するフラッシュ・コールを呼び出します。そのため、Kafkaハンドラは安全なチェックポイントでデータの損失をゼロにします。Kafkaプロデューサに対するフラッシュ・コールの呼出しは、linger.msの持続期間による影響を受けません。そのため、Kafkaハンドラは安全なチェックポイントでデータの損失をゼロにします。

KafkaプロデューサがデータをKafkaブローカにフラッシュするタイミングは、Kafkaプロデューサ構成ファイルにある変更可能な多数の構成プロパティによって制御できます。Kafkaプロデューサによるメッセージの一括送信を有効にするには、batch.sizelinger.msの両方のKafkaプロデューサ・プロパティを設定する必要があります。Kafkaへの送信前にバッファする最大バイト数はbatch.sizeによって制御され、データ送信前に待機する最大時間(ミリ秒)はlinger.ms変数によって制御されます。batch.sizeに達するか、linger.msの時間が経過するか、どちらか早いほうの条件が満たされると、データはKafkaに送信されます。batch.size変数のみを設定すると、メッセージはただちにKafkaに送信されます。

トピック名の選択

トピックは、この構成パラメータを使用して実行時に解決されます。

gg.handler.topicMappingTemplate 

静的な文字列、キーワード、または静的な文字列とキーワードの組合せを構成し、実行時にそのときの操作コンテキストに基づいてトピック名を動的に解決できます(「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照)。

Kafkaブローカ設定

トピックを自動的に作成するように構成するには、auto.create.topics.enableプロパティをtrueに設定します。これがデフォルトの設定です。

auto.create.topics.enableプロパティがfalseに設定されている場合は、Replicatプロセスを開始する前にトピックを手動で作成する必要があります。

スキーマの伝播

すべての表のスキーマ・データが、schemaTopicNameプロパティで構成されたスキーマ・トピックに提供されます。詳細は、「スキーマの伝播」を参照してください。

19.3 Kafkaハンドラの設定および実行

ここでは、Kafkaハンドラのコンポーネントの構成およびハンドラの実行について説明します。

Kafkaをインストールし、単一ノードまたはクラスタ化されたインスタンスとして正しく構成する必要があります。http://kafka.apache.org/documentation.htmlを参照してください。

Apache Kafka以外のKafka配布を使用する場合、インストールおよび構成の詳細は、そのKafka配布に関するドキュメントを参照してください。

KafkaおよびKafkaブローカ(単数または複数)の前提条件となるコンポーネントであるZookeeperが稼働している必要があります。

データ・トピックとスキーマ・トピック(該当する場合)は、実行中のKafkaブローカで事前に構成されていることを、ベスト・プラクティスとしてお薦めします。Kafkaトピックを動的に作成できます。ただし、これはKafkaブローカが動的トピックを許可するように構成されている必要があります。

Kafkaブローカが、Kafkaハンドラ・プロセスと同じ場所に配置されていない場合は、Kafkaハンドラを実行しているマシンから、リモートのホスト・ポートにアクセスできる必要があります。

19.3.1 クラスパス構成

KafkaハンドラをKafkaに接続して実行するには、Kafkaプロデューサ・プロパティ・ファイルおよびKafkaクライアントJARが、gg.classpath構成変数で構成されている必要があります。KafkaクライアントJARは、Kafkaハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、「Kafkaハンドラ・クライアント依存性」を参照してください。

Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprmディレクトリです。

KafkaクライアントJARのデフォルトの場所は、Kafka_Home/libs/*です。

gg.classpathは、正確に構成する必要があります。Kafkaプロデューサ・プロパティ・ファイルのパスには、ワイルドカードが付加されていないパスを使用してください。Kafkaプロデューサ・プロパティ・ファイルのパスにワイルドカード(*)を含めると、このファイルが選択されません。逆に、依存関係JARのパスには、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*)を含める必要があります。*.jarは使用しないでください。正しく構成されたクラスパスの例を次に示します。

gg.classpath={kafka install dir}/libs/*

19.3.2 Kafkaハンドラ構成

Kafkaハンドラの設定可能な値は次のとおりです。これらのプロパティは、Javaアダプタ・プロパティ・ファイルにあります(Replicatプロパティ・ファイルにはありません)。

Kafkaハンドラの選択を有効にするには、まずgg.handler.namr.type=kafkaおよびその他のKafkaプロパティを次のように指定してハンドラ・タイプを構成する必要があります。

表19-1 Kafkaハンドラの構成プロパティ

プロパティ名 必須/オプション プロパティ値 デフォルト 説明

gg.handlerlist

必須

name (任意の名前を選択)

なし

使用するハンドラのリスト。

gg.handler.name.type

必須

kafka

なし

使用するハンドラのタイプ。

gg.handler.name.KafkaProducerConfigFile

オプション

任意のカスタム・ファイル名

kafka-producer-default.properties

クラスパスにあるファイル名は、Apache Kafkaプロデューサを構成するApache Kafkaプロパティを保持します。

gg.handler.name.Format

オプション

フォーマッタ・クラスまたはショート・コード

delimitedtext

ペイロードのフォーマットに使用するフォーマッタ。xmldelimitedtextjsonjson_rowavro_rowavro_opのいずれかです

gg.handler.name.SchemaTopicName

スキーマ配信が必要な場合に必須。

スキーマ・トピックの名前

なし

スキーマ・データが配信されるトピック名。このプロパティを設定する場合、スキーマは伝播されません。スキーマが伝播されるのは、Avroフォーマッタの場合のみです。

gg.handler.name.SchemaPrClassName

オプション

Oracle GoldenGate for Big Data Kafka HandlerのCreateProducerRecord Javaインタフェースを実装するカスタム・クラスの完全修飾クラス名

この実装クラスを提供: oracle.goldengate.handler.kafka

ProducerRecord

スキーマはProducerRecordとして伝播もされます。デフォルトのキーは、完全修飾の表名です。これをスキーマ・レコードから変更する必要がある場合は、CreateProducerRecordインタフェースのカスタム実装を作成する必要があり、このプロパティは新しいクラスの完全修飾名を示すように設定する必要があります。

gg.handler.name.BlockingSend

オプション

true | false

false

このプロパティをtrueに設定すると、Kafkaへの配信は完全に同期モデルで動作します。次のペイロードは、現在のペイロードが目的のトピックに書き込まれ、確認が受信されてから送信されます。そのための、トランザクション・モードでは厳密に1回のセマンティックが提供されます。このプロパティをfalseに設定すると、Kafkaへの配信は非同期モデルで動作します。ペイロードは、確認を待たずに順次送信されます。Kafkaの内部キューでコンテンツをバッファすると、スループットを増やすことができます。チェックポイントは、Javaコールバックを使用してKafkaブローカから確認が受信されたときにのみ作成されます。

gg.handler.name.mode

オプション

tx/op

tx

Kafkaハンドラが操作モードの場合、変更キャプチャ・データ・レコード(Insert、Update、Deleteなど)の各ペイロードはKafkaプロデューサ・レコードとして表され、一度に1つずつフラッシュされます。Kafkaハンドラがトランザクション・モードの場合、ソース・トランザクション内の操作はすべて、1つのKafkaプロデューサ・レコードとして表されます。組み合されたこのバイト・ペイロードが、トランザクション・コミット・イベント時にフラッシュされます。

gg.handler.name.topicMappingTemplate

必須

実行時にKafkaトピック名を解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.keyMappingTemplate

必須

実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.hander.name.logSuccessfullySentMessages

オプション

true | false

true

trueに設定すると、Kafkaハンドラは、Kafkaに正常に送信されたメッセージをINFOレベルで記録します。このプロパティを有効にすると、パフォーマンスに悪影響を与えます。

gg.handler.name.metaHeadersTemplate オプション メタ列キーワードのカンマ区切りリスト。 なし メタ列を選択し、メタ列キーワード構文を使用してコンテキスト・ベースのキー値ペアをKafkaメッセージ・ヘッダーに挿入できます。

19.3.3 Javaアダプタ・プロパティ・ファイル

アダプタ・プロパティ・ファイルからのKafkaハンドラのサンプル構成を次に示します。

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.Type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=oggtopic
gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp}
gg.handler.kafkahandler.Format = avro_op
gg.handler.kafkahandler.SchemaTopicName = oggSchemaTopic
gg.handler.kafkahandler.SchemaPrClassName = com.company.kafkaProdRec.SchemaRecord
gg.handler.kafkahandler.Mode = tx
gg.handler.kafkahandler.BlockingSend = true

Kafka統合のサンプルのReplicat構成とJavaアダプタ・プロパティ・ファイルの例は、次のディレクトリにあります。

GoldenGate_install_directory/AdapterExamples/big-data/kafka

19.3.4 Kafkaプロデューサ構成ファイル

Kafkaハンドラは、メッセージをKafkaに公開するために、Kafkaプロデューサ構成ファイルにアクセスする必要があります。Kafkaプロデューサ構成ファイルのファイル名は、Kafkaハンドラ・プロパティの次の構成によって制御されます。

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

Kafkaハンドラは、Javaのクラスパスを使用してKafkaプロデューサ構成ファイルを検索およびロードしようとします。したがって、Javaのクラスパスに、Kafkaプロデューサ構成ファイルがあるディレクトリが含まれている必要があります。

Kafkaプロデューサ構成ファイルには、Kafka専有のプロパティがあります。0.8.2.0のKafkaプロデューサ・インタフェース・プロパティについては、Kafkaドキュメントに構成情報が記載されています。Kafkaハンドラは、これらのプロパティを使用してKafkaブローカのホストおよびポートを解決し、Kafkaプロデューサ・クライアントとKafkaブローカの間の対話の動作は、Kafkaプロデューサ構成ファイルにあるプロパティによって制御されます。

Kafkaプロデューサのサンプル構成ファイルは、次のようになります。

bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
 
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 0
max.request.size = 1048576 
send.buffer.bytes = 131072

19.3.5 テンプレートを使用したトピック名とメッセージ・キーの解決

Kafkaハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成プロパティを使用します。

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

テンプレートのモード

ソース・データベース・トランザクションは、それぞれが挿入、更新および削除である、1つ以上の個別の操作から構成されます。Kafkaハンドラは、操作ごとに1つのメッセージ(挿入、更新、削除)を送信するように構成することも、トランザクション・レベルで操作をメッセージにグループ化するように構成することもできます。テンプレート・キーワードの多くは、個々のソース・データベース操作のコンテキストに基づいてデータを解決します。したがって、トランザクション・レベルでメッセージを送信する場合には、キーワードの多くは機能しません。たとえば、トランザクション・レベルでメッセージを送信する場合は、${fullyQualifiedTableName}は機能せずに、操作のための修飾されたソース表名に解決されます。ただし、トランザクションには、多くのソース表に対する複数の操作を含めることができます。トランザクション・レベルでのメッセージの完全修飾表名の解決は、非決定的であるため、実行時に異常終了します。

テンプレートのキーワード

この表には、トランザクション・レベルのメッセージでそのキーワードがサポートされているかどうかを示す列が含まれています。

キーワード 説明 トランザクション・メッセージのサポート

${fullyQualifiedTableName}

カタログ、スキーマおよび表名の間にピリオド(.)のデリミタを含む、完全修飾表名に解決されます。

たとえば、test.dbo.table1です。

いいえ

${catalogName}

カタログ名に解決されます。

いいえ

${schemaName}

スキーマ名に解決されます。

いいえ

${tableName}

短い表名に解決されます。

いいえ

${opType}

操作の種類(INSERTUPDATEDELETEまたはTRUNCATE)に解決されます

いいえ

${primaryKeys}

アンダースコア(_)で区切られた連結主キー値に解決されます。

いいえ

${position}

ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。

はい

${opTimestamp}

ソース証跡ファイルからの操作タイムスタンプ。

はい

${emptyString}

""に解決されます。

はい

${groupName}

Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。

はい

${staticMap[]}

キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。

${staticMap[dbo.table1=value1,dbo.table2=value2]}

いいえ

${columnValue[]}

キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。たとえば:

${staticMap[dbo.table1=col1,dbo.table2=col2]}

いいえ

${currentTimestamp}

または

${currentTimestamp[]}

現在のタイムスタンプに解決されます。SimpleDateFormatクラスで説明されているJavaベースの書式設定を使用して、現在のタイムスタンプの書式を制御できます。https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.htmlを参照してください

例:

${currentDate}
${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

はい

${null}

NULL文字列に解決されます。

はい

${custom[]}

カスタム値リゾルバを記述することが可能です。必要に応じて、Oracleサポートに連絡してください。

実装依存

${token[]} トークン値を解決します。 いいえ

テンプレートの例

テンプレートの構成値の例と解決される値を次に示します。

テンプレートの例 解決される値

${groupName}_{fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

19.3.6 KerberosでのKafkaの構成

次のステップに従って、Kerberosを使用してKafkaハンドラReplicatを構成し、ClouderaインスタンスがKafkaトピックに対するOracle GoldenGate for Big Dataの証跡を処理できるようにします。

  1. GGSCIで、Kafka Replicatを追加します。

    GGSCI> add replicat kafka, exttrail dirdat/gg
  2. 次のプロパティを使用してprmファイルを構成します。

    replicat kafka
    discardfile ./dirrpt/kafkax.dsc, purge
    SETENV (TZ=PST8PDT)
    GETTRUNCATES
    GETUPDATEBEFORES
    ReportCount Every 1000 Records, Rate
    MAP qasource.*, target qatarget.*;
  3. 次のようにReplicatプロパティ・ファイルを構成します。

    ###KAFKA Properties file ###
    gg.log=log4j
    gg.log.level=info
    gg.report.time=30sec
    
    ###Kafka Classpath settings ###
    gg.classpath=/opt/cloudera/parcels/KAFKA-2.1.0-1.2.1.0.p0.115/lib/kafka/libs/*
    jvm.bootoptions=-Xmx64m -Xms64m -Djava.class.path=./ggjava/ggjava.jar -Dlog4j.configuration=log4j.properties -Djava.security.auth.login.config=/scratch/ydama/ogg/v123211/dirprm/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf
    
    javawriter.stats.full=TRUE
    javawriter.stats.display=TRUE
    
    ### native library config ###
    goldengate.userexit.nochkpt=TRUE
    goldengate.userexit.timestamp=utc
    
    ### Kafka handler properties ###
    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=kafka-producer.properties
    gg.handler.kafkahandler.format=delimitedtext
    gg.handler.kafkahandler.format.PkUpdateHandling=update
    gg.handler.kafkahandler.mode=tx
    gg.handler.kafkahandler.format.includeCurrentTimestamp=false
    #gg.handler.kafkahandler.maxGroupSize=100
    #gg.handler.kafkahandler.minGroupSize=50
    gg.handler.kafkahandler.format.fieldDelimiter=|
    gg.handler.kafkahandler.format.lineDelimiter=CDATA[\n]
    gg.handler.kafkahandler.topicMappingTemplate=myoggtopic
    gg.handler.kafkahandler.keyMappingTemplate=${position}
  4. 次のプロパティを使用してKafka Producerファイルを構成します。

    bootstrap.servers=10.245.172.52:9092
    acks=1
    #compression.type=snappy
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=1024
    linger.ms=2000
    
    security.protocol=SASL_PLAINTEXT
    
    sasl.kerberos.service.name=kafka
    sasl.mechanism=GSSAPI
  5. 次のプロパティを使用してjaas.confファイルを構成します。

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/scratch/ydama/ogg/v123211/dirtmp/keytabs/slc06unm/kafka.keytab"
    principal="kafka/slc06unm.us.oracle.com@HADOOPTEST.ORACLE.COM";
    };
  6. 保護されたKafkaトピックにClouderaインスタンスから接続するための最新のkey.tabファイルがあることを確認します。

  7. GGSCIからReplicatを起動し、INFO ALLで実行されていることを確認します。

  8. Replicatレポートで、処理されたレコードの合計数を確認します。レポートは次のようになります。

    Oracle GoldenGate for Big Data, 12.3.2.1.1.005
    
    Copyright (c) 2007, 2018. Oracle and/or its affiliates. All rights reserved
    
    Built with Java 1.8.0_161 (class version: 52.0)
    
    2018-08-05 22:15:28 INFO OGG-01815 Virtual Memory Facilities for: COM
    anon alloc: mmap(MAP_ANON) anon free: munmap
    file alloc: mmap(MAP_SHARED) file free: munmap
    target directories:
    /scratch/ydama/ogg/v123211/dirtmp.
    
    Database Version:
    
    Database Language and Character Set:
    
    ***********************************************************************
    ** Run Time Messages **
    ***********************************************************************
    
    
    2018-08-05 22:15:28 INFO OGG-02243 Opened trail file /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000 at 2018-08-05 22:15:28.258810.
    
    2018-08-05 22:15:28 INFO OGG-03506 The source database character set, as determined from the trail file, is UTF-8.
    
    2018-08-05 22:15:28 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTMER1", target qatarget."BDCUSTMER1".
    
    2018-08-05 22:15:28 INFO OGG-02756 The definition for table QASOURCE.BDCUSTMER1 is obtained from the trail file.
    
    2018-08-05 22:15:28 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, NAME, CITY, STATE.
    
    2018-08-05 22:15:28 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTMER1: CUST_CODE.
    
    2018-08-05 22:15:29 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTORD1", target qatarget."BDCUSTORD1".
    
    2018-08-05 22:15:29 INFO OGG-02756 The definition for table QASOURCE.BDCUSTORD1 is obtained from the trail file.
    
    2018-08-05 22:15:29 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID, PRODUCT_PRICE, PRODUCT_AMOUNT, TRANSACTION_ID.
    
    2018-08-05 22:15:29 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTORD1: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID.
    
    2018-08-05 22:15:33 INFO OGG-01021 Command received from GGSCI: STATS.
    
    2018-08-05 22:16:03 INFO OGG-01971 The previous message, 'INFO OGG-01021', repeated 1 times.
    
    2018-08-05 22:43:27 INFO OGG-01021 Command received from GGSCI: STOP.
    
    ***********************************************************************
    * ** Run Time Statistics ** *
    ***********************************************************************
    
    Last record for the last committed transaction is the following:
    ___________________________________________________________________
    Trail name : /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000
    Hdr-Ind : E (x45) Partition : . (x0c)
    UndoFlag : . (x00) BeforeAfter: A (x41)
    RecLength : 0 (x0000) IO Time : 2015-08-14 12:02:20.022027
    IOType : 100 (x64) OrigNode : 255 (xff)
    TransInd : . (x03) FormatType : R (x52)
    SyskeyLen : 0 (x00) Incomplete : . (x00)
    AuditRBA : 78233 AuditPos : 23968384
    Continued : N (x00) RecCount : 1 (x01)
    
    2015-08-14 12:02:20.022027 GGSPurgedata Len 0 RBA 6473
    TDR Index: 2
    ___________________________________________________________________
    
    Reading /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000, current RBA 6556, 20 records, m_file_seqno = 0, m_file_rba = 6556
    
    Report at 2018-08-05 22:43:27 (activity since 2018-08-05 22:15:28)
    
    From Table QASOURCE.BDCUSTMER1 to qatarget.BDCUSTMER1:
    # inserts: 5
    # updates: 1
    # deletes: 0
    # discards: 0
    From Table QASOURCE.BDCUSTORD1 to qatarget.BDCUSTORD1:
    # inserts: 5
    # updates: 3
    # deletes: 5
    # truncates: 1
    # discards: 0
    
    
  9. セキュアKafkaトピックが作成されていることを確認します。

    /kafka/bin/kafka-topics.sh --zookeeper slc06unm:2181 --list  
    myoggtopic
  10. セキュアKafkaピックの内容を確認します。

    1. 次の内容のconsumer.propertiesファイルを作成します。

      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
    2. 次の環境変数を設定します。

      export KAFKA_OPTS="-Djava.security.auth.login.config="/scratch/ogg/v123211/dirprm/jaas.conf"
      
    3. コンシューマ・ユーティリティを実行してレコードをチェックします。

      /kafka/bin/kafka-console-consumer.sh --bootstrap-server sys06:9092 --topic myoggtopic --new-consumer --consumer.config consumer.properties

19.3.7 KafkaのSSLのサポート

Kafkaは、KafkaクライアントとKafkaクラスタの間のSSL接続をサポートしています。SSL接続により、クライアントとサーバーの間で転送されるメッセージの認証と暗号化の両方が可能になります。

SSLはサーバー認証用(クライアントがサーバーを認証します)に構成できますが、通常は相互認証用(クライアントとサーバーの両方が相互に認証します)に構成されます。SSL相互認証では、接続の各片側がキーストアから証明書を取得し、それを接続のもう一方の側に渡します。これにより、証明書がトラストストア内の証明書に対して検証されます。
SSLを設定する場合、実行している特定のKafkaバージョンの詳細は、Kafkaのドキュメントを参照してください。Kafkaのドキュメントには、次の実行方法に関する情報も記載されています。
  • SSL用のKafkaクラスタの設定
  • キーストア/トラストストア・ファイルでの自己署名証明書の作成
  • SSL用のKafkaクライアントの構成
Kafkaプロデューサおよびコンシューマ・コマンドライン・ユーティリティを使用してSSL接続を実装してから、Oracle GoldenGate for Big Dataで使用することをお薦めします。Oracle GoldenGate for Big DataをホストするマシンとKafkaクラスタの間のSSL接続を確認する必要があります。このアクションにより、Oracle GoldenGate for Big Dataを導入する前に、SSL接続が正しく設定され、機能していることを確認できます。
SSL相互認証を使用したKafkaプロデューサ構成の例を次に示します。
bootstrap.servers=localhost:9092
acks=1
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
security.protocol=ssl
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234

19.4 スキーマの伝播

Kafkaハンドラには、スキーマをスキーマ・トピックに公開する機能があります。現在、スキーマ公開が有効なのは、Avro行フォーマッタとAvro操作フォーマッタのみです。KafkaハンドラのschemaTopicNameプロパティが設定されている場合、スキーマは次のイベントで公開されます。

  • 特定の表のAvroスキーマは、その表に対する最初の操作が発生するときに公開されます。

  • Kafkaハンドラがメタデータ変更イベントを受信すると、スキーマはフラッシュされます。特定の表について再生成されるAvroスキーマは、その表に対する次の操作が発生するときに公開されます。

  • Avroのラッピング機能を有効にしている場合、汎用のラッパーAvroスキーマは、任意の操作が最初に発生するときに公開されます。汎用のラッパーを有効にする(Avroフォーマッタの構成でAvroスキーマの機能を有効にする)には、「Avro行フォーマッタ」および「Avro操作フォーマッタ」を参照してください。

KafkaのProducerRecord値はスキーマになり、キーは完全修飾の表名になります。

AvroメッセージはAvroスキーマに直接依存しているため、Kafka上のAvroのユーザーには問題が発生することがあります。Avroメッセージはバイナリなので、人間が読める形式ではありません。Avroメッセージをデシリアライズするには、まず受信者に正しいAvroスキーマが必要ですが、ソース・データベースからの各表が個々のAvroスキーマになるため、困難な場合があります。ソースOracle GoldenGate証跡ファイルに複数の表からの操作が含まれる場合、Kafkaメッセージの受信者が、個々のメッセージをデシリアライズするためにどのAvroスキーマを使用するかを判断することはできません。この問題を解決するには、特殊なAvroメッセージを汎用のAvroメッセージ・ラッパーでラップできます。この汎用のAvroラッパーを使用すれば、完全修飾の表名、スキーマ文字列のハッシュコード、ラップされたAvroメッセージを確認することができます。受信者は、完全修飾の表名と、スキーマ文字列のハッシュコードを使用して、ラップされたメッセージに関連付けられたスキーマを解決し、そのスキーマを使用して、ラップされたメッセージをデシリアライズできます。

19.5 パフォーマンスに関する考慮事項

gg.handler.name.BlockingSendtrueに設定されている場合、Kafkaプロデューサconfigファイルでlinger.ms設定を使用しないことをお薦めします。これにより、Kafkaハンドラ構成とKafkaプロデューサ構成が競合するため、ブロックへの送信のたびに、少なくともlinger.msの値が発生し、パフォーマンスに大きく影響します。このような構成の結果、Kafkaハンドラが送信確認の受信を待機する一方で、Kafkaプロデューサは送信前に次のメッセージを待機することになり、一時的なデッドロックが発生します。このデッドロックは、linger.msの時間が経過すると解決されます。この動作が、送信されるメッセージのたびに繰り返されます。

最高のパフォーマンスを得るためには、Kafkaプロデューサへの非ブロック(非同期)コールを使用して、操作モードで動作するようにKafkaハンドラを設定することをお薦めします。Javaアダプタ・プロパティ・ファイルで次の構成を使用します。

gg.handler.name.mode = op
gg.handler.name.BlockingSend = false

また、Kafkaプロデューサ・プロパティ・ファイルでbatch.sizeとlinger.msの値も設定することをお薦めします。これらの値は、ユースケース別の状況によって大きく異なります。通常、値が大きいほどスループットは向上しますが、レイテンシが大きくなります。これらのプロパティの値を小さくすると、レイテンシは小さくなりますが、全体的なスループットは低下します。

Replicat変数GROUPTRANSOPSを使用しても、パフォーマンスは向上します。推奨される設定は10000です。

ソース証跡ファイルからシリアライズした操作を個々のKafkaメッセージで配信する必要がある場合、Kafkaハンドラは操作モードに設定されている必要があります。

gg.handler.name.mode = op

19.6 セキュリティについて

Kafkaバージョン0.9.0.0で、SSL/TLSおよびSASL(Kerberos)を介したセキュリティが導入されました。SSL/TLSおよびSASLセキュリティ製品の一方または両方を使用してKafkaハンドラを保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを使用する統合からのセキュリティ機能の抽象化を提供します。Kafkaハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサ・プロパティ・ファイルを必要なセキュリティ・プロパティを使用して構成する必要があります。Kafkaクラスタの保護に関する詳細は、次の場所にあるKafkaドキュメントを参照してください。

keytabファイルからKerberosパスワードを復号化できない場合があります。これによって、Kerberos認証が対話型モードにフォール・バックされ、プログラムで呼び出されているため機能しなくなります。この問題の原因は、Java Cryptography Extension (JCE)が、Java Runtime Environment (JRE)にインストールされていないことです。JCEがJREにロードされていることを確認します。http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.htmlを参照してください。

19.7 メタデータ変更イベント

メタデータ変更イベントが、Kafkaハンドラによって処理されるようになりました。これが関係するのは、スキーマ・トピックを構成しており、使用するフォーマッタがスキーマ伝播をサポートしている場合のみです(現在は、Avro行フォーマッタとAvro操作フォーマッタ)。スキーマが変更されている表で次に操作が発生するとき、更新されたスキーマがスキーマ・トピックに公開されます。

メタデータ変更イベントをサポートするには、ソース・データベースで変更を取得するOracle GoldenGateプロセスが、証跡機能でのOracle GoldenGateメタデータ(Oracle GoldenGate 12c (12.2)で導入)をサポートする必要があります。

19.8 Snappyに関する考慮事項

Kafkaプロデューサ構成ファイルは、圧縮の使用をサポートしています。構成可能なオプションの1つにSnappyがあり、これは、オープン・ソースの圧縮および圧縮解除(codec)ライブラリで、他のcodecライブラリよりパフォーマンスが優れています。Snappy JARはすべてのプラットフォームで実行されるわけではありません。Snappyは、Linuxシステムでは動作するようですが、他のUNIXおよびWindowsの実装では動作する場合としない場合があります。Snappyの圧縮を使用する場合は、Snappyを使用する圧縮を実装する前に、必要なシステムすべてでSnappyをテストします。必要なシステムの一部にSnappyが対応しない場合は、別のcodecライブラリを使用することをお薦めします。

19.9 Kafkaインターセプタのサポート

Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。

インターセプタの一般的な使用例は、監視です。Kafkaプロデューサ・インターセプタは、インタフェース org.apache.kafka.clients.producer.ProducerInterceptorに準拠している必要があります。Kafkaハンドラは、プロデューサ・インターセプタの使用をサポートしています。

ハンドラでインターセプタを使用する場合の要件は次のとおりです。

  • Kafkaプロデューサ構成プロパティ"interceptor.classes"は、起動するインターセプタのクラス名で構成する必要があります。
  • インターセプタを呼び出すためには、jarファイルとすべての依存関係jarをJVMで使用できる必要があります。したがって、インターセプタを含むjarファイルとすべての依存関係jarを、ハンドラ構成ファイルのgg.classpathに追加する必要があります。

    詳細は、Kafkaのドキュメントを参照してください。

19.10 Kafkaパーティションの選択

Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。

(Kafkaメッセージ・キーのハッシュ)係数(パーティションの数) = 選択したパーティション番号

Kafkaメッセージ・キーは、次の構成値によって選択されます。

gg.handler.{your handler name}.keyMappingTemplate=

このパラメータが静的キーを生成する値に設定されている場合、すべてのメッセージは同じパーティションに送信されます。静的キーの例を次に示します。

gg.handler.{your handler name}.keyMappingTemplate=StaticValue

このパラメータが、頻繁に変更されないキーを生成する値に設定されている場合、パーティション選択の変更頻度は低くなります。次の例では、表名をメッセージ・キーとして使用します。特定のソース表に対するすべての操作は同じキーを持つため、同じパーティションにルーティングされます。

gg.handler.{your handler name}.keyMappingTemplate=${tableName}
nullのKafkaメッセージ・キーはラウンドロビン・ベースでパーティションに分散されます。これを行うには、次のように設定します。
gg.handler.{your handler name}.keyMappingTemplate=${null}

マッピング・キーの構成に推奨される設定は次のとおりです。

gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}

これにより、連結および区切られた主キー値であるKafkaメッセージ・キーが生成されます。

各行の操作には一意の主キーが必要であるため、各行に一意のKafkaメッセージ・キーが生成されます。別の重要な考慮事項として、異なるパーティションに送信されたKafkaメッセージは、送信された元の順序でKafkaコンシューマに配信される保証はありません。これはKafkaの仕様の一部です。順序はパーティション内でのみ維持されます。Kafkaメッセージ・キーとして主キーを使用すると、同じ主キーを持つ同じ行に対する操作で同じKafkaメッセージ・キーが生成されるため、同じKafkaパーティションに送信されます。このようにして、同じ行に対する操作の順序が維持されます。

DEBUGログ・レベルでは、Kafkaメッセージの座標(トピック、パーティションおよびオフセット)は、正常に送信されたメッセージの.logファイルに記録されます。

19.11 トラブルシューティング

19.11.1 Kafka設定の検証

コマンドラインのKafkaプロデューサを使用すると、ダミー・データをKafkaトピックに書き込むことができ、Kafkaコンシューマを使用してこのデータをKafkaトピックから読み取ることができます。この方法を使用して、ディスク上でのKafkaトピックの設定および読取り/書込み権限を確認します。http://kafka.apache.org/documentation.html#quickstartを参照してください。

19.11.2 クラスパスの問題

Javaクラスパスの問題はよくある問題です。そのような問題には、log4jログ・ファイルのClassNotFoundExceptionの問題や、gg.classpath変数の入力ミスによるクラスパスの解決エラーなどがあります。Kafkaクライアント・ライブラリは、Oracle GoldenGate for Big Data製品に付属しません「クラスパス構成」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタ・プロパティ・ファイルでgg.classpathプロパティを適切に構成する必要があります。

19.11.3 無効なKafkaバージョン

Kafkaハンドラでは、Kafka 0.8.2.2以前のバージョンはサポートされていません。サポートされていないバージョンのKafkaを実行すると、実行時Java例外java.lang.NoSuchMethodErrorが発生します。これは、org.apache.kafka.clients.producer.KafkaProducer.flush()メソッドが見つからないことを意味します。このエラーが発生した場合は、Kafka 0.9.0.0以降のバージョンに移行します。

19.11.4 Kafkaプロデューサ・プロパティ・ファイルが見つからない

この問題は一般的に、次の例外として発生します。

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

gg.handler.kafkahandler.KafkaProducerConfigFile構成変数をチェックし、Kafkaプロデューサ構成ファイルの名前が正しく設定されていることを確認します。gg.classpath変数をチェックし、クラスパスにKafkaプロデューサ・プロパティ・ファイルのパスが含まれていることと、プロパティ・ファイルのパスの末尾にワイルドカード(*)が含まれていないことを確認します。

19.11.5 Kafkaの接続の問題

この問題は、KafkaハンドラがKafkaに接続できない場合に発生します。次の警告が表示されます。

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused

接続の再試行間隔が経過し、Kafkaハンドラ・プロセスは異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサ・プロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -lなど)を使用すると、想定するポートをKafkaがリスニングしていることを確認できます。