9.1.4 Apache Kafka

Kafka用のOracle GoldenGateキャプチャ(Extract)は、Kafkaトピックまたはトピックからメッセージを読み取り、GoldenGate証跡ファイルに書き込まれた論理変更レコードにデータを変換するために使用されます。この項では、Kafka用のOracle GoldenGateキャプチャを使用する方法について説明します。

9.1.4.1 概要

Kafkaはここ数年で市場の牽引力を獲得し、エンタープライズ・メッセージング分野のリーダーになりました。Kafkaは、高可用性、フェイルオーバー、冗長性によるデータの整合性および高パフォーマンスを提供するクラスタベースのメッセージング・システムです。Kafkaは現在、Enterprise Service Busアーキテクチャの実装のための主要なアプリケーションです。Kafka Capture抽出プロセスは、Kafkaからメッセージを読み取り、それらのメッセージをOracle GoldenGate証跡ファイルに書き込まれる論理変更レコードに変換します。生成された証跡ファイルを使用して、証跡ファイル内のデータをOracle GoldenGateのReplicatプロセスでサポートされる様々なRDBMS実装または他の統合に伝播できます。

9.1.4.2 前提条件

9.1.4.2.1 ソース・タイプを検出するための資格証明ストア・エントリの設定

キャプチャのデータベース・タイプは、データベース資格証明useridの接頭辞に基づいています。useridの一般的な形式は次のとおりです: <dbtype>://<db-user>@<comma separated list of server addresses>:<port>

Kafkaキャプチャのuserid値は、接頭辞kafka://を持つ任意の値である必要があります。資格証明ストア・エントリは、「管理サービス」の「DB接続」で設定できます。

alter credentialstore add user kafka:// password somepass alias kafka

ノート:

資格証明の設定中にKafkaにダミーのパスワードを指定できます。

9.1.4.3 Kafka Captureの一般的な用語および機能

9.1.4.3.1 Kafkaストリーム

Kafkaコンシューマとして、1つ以上のトピックから読み取ることができます。さらに、各トピックを1つ以上のパーティションに分割できます。個々のトピック/パーティションの組合せが、Kafkaストリームです。このトピックでは、Kafkaストリームについて幅広く説明します。ここでは用語を明確に定義することが重要です。

5つのKafkaストリームの例を次に示します。
  • トピック: TEST1 パーティション: 0
  • トピック: TEST1 パーティション: 1
  • トピック: TEST2 パーティション: 0
  • トピック: TEST2 パーティション: 1
  • トピック: TEST2 パーティション: 2

9.1.4.3.2 Kafkaメッセージの順序

個々のストリームについてKafkaコンシューマから受信したメッセージは、Kafkaコミット・ログに格納されている順序である必要があります。ただし、Kafkaストリームは相互に独立して移動し、様々なストリームからメッセージを受信する順序は非決定的です。

たとえば、Kafka Captureが次の2つのストリームからのメッセージを消費しています。
  • ストリーム1: トピックTEST1、パーティション0
  • ストリーム2: トピックTEST1、パーティション1
1をトピック|パーティション|オフセット|タイムスタンプ形式の合計5つのメッセージでストリーミングします。
TEST1|0|0|1588888086210
TEST1|0|1|1588888086220
TEST1|0|2|1588888086230
TEST1|0|3|1588888086240
TEST1|0|4|1588888086250
2をトピック|パーティション|オフセット|タイムスタンプ形式の合計5つのメッセージにストリーミングします。
TEST1|1|0|1588888086215
TEST1|1|1|1588888086225
TEST1|1|2|1588888086235
TEST1|1|3|1588888086245
TEST1|1|4|1588888086255
Kafka Consumerは、実行1で次の順序でメッセージを配信した可能性があります。
TEST1|1|0|1588888086215
TEST1|1|1|1588888086225
TEST1|0|0|1588888086210
TEST1|0|1|1588888086220
TEST1|0|2|1588888086230
TEST1|0|3|1588888086240
TEST1|0|4|1588888086250
TEST1|1|2|1588888086235
TEST1|1|3|1588888086245
TEST1|1|4|1588888086255
2次実行では、メッセージが次の順序で配信された可能性があります。
TEST1|0|0|1588888086210
TEST1|0|1|1588888086220
TEST1|1|0|1588888086215
TEST1|1|1|1588888086225
TEST1|0|2|1588888086230
TEST1|0|3|1588888086240
TEST1|0|4|1588888086250
TEST1|1|2|1588888086235
TEST1|1|3|1588888086245
TEST1|1|4|1588888086255

