8.2.8 Apache Kafka

Kafkaハンドラは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データをストリーミングするように設計されています。

この章では、Kafkaハンドラの使用方法について説明します。

8.2.8.1 Apache Kafka

Kafkaハンドラは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データをストリーミングするように設計されています。

この章では、Kafkaハンドラの使用方法について説明します。

8.2.8.1.1 概要

Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)のKafkaハンドラでは、Oracle GoldenGate証跡からKafkaトピックに変更キャプチャ・データがストリーミングされます。また、Kafkaハンドラには、メッセージを別のスキーマ・トピックに公開する機能もあります。AvroおよびJSONのスキーマの公開がサポートされています。

Apache Kafkaは、オープン・ソースで、パーティション化および複製された分散型のメッセージング・サービスです。http://kafka.apache.org/を参照してください。

Kafkaは、単一インスタンスとしても、複数サーバー上のクラスタとしても実行できます。Kafkaの各サーバー・インスタンスは、ブローカと呼ばれます。Kafkaにおけるトピックは、メッセージがプロデューサによって公開され、コンシューマによって取得されるときのカテゴリ、またはフィード名です。

Kafkaでは、トピック名が完全修飾のソース表名に対応する場合、KafkaハンドラがKafkaプロデューサを実装します。Kafkaプロデューサは、シリアライズされたチェンジ・データ・キャプチャを、複数のソース表から1つの構成されたトピックまたは分離したソース操作のいずれか、別のKafkaトピックに書き込みます。

8.2.8.1.2 詳細な機能

トランザクション・モードと操作モード

Kafkaハンドラは、Kafka ProducerRecordクラスのインスタンスをKafkaプロデューサAPIに送信し、次にそれがProducerRecordをKafkaトピックに公開します。Kafka ProducerRecordは、実質的に1つのKafkaメッセージの実装です。ProducerRecordには、キーと値の2つのコンポーネントがあります。キーと値のどちらも、Kafkaハンドラによってバイト配列として表されます。この項では、Kafkaハンドラがデータを公開する方法について説明します。

トランザクション・モード

次の構成は、Kafkaハンドラをトランザクション・モードに設定します。

gg.handler.name.Mode=tx

トランザクション・モードでは、ソースOracle GoldenGate証跡ファイルからのトランザクションにおける各操作のシリアライズ・データが連結されます。連結された操作データの内容は、KafkaのProducerRecordオブジェクトの値です。Kafka ProducerRecordオブジェクトのキーはNULLです。結果として、Kafkaメッセージには1からNまでの操作のデータが含まれます。Nは、トランザクションにおける操作の数です。

グループ化されたトランザクションの場合、すべての操作のすべてのデータが1つのKafkaメッセージに連結されます。そのため、グループ化されたトランザクションは、大量の操作のデータを含む非常に大きいKafkaメッセージになる可能性があります。

操作モード

次の構成は、Kafkaハンドラを操作モードに設定します。

gg.handler.name.Mode=op

操作モードでは、各操作のシリアライズ・データは個々のProducerRecordオブジェクトに値として配置されます。ProducerRecordキーは、ソース操作の完全修飾表名です。ProducerRecordは、KafkaプロデューサAPIを使用してすぐに送信されます。つまり、着信操作と、生成されるKafkaメッセージの数との間には1対1の関係があります。

トピック名の選択

トピックは、この構成パラメータを使用して実行時に解決されます。

gg.handler.topicMappingTemplate 

静的な文字列、キーワード、または静的な文字列とキーワードの組合せを構成し、実行時にそのときの操作コンテキストに基づいてトピック名を動的に解決できます(「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照)。

Kafkaブローカ設定

トピックを自動的に作成するように構成するには、auto.create.topics.enableプロパティをtrueに設定します。これがデフォルトの設定です。

auto.create.topics.enableプロパティがfalseに設定されている場合は、Replicatプロセスを開始する前にトピックを手動で作成する必要があります。

スキーマの伝播

すべての表のスキーマ・データが、schemaTopicNameプロパティで構成されたスキーマ・トピックに提供されます。詳細は、「スキーマの伝播」を参照してください。

8.2.8.1.3 Kafkaハンドラの設定および実行

ここでは、Kafkaハンドラのコンポーネントの構成およびハンドラの実行について説明します。

Kafkaをインストールし、単一ノードまたはクラスタ化されたインスタンスとして正しく構成する必要があります。http://kafka.apache.org/documentation.htmlを参照してください。

Apache Kafka以外のKafka配布を使用する場合、インストールおよび構成の詳細は、そのKafka配布に関するドキュメントを参照してください。

KafkaおよびKafkaブローカ(単数または複数)の前提条件となるコンポーネントであるZookeeperが稼働している必要があります。

データ・トピックとスキーマ・トピック(該当する場合)は、実行中のKafkaブローカで事前に構成されていることを、ベスト・プラクティスとしてお薦めします。Kafkaトピックを動的に作成できます。ただし、これはKafkaブローカが動的トピックを許可するように構成されている必要があります。

Kafkaブローカが、Kafkaハンドラ・プロセスと同じ場所に配置されていない場合は、Kafkaハンドラを実行しているマシンから、リモートのホスト・ポートにアクセスできる必要があります。

8.2.8.1.3.1 クラスパス構成

KafkaハンドラをKafkaに接続して実行するには、Kafkaプロデューサ・プロパティ・ファイルおよびKafkaクライアントJARが、gg.classpath構成変数で構成されている必要があります。KafkaクライアントJARは、Kafkaハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、「Kafkaハンドラ・クライアント依存性」を参照してください。

Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprmディレクトリです。