ノート:

2回の実行で、同じKafkaストリームに属するメッセージは、そのストリームで発生する順序で配信されます。ただし、異なるストリームからのメッセージは、非決定的な方法でインタレースされます。

9.1.4.3.3 Kafkaメッセージのタイムスタンプ

各Kafkaメッセージにはタイムスタンプが関連付けられています。Kafkaメッセージのタイムスタンプは、生成された証跡ファイル内のレコードの操作タイムスタンプにマップされます。Kafkaメッセージのタイムスタンプは、抽出が1つのストリーム(単一のトピックおよびパーティション)からのみ読み取っている場合でも、単調に増加することを保証されません。Kafkaには、ストリーム内でもKafkaメッセージのタイムスタンプが単調に増加するという要件はありません。Kafkaプロデューサは、メッセージのタイムスタンプをメッセージに明示的に設定できるAPIを提供しています。これは、KafkaプロデューサがKafkaメッセージのタイムスタンプを任意の値に設定できることを意味します。

複数のトピックまたは複数のパーティションを含むトピック(あるいはその両方)から読み取る場合、Kafkaキャプチャにより生成された証跡ファイルに、単調に増加する操作タイムスタンプがないことはほぼ確実です。Kafkaストリームは相互に独立して移動し、様々なストリームから受信したメッセージの配信順序の保証はありません。KafkaコンシューマがKafkaクラスタから読み取る場合、異なるストリームからのメッセージがランダムな順序でインタレースする可能性があります。

9.1.4.3.4 Kafkaメッセージの座標

Kafka Captureはメッセージ・ギャップ・チェックを実行して、メッセージ・ストリームのコンテキストとメッセージの一貫性を確保します。Kafkaキャプチャでメッセージを消費しているすべてのKafkaストリームについて、Kafkaメッセージ・オフセットの順序にギャップはありません。

メッセージ・オフセットの順序にギャップが見つかった場合、Kafkaキャプチャはエラーを記録し、Kafka Captureの抽出プロセスは異常終了します。

メッセージ・ギャップ・チェックは、パラメータ・ファイル内で次のことを設定すると無効にできます。

SETENV (PERFORMMESSAGEGAPCHECK = "false").

9.1.4.3.5 Extractモードの開始

Extractは、2つの異なるポイントからレプリケーションを開始するように構成できます。UIの「抽出オプション」ステップの「開始」セクションで、Extractの開始位置を選択できます。「今すぐ」をクリックするか、「カスタム時間」を定義することができます。

9.1.4.3.5.1 最早を開始
Kafkaの使用可能な最も古いメッセージからKafka Captureを開始します。
ggsci> ADD EXTRACT kafka, TRANLOG
ggsci> ADD EXTRAIL dirdat/kc, extract kafka
ggsci> START EXTRACT kafka
9.1.4.3.5.2 タイムスタンプを開始
Kafkaの使用可能な最も古いメッセージからKafka Captureを開始します。
ggsci> ADD EXTRACT kafka, TRANLOG BEGIN 2019-03-27 23:05:05.123456
ggsci> ADD EXTRAIL dirdat/kc, extract kafka
ggsci> START EXTRACT kafka
または、現在が特定の時点であるため今すぐ開始します。
ggsci> ADD EXTRACT kafka, TRANLOG BEGIN NOW
ggsci> ADD EXTRAIL dirdat/kc, extract kafka
ggsci> START EXTRACT kafka

ノート:

特定の時点から開始することに注意してください。Kafka Captureは、基準(構成した時間以上)に適合するストリーム内の最初の使用可能なレコードから開始します。Replicatは、後続のメッセージのタイムスタンプに関係なく、最初のメッセージから続行します。前述のとおり、Kafkaメッセージのタイムスタンプが単調に増加する保証や要件はありません。

Extractの変更

タイムスタンプの変更
ggsci> STOP EXTRACT kafka
ggsci> ALTER EXTRACT kafka BEGIN {Timestamp}
ggsci> START EXTRACT kafka 

現在の変更

ggsci> STOP EXTRACT kafka
ggsci> ALTER EXTRACT kafka BEGIN NOW
ggsci> START EXTRACT kafka 

9.1.4.3.6 一般的な構成の概要

9.1.4.3.7 OGGSOURCEパラメータ

Kafka抽出レプリケーションを有効にするには、GLOBALSパラメータ・ファイルを次のように構成する必要があります:
OGGSOURCE KAFKA
JVMCLASSPATH ggjava/ggjava.jar:/kafka/client/path/*:dirprm
JVMBOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO

OGGSOURCE KAFKA: 最初の行は、レプリケーションのソースがKafkaであることを示します。

JVMCLASSPATH ggjava/ggjava.jar:/kafka/client/path/*:dirprm: 2行目は、Java JVMクラスパスを設定します。このJavaクラスパスにより、必要なすべてのOracle GoldenGate for Distributed Applications and Analytics (GG for DAA)およびKafkaクライアント・ライブラリをロードするためのパスを指定します。Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)ライブラリは、このリストの先頭にある必要があります(ggjava.jar)。Kafkaクライアント・ライブラリ、Kafka ConnectフレームワークおよびKafka Connectコンバータは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)インストールには含まれていません。これらのライブラリは個別に取得する必要があります。接続先のKafkaブローカのバージョンと同じバージョンのKafkaクライアントを使用することをお薦めします。依存性ダウンロード・ツールを使用して、依存性ライブラリをダウンロードできます。または、パスをKafkaインストールに設定することもできます。依存関係ダウンロードの詳細は、依存関係ダウンロードを参照してください。

JVMBOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO: 3行目はJVMブート・オプションです。これを使用して、Javaヒープの最大サイズ(-Xmx512m)およびlog4jロギング・パラメータを構成し、.logファイル(-Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO)を生成します。

ノート:

Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)リリース23c以降では、このパラメータは非推奨になります。

9.1.4.3.8 抽出パラメータ・ファイル

構成された抽出プロセスは、.prmファイルを使用して構成されます。パラメータ・ファイルの命名の形式は<extract name>.prmです。たとえば、抽出プロセスkcの抽出パラメータ・ファイルはkc.prmです。
EXTRACT KC
-- alter credentialstore add user kafka:// password <somepass> alias kafka
SOURCEDB USERIDALIAS kafka
JVMOPTIONS CLASSPATH ggjava/ggjava.jar:/kafka/client/path/*
JVMOPTIONS BOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO
TRANLOGOPTIONS GETMETADATAFROMVAM
TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties
EXTTRAIL dirdat/kc
TABLE QASOURCE.TOPIC1;

EXTRACT KC: 最初の行は、抽出プロセスの名前を設定します。

TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties: この行は、Kafkaコンシューマ・プロパティ・ファイルの名前と場所を設定します。Kafkaコンシューマ・プロパティは、Kafkaクラスタへの接続およびセキュリティを構成するKafka固有の構成を含むファイルです。Kafkaコンシューマ・プロパティに関するドキュメントは、Kafkaドキュメントにあります。

EXTTRAIL dirdat/kc: 4行目は、生成される証跡ファイルの場所と接頭辞を設定します。
TABLE QASOURCE.TOPIC1;: 5行目は、extract TABLE文です。1つ以上のTABLE文を指定できます。例のスキーマ名はQASOURCEです。スキーマ名はOGGアーティファクトであり、必須です。任意の有効な文字列に設定できます。スキーマ名にワイルドカードは使用できません。各extactプロセスでは、1つのスキーマ名のみがサポートされます。構成された表名は、Kafkaトピック名にマップされます。表の構成では、ワイルドカードがサポートされています。有効なKafkaトピック名には次の文字を含めることができます。
  • a-z (小文字のaからz)
  • A-Z (大文字のAからZ)
  • 0-9 (0から9の数字)
  • . (ピリオド)
  • _ (アンダースコア)
  • - (ハイフン)
トピック名にピリオド、アンダースコアまたはハイフンが含まれている場合は、引用符で囲んだ表名を構成に含めてください。トピック名は大/小文字が区別されるため、トピックMYTOPIC1およびMyTopic1は異なるKafkaトピックです。
有効なextract table文の例:
TABLE TESTSCHEMA.TEST*;
TABLE TESTSCHEMA.MyTopic1;
TABLE TESTSCHEMA.”My.Topic1”;
不正な構成の例 - 複数のスキーマ名が使用されています。
TABLE QASOURCE.TEST*;
TABLE TESTSCHEMA.MYTOPIC1;
不正な構成の例 - 引用符で囲まれていない特殊文字を含む表。
TABLE QASOURE.My.Topic1;
不正な構成の例 - スキーマ名がワイルドカードです。
TABLE *.*;

オプションの.prm構成。

Kafka Captureは、メッセージ・ギャップ・チェックを実行してメッセージの継続性を保証します。メッセージ・ギャップ・チェック・セットを無効にするには:
SETENV (PERFORMMESSAGEGAPCHECK = "false")

9.1.4.3.9 Kafkaコンシューマ・プロパティ・ファイル

Kafkaコンシューマ・プロパティ・ファイルには、Kafkaクラスタへの接続方法やセキュリティ・パラメータなど、Kafkaコンシューマを構成するためのプロパティが含まれています。

例:
#Kafka Properties
bootstrap.servers=den02box:9092
group.id=mygroupid
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
9.1.4.3.9.1 Kafkaプロデューサ・プロパティの暗号化
Kafkaプロデューサ構成ファイル内の機密プロパティは、Oracle GoldenGate資格証明ストアを使用して暗号化できます。

資格証明ストアの使用方法の詳細は、「Oracle GoldenGate資格証明ストアでの識別子の使用」を参照してください。

たとえば、次のkafkaプロパティがあるとします:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required
username="alice" password="alice"; 
次のように置き換えることができます:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required
username=ORACLEWALLETUSERNAME[alias domain_name]  password=ORACLEWALLETPASSWORD[alias
domain_name];

9.1.4.4 汎用ミューテーション・ビルダー

デフォルト・モードでは、汎用ミューテーション・ビルダーを使用して、Kafkaメッセージを証跡ファイル操作に変換します。Kafkaメッセージは、任意のフォーマットのデータで構成されます。Kafkaメッセージは、区切りテキスト、JSON、Avro、XML、テキストなどにできます。これにより、Kafkaメッセージから論理変更レコードへのデータのマッピングが困難になります。ただし、Kafkaメッセージ・キーおよびペイロード値は、基本的な形式では単なるバイト配列です。汎用のKafkaレプリケーションは、Kafkaメッセージ・キーとKafkaメッセージ値をバイト配列として単純に伝播します。汎用のKafkaレプリケーションは、データを3つのフィールドの操作に変換します。3つのフィールドは次のとおりです。
  • id: これは表の主キーです。文字列として入力されます。値は、トピック名:パーティション番号:オフセットという形式でのKafka内のメッセージの座標です。たとえば、トピックTEST、パーティション1およびオフセット245の値はTEST:1:245になります。

  • key: これは、ソースKafkaメッセージのメッセージ・キー・フィールドです。フィールドはバイナリとして入力されます。フィールドの値は、バイトとして伝播されたソースKafkaメッセージのキーです。

  • payload: これは、ソースKafkaメッセージのメッセージ・ペイロードまたは値です。フィールドはバイナリとして入力されます。フィールドの値は、バイトとして伝播されたソースKafkaメッセージのペイロードです。

汎用ミューテーション・ビルダーの機能
  • すべてのレコードは、挿入操作として伝播されます。
  • 各Kafkaメッセージによって、独自のトランザクションに操作が作成されます。
Logdump 2666 >n
___________________________________________________________________ 
Hdr-Ind    :     E  (x45)     Partition  :     .  (x00)  
UndoFlag   :     .  (x00)     BeforeAfter:     A  (x41)  
RecLength  :   196  (x00c4)   IO Time    : 2021/07/22 14:57:25.085.436   
IOType     :   170  (xaa)     OrigNode   :     2  (x02) 
TransInd   :     .  (x03)     FormatType :     R  (x52) 
SyskeyLen  :     0  (x00)     Incomplete :     .  (x00) 
DDR/TDR index:   (001, 001)     AuditPos   : 0 
Continued  :     N  (x00)     RecCount   :     1  (x01) 

2021/07/22 14:57:25.085.436 Metadata             Len 196 RBA 1335 
Table Name:  QASOURCE.TOPIC1 
*
 1)Name          2)Data Type        3)External Length  4)Fetch Offset      5)Scale         6)Level
 7)Null          8)Bump if Odd      9)Internal Length 10)Binary Length    11)Table Length 12)Most Sig DT
13)Least Sig DT 14)High Precision  15)Low Precision   16)Elementary Item  17)Occurs       18)Key Column
19)Sub DataType 20)Native DataType 21)Character Set   22)Character Length 23)LOB Type     24)Partial Type
25)Remarks
*
TDR version: 11
Definition for table QASOURCE.TOPIC1
Record Length: 20016
Columns: 3
id        64   8000        0  0  0 0 0   8000   8000      0 0 0 0 0 1    0 1   0   12       -1      0 0 0  
key       64  16000     8005  0  0 1 0   8000   8000      0 0 0 0 0 1    0 0   4   -3       -1      0 0 0  
payload   64   8000    16010  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
End of definition

9.1.4.5 Kafka Connectミューテーション・ビルダー

Kafka Connectミューテーション・ビルダーは、Kafka Connectメッセージを論理変更レコードに解析し、Oracle GoldenGateの証跡ファイルに書き込みます。

9.1.4.5.1 Kafka Connectミューテーション・ビルダーの機能および制限

  • すべてのレコードは、挿入操作として伝播されます。
  • 各Kafkaメッセージによって、独自のトランザクションに操作が作成されます。
  • Kafkaメッセージ・キーは、Kafka Connectプリミティブ型または論理型である必要があります。
  • Kafkaメッセージ値は、プリミティブ型/論理型、またはプリミティブ型、論理型およびコンテナ型のみを含むレコードのいずれかである必要があります。ネストされたレコードは現在サポートされていないため、レコードに別のレコードを含めることはできません。
  • Kafka Connect配列データ型は、バイナリ・フィールドにマップされます。バイナリ・フィールドの内容は、シリアライズされたJSON配列に変換されたソース配列になります。
  • Kafka Connectマップ・データ型は、バイナリ・フィールドにマップされます。バイナリ・フィールドの内容は、シリアライズされたJSONに変換されたソース・マップになります。
  • ソースKafkaメッセージはKafka Connectメッセージである必要があります。
  • Kafka Connect Protobufメッセージは現在サポートされていません。(現在のKafka Capture機能は、Kafkaメッセージ・キーのプリミティブ型または論理型のみをサポートします。Kafka Connect Protobuf Converterは、スタンド・オンリー・プリミティブ型または論理型をサポートしていません。)
  • 各ソース・トピックには、同じスキーマに準拠するメッセージが含まれている必要があります。異なるKafka Connectスキーマに準拠する同じKafkaトピック内のメッセージのインタレースは現在サポートされていません。
  • スキーマ変更は現在サポートされていません。

9.1.4.5.2 主キー

主キー・フィールドは、出力でgg_idという名前の列として作成されます。このフィールドの値は、:文字で区切られた、連結されたトピック名、パーティションおよびオフセットです。たとえば、TOPIC1:0:1001です。

9.1.4.5.3 Kafkaメッセージ・キー

メッセージ・キーは、コールされた名前付きのgg_keyにマップされます。

9.1.4.5.4 Kafka Connectでサポートされているタイプ

サポートされているプリミティブ型
  • 文字列
  • 8ビットの整数
  • 16ビットの整数
  • 32ビットの整数
  • 64ビットの整数
  • ブール
  • 32ビット浮動小数点数
  • 64ビット浮動小数点数
  • バイト(バイナリ)
サポートされている論理型
  • 小数
  • タイムスタンプ
  • 日付
  • 時刻

サポートされているコンテナ型

  • 配列 - プリミティブ型または論理型の配列のみがサポートされています。データは、ソース配列の内容を含むJSON配列ドキュメントを値として持つバイナリ・フィールドとしてマップされます。
  • リスト - プリミティブ型または論理型のリストのみがサポートされています。データは、ソース・リストの内容を含むJSONドキュメントを値として持つバイナリ・フィールドとしてマップされます。

9.1.4.5.5 Kafka Connectミューテーション・ビルダーを有効にする方法

Kafka Connectエミュテーション・ビルダーは、Kafkaプロデューサ・プロパティ・ファイル内のKafka Connectキーおよび値コンバータの構成によって有効化されます。

Kafka Connect JSON Converterの場合
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Kafka Connect Avro Converterの場合

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

Kafka Capture機能は、Kafkaプロデューサ・プロパティ・ファイルを読み取ります。Kafka Connectコンバータが構成されている場合、Kafka Connectエミュテーション・ビルダーが起動されます。

logdumpを使用した証跡ファイルからのサンプル・メタデータ

2021/08/03 09:06:05.243.881 Metadata             Len 1951 RBA 1335 
Table Name: TEST.KC 
*
 1)Name          2)Data Type        3)External Length  4)Fetch Offset      5)Scale         6)Level
 7)Null          8)Bump if Odd      9)Internal Length 10)Binary Length    11)Table Length 12)Most Sig DT
13)Least Sig DT 14)High Precision  15)Low Precision   16)Elementary Item  17)Occurs       18)Key Column
19)Sub DataType 20)Native DataType 21)Character Set   22)Character Length 23)LOB Type     24)Partial Type
25)Remarks
*
TDR version: 11
Definition for table TEST.KC
Record Length: 36422
Columns: 30
gg_id                64   8000        0  0  0 0 0   8000   8000      0 0 0 0 0 1    0 1   0   12       -1      0 0 0  
gg_key               64   4000     8005  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   0   -1       -1      0 1 0  
string_required      64   4000    12010  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   0   -1       -1      0 1 0  
string_optional      64   4000    16015  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   0   -1       -1      0 1 0  
byte_required       134     23    20020  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
byte_optional       134     23    20031  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
short_required      134     23    20042  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
short_optional      134     23    20053  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
integer_required    134     23    20064  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
integer_optional    134     23    20075  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
long_required       134     23    20086  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0   -5       -1      0 0 0  
long_optional       134     23    20097  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0   -5       -1      0 0 0  
boolean_required      0      2    20108  0  0 1 0      1      1      0 0 0 0 0 1    0 0   4   -2       -1      0 0 0  
boolean_optional      0      2    20112  0  0 1 0      1      1      0 0 0 0 0 1    0 0   4   -2       -1      0 0 0  
float_required      141     50    20116  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    6       -1      0 0 0  
float_optional      141     50    20127  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    6       -1      0 0 0  
double_required     141     50    20138  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    8       -1      0 0 0  
double_optional     141     50    20149  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    8       -1      0 0 0  
bytes_required       64   8000    20160  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
bytes_optional       64   8000    24165  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
decimal_required     64     50    28170  0  0 1 0     50     50      0 0 0 0 0 1    0 0   0   12       -1      0 0 0  
decimal_optional     64     50    28225  0  0 1 0     50     50      0 0 0 0 0 1    0 0   0   12       -1      0 0 0  
timestamp_required  192     29    28280  0  0 1 0     29     29     29 0 6 0 0 1    0 0   0   11       -1      0 0 0  
timestamp_optional  192     29    28312  0  0 1 0     29     29     29 0 6 0 0 1    0 0   0   11       -1      0 0 0  
date_required       192     10    28344  0  0 1 0     10     10     10 0 2 0 0 1    0 0   0    9       -1      0 0 0  
date_optional       192     10    28357  0  0 1 0     10     10     10 0 2 0 0 1    0 0   0    9       -1      0 0 0  
time_required       192     18    28370  0  0 1 0     18     18     18 3 6 0 0 1    0 0   0   10       -1      0 0 0  
time_optional       192     18    28391  0  0 1 0     18     18     18 3 6 0 0 1    0 0   0   10       -1      0 0 0  
array_optional       64   8000    28412  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
map_optional         64   8000    32417  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
End of definition

9.1.4.6 構成ファイルの例

9.1.4.6.1 kc.prmファイルの例

EXTRACT KC
OGGSOURCE KAFKA
JVMOOPTIONS CLASSPATH ggjava/ggjava.jar:/path/to/kafka/libs/*
TRANLOGOPTIONS GETMETADATAFROMVAM
--Uncomment the following line to disable Kafka message gap checking.
--SETENV (PERFORMMESSAGEGAPCHECK = "false")
TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties
EXTTRAIL dirdat/kc
TABLE TEST.KC;

9.1.4.6.2 Kafkaコンシューマ・プロパティ・ファイルの例

#Kafka Properties
bootstrap.servers=localhost:9092
group.id=someuniquevalue
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

#JSON Converter Settings
#Uncomment to use the Kafka Connect Mutation Builder with JSON Kafka Connect Messages
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter

#Avro Converter Settings
#Uncomment to use the Kafka Connect Mutation Builder with Avro Kafka Connect Messages
#key.converter=io.confluent.connect.avro.AvroConverter
#value.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter.schema.registry.url=http://localhost:8081