KafkaクライアントJARのデフォルトの場所は、Kafka_Home/libs/*です。

gg.classpathは、正確に構成する必要があります。Kafkaプロデューサ・プロパティ・ファイルのパスには、ワイルドカードが付加されていないパスを使用してください。Kafkaプロデューサ・プロパティ・ファイルのパスにワイルドカード(*)を含めると、このファイルが選択されません。逆に、依存関係JARのパスには、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*)を含める必要があります。*.jarは使用しないでください。正しく構成されたクラスパスの例を次に示します。

gg.classpath={kafka install dir}/libs/*

8.2.8.1.3.2 Kafkaハンドラ構成

Kafkaハンドラの設定可能な値は次のとおりです。これらのプロパティは、Javaアダプタ・プロパティ・ファイルにあります(Replicatプロパティ・ファイルにはありません)

Kafkaハンドラの選択を有効にするには、まずgg.handler.namr.type=kafkaおよびその他のKafkaプロパティを次のように指定してハンドラ・タイプを構成する必要があります。

表8-9 Kafkaハンドラの構成プロパティ

プロパティ名 必須/オプション プロパティ値 デフォルト 説明

gg.handlerlist

必須

name (任意の名前を選択)

なし

使用するハンドラのリスト。

gg.handler.name.type

必須

kafka

なし

使用するハンドラのタイプ。

gg.handler.name.KafkaProducerConfigFile

オプション

任意のカスタム・ファイル名

kafka-producer-default.properties

クラスパスにあるファイル名は、Apache Kafkaプロデューサを構成するApache Kafkaプロパティを保持します。

gg.handler.name.Format

オプション

フォーマッタ・クラスまたはショート・コード

delimitedtext

ペイロードのフォーマットに使用するフォーマッタ。xmldelimitedtextjsonjson_rowavro_rowavro_opのいずれかです

gg.handler.name.SchemaTopicName

スキーマ配信が必要な場合に必須。

スキーマ・トピックの名前

なし

スキーマ・データが配信されるトピック名。このプロパティを設定する場合、スキーマは伝播されません。スキーマが伝播されるのは、Avroフォーマッタの場合のみです。

gg.handler.name.SchemaPrClassName

オプション

Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) KafkaハンドラのCreateProducerRecord Javaインタフェースを実装するカスタム・クラスの完全修飾クラス名。

この実装クラスを提供: oracle.goldengate.handler.kafka

ProducerRecord

スキーマはProducerRecordとして伝播もされます。デフォルトのキーは、完全修飾の表名です。これをスキーマ・レコードから変更する必要がある場合は、CreateProducerRecordインタフェースのカスタム実装を作成する必要があり、このプロパティは新しいクラスの完全修飾名を示すように設定する必要があります。

gg.handler.name.mode

オプション

tx/op

tx

Kafkaハンドラが操作モードの場合、変更キャプチャ・データ・レコード(Insert、Update、Deleteなど)の各ペイロードはKafkaプロデューサ・レコードとして表され、一度に1つずつフラッシュされます。Kafkaハンドラがトランザクション・モードの場合、ソース・トランザクション内の操作はすべて、1つのKafkaプロデューサ・レコードとして表されます。組み合されたこのバイト・ペイロードが、トランザクション・コミット・イベント時にフラッシュされます。

gg.handler.name.topicMappingTemplate

必須

実行時にKafkaトピック名を解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.keyMappingTemplate

必須

実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.hander.name.logSuccessfullySentMessages

オプション

true | false

true

trueに設定すると、Kafkaハンドラは、Kafkaに正常に送信されたメッセージをINFOレベルで記録します。このプロパティを有効にすると、パフォーマンスに悪影響を与えます。

gg.handler.name.metaHeadersTemplate オプション メタ列キーワードのカンマ区切りリスト。 なし メタ列を選択し、メタ列キーワード構文を使用してコンテキスト・ベースのキー値ペアをKafkaメッセージ・ヘッダーに挿入できます。
8.2.8.1.3.3 Javaアダプタ・プロパティ・ファイル

アダプタ・プロパティ・ファイルからのKafkaハンドラのサンプル構成を次に示します。

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.Type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=oggtopic
gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp}
gg.handler.kafkahandler.Format = avro_op
gg.handler.kafkahandler.SchemaTopicName = oggSchemaTopic
gg.handler.kafkahandler.SchemaPrClassName = com.company.kafkaProdRec.SchemaRecord
gg.handler.kafkahandler.Mode = tx

Kafka統合のサンプルのReplicat構成とJavaアダプタ・プロパティ・ファイルの例は、次のディレクトリにあります。

GoldenGate_install_directory/AdapterExamples/big-data/kafka

8.2.8.1.3.4 Kafkaプロデューサ構成ファイル

Kafkaハンドラは、メッセージをKafkaに公開するために、Kafkaプロデューサ構成ファイルにアクセスする必要があります。Kafkaプロデューサ構成ファイルのファイル名は、Kafkaハンドラ・プロパティの次の構成によって制御されます。

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

Kafkaハンドラは、Javaのクラスパスを使用してKafkaプロデューサ構成ファイルを検索およびロードしようとします。したがって、Javaのクラスパスに、Kafkaプロデューサ構成ファイルがあるディレクトリが含まれている必要があります。

Kafkaプロデューサ構成ファイルには、Kafka専有のプロパティがあります。0.8.2.0のKafkaプロデューサ・インタフェース・プロパティについては、Kafkaドキュメントに構成情報が記載されています。Kafkaハンドラは、これらのプロパティを使用してKafkaブローカのホストおよびポートを解決し、Kafkaプロデューサ・クライアントとKafkaブローカの間の対話の動作は、Kafkaプロデューサ構成ファイルにあるプロパティによって制御されます。

Kafkaプロデューサのサンプル構成ファイルは、次のようになります。

bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
 
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 0
max.request.size = 1048576 
send.buffer.bytes = 131072
8.2.8.1.3.4.1 Kafkaプロデューサ・プロパティの暗号化
Kafkaプロデューサ構成ファイル内の機密プロパティは、Oracle GoldenGate資格証明ストアを使用して暗号化できます。

資格証明ストアの使用方法の詳細は、「Oracle GoldenGate資格証明ストアでの識別子の使用」を参照してください。

たとえば、次のkafkaプロパティがあるとします:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required
username="alice" password="alice"; 
次のように置き換えることができます:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required
username=ORACLEWALLETUSERNAME[alias domain_name]  password=ORACLEWALLETPASSWORD[alias
domain_name];
8.2.8.1.3.5 テンプレートを使用したトピック名とメッセージ・キーの解決

Kafkaハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、実行時にコンテンツを動的に解決し、その解決された値を解決された文字列に挿入するために使用します。

テンプレートは、次の構成プロパティを使用します。

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

テンプレートのモード

ソース・データベース・トランザクションは、それぞれが挿入、更新および削除である、1つ以上の個別の操作から構成されます。Kafkaハンドラは、操作ごとに1つのメッセージ(挿入、更新、削除)を送信するように構成することも、トランザクション・レベルで操作をメッセージにグループ化するように構成することもできます。テンプレート・キーワードの多くは、個々のソース・データベース操作のコンテキストに基づいてデータを解決します。したがって、トランザクション・レベルでメッセージを送信する場合には、キーワードの多くは機能しません。たとえば、トランザクション・レベルでメッセージを送信する場合は、${fullyQualifiedTableName}は機能せずに、操作のための修飾されたソース表名に解決されます。ただし、トランザクションには、多くのソース表に対する複数の操作を含めることができます。トランザクション・レベルでのメッセージの完全修飾表名の解決は、非決定的であるため、実行時に異常終了します。

テンプレート・キーワードの詳細は、テンプレート・キーワードを参照してください。
テンプレートの例」を参照してください。
8.2.8.1.3.6 HadoopプラットフォームでのKerberosを使用したKafkaの構成

次の手順で、Kerberosを使用してKafkaハンドラのReplicatを構成して、ClouderaインスタンスでKafkaトピックに対するOracle GoldenGate for Distributed Applications and Analytics (GG for DAA)証跡を処理できるようにします。

  1. GGSCIで、Kafka Replicatを追加します。
    GGSCI> add replicat kafka, exttrail dirdat/gg
  2. 次のプロパティを使用してprmファイルを構成します。
    replicat kafka
    discardfile ./dirrpt/kafkax.dsc, purge
    SETENV (TZ=PST8PDT)
    GETTRUNCATES
    GETUPDATEBEFORES
    ReportCount Every 1000 Records, Rate
    MAP qasource.*, target qatarget.*;
  3. 次のようにReplicatプロパティ・ファイルを構成します。
    ###KAFKA Properties file ###
    gg.log=log4j
    gg.log.level=info
    gg.report.time=30sec
    
    ###Kafka Classpath settings ###
    gg.classpath=/opt/cloudera/parcels/KAFKA-2.1.0-1.2.1.0.p0.115/lib/kafka/libs/*
    jvm.bootoptions=-Xmx64m -Xms64m -Djava.class.path=./ggjava/ggjava.jar -Dlog4j.configuration=log4j.properties -Djava.security.auth.login.config=/scratch/ydama/ogg/v123211/dirprm/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf
    
    ### Kafka handler properties ###
    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=kafka-producer.properties
    gg.handler.kafkahandler.format=delimitedtext
    gg.handler.kafkahandler.format.PkUpdateHandling=update
    gg.handler.kafkahandler.mode=op
    gg.handler.kafkahandler.format.includeCurrentTimestamp=false
    gg.handler.kafkahandler.format.fieldDelimiter=|
    gg.handler.kafkahandler.format.lineDelimiter=CDATA[\n]
    gg.handler.kafkahandler.topicMappingTemplate=myoggtopic
    gg.handler.kafkahandler.keyMappingTemplate=${position}
  4. 次のプロパティを使用してKafka Producerファイルを構成します。
    bootstrap.servers=10.245.172.52:9092
    acks=1
    #compression.type=snappy
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=1024
    linger.ms=2000
    
    security.protocol=SASL_PLAINTEXT
    
    sasl.kerberos.service.name=kafka
    sasl.mechanism=GSSAPI
  5. 次のプロパティを使用してjaas.confファイルを構成します。

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/scratch/ydama/ogg/v123211/dirtmp/keytabs/slc06unm/kafka.keytab"
    principal="kafka/slc06unm.us.oracle.com@HADOOPTEST.ORACLE.COM";
    };
  6. 保護されたKafkaトピックにClouderaインスタンスから接続するための最新のkey.tabファイルがあることを確認します。

  7. GGSCIからReplicatを起動し、INFO ALLで実行されていることを確認します。

  8. Replicatレポートで、処理されたレコードの合計数を確認します。レポートは次のようになります。

    Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)
    
    Copyright (c) 2007, 2018. Oracle and/or its affiliates. All rights reserved
    
    Built with Java 1.8.0_161 (class version: 52.0)
    
    2018-08-05 22:15:28 INFO OGG-01815 Virtual Memory Facilities for: COM
    anon alloc: mmap(MAP_ANON) anon free: munmap
    file alloc: mmap(MAP_SHARED) file free: munmap
    target directories:
    /scratch/ydama/ogg/v123211/dirtmp.
    
    Database Version:
    
    Database Language and Character Set:
    
    ***********************************************************************
    ** Run Time Messages **
    ***********************************************************************
    
    
    2018-08-05 22:15:28 INFO OGG-02243 Opened trail file /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000 at 2018-08-05 22:15:28.258810.
    
    2018-08-05 22:15:28 INFO OGG-03506 The source database character set, as determined from the trail file, is UTF-8.
    
    2018-08-05 22:15:28 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTMER1", target qatarget."BDCUSTMER1".
    
    2018-08-05 22:15:28 INFO OGG-02756 The definition for table QASOURCE.BDCUSTMER1 is obtained from the trail file.
    
    2018-08-05 22:15:28 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, NAME, CITY, STATE.
    
    2018-08-05 22:15:28 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTMER1: CUST_CODE.
    
    2018-08-05 22:15:29 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTORD1", target qatarget."BDCUSTORD1".
    
    2018-08-05 22:15:29 INFO OGG-02756 The definition for table QASOURCE.BDCUSTORD1 is obtained from the trail file.
    
    2018-08-05 22:15:29 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID, PRODUCT_PRICE, PRODUCT_AMOUNT, TRANSACTION_ID.
    
    2018-08-05 22:15:29 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTORD1: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID.
    
    2018-08-05 22:15:33 INFO OGG-01021 Command received from GGSCI: STATS.
    
    2018-08-05 22:16:03 INFO OGG-01971 The previous message, 'INFO OGG-01021', repeated 1 times.
    
    2018-08-05 22:43:27 INFO OGG-01021 Command received from GGSCI: STOP.
    
    ***********************************************************************
    * ** Run Time Statistics ** *
    ***********************************************************************
    
    Last record for the last committed transaction is the following:
    ___________________________________________________________________
    Trail name : /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000
    Hdr-Ind : E (x45) Partition : . (x0c)
    UndoFlag : . (x00) BeforeAfter: A (x41)
    RecLength : 0 (x0000) IO Time : 2015-08-14 12:02:20.022027
    IOType : 100 (x64) OrigNode : 255 (xff)
    TransInd : . (x03) FormatType : R (x52)
    SyskeyLen : 0 (x00) Incomplete : . (x00)
    AuditRBA : 78233 AuditPos : 23968384
    Continued : N (x00) RecCount : 1 (x01)
    
    2015-08-14 12:02:20.022027 GGSPurgedata Len 0 RBA 6473
    TDR Index: 2
    ___________________________________________________________________
    
    Reading /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000, current RBA 6556, 20 records, m_file_seqno = 0, m_file_rba = 6556
    
    Report at 2018-08-05 22:43:27 (activity since 2018-08-05 22:15:28)
    
    From Table QASOURCE.BDCUSTMER1 to qatarget.BDCUSTMER1:
    # inserts: 5
    # updates: 1
    # deletes: 0
    # discards: 0
    From Table QASOURCE.BDCUSTORD1 to qatarget.BDCUSTORD1:
    # inserts: 5
    # updates: 3
    # deletes: 5
    # truncates: 1
    # discards: 0
    
    
  9. セキュアKafkaトピックが作成されていることを確認します。

    /kafka/bin/kafka-topics.sh --zookeeper slc06unm:2181 --list  
    myoggtopic
  10. セキュアKafkaピックの内容を確認します。

    1. 次の内容のconsumer.propertiesファイルを作成します。

      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
    2. 次の環境変数を設定します。

      export KAFKA_OPTS="-Djava.security.auth.login.config="/scratch/ogg/v123211/dirprm/jaas.conf"
      
    3. コンシューマ・ユーティリティを実行してレコードをチェックします。

      /kafka/bin/kafka-console-consumer.sh --bootstrap-server sys06:9092 --topic myoggtopic --new-consumer --consumer.config consumer.properties
8.2.8.1.3.7 KafkaのSSLのサポート

Kafkaは、KafkaクライアントとKafkaクラスタの間のSSL接続をサポートしています。SSL接続により、クライアントとサーバーの間で転送されるメッセージの認証と暗号化の両方が可能になります。

SSLはサーバー認証用(クライアントがサーバーを認証します)に構成できますが、通常は相互認証用(クライアントとサーバーの両方が相互に認証します)に構成されます。SSL相互認証では、接続の各片側がキーストアから証明書を取得し、それを接続のもう一方の側に渡します。これにより、証明書がトラストストア内の証明書に対して検証されます。
SSLを設定する場合、実行している特定のKafkaバージョンの詳細は、Kafkaのドキュメントを参照してください。Kafkaのドキュメントには、次の実行方法に関する情報も記載されています。
  • SSL用のKafkaクラスタの設定
  • キーストア/トラストストア・ファイルでの自己署名証明書の作成
  • SSL用のKafkaクライアントの構成
Kafkaプロデューサおよびコンシューマ・コマンドライン・ユーティリティを使用してSSL接続を実装してから、それをOracle GoldenGate for Distributed Applications and Analytics (GG for DAA)で使用することをお薦めします。GG for DAAをホストするマシンとKafkaクラスタの間のSSL接続を確認する必要があります。このアクションにより、GG for DAAを導入する前に、SSL接続が正しく設定され機能していることを確認できます。
SSL相互認証を使用したKafkaプロデューサ構成の例を次に示します。
bootstrap.servers=localhost:9092
acks=1
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
security.protocol=SSL
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234

8.2.8.1.4 スキーマの伝播

Kafkaハンドラには、スキーマをスキーマ・トピックに公開する機能があります。現在、スキーマ公開が有効なのは、Avro行フォーマッタとAvro操作フォーマッタのみです。KafkaハンドラのschemaTopicNameプロパティが設定されている場合、スキーマは次のイベントで公開されます。

  • 特定の表のAvroスキーマは、その表に対する最初の操作が発生するときに公開されます。

  • Kafkaハンドラがメタデータ変更イベントを受信すると、スキーマはフラッシュされます。特定の表について再生成されるAvroスキーマは、その表に対する次の操作が発生するときに公開されます。

  • Avroのラッピング機能を有効にしている場合、汎用のラッパーAvroスキーマは、任意の操作が最初に発生するときに公開されます。汎用のラッパーを有効にする(Avroフォーマッタの構成でAvroスキーマの機能を有効にする)には、「Avro行フォーマッタ」および「Avro操作フォーマッタ」を参照してください。

KafkaのProducerRecord値はスキーマになり、キーは完全修飾の表名になります。

AvroメッセージはAvroスキーマに直接依存しているため、Kafka上のAvroのユーザーには問題が発生することがあります。Avroメッセージはバイナリなので、人間が読める形式ではありません。Avroメッセージをデシリアライズするには、まず受信者に正しいAvroスキーマが必要ですが、ソース・データベースからの各表が個々のAvroスキーマになるため、困難な場合があります。ソースOracle GoldenGate証跡ファイルに複数の表からの操作が含まれる場合、Kafkaメッセージの受信者が、個々のメッセージをデシリアライズするためにどのAvroスキーマを使用するかを判断することはできません。この問題を解決するには、特殊なAvroメッセージを汎用のAvroメッセージ・ラッパーでラップできます。この汎用のAvroラッパーを使用すれば、完全修飾の表名、スキーマ文字列のハッシュコード、ラップされたAvroメッセージを確認することができます。受信者は、完全修飾の表名と、スキーマ文字列のハッシュコードを使用して、ラップされたメッセージに関連付けられたスキーマを解決し、そのスキーマを使用して、ラップされたメッセージをデシリアライズできます。

8.2.8.1.5 パフォーマンスに関する考慮事項

最高のパフォーマンスを得るために、Kafkaハンドラを操作モードで動作するように送信することをお薦めします。

gg.handler.name.mode = op

また、Kafkaプロデューサ・プロパティ・ファイルでbatch.sizeとlinger.msの値も設定することをお薦めします。これらの値は、ユースケース別の状況によって大きく異なります。通常、値が大きいほどスループットは向上しますが、レイテンシが大きくなります。これらのプロパティの値を小さくすると、レイテンシは小さくなりますが、全体的なスループットは低下します。

Replicat変数GROUPTRANSOPSを使用しても、パフォーマンスは向上します。推奨される設定は10000です。

ソース証跡ファイルからシリアライズした操作を個々のKafkaメッセージで配信する必要がある場合、Kafkaハンドラは操作モードに設定されている必要があります。

gg.handler.name.mode = op

8.2.8.1.6 セキュリティについて

Kafkaバージョン0.9.0.0で、SSL/TLSおよびSASL(Kerberos)を介したセキュリティが導入されました。SSL/TLSおよびSASLセキュリティ製品の一方または両方を使用してKafkaハンドラを保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを使用する統合からのセキュリティ機能の抽象化を提供します。Kafkaハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサ・プロパティ・ファイルを必要なセキュリティ・プロパティを使用して構成する必要があります。Kafkaクラスタの保護に関する詳細は、次の場所にあるKafkaドキュメントを参照してください。

keytabファイルからKerberosパスワードを復号化できない場合があります。これによって、Kerberos認証が対話型モードにフォール・バックされ、プログラムで呼び出されているため機能しなくなります。この問題の原因は、Java Cryptography Extension (JCE)が、Java Runtime Environment (JRE)にインストールされていないことです。JCEがJREにロードされていることを確認します。http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.htmlを参照してください。

8.2.8.1.7 メタデータ変更イベント

メタデータ変更イベントが、Kafkaハンドラによって処理されるようになりました。これが関係するのは、スキーマ・トピックを構成しており、使用するフォーマッタがスキーマ伝播をサポートしている場合のみです(現在は、Avro行フォーマッタとAvro操作フォーマッタ)。スキーマが変更されている表で次に操作が発生するとき、更新されたスキーマがスキーマ・トピックに公開されます。

メタデータ変更イベントをサポートするには、ソース・データベースで変更を取得するOracle GoldenGateプロセスが、証跡機能でのOracle GoldenGateメタデータ(Oracle GoldenGate 12c (12.2)で導入)をサポートする必要があります。

8.2.8.1.8 Snappyに関する考慮事項

Kafkaプロデューサ構成ファイルは、圧縮の使用をサポートしています。構成可能なオプションの1つにSnappyがあり、これは、オープン・ソースの圧縮および圧縮解除(codec)ライブラリで、他のcodecライブラリよりパフォーマンスが優れています。Snappy JARはすべてのプラットフォームで実行されるわけではありません。Snappyは、Linuxシステムでは動作するようですが、他のUNIXおよびWindowsの実装では動作する場合としない場合があります。Snappyの圧縮を使用する場合は、Snappyを使用する圧縮を実装する前に、必要なシステムすべてでSnappyをテストします。必要なシステムの一部にSnappyが対応しない場合は、別のcodecライブラリを使用することをお薦めします。

8.2.8.1.9 Kafkaインターセプタのサポート

Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。

インターセプタの一般的な使用例は、監視です。Kafkaプロデューサ・インターセプタは、インタフェース org.apache.kafka.clients.producer.ProducerInterceptorに準拠している必要があります。Kafkaハンドラは、プロデューサ・インターセプタの使用をサポートしています。

ハンドラでインターセプタを使用する場合の要件は次のとおりです。

  • Kafkaプロデューサ構成プロパティ"interceptor.classes"は、起動するインターセプタのクラス名で構成する必要があります。
  • インターセプタを呼び出すためには、jarファイルとすべての依存関係jarをJVMで使用できる必要があります。したがって、インターセプタを含むjarファイルとすべての依存関係jarを、ハンドラ構成ファイルのgg.classpathに追加する必要があります。

    詳細は、Kafkaのドキュメントを参照してください。

8.2.8.1.10 Kafkaパーティションの選択

Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。

(Kafkaメッセージ・キーのハッシュ)係数(パーティションの数) = 選択したパーティション番号

Kafkaメッセージ・キーは、次の構成値によって選択されます。

gg.handler.{your handler name}.keyMappingTemplate=

このパラメータが静的キーを生成する値に設定されている場合、すべてのメッセージは同じパーティションに送信されます。静的キーの例を次に示します。

gg.handler.{your handler name}.keyMappingTemplate=StaticValue

このパラメータが、頻繁に変更されないキーを生成する値に設定されている場合、パーティション選択の変更頻度は低くなります。次の例では、表名をメッセージ・キーとして使用します。特定のソース表に対するすべての操作は同じキーを持つため、同じパーティションにルーティングされます。

gg.handler.{your handler name}.keyMappingTemplate=${tableName}
nullのKafkaメッセージ・キーはラウンドロビン・ベースでパーティションに分散されます。これを行うには、次のように設定します。
gg.handler.{your handler name}.keyMappingTemplate=${null}

マッピング・キーの構成に推奨される設定は次のとおりです。

gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}

これにより、連結および区切られた主キー値であるKafkaメッセージ・キーが生成されます。

各行の操作には一意の主キーが必要であるため、各行に一意のKafkaメッセージ・キーが生成されます。別の重要な考慮事項として、異なるパーティションに送信されたKafkaメッセージは、送信された元の順序でKafkaコンシューマに配信される保証はありません。これはKafkaの仕様の一部です。順序はパーティション内でのみ維持されます。Kafkaメッセージ・キーとして主キーを使用すると、同じ主キーを持つ同じ行に対する操作で同じKafkaメッセージ・キーが生成されるため、同じKafkaパーティションに送信されます。このようにして、同じ行に対する操作の順序が維持されます。

DEBUGログ・レベルでは、Kafkaメッセージの座標(トピック、パーティションおよびオフセット)は、正常に送信されたメッセージの.logファイルに記録されます。

8.2.8.1.11 トラブルシューティング

8.2.8.1.11.1 Kafka設定の検証

コマンドラインのKafkaプロデューサを使用すると、ダミー・データをKafkaトピックに書き込むことができ、Kafkaコンシューマを使用してこのデータをKafkaトピックから読み取ることができます。この方法を使用して、ディスク上でのKafkaトピックの設定および読取り/書込み権限を確認します。http://kafka.apache.org/documentation.html#quickstartを参照してください。

8.2.8.1.11.2 クラスパスの問題

Javaクラスパスの問題はよくある問題です。そのような問題には、log4jログ・ファイルのClassNotFoundExceptionの問題や、gg.classpath変数の入力ミスによるクラスパスの解決エラーなどがあります。Kafkaクライアント・ライブラリは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)製品には付属していません「クラスパス構成」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタ・プロパティ・ファイルでgg.classpathプロパティを適切に構成する必要があります。

8.2.8.1.11.3 無効なKafkaバージョン

Kafkaハンドラでは、Kafka 0.8.2.2以前のバージョンはサポートされていません。サポートされていないバージョンのKafkaを実行すると、実行時Java例外java.lang.NoSuchMethodErrorが発生します。これは、org.apache.kafka.clients.producer.KafkaProducer.flush()メソッドが見つからないことを意味します。このエラーが発生した場合は、Kafka 0.9.0.0以降のバージョンに移行します。

8.2.8.1.11.4 Kafkaプロデューサ・プロパティ・ファイルが見つからない

この問題は一般的に、次の例外として発生します。

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

gg.handler.kafkahandler.KafkaProducerConfigFile構成変数をチェックし、Kafkaプロデューサ構成ファイルの名前が正しく設定されていることを確認します。gg.classpath変数をチェックし、クラスパスにKafkaプロデューサ・プロパティ・ファイルのパスが含まれていることと、プロパティ・ファイルのパスの末尾にワイルドカード(*)が含まれていないことを確認します。

8.2.8.1.11.5 Kafkaの接続の問題

この問題は、KafkaハンドラがKafkaに接続できない場合に発生します。次の警告が表示されます。

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused

接続の再試行間隔が経過し、Kafkaハンドラ・プロセスは異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサ・プロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -lなど)を使用すると、想定するポートをKafkaがリスニングしていることを確認できます。

8.2.8.1.12 Kafkaハンドラ・クライアント依存性

Apache Kafkaデータベースに接続するためのKafkaハンドラの依存性とはどのようなものでしょう。

Maven Central RepositoryのKafkaデータベースのアーティファクトは次のとおりです。

Maven groupId: org.apache.kafka

Maven atifactId: kafka-clients

Maven version: 各セクションでリストされるKafkaのバージョン番号

8.2.8.1.12.1 Kafka 2.8.0
kafka-clients-2.8.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.8.1.jar
zstd-jni-1.4.9-1.jar
8.2.8.1.12.2 Kafka 2.7.0
kafka-clients-2.7.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.7.jar
zstd-jni-1.4.5-6.jar
8.2.8.1.12.3 Kafka 2.6.0
kafka-clients-2.6.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.4-7.jar
8.2.8.1.12.4 Kafka 2.5.1
kafka-clients-2.5.1.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.4-7.jar
8.2.8.1.12.5 Kafka 2.4.1
kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.3-1.jarr
8.2.8.1.12.6 Kafka 2.3.1
kafka-clients-2.3.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.26.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.0-1.jar

8.2.8.2 Apache Kafka Connectハンドラ

Kafka Connectハンドラは、標準的なKafkaのメッセージング機能を拡張したものです。

この章では、Kafka Connectハンドラの使用方法について説明します。

8.2.8.2.1 概要

Oracle GoldenGate Kafka Connectは、標準的なKafkaのメッセージング機能を拡張したものです。Kafka Connectは、標準的なKafkaのプロデューサおよびコンシューマのインタフェースの上にある機能レイヤーです。新規のソースおよびターゲット・システムをトポロジに簡単に追加できる、メッセージングの標準化を提供します。

ConfluentはKafka Connectの主要な採用先であり、そのConfluent Platform製品には標準的なKafka Connectの機能拡張が含まれています。これには、AvroのシリアライズとデシリアライズおよびAvroスキーマ・レジストリが含まれています。Kafka Connectの機能の多くは、Apache Kafkaで利用できます。様々なオープン・ソースのKafka Connect統合は、次の場所を参照してください。

https://www.confluent.io/product/connectors/

Kafka Connectハンドラは、Kafka Connectのソース・コネクタです。Oracle GoldenGateでサポートされているデータベースから、データベースの変更を取得し、その変更データをKafka Connectレイヤー経由でKafkaにストリーミングできます。このハンドラを使用して、Oracle Event Hub Cloud Service (EHCS)に接続することもできます。

Kafka Connectでは、プロプライエタリなオブジェクトを使用してスキーマ(org.apache.kafka.connect.data.Schema)およびメッセージ(org.apache.kafka.connect.data.Struct)が定義されます。Kafka Connectハンドラは、公開済データおよび公開済データの構造を管理するように構成できます。

Kafka Connectハンドラは、Kafkaハンドラでサポートされているどのプラガブル・フォーマッタもサポートしていません

8.2.8.2.2 詳細な機能

JSONコンバータ

Kafka Connectフレームワークにより、インメモリーのKafka Connectメッセージを、ネットワーク経由での伝送に適したシリアライズされた形式に変換するコンバータが提供されます。このコンバータは、Kafkaプロデューサのプロパティ・ファイルの構成を使用して選択されます。

Kafka ConnectおよびJSONコンバータは、Apache Kafkaダウンロードの一部として使用可能です。JSONコンバータによりKafkaのキーおよび値がJSONに変換され、それがKafkaトピックに送信されます。Kafkaプロデューサのプロパティ・ファイルで、次の構成を使用してJSONコンバータを識別します。

key.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=true 
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=true

メッセージのフォーマットは、ペイロード情報が後に続くメッセージ・スキーマ情報です。JSONは自己記述型のフォーマットであるため、Kafkaに公開される各メッセージにはスキーマ情報を含めないでください。

メッセージからJSONのスキーマ情報を省略するには、次のように設定します。

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Avroコンバータ

Confluentにより、Kafkaのインストール、Kafkaのサポート、Kafka上に構築された拡張機能が提供され、Kafkaの潜在的な可能性の実現が支援されます。Confluentは、オープン・ソース版のKafka (Confluent Open Source)と、購入可能なエンタープライズ版(Confluent Enterprise)の両方を提供しています。

一般的なKafkaのユースケースは、Kafka上でAvroメッセージを送信することです。これは、AvroメッセージをデシリアライズするためにAvroスキーマに依存するため、受信側で問題が発生する可能性があります。受信メッセージは、プロデューサ側でメッセージを生成するために使用されたAvroスキーマと完全に一致させる必要があるため、スキーマの進化によって問題が増加する可能性があります。間違ったAvroスキーマでAvroメッセージをデシリアライズすると、ランタイム・エラー、不完全なデータまたは不正なデータが発生することがあります。Confluentでは、スキーマ・レジストリおよびConfluentスキーマ・コンバータを使用してこの問題を解決しました。

Kafkaプロデューサのプロパティ・ファイルの構成を次に示します。

key.converter=io.confluent.connect.avro.AvroConverter 
value.converter=io.confluent.connect.avro.AvroConverter 
key.converter.schema.registry.url=http://localhost:8081 
value.converter.schema.registry.url=http://localhost:8081 

メッセージがKafkaに公開されると、Avroスキーマが登録され、スキーマ・レジストリに格納されます。Kafkaからメッセージが消費されるとき、メッセージの作成に使用された正確なAvroスキーマをスキーマ・レジストリから取得してAvroメッセージをデシリアライズできます。Avroメッセージとそれに対応する受信側のAvroスキーマの照合を作成することで、この問題を解決しています。

Avroコンバータを使用するための要件は次のとおりです。

  • この機能は、Confluent Kafkaの両バージョン(オープン・ソースまたはエンタープライズ)で使用できます。
  • Confluentスキーマ・レジストリ・サービスが実行されている必要があります。
  • ソース・データベース表には、関連付けられたAvroスキーマが必要です。異なるAvroスキーマに関連付けられたメッセージは、異なるKafkaトピックに送信する必要があります。
  • ConfluentのAvroコンバータおよびスキーマ・レジストリ・クライアントがクラスパスで使用できる必要があります。

スキーマ・レジストリは、トピックごとにAvroスキーマを追跡します。メッセージは、同じスキーマまたは同じスキーマの進化バージョンを持つトピックに送信する必要があります。ソース・メッセージにはソース・データベース表スキーマに基づくAvroスキーマがあり、Avroスキーマはソース表ごとに一意です。複数のソース表で単一のトピックにメッセージを公開すると、ソース表から送信されたメッセージが以前のメッセージと異なるたびに、スキーマが進化していることがスキーマ・レジストリに示されます。

Protobuf Converter

Protobuf Converterでは、Kafka ConnectメッセージをGoogle Protocol Buffers形式としてフォーマットできます。Protobuf ConverterはConfluentスキーマ・レジストリと統合されており、この機能は、Confluentのオープン・ソース・バージョンとエンタープライズ・バージョンの両方で使用できます。Confluentは、Confluentバージョン5.5.0以降でProtobuf Converterを追加しました。

Kafkaプロデューサ・プロパティ・ファイルでProtobuf Converterを選択するための構成を次に示します。
key.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

Protobuf Converterを使用するための要件は次のとおりです。

  • この機能は、5.5.0以降、Confluent Kafkaの両バージョン(オープン・ソースまたはエンタープライズ)で使用できます。
  • Confluentスキーマ・レジストリ・サービスが実行されている必要があります。
  • スキーマ(ソース表)が異なるメッセージは、異なるKafkaトピックに送信する必要があります。
  • Confluent Protobufコンバータおよびスキーマ・レジストリ・クライアントがクラスパスで使用できる必要があります。

スキーマ・レジストリは、トピックごとにProtobufスキーマを追跡します。メッセージは、同じスキーマまたは同じスキーマの進化バージョンを持つトピックに送信する必要があります。ソース・メッセージにはソース・データベース表スキーマに基づくProtobufスキーマがあり、Protobufスキーマはソース表ごとに一意です。複数のソース表で単一のトピックにメッセージを公開すると、ソース表から送信されたメッセージが以前のメッセージと異なるたびに、スキーマが進化していることがスキーマ・レジストリに示されます。

8.2.8.2.3 Kafka Connectハンドラの設定および実行

ここでは、Kafka Connectハンドラのコンポーネントの構成およびハンドラの実行について説明します。

クラスパス構成

Kafka ConnectハンドラをKafkaに接続して実行できるように、gg.classpath構成変数に2つのものを含める必要があります。必要なのは、Kafkaプロデューサ・プロパティ・ファイルと、KafkaクライアントJARです。KafkaクライアントのJARは、Kafka Connectハンドラが接続するKafkaのバージョンと一致する必要があります。必要なクライアントJARファイルのバージョン別リストは、Kafkaハンドラ・クライアント依存性(「Kafka Connectハンドラ・クライアント依存性」)を参照してください。Kafkaプロデューサのプロパティ・ファイルの格納場所として推奨されるのは、Oracle GoldenGateのdirprmディレクトリです。

Kafka ConnectクライアントJARのデフォルトの場所は、Kafka_Home/libs/*ディレクトリです。

gg.classpath変数は、正確に構成する必要があります。Kafkaプロデューサのプロパティ・ファイルのパス指定に含めるパスには、ワイルドカードを使用しないでください。Kafkaプロデューサのプロパティ・ファイルへのパスにアスタリスク(*)のワイルドカードを含めると、それが破棄されます。依存関係JARのパス指定には、そのディレクトリにあるJARファイルがすべて関連するクラスパスに含まれるように、ワイルドカード(*)を含める必要があります。*.jarは使用しないでください

正しく構成されたApache Kafkaのクラスパスの例:

gg.classpath=dirprm:{kafka_install_dir}/libs/*

正しく構成されたConfluent Kafkaのクラスパスの例を次に示します。

gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*
8.2.8.2.3.1 Kafka Connectハンドラの構成

生成されたKafka Connectメッセージ内のメタ列フィールドの自動出力は、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) 21cリリース以降では削除されています。

メタ列フィールドは次のプロパティとして構成できます。

gg.handler.name.metaColumnsTemplate

メタ列を以前のバージョンと同様に出力するには、次のように構成します。

gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}

主キー列およびトークンも含めるには、次のように構成します。

gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}

詳細は、構成プロパティを参照してください:

gg.handler.name.metaColumnsTemplate

表8-10 Kafka Connectハンドラの構成プロパティ

プロパティ 必須/オプション 有効な値 デフォルト 説明
gg.handler.name.type

必須

kafkaconnect

なし

Kafka Connectハンドラを選択するための構成。

gg.handler.name.kafkaProducerConfigFile

必須

string

なし

KafkaのプロパティおよびKafka Connect構成プロパティを含むプロパティ・ファイルの名前。このファイルは、gg.classpathプロパティによって構成されたクラスパスの一部である必要があります。

gg.handler.name.topicMappingTemplate

必須

実行時にKafkaトピック名を解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.keyMappingTemplate

必須

実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.includeTokens

オプション

true | false

false

trueに設定すると、出力メッセージにマップ・フィールドが含まれます。キーはトークンで、その値は、キーと値がOracle GoldenGateソース証跡ファイルからのトークン・キーと値であるマップです。

このフィールドを抑止する場合は、falseに設定します。

gg.handler.name.messageFormatting

オプション

row | op

row

出力メッセージのモデル化方法を制御します。行を選択すると、出力メッセージは行としてモデル化されます。opに設定すると、出力メッセージは操作メッセージとしてモデル化されます。

gg.handler.name.insertOpKey

オプション

任意の文字列

I

挿入操作を示すフィールドop_typeの値。

gg.handler.name.updateOpKey

オプション

任意の文字列

U

挿入操作を示すフィールドop_typeの値。

gg.handler.name.deleteOpKey

オプション

任意の文字列

D

削除操作を示すフィールドop_typeの値。

gg.handler.name.truncateOpKey

オプション

任意の文字列

T

切捨て操作を示すフィールドop_typeの値。

gg.handler.name.treatAllColumnsAsStrings

オプション

true | false

false

trueに設定すると、すべての出力フィールドが文字列として扱われます。falseに設定すると、ハンドラによって、ソース証跡ファイルからの対応するフィールド・タイプがそれに最も適したKafka Connectのデータ型にマップされます。

gg.handler.name.mapLargeNumbersAsStrings

オプション

true | false

false

大きい数値は、数値フィールドにDoubleとしてマッピングされます。特定のシナリオでは、精度を失う可能性があります。

trueに設定すると、精度を保つため、これらのフィールドが文字列としてマップされます。

gg.handler.name.pkUpdateHandling

オプション

abend | update | delete-insert

abend

モデリング行メッセージがgg.handler.name.messageFormatting=rowの場合にのみ適用されます。更新の場合に、ビフォアおよびアフター・イメージとしてのモデリング操作メッセージがそのメッセージに伝播される場合は適用されません。

gg.handler.name.metaColumnsTemplate

オプション

任意のメタ列キーワード。

なし

テンプレートを表す1つ以上のテンプレート値で構成されるカンマ区切り文字列。メタ列のキーワードを参照してください。

gg.handler.name.includeIsMissingFields

オプション

true|false

true

extract{column_name}を含めるにはtrueに設定します。

このプロパティを各列に設定して、null値がソース証跡ファイルで実際にnullであるか、ソース証跡ファイルにないかを、ダウンストリーム・アプリケーションが区別できるようにします。

gg.handler.name.enableDecimalLogicalType オプション true|false false Kafka Connectで小数論理型を有効にするには、trueに設定します。小数論理型では、64ビット・データ型に収まらない数値を表すことができます。
gg.handler.name.oracleNumberScale オプション 正の整数 38 gg.handler.name.enableDecimalLogicalType=trueの場合にのみ適用されます。一部のソース・データ型には、固定スケールが関連付けられていません。Kafka Connect小数論理型にスケールを設定する必要があります。メタデータにスケールがないソース型の場合は、このパラメータの値を使用してスケールを設定します。
gg.handler.name.EnableTimestampLogicalType オプション true|false false Kafka Connectタイムスタンプ論理型を有効にするには、trueに設定します。Kafka Connectタイムスタンプの論理時間は、Javaのエポックからのミリ秒の整数測定値です。これは、タイムスタンプ論理型が使用された場合、ミリ秒より高い精度が使用できないことを意味します。このプロパティを使用するには、gg.format.timestampプロパティを設定する必要があります。このプロパティは、文字列形式のタイムスタンプの出力を決定するために使用されるタイムスタンプ書式設定文字列です。たとえば、gg.format.timestamp=yyyy-MM-dd HH:mm:ss.SSSです。goldengate.userexit.timestampプロパティが構成ファイルに設定されていないことを確認します。このプロパティを設定すると、入力タイムスタンプが、論理タイムスタンプに必要なJavaオブジェクトに解析されなくなります。
gg.handler.name.metaHeadersTemplate オプション メタ列キーワードのカンマ区切りリスト。 なし メタ列を選択し、メタ列キーワード構文を使用してコンテキスト・ベースのキー値ペアをKafkaメッセージ・ヘッダーに挿入できます。「メタ列のキーワード」を参照してください。
gg.handler.name.schemaNamespace

オプション KafkaコネクタAvroスキーマのネーミング要件に違反する文字を含まない任意の文字列。 なし 生成されるKafka Connectスキーマ名の制御に使用されます。設定されていない場合、スキーマ名は修飾されたソース表名と同じになります。たとえば、ソース表がQASOURCE.TCUSTMERである場合、Kafka Connectスキーマ名は同じになります。

このプロパティを使用して、生成されるスキーマ名を制御できます。たとえば、このプロパティがcom.example.companyに設定されている場合、表QASOURCE.TCUSTMERに対して生成されるKafka Connectスキーマ名はcom.example.company.TCUSTMERです。

gg.handler.name.enableNonnullable オプション true|false false デフォルトの動作では、生成されたKafka接続スキーマですべてのフィールドをnull許容として設定します。このパラメータをtrueに設定すると、メタデータ・プロバイダによって提供されるターゲット・メタデータで、構成されたnull許容値を使用できます。このプロパティをtrueに設定すると、有害な副作用が発生する可能性があります。
  1. フィールドをnull不可に設定すると、フィールドが有効であるためには値が必要であることを意味します。フィールドがnull不可として設定され、ソース証跡ファイルの値がnullまたは欠落している場合、ランタイム・エラーが発生します。
  2. フィールドをnull不可に設定すると、切捨て操作は伝播できません。切捨て操作にはフィールド値がありません。その結果、Kafka Connectコンバータのシリアライゼーションはフィールドに値がないためにフィールド化されます。
  3. スキーマを変更してnull不可フィールドが追加されると、Confluentスキーマ・レジストリでスキーマの下位互換性の例外が発生します。これが発生した場合、ユーザーはConfluentスキーマ・レジストリの互換性構成を調整または無効化する必要があります。

詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。

サンプル構成の確認

gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
8.2.8.2.3.2 テンプレートを使用したトピック名とメッセージ・キーの解決

Kafka Connectハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成パラメータに適用されます。

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

テンプレートのモード

Kafka Connectハンドラでは、操作メッセージのみを送信できます。Kafka Connectハンドラでは、操作メッセージをより大きなトランザクション・メッセージにグループ化できません。

テンプレート・キーワードの詳細は、テンプレート・キーワードを参照してください。
テンプレートの例は、テンプレートの例を参照してください。
8.2.8.2.3.3 Kafka Connectハンドラでのセキュリティの構成

Kafkaバージョン0.9.0.0にSSL/TLSまたはKerberosを介したセキュリティが導入されました。Kafka Connectハンドラは、SSL/TLSまたはKerberos使用して保護できます。Kafkaプロデューサ・クライアント・ライブラリは、それらのライブラリを利用する統合からのセキュリティ機能の抽象化を提供します。Kafka Connectハンドラは、セキュリティ機能から効率的に抽象化されます。セキュリティを有効にするには、Kafkaクラスタのセキュリティを設定し、マシンを接続して、Kafkaプロデューサのプロパティ・ファイル(Kafkaハンドラで処理のために使用する)を必要なセキュリティ・プロパティを使用して構成する必要があります。

keytabファイルからKerberosパスワードを復号化できない場合があります。これによって、Kerberos認証が対話型モードにフォール・バックされ、プログラムで呼び出されているため機能しなくなります。この問題の原因は、Java Cryptography Extension (JCE)が、Java Runtime Environment (JRE)にインストールされていないことです。JCEがJREにロードされていることを確認します。http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.htmlを参照してください。

8.2.8.2.4 セキュア・スキーマ・レジストリへの接続

Kafka Connectの顧客トポロジには、保護されているスキーマ・レジストリが含まれる場合があります。このトピックでは、保護されたスキーマ・レジストリへの接続用に構成されたKafkaプロデューサ・プロパティを設定する方法について説明します。

SSL相互認証
key.converter.schema.registry.ssl.truststore.location=
key.converter.schema.registry.ssl.truststore.password=
key.converter.schema.registry.ssl.keystore.location=
key.converter.schema.registry.ssl.keystore.password=
key.converter.schema.registry.ssl.key.password=
value.converter.schema.registry.ssl.truststore.location=
value.converter.schema.registry.ssl.truststore.password=
value.converter.schema.registry.ssl.keystore.location=
value.converter.schema.registry.ssl.keystore.password=
value.converter.schema.registry.ssl.key.password=

SSL基本認証

key.converter.basic.auth.credentials.source=USER_INFO
key.converter.basic.auth.user.info=username:password
key.converter.schema.registry.ssl.truststore.location=
key.converter.schema.registry.ssl.truststore.password=
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=username:password
value.converter.schema.registry.ssl.truststore.location=
value.converter.schema.registry.ssl.truststore.password=

8.2.8.2.5 Kafka Connectハンドラのパフォーマンスに関する考慮事項

Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)構成とKafkaプロデューサでは、パフォーマンスに影響を与える複数の構成設定があります。

Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPSパラメータです。GROUPTRANSOPSパラメータを使用すると、Replicatで複数のソース・トランザクションを単一のターゲット・トランザクションにグループ化できます。トランザクションのコミット時、Kafka Connectハンドラは、書込み永続性のためにKafkaプロデューサ上でフラッシュをコールしてメッセージをKafkaにプッシュし、その後チェックポイントを送信します。フラッシュ・コールはコストの大きなコールであり、ReplicatのGROUPTRANSOPS設定を大きく設定すると、Replicatのフラッシュ・コールの頻度が少なくてすみ、パフォーマンスが向上します。

GROUPTRANSOPSのデフォルト設定は1000で、値を2500、5000、さらには10000に増やすことでパフォーマンスの向上が得られます。

opモードのgg.handler.kafkaconnect.mode=opパラメータもまた、txモードのgg.handler.kafkaconnect.mode=txよりもパフォーマンスを向上させることができます。

Kafkaプロデューサのいくつかのプロパティがパフォーマンスに影響する可能性があります。重要な影響を与えるパラメータは次のとおりです。

  • linger.ms

  • batch.size

  • acks

  • buffer.memory

  • compression.type

これらのパラメータのデフォルト値から開始し、パフォーマンスのベース・ラインを取得するためのパフォーマンス・テストを実行することをお薦めします。これらの各パラメータのKafkaドキュメントを参照して、その役割を理解し、パラメータを調整し、各パラメータのパフォーマンス効果を確認するための追加のパフォーマンス・テストを実行します。

8.2.8.2.6 Kafkaインターセプタのサポート

Kafkaプロデューサ・クライアント・フレームワークでは、プロデューサ・インターセプタの使用がサポートされています。プロデューサ・インターセプタは、単にKafkaプロデューサ・クライアントからのユーザー・イグジットで、これにより、インターセプタ・オブジェクトがインスタンス化され、Kafkaメッセージ送信コールとKafkaメッセージ送信確認コールの通知を受信します。

インターセプタの一般的な使用例は、監視です。Kafkaプロデューサ・インターセプタは、インタフェース org.apache.kafka.clients.producer.ProducerInterceptorに準拠している必要があります。Kafka Connectハンドラは、プロデューサ・インターセプタの使用をサポートしています。

ハンドラでインターセプタを使用する場合の要件は次のとおりです。

  • Kafkaプロデューサ構成プロパティ"interceptor.classes"は、起動するインターセプタのクラス名で構成する必要があります。
  • インターセプタを呼び出すためには、jarファイルとすべての依存関係jarをJVMで使用できる必要があります。したがって、インターセプタを含むjarファイルとすべての依存関係jarを、ハンドラ構成ファイルのgg.classpathに追加する必要があります。

    詳細は、Kafkaのドキュメントを参照してください。

8.2.8.2.7 Kafkaパーティションの選択

Kafkaトピックは1つ以上のパーティションで構成されます。Kafkaクライアントは異なるトピック/パーティションの組合せへのメッセージ送信をパラレル化するため、複数のパーティションへの分散はKafkaの収集パフォーマンスを向上させるのに適した方法です。パーティションの選択は、Kafkaクライアントでの次の計算によって制御されます。

(Kafkaメッセージ・キーのハッシュ)係数(パーティションの数) = 選択したパーティション番号

Kafkaメッセージ・キーは、次の構成値によって選択されます。

gg.handler.{your handler name}.keyMappingTemplate=

このパラメータが静的キーを生成する値に設定されている場合、すべてのメッセージは同じパーティションに送信されます。静的キーの例を次に示します。

gg.handler.{your handler name}.keyMappingTemplate=StaticValue

このパラメータが、頻繁に変更されないキーを生成する値に設定されている場合、パーティション選択の変更頻度は低くなります。次の例では、表名をメッセージ・キーとして使用します。特定のソース表に対するすべての操作は同じキーを持つため、同じパーティションにルーティングされます。

gg.handler.{your handler name}.keyMappingTemplate=${tableName}
nullのKafkaメッセージ・キーはラウンドロビン・ベースでパーティションに分散されます。これを行うには、次のように設定します。
gg.handler.{your handler name}.keyMappingTemplate=${null}

マッピング・キーの構成に推奨される設定は次のとおりです。

gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}

これにより、連結および区切られた主キー値であるKafkaメッセージ・キーが生成されます。

各行の操作には一意の主キーが必要であるため、各行に一意のKafkaメッセージ・キーが生成されます。別の重要な考慮事項として、異なるパーティションに送信されたKafkaメッセージは、送信された元の順序でKafkaコンシューマに配信される保証はありません。これはKafkaの仕様の一部です。順序はパーティション内でのみ維持されます。Kafkaメッセージ・キーとして主キーを使用すると、同じ主キーを持つ同じ行に対する操作で同じKafkaメッセージ・キーが生成されるため、同じKafkaパーティションに送信されます。このようにして、同じ行に対する操作の順序が維持されます。

DEBUGログ・レベルでは、Kafkaメッセージの座標(トピック、パーティションおよびオフセット)は、正常に送信されたメッセージの.logファイルに記録されます。

8.2.8.2.8 Kafka Connectハンドラのトラブルシューティング

8.2.8.2.8.1 Kafka Connectハンドラ用のJavaクラスパス

Javaクラスパスは、非常によくある問題の1つです。クラスパスに問題があることを示すのは、Oracle GoldenGate Javaのlog4jログ・ファイルにあるClassNotFoundExceptionで、gg.classpath変数に入力ミスがある場合は、クラスパスを解決する際にエラーが発生します。

Kafkaクライアント・ライブラリは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)製品には付属していません。「Kafka Connectハンドラの設定および実行」で説明されているように、JavaとKafkaクライアント・ライブラリを適切に解決するために、適切なバージョンのKafkaクライアント・ライブラリを取得し、Javaアダプタのプロパティ・ファイルでgg.classpathプロパティを適切に構成する作業は、ユーザーが行う必要があります。

8.2.8.2.8.2 無効なKafkaバージョン

Kafka Connectは、Kafka 0.9.0.0バージョンで導入されました。Kafka Connectハンドラは、0.8.2.2以前のバージョンのKafkaでは動作しません。Kafka 0.8.2.2でKafka Connectを使用しようとすると、実行時にClassNotFoundExceptionエラーが発生します。

8.2.8.2.8.3 Kafkaプロデューサ・プロパティ・ファイルが見つからない

通常、次の例外メッセージが表示されます。

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

Kafkaプロデューサの構成ファイル名のgg.handler.kafkahandler.KafkaProducerConfigFile構成プロパティが正しく設定されていることを確認してください。

gg.classpath変数にKafkaプロデューサのプロパティ・ファイルのパスが含まれていること、プロパティ・ファイルのパスの末尾にワイルドカード(*)が含まれていないことを確認してください。

8.2.8.2.8.4 Kafkaの接続の問題

通常、次の例外メッセージが表示されます。

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] 

WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1  java.net.ConnectException: Connection refused

これが発生すると、接続の再試行時間が経過し、Kafka接続ハンドラ・プロセスが異常終了します。Kafkaブローカが稼働しており、Kafkaプロデューサのプロパティ・ファイルで指定されているホストとポートが正しいことを確認してください。

Kafkaブローカをホストしているマシンでネットワーク・シェル・コマンド(netstat -lなど)を使用すると、Kafkaが所定のポートでリスニングしていることを確認できます。

8.2.8.2.9 Kafka Connectハンドラ・クライアント依存性

Apache Kafka Connectデータベースに接続するためのKafka Connectハンドラの依存性とはどのようなものでしょう。

Maven Central RepositoryのKafka Connectデータベースのアーティファクトは次のとおりです。

Maven groupId: org.apache.kafka

Maven artifactId: kafka-clients & connect-json

Maven version: 各セクションでリストされるKafka Connectのバージョン番号

8.2.8.2.9.1 Kafka 2.8.0
connect-api-2.8.0.jar
connect-json-2.8.0.jar
jackson-annotations-2.10.5.jar
jackson-core-2.10.5.jar
jackson-databind-2.10.5.1.jar
jackson-datatype-jdk8-2.10.5.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.8.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.8.1.jar
zstd-jni-1.4.9-1.jar
8.2.8.2.9.2 Kafka 2.7.1
connect-api-2.7.1.jar
connect-json-2.7.1.jar
jackson-annotations-2.10.5.jar
jackson-core-2.10.5.jar
jackson-databind-2.10.5.1.jar
jackson-datatype-jdk8-2.10.5.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.7.1.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.7.jar
zstd-jni-1.4.5-6.jar
8.2.8.2.9.3 Kafka 2.6.0
connect-api-2.6.0.jar
connect-json-2.6.0.jar
jackson-annotations-2.10.2.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.2.jar
jackson-datatype-jdk8-2.10.2.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.6.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.4-7.jar
8.2.8.2.9.4 Kafka 2.5.1
connect-api-2.5.1.jar
connect-json-2.5.1.jar
jackson-annotations-2.10.2.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.2.jar
jackson-datatype-jdk8-2.10.2.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.5.1.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.4-7.jar
8.2.8.2.9.5 Kafka 2.4.1
kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.3-1.jarr
8.2.8.2.9.6 Kafka 2.3.1
connect-api-2.3.1.jar
connect-json-2.3.1.jar
jackson-annotations-2.10.0.jar
jackson-core-2.10.0.jar
jackson-databind-2.10.0.jar
jackson-datatype-jdk8-2.10.0.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.3.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.26.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.0-1.jar
8.2.8.2.9.7 Kafka 2.2.1
kafka-clients-2.2.1.jar
lz4-java-1.5.0.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.2.jar
zstd-jni-1.3.8-1.jar
8.2.8.2.9.8 Kafka 2.1.1
audience-annotations-0.5.0.jar
connect-api-2.1.1.jar
connect-json-2.1.1.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.8.jar
jackson-databind-2.9.8.jar
javax.ws.rs-api-2.1.1.jar
jopt-simple-5.0.4.jar
kafka_2.12-2.1.1.jar
kafka-clients-2.1.1.jar
lz4-java-1.5.0.jar
metrics-core-2.2.0.jar
scala-library-2.12.7.jar
scala-logging_2.12-3.9.0.jar
scala-reflect-2.12.7.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.2.jar
zkclient-0.11.jar
zookeeper-3.4.13.jar
zstd-jni-1.3.7-1.jar
8.2.8.2.9.9 Kafka 2.0.1
audience-annotations-0.5.0.jar
connect-api-2.0.1.jar
connect-json-2.0.1.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.7.jar
jackson-databind-2.9.7.jar
javax.ws.rs-api-2.1.jar
jopt-simple-5.0.4.jar
kafka_2.12-2.0.1.jar
kafka-clients-2.0.1.jar
lz4-java-1.4.1.jar
metrics-core-2.2.0.jar
scala-library-2.12.6.jar
scala-logging_2.12-3.9.0.jar
scala-reflect-2.12.6.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.1.jar
zkclient-0.10.jar
zookeeper-3.4.13.jar
8.2.8.2.9.10 Kafka 1.1.1
kafka-clients-1.1.1.jar
lz4-java-1.4.1.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.1.jar
8.2.8.2.9.11 Kafka 1.0.2
kafka-clients-1.0.2.jar
lz4-java-1.4.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.4.jar
8.2.8.2.9.12 Kafka  0.11.0.0
connect-api-0.11.0.0.jar
connect-json-0.11.0.0.jar
jackson-annotations-2.8.0.jar
jackson-core-2.8.5.jar
jackson-databind-2.8.5.jar
jopt-simple-5.0.3.jar
kafka_2.11-0.11.0.0.jar
kafka-clients-0.11.0.0.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-2.2.0.jar
scala-library-2.11.11.jar
scala-parser-combinators_2.11-1.0.4.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.2.6.jar
zkclient-0.10.jar
zookeeper-3.4.10.jar
8.2.8.2.9.13 Kafka 0.10.2.0
connect-api-0.10.2.0.jar 
connect-json-0.10.2.0.jar 
jackson-annotations-2.8.0.jar 
jackson-core-2.8.5.jar 
jackson-databind-2.8.5.jar 
jopt-simple-5.0.3.jar 
kafka_2.11-0.10.2.0.jar 
kafka-clients-0.10.2.0.jar 
log4j-1.2.17.jar 
lz4-1.3.0.jar 
metrics-core-2.2.0.jar 
scala-library-2.11.8.jar 
scala-parser-combinators_2.11-1.0.4.jar 
slf4j-api-1.7.21.jar 
slf4j-log4j12-1.7.21.jar 
snappy-java-1.1.2.6.jar 
zkclient-0.10.jar 
zookeeper-3.4.9.jar
8.2.8.2.9.14 Kafka 0.10.2.0
connect-api-0.10.1.1.jar 
connect-json-0.10.1.1.jar 
jackson-annotations-2.6.0.jar 
jackson-core-2.6.3.jar 
jackson-databind-2.6.3.jar 
jline-0.9.94.jar 
jopt-simple-4.9.jar 
kafka_2.11-0.10.1.1.jar 
kafka-clients-0.10.1.1.jar 
log4j-1.2.17.jar 
lz4-1.3.0.jar 
metrics-core-2.2.0.jar 
netty-3.7.0.Final.jar 
scala-library-2.11.8.jar 
scala-parser-combinators_2.11-1.0.4.jar 
slf4j-api-1.7.21.jar 
slf4j-log4j12-1.7.21.jar 
snappy-java-1.1.2.6.jar 
zkclient-0.9.jar 
zookeeper-3.4.8.jar
8.2.8.2.9.15 Kafka 0.10.0.0
activation-1.1.jar 
connect-api-0.10.0.0.jar 
connect-json-0.10.0.0.jar 
jackson-annotations-2.6.0.jar 
jackson-core-2.6.3.jar 
jackson-databind-2.6.3.jar 
jline-0.9.94.jar 
jopt-simple-4.9.jar 
junit-3.8.1.jar 
kafka_2.11-0.10.0.0.jar 
kafka-clients-0.10.0.0.jar 
log4j-1.2.15.jar 
lz4-1.3.0.jar 
mail-1.4.jar 
metrics-core-2.2.0.jar 
netty-3.7.0.Final.jar 
scala-library-2.11.8.jar 
scala-parser-combinators_2.11-1.0.4.jar 
slf4j-api-1.7.21.jar 
slf4j-log4j12-1.7.21.jar 
snappy-java-1.1.2.4.jar 
zkclient-0.8.jar 
zookeeper-3.4.6.jar
8.2.8.2.9.16 Kafka 0.9.0.1
activation-1.1.jar 
connect-api-0.9.0.1.jar 
connect-json-0.9.0.1.jar 
jackson-annotations-2.5.0.jar 
jackson-core-2.5.4.jar 
jackson-databind-2.5.4.jar 
jline-0.9.94.jar 
jopt-simple-3.2.jar 
junit-3.8.1.jar 
kafka_2.11-0.9.0.1.jar 
kafka-clients-0.9.0.1.jar 
log4j-1.2.15.jar 
lz4-1.2.0.jar 
mail-1.4.jar 
metrics-core-2.2.0.jar 
netty-3.7.0.Final.jar 
scala-library-2.11.7.jar 
scala-parser-combinators_2.11-1.0.4.jar 
scala-xml_2.11-1.0.4.jar 
slf4j-api-1.7.6.jar 
slf4j-log4j12-1.7.6.jar 
snappy-java-1.1.1.7.jar 
zkclient-0.7.jar 
zookeeper-3.4.6.jar
8.2.8.2.9.16.1 Confluentの依存性

ノート:

次にリストするConfluentの依存性は、Kafka Connect Avroコンバータおよび関連付けられたAvroスキーマ・レジストリ・クライアントに関するものです。Confluent Kafka Connectと統合されている場合、前の項にリストされている対応するKafkaバージョンのKafka Connect依存性に加えて、次の依存性が必要です。

8.2.8.2.9.16.1.1 Confluent 6.2.0
avro-1.10.1.jar
commons-compress-1.20.jar
common-utils-6.2.0.jar
connect-api-6.2.0-ccs.jar
connect-json-6.2.0-ccs.jar
jackson-annotations-2.10.5.jar
jackson-core-2.11.3.jar
jackson-databind-2.10.5.1.jar
jackson-datatype-jdk8-2.10.5.jar
jakarta.annotation-api-1.3.5.jar
jakarta.inject-2.6.1.jar
jakarta.ws.rs-api-2.1.6.jar
javax.ws.rs-api-2.1.1.jar
jersey-common-2.34.jar
kafka-avro-serializer-6.2.0.jar
kafka-clients-6.2.0-ccs.jar
kafka-connect-avro-converter-6.2.0.jar
kafka-connect-avro-data-6.2.0.jar
kafka-schema-registry-client-6.2.0.jar
kafka-schema-serializer-6.2.0.jar
lz4-java-1.7.1.jar
osgi-resource-locator-1.0.3.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.8.1.jar
swagger-annotations-1.6.2.jar
zstd-jni-1.4.9-1.jar
8.2.8.2.9.16.1.2 Confluent 6.1.0
avro-1.9.2.jar
commons-compress-1.19.jar
common-utils-6.1.0.jar
connect-api-6.1.0-ccs.jar
connect-json-6.1.0-ccs.jar
jackson-annotations-2.10.5.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.5.1.jar
jackson-datatype-jdk8-2.10.5.jar
jakarta.annotation-api-1.3.5.jar
jakarta.inject-2.6.1.jar
jakarta.ws.rs-api-2.1.6.jar
javax.ws.rs-api-2.1.1.jar
jersey-common-2.31.jar
kafka-avro-serializer-6.1.0.jar
kafka-clients-6.1.0-ccs.jar
kafka-connect-avro-converter-6.1.0.jar
kafka-connect-avro-data-6.1.0.jar
kafka-schema-registry-client-6.1.0.jar
kafka-schema-serializer-6.1.0.jar
lz4-java-1.7.1.jar
osgi-resource-locator-1.0.3.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.7.jar
swagger-annotations-1.6.2.jar
zstd-jni-1.4.5-6.jar
8.2.8.2.9.16.1.3 Confluent 6.0.0
avro-1.9.2.jar
commons-compress-1.19.jar
common-utils-6.0.0.jar
connect-api-6.0.0-ccs.jar
connect-json-6.0.0-ccs.jar
jackson-annotations-2.10.5.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.5.jar
jackson-datatype-jdk8-2.10.5.jar
jakarta.annotation-api-1.3.5.jar
jakarta.inject-2.6.1.jar
jakarta.ws.rs-api-2.1.6.jar
javax.ws.rs-api-2.1.1.jar
jersey-common-2.30.jar
kafka-avro-serializer-6.0.0.jar
kafka-clients-6.0.0-ccs.jar
kafka-connect-avro-converter-6.0.0.jar
kafka-connect-avro-data-6.0.0.jar
kafka-schema-registry-client-6.0.0.jar
kafka-schema-serializer-6.0.0.jar
lz4-java-1.7.1.jar
osgi-resource-locator-1.0.3.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
swagger-annotations-1.6.2.jar
zstd-jni-1.4.4-7.jar
8.2.8.2.9.16.1.4 Confluent 5.5.0
avro-1.9.2.jar
classmate-1.3.4.jar
common-config-5.5.0.jar
commons-compress-1.19.jar
commons-lang3-3.2.1.jar
common-utils-5.5.0.jar
connect-api-5.5.0-ccs.jar
connect-json-5.5.0-ccs.jar
guava-18.0.jar
hibernate-validator-6.0.17.Final.jar
jackson-annotations-2.10.2.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.2.jar
jackson-dataformat-yaml-2.4.5.jar
jackson-datatype-jdk8-2.10.2.jar
jackson-datatype-joda-2.4.5.jar
jakarta.annotation-api-1.3.5.jar
jakarta.el-3.0.2.jar
jakarta.el-api-3.0.3.jar
jakarta.inject-2.6.1.jar
jakarta.validation-api-2.0.2.jar
jakarta.ws.rs-api-2.1.6.jar
javax.ws.rs-api-2.1.1.jar
jboss-logging-3.3.2.Final.jar
jersey-bean-validation-2.30.jar
jersey-client-2.30.jar
jersey-common-2.30.jar
jersey-media-jaxb-2.30.jar
jersey-server-2.30.jar
joda-time-2.2.jar
kafka-avro-serializer-5.5.0.jar
kafka-clients-5.5.0-ccs.jar
kafka-connect-avro-converter-5.5.0.jar
kafka-connect-avro-data-5.5.0.jar
kafka-schema-registry-client-5.5.0.jar
kafka-schema-serializer-5.5.0.jar
lz4-java-1.7.1.jar
osgi-resource-locator-1.0.3.jar
slf4j-api-1.7.30.jar
snakeyaml-1.12.jar
snappy-java-1.1.7.3.jar
swagger-annotations-1.5.22.jar
swagger-core-1.5.3.jar
swagger-models-1.5.3.jar
zstd-jni-1.4.4-7.jar
8.2.8.2.9.16.1.5 Confluent 5.4.0
avro-1.9.1.jar
common-config-5.4.0.jar
commons-compress-1.19.jar
commons-lang3-3.2.1.jar
common-utils-5.4.0.jar
connect-api-5.4.0-ccs.jar
connect-json-5.4.0-ccs.jar
guava-18.0.jar
jackson-annotations-2.9.10.jar
jackson-core-2.9.9.jar
jackson-databind-2.9.10.1.jar
jackson-dataformat-yaml-2.4.5.jar
jackson-datatype-jdk8-2.9.10.jar
jackson-datatype-joda-2.4.5.jar
javax.ws.rs-api-2.1.1.jar
joda-time-2.2.jar
kafka-avro-serializer-5.4.0.jar
kafka-clients-5.4.0-ccs.jar
kafka-connect-avro-converter-5.4.0.jar
kafka-schema-registry-client-5.4.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
snakeyaml-1.12.jar
snappy-java-1.1.7.3.jar
swagger-annotations-1.5.22.jar
swagger-core-1.5.3.jar
swagger-models-1.5.3.jar
zstd-jni-1.4.3-1.jar
8.2.8.2.9.16.1.6 Confluent 5.3.0
audience-annotations-0.5.0.jar
avro-1.8.1.jar
common-config-5.3.0.jar
commons-compress-1.8.1.jar
common-utils-5.3.0.jar
connect-api-5.3.0-ccs.jar
connect-json-5.3.0-ccs.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.9.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.9.jar
jackson-datatype-jdk8-2.9.9.jar
jackson-mapper-asl-1.9.13.jar
javax.ws.rs-api-2.1.1.jar
jline-0.9.94.jar
jsr305-3.0.2.jar
kafka-avro-serializer-5.3.0.jar
kafka-clients-5.3.0-ccs.jar
kafka-connect-avro-converter-5.3.0.jar
kafka-schema-registry-client-5.3.0.jar
lz4-java-1.6.0.jar
netty-3.10.6.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.26.jar
snappy-java-1.1.1.3.jar
spotbugs-annotations-3.1.9.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.14.jar
zstd-jni-1.4.0-1.jar
8.2.8.2.9.16.1.7 Confluent 5.2.1
audience-annotations-0.5.0.jar
avro-1.8.1.jar
common-config-5.2.1.jar
commons-compress-1.8.1.jar
common-utils-5.2.1.jar
connect-api-2.2.0-cp2.jar
connect-json-2.2.0-cp2.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.8.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.8.jar
jackson-datatype-jdk8-2.9.8.jar
jackson-mapper-asl-1.9.13.jar
javax.ws.rs-api-2.1.1.jar
jline-0.9.94.jar
kafka-avro-serializer-5.2.1.jar
kafka-clients-2.2.0-cp2.jar
kafka-connect-avro-converter-5.2.1.jar
kafka-schema-registry-client-5.2.1.jar
lz4-java-1.5.0.jar
netty-3.10.6.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.1.3.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.13.jar
zstd-jni-1.3.8-1.jar
8.2.8.2.9.16.1.8 Confluent 5.1.3
audience-annotations-0.5.0.jar
avro-1.8.1.jar
common-config-5.1.3.jar
commons-compress-1.8.1.jar
common-utils-5.1.3.jar
connect-api-2.1.1-cp3.jar
connect-json-2.1.1-cp3.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.8.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.8.jar
jackson-mapper-asl-1.9.13.jar
javax.ws.rs-api-2.1.1.jar
jline-0.9.94.jar
kafka-avro-serializer-5.1.3.jar
kafka-clients-2.1.1-cp3.jar
kafka-connect-avro-converter-5.1.3.jar
kafka-schema-registry-client-5.1.3.jar
lz4-java-1.5.0.jar
netty-3.10.6.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.1.3.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.13.jar
zstd-jni-1.3.7-1.jar
8.2.8.2.9.16.1.9 Confluent 5.0.3
audience-annotations-0.5.0.jar
avro-1.8.1.jar
common-config-5.0.3.jar
commons-compress-1.8.1.jar
common-utils-5.0.3.jar
connect-api-2.0.1-cp4.jar
connect-json-2.0.1-cp4.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.7.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.7.jar
jackson-mapper-asl-1.9.13.jar
javax.ws.rs-api-2.1.jar
jline-0.9.94.jar
kafka-avro-serializer-5.0.3.jar
kafka-clients-2.0.1-cp4.jar
kafka-connect-avro-converter-5.0.3.jar
kafka-schema-registry-client-5.0.3.jar
lz4-java-1.4.1.jar
netty-3.10.6.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.1.3.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.13.jar
8.2.8.2.9.16.1.10 Confluent 4.1.2
avro-1.8.1.jar
common-config-4.1.2.jar
commons-compress-1.8.1.jar
common-utils-4.1.2.jar
connect-api-1.1.1-cp1.jar
connect-json-1.1.1-cp1.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.6.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.6.jar
jackson-mapper-asl-1.9.13.jar
jline-0.9.94.jar
kafka-avro-serializer-4.1.2.jar
kafka-clients-1.1.1-cp1.jar
kafka-connect-avro-converter-4.1.2.jar
kafka-schema-registry-client-4.1.2.jar
log4j-1.2.16.jar
lz4-java-1.4.1.jar
netty-3.10.5.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.6.1.jar
snappy-java-1.1.1.3.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.10.jar

8.2.8.3 Apache Kafka RESTプロキシ

Kafka REST Proxyハンドラは、Confluentによって配布されたKafka REST Proxyにメッセージをストリーミングします。

この章では、Kafka REST Proxyハンドラの使用方法について説明します。

8.2.8.3.1 概要

Kafka REST Proxyハンドラを使用すると、KafkaメッセージをHTTPSプロトコルを使用してストリーミングできます。この機能のユースケースとしては、KafkaメッセージをOracle GoldenGate On Premisesインストールからクラウドに、またはクラウドからクラウドにストリーミングします。

Kafka REST Proxyは、KafkaクラスタへのRESTfulインタフェースを提供します。これにより、次のことを簡単に実行できます。

  • メッセージの生成および消費。

  • クラスタの状態の表示。

  • ネイティブKafkaプロトコルまたはクライアントを使用しない管理アクションの実行。

Kafka REST Proxyは、Confluent Open SourceおよびConfluent Enterprise配布の一部です。これは、Apache Kafka配布では使用できません。REST Proxy経由でKafkaにアクセスするには、Confluent Kafkaバージョンをインストールする必要があります。https://docs.confluent.io/current/kafka-rest/docs/index.htmlを参照してください。

8.2.8.3.2 Kafka REST Proxyハンドラ・サービスの設定および開始

ZIP、tarアーカイブ、Docker、パッケージなど、複数のインストール形式から選択できます。

8.2.8.3.2.1 Kafka REST Proxyハンドラの使用

Kafka REST Proxyは、Apache、ClouderaまたはHortonworksに含まれていないため、Confluent Open SourceまたはConfluent Enterprise配布をダウンロードしてインストールする必要があります。ZIP、TARアーカイブ、Docker、パッケージなど、複数のインストール形式から選択できます。

Kafka REST Proxyは、ZooKeeper、KafkaおよびSchema Registryに依存しています

8.2.8.3.2.2 依存性のダウンロード

次の場所から、Javaクライアント依存性のJersey RESTful Webサービスを確認およびダウンロードできます。

https://eclipse-ee4j.github.io/jersey/

Jersey Apache Connectorの依存性は、Mavenリポジトリhttps://mvnrepository.com/artifact/org.glassfish.jersey.connectors/jersey-apache-connectorから確認およびダウンロードできます

8.2.8.3.2.3 クラスパス構成

Kafka RESTプロキシ・ハンドラは、Jerseyプロジェクトjersey-clientバージョン2.27およびjersey-connectors-apacheバージョン2.27を使用してKafkaに接続します。Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)には必要な依存性が含まれていないため、それらを取得する必要があります。「依存性のダウンロード」を参照してください。

Javaアダプタ・プロパティ・ファイルでgg.classpathプロパティを使用して、これらの依存性を構成する必要があります。Kafka RESTプロキシ・ハンドラの正しく構成されたクラスパスの例を次に示します。

gg.classpath=dirprm:
{path_to_jersey_client_jars}/jaxrs-ri/lib/*:{path_to_jersey_client_jars}
/jaxrs-ri/api/*
:{path_to_jersey_client_jars}/jaxrs-ri/ext/*:{path_to_jersey_client_jars}
/connector/*
8.2.8.3.2.4 Kafka REST Proxyハンドラの構成

Kafka REST Proxyハンドラの構成可能な値は次のとおりです。Kafka REST Proxyプロパティ・ファイルは、Oracle GoldenGate dirprmディレクトリに保存することをお薦めします。

Kafka REST Proxyハンドラの選択を有効にするには、まずgg.handler.name.type=kafkarestproxyを指定してハンドラ・タイプを構成してから、次に示す他のKafka REST Proxyハンドラ・プロパティを構成する必要があります。

プロパティ 必須/オプション 有効な値 デフォルト 説明

gg.handler.name.type

必須

kafkarestproxy

なし

Kafka REST Proxyハンドラを選択するための構成。

gg.handler.name.topicMappingTemplate

必須

実行時にKafkaトピック名を解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.keyMappingTemplate

必須

実行時にKafkaメッセージ・キーを解決するためのテンプレート文字列の値。

なし

詳細は、「テンプレートを使用したトピック名とメッセージ・キーの解決」を参照してください。

gg.handler.name.postDataUrl

必須

Rest Proxyのリスナー・アドレス。

なし

Kafka REST ProxyのURLに設定します。

gg.handler.name.format

必須

avro | json

なし

RESTプロキシ・ペイロード・データ形式に設定します

gg.handler.name.payloadsize

オプション

ペイロード・サイズ(MB)を表す値。

5MB

HTTPメッセージのペイロードの最大サイズに設定します。

gg.handler.name.apiVersion

オプション

v1 | v2

v2

使用するAPIのバージョンを設定します。

gg.handler.name.mode

オプション

op | tx

op

操作の処理方法を設定します。opモードでは、操作は受信したとおりに処理されます。txモードでは、操作はトランザクションのコミット時にキャッシュされ、処理されます。

gg.handler.name.trustStore

オプション

トラストストアへのパス。

なし

信頼できる認証局(CA)の証明書を保持するトラストストア・ファイルへのパス。これらのCAは、SSL接続中にサーバーによって提示された証明書を検証するために使用されます。「キーストアまたはトラストストアの生成」を参照してください。

gg.handler.name.trustStorePassword

オプション

トラストストアのパスワード。

なし

トラストストア・パスワード。

gg.handler.name.keyStore

オプション

キーストアへのパス。

なし

アイデンティティの検証のために他のパーティ(サーバーまたはクライアント)に提示する秘密鍵およびアイデンティティ証明書が含まれるキーストア・ファイルへのパス。「キーストアまたはトラストストアの生成」を参照してください。

gg.handler.name.keyStorePassword

オプション

キーストアのパスワード。

なし

キーストア・パスワード。

gg.handler.name.proxy

オプション

http://host:port

なし

プロキシURLは次の形式です。http://host:port

gg.handler.name.proxyUserName

オプション

任意の文字列。

なし

プロキシ・ユーザー名

gg.handler.name.proxyPassword

オプション

任意の文字列。

なし

プロキシ・パスワード。

gg.handler.name.readTimeout

オプション

整数値。

なし

サーバーが応答できる時間。

gg.handler.name.connectionTimeout

オプション

整数値。

なし

ホストへの接続の確立を待機する時間。

gg.handler.name.format.metaColumnsTemplate オプション

${alltokens} | ${token} | ${env} | ${sys} | ${javaprop} | ${optype} | ${position} | ${timestamp} | ${catalog} | ${schema} | ${table} | ${objectname} | ${csn} | ${xid} | ${currenttimestamp} | ${opseqno} | ${timestampmicro} | ${currenttimestampmicro} |

${txind}

| ${primarykeycolumns}|${currenttimestampiso8601}${static}${segno} | ${rba}

なし

${alltokens} | ${token} | ${env} | ${sys} | ${javaprop} | ${optype} | ${position} | ${timestamp} | ${catalog} | ${schema} | ${table} | ${objectname} | ${csn} | ${xid} | ${currenttimestamp} | ${opseqno} | ${timestampmicro} | ${currenttimestampmicro} |

${txind}

| ${primarykeycolumns}|${currenttimestampiso8601}${static}${segno} | ${rba}

これは、テンプレートを表す1つ以上のテンプレート化された値で構成されているカンマ区切りの文字列です。メタ列のキーワードの詳細は、メタ列のキーワードを参照してください。次に、メタ列のリストを生成する例を示します。
${optype}, ${token.ROWID}, ${sys.username}, ${currenttimestamp}

詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。

8.2.8.3.2.5 サンプル構成の確認

Javaアダプタ・プロパティ・ファイルからのKafka RESTプロキシ・ハンドラの構成例を次に示します。

gg.handlerlist=kafkarestproxy

#The handler properties
gg.handler.kafkarestproxy.type=kafkarestproxy
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys}
gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083
gg.handler.kafkarestproxy.apiVersion=v1
gg.handler.kafkarestproxy.format=json
gg.handler.kafkarestproxy.payloadsize=1
gg.handler.kafkarestproxy.mode=tx

#Server auth properties
#gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks
#gg.handler.kafkarestproxy.trustStorePassword=test1234
#Client auth properites
#gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks
#gg.handler.kafkarestproxy.keyStorePassword=test1234

#Proxy properties
#gg.handler.kafkarestproxy.proxy=http://proxyurl:80
#gg.handler.kafkarestproxy.proxyUserName=username
#gg.handler.kafkarestproxy.proxyPassword=password

#The MetaColumnTemplate formatter properties
gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}
8.2.8.3.2.6 セキュリティ

次の間でセキュリティを使用できます。

  • Kafka RESTプロキシ・クライアントとKafka RESTプロキシ・サーバー。Oracle GoldenGate RESTプロキシ・ハンドラはKafka RESTプロキシ・クライアントです。

  • Kafka RESTプロキシ・サーバーとKafkaブローカ。Kafka RESTプロキシ・サーバーのセキュリティ・ドキュメントおよび構成を十分に確認することをお薦めします。https://docs.confluent.io/current/kafka-rest/docs/index.htmlを参照してください。

REST Proxyは、クライアントとKafka REST Proxyハンドラの間の通信を保護するためにSSLをサポートしています。SSLを構成するには:

  1. スクリプトを使用してキーストアを生成します。「キーストアまたはトラストストアの生成」を参照してください。

  2. 次のプロパティを使用して、kafka-rest.propertiesファイルのKafka REST Proxyサーバー構成を更新します。

    listeners=https://hostname:8083
    confluent.rest.auth.propagate.method=SSL
    
    Configuration Options for HTTPS
    ssl.client.auth=true
    ssl.keystore.location={keystore_file_path}/server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    ssl.truststore.location={keystore_file_path}/server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
  3. サーバーを再起動します。

相互認証を無効にするには、ssl.client.auth=プロパティをtrueからfalseに更新します。

8.2.8.3.2.7 キーストアまたはトラストストアの生成

トラストストアの生成

このスクリプトを実行して、ca-certca-keyおよびtruststore.jksトラストストア・ファイルを生成します。

#!/bin/bash
PASSWORD=password
CLIENT_PASSWORD=password
VALIDITY=365

次に、この例のようにCAを生成します。

openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -passin pass:$PASSWORD
      -passout pass:$PASSWORD -subj "/C=US/ST=CA/L=San Jose/O=Company/OU=Org/CN=FQDN"
      -nodes

最後に、keytoolを使用して、CAをサーバーのトラストストアに追加します。

keytool -keystore truststore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORD
      -keypass $PASSWORD

キーストアの生成

このスクリプトを実行し、引数としてfqdnを渡して、ca-cert.srlcert-filecert-signed、およびkeystore.jksキーストア・ファイルを生成します。

#!/bin/bash
PASSWORD=password
VALIDITY=365

if [ $# -lt 1 ];
then
echo "`basename $0` host fqdn|user_name|app_name"
exit 1
fi

CNAME=$1
ALIAS=`echo $CNAME|cut -f1 -d"."`

次に、この例のようにkeytoolを使用してキーストアを生成します。

keytool -noprompt ¿keystore keystore.jks -alias $ALIAS -keyalg RSA -validity $VALIDITY
      -genkey -dname "CN=$CNAME,OU=BDP,O=Company,L=San Jose,S=CA,C=US" -storepass $PASSWORD
      -keypass $PASSWORD

次に、キーストア内のすべての証明書をCAで署名します。

keytool -keystore keystore.jks -alias $ALIAS -certreq -file cert-file -storepass
      $PASSWORDopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY
      -CAcreateserial -passin pass:$PASSWORD

最後に、CAと署名付き証明書の両方をキーストアにインポートします。

keytool -keystore keystore.jks -alias CARoot -import -file ca-cert -storepass
      $PASSWORDkeytool -keystore keystore.jks -alias $ALIAS -import -file cert-signed -storepass
      $PASSWORD
8.2.8.3.2.8 テンプレートを使用したトピック名とメッセージ・キーの解決

Kafka REST Proxyハンドラでは、テンプレートの構成値を使用して実行時にトピック名およびメッセージ・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成プロパティを使用します。

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

テンプレートのモード

Kafka REST Proxyハンドラは、操作(挿入、更新、削除)ごとに1つのメッセージを送信するように構成できます。または、トランザクション・レベルで操作をメッセージにグループ化するように構成できます。

テンプレート・キーワードの詳細は、テンプレート・キーワードを参照してください。

テンプレートの例

テンプレートの構成値の例と解決される値を次に示します。

テンプレートの例 解決される値

${groupName}_${fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

8.2.8.3.2.9 Kafka REST Proxyハンドラのフォーマッタ・プロパティ

Kafka REST Proxyハンドラ・フォーマッタの構成可能な値は次のとおりです。

表8-11 Kafka REST Proxyハンドラ・フォーマッタ・プロパティ

プロパティ オプション/オプション 有効な値 デフォルト 説明
gg.handler.name.format.includeOpType

オプション

true | false

true

trueに設定すると、出力メッセージにop_tsというフィールドが作成されます。この値は、ソース・データベースの操作タイプのインジケータ(たとえば、挿入の場合はI、更新の場合はU、削除の場合はD)になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includeOpTimestamp

オプション

true | false

true

trueに設定すると、出力メッセージにop_typeというフィールドが作成されます。この値は、ソース証跡ファイルの操作タイムスタンプ(コミット・タイムスタンプ)になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includeCurrentTimestamp

オプション

true | false

true

trueに設定すると、出力メッセージにcurrent_tsというフィールドが作成されます。この値は、ハンドラが操作を処理するときの現在のタイムスタンプになります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includePosition

オプション

true | false

true

trueに設定すると、出力メッセージにposというフィールドが作成されます。この値は、ソース証跡ファイルからの操作の位置(順序番号+オフセット)になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includePrimaryKeys

オプション

true | false

true

trueに設定すると、出力メッセージにprimary_keysというフィールドが作成されます。この値は、主キー列の列名の配列になります。

出力でこのフィールドを省略する場合は、falseに設定します。

gg.handler.name.format.includeTokens

オプション

true | false

true

trueに設定すると、出力メッセージにマップ・フィールドが含まれます。キーはトークンで、その値は、キーと値がOracle GoldenGateソース証跡ファイルからのトークン・キーと値であるマップです。

このフィールドを抑止する場合は、falseに設定します。

gg.handler.name.format.insertOpKey

オプション

任意の文字列。

I

挿入操作を示すフィールドop_typeの値。

gg.handler.name.format.updateOpKey

オプション

任意の文字列。

U

更新操作を示すフィールドop_typeの値。

gg.handler.name.format.deleteOpKey

オプション

任意の文字列。

D

削除操作を示すフィールドop_typeの値。

gg.handler.name.format.truncateOpKey

オプション

任意の文字列。

T

切捨て操作を示すフィールドop_typeの値。

gg.handler.name.format.treatAllColumnsAsStrings

オプション

true | false

false

trueに設定すると、すべての出力フィールドが文字列として扱われます。

falseに設定すると、ハンドラによって、ソース証跡ファイルの対応するフィールド・タイプがそれに最も適したKafkaのデータ型にマップされます。

gg.handler.name.format.mapLargeNumbersAsStrings

オプション

true | false

false

trueに設定すると、精度を保つため、これらのフィールドが文字列としてマップされます。このプロパティは、Avroフォーマッタに固有であり、他のフォーマッタと組み合せて使用することはできません。

gg.handler.name.format.iso8601Format

オプション

true | false

false

trueに設定すると、現在の日付がISO8601形式で出力されます。

gg.handler.name.format.pkUpdateHandling

オプション

abend | update | delete-insert

abend

これは、.(gg.handler.name.format.messageFormatting=rowプロパティの行メッセージをモデリングしている場合にのみ適用されます。これは、更新時に操作前および操作後イメージがメッセージに伝播される操作メッセージをモデリングしている場合は適用されません。

8.2.8.3.3 レコードの消費

Kafka REST Proxyハンドラを使用して、Kafkaトピックからデータを消費する簡単な方法はCurlです。

JSONデータの消費

  1. JSONデータのコンシューマを作成します。

    curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json"  
    
    https://localhost:8082/consumers/my_json_consumer
  2. トピックをサブスクライブします。

    curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json"    --data '{"topics":["topicname"]}' \
    
    https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
  3. レコードを消費します。

    curl –k -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
    
    https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
    

Avroデータの消費

  1. Avroデータのコンシューマを作成します。

    curl -k -X POST  -H "Content-Type: application/vnd.kafka.v2+json" \
     --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
    
    https://localhost:8082/consumers/my_avro_consumer
  2. トピックをサブスクライブします。

    curl –k -X POST -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["topicname"]}' \
    
    https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
  3. レコードを消費します。

    curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
    
    https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records

ノート:

RESTプロキシをホストしているマシンからcurlを使用している場合は、メッセージを使用する前に、http_proxy環境変数の設定を解除します。ローカル・マシンからcurlを使用してKafka RESTプロキシからメッセージを取得する場合は、http_proxy環境変数の設定が必要になります。

8.2.8.3.4 パフォーマンスに関する考慮事項

Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)構成とKafkaプロデューサでは、パフォーマンスに影響を与える複数の構成設定があります。

Oracle GoldenGateのパラメータでパフォーマンスに最も大きな影響を与えるのはReplicatのGROUPTRANSOPSパラメータです。これにより、Replicatは複数のソース・トランザクションを1つのターゲット・トランザクションにグループ化できます。トランザクションのコミット時に、Kafka REST Proxyハンドラは、KafkaプロデューサにデータをPOSTします。

ReplicatのGROUPTRANSOPSを大きい値に設定すると、ReplicatがPOSTを呼び出す頻度が少なくなり、パフォーマンスが向上します。GROUPTRANSOPSのデフォルト値は1000で、値を2500、5000、さらには10000に増やすことでパフォーマンスが向上します。

8.2.8.3.5 Kafka REST Proxyハンドラのメタ列テンプレート・プロパティ

Kafka REST Proxyサーバーを起動する際の問題

Kafka RESTプロキシ・サーバーを起動するスクリプトは、そのCLASSPATHを環境CLASSPATH変数に追加します。これが設定され、環境CLASSPATHにKafka RESTプロキシ・サーバーの正常な実行と競合するJARファイルが含まれていた場合、起動できなくなります。Kafka RESTプロキシ・サーバーを起動する前に、CLASSPATH環境変数の設定を解除することをお薦めします。CLASSPATH""に再設定すると、この問題を解決できます。