9.1.4 Apache Kafka
Kafka用のOracle GoldenGateキャプチャ(Extract)は、Kafkaトピックまたはトピックからメッセージを読み取り、GoldenGate証跡ファイルに書き込まれた論理変更レコードにデータを変換するために使用されます。この項では、Kafka用のOracle GoldenGateキャプチャを使用する方法について説明します。
9.1.4.1 概要
Kafkaはここ数年で市場の牽引力を獲得し、エンタープライズ・メッセージング分野のリーダーになりました。Kafkaは、高可用性、フェイルオーバー、冗長性によるデータの整合性および高パフォーマンスを提供するクラスタベースのメッセージング・システムです。Kafkaは現在、Enterprise Service Busアーキテクチャの実装のための主要なアプリケーションです。Kafka Capture抽出プロセスは、Kafkaからメッセージを読み取り、それらのメッセージをOracle GoldenGate証跡ファイルに書き込まれる論理変更レコードに変換します。生成された証跡ファイルを使用して、証跡ファイル内のデータをOracle GoldenGateのReplicatプロセスでサポートされる様々なRDBMS実装または他の統合に伝播できます。
親トピック: Apache Kafka
9.1.4.2 前提条件
9.1.4.2.1 ソース・タイプを検出するための資格証明ストア・エントリの設定
userid
の接頭辞に基づいています。userid
の一般的な形式は次のとおりです: <dbtype>://<db-user>@<comma separated list of server addresses>:<port>
Kafkaキャプチャのuserid
値は、接頭辞kafka://
を持つ任意の値である必要があります。資格証明ストア・エントリは、「管理サービス」の「DB接続」で設定できます。
alter credentialstore add user kafka:// password somepass alias kafka
ノート:
資格証明の設定中にKafkaにダミーのパスワードを指定できます。親トピック: 前提条件
9.1.4.3 Kafka Captureの一般的な用語および機能
- Kafkaストリーム
- Kafkaメッセージの順序
- Kafkaメッセージのタイムスタンプ
- Kafkaメッセージの座標
- Extractモードの開始
- 一般的な構成の概要
- OGGSOURCEパラメータ
- 抽出パラメータ・ファイル
- Kafkaコンシューマ・プロパティ・ファイル
親トピック: Apache Kafka
9.1.4.3.1 Kafkaストリーム
Kafkaコンシューマとして、1つ以上のトピックから読み取ることができます。さらに、各トピックを1つ以上のパーティションに分割できます。個々のトピック/パーティションの組合せが、Kafkaストリームです。このトピックでは、Kafkaストリームについて幅広く説明します。ここでは用語を明確に定義することが重要です。
- トピック: TEST1 パーティション: 0
- トピック: TEST1 パーティション: 1
- トピック: TEST2 パーティション: 0
- トピック: TEST2 パーティション: 1
- トピック: TEST2 パーティション: 2
親トピック: Kafka Captureの一般的な用語および機能
9.1.4.3.2 Kafkaメッセージの順序
個々のストリームについてKafkaコンシューマから受信したメッセージは、Kafkaコミット・ログに格納されている順序である必要があります。ただし、Kafkaストリームは相互に独立して移動し、様々なストリームからメッセージを受信する順序は非決定的です。
- ストリーム1: トピックTEST1、パーティション0
- ストリーム2: トピックTEST1、パーティション1
TEST1|0|0|1588888086210 TEST1|0|1|1588888086220 TEST1|0|2|1588888086230 TEST1|0|3|1588888086240 TEST1|0|4|1588888086250
TEST1|1|0|1588888086215 TEST1|1|1|1588888086225 TEST1|1|2|1588888086235 TEST1|1|3|1588888086245 TEST1|1|4|1588888086255
TEST1|1|0|1588888086215 TEST1|1|1|1588888086225 TEST1|0|0|1588888086210 TEST1|0|1|1588888086220 TEST1|0|2|1588888086230 TEST1|0|3|1588888086240 TEST1|0|4|1588888086250 TEST1|1|2|1588888086235 TEST1|1|3|1588888086245 TEST1|1|4|1588888086255
TEST1|0|0|1588888086210 TEST1|0|1|1588888086220 TEST1|1|0|1588888086215 TEST1|1|1|1588888086225 TEST1|0|2|1588888086230 TEST1|0|3|1588888086240 TEST1|0|4|1588888086250 TEST1|1|2|1588888086235 TEST1|1|3|1588888086245 TEST1|1|4|1588888086255
ノート:
2回の実行で、同じKafkaストリームに属するメッセージは、そのストリームで発生する順序で配信されます。ただし、異なるストリームからのメッセージは、非決定的な方法でインタレースされます。親トピック: Kafka Captureの一般的な用語および機能
9.1.4.3.3 Kafkaメッセージのタイムスタンプ
各Kafkaメッセージにはタイムスタンプが関連付けられています。Kafkaメッセージのタイムスタンプは、生成された証跡ファイル内のレコードの操作タイムスタンプにマップされます。Kafkaメッセージのタイムスタンプは、抽出が1つのストリーム(単一のトピックおよびパーティション)からのみ読み取っている場合でも、単調に増加することを保証されません。Kafkaには、ストリーム内でもKafkaメッセージのタイムスタンプが単調に増加するという要件はありません。Kafkaプロデューサは、メッセージのタイムスタンプをメッセージに明示的に設定できるAPIを提供しています。これは、KafkaプロデューサがKafkaメッセージのタイムスタンプを任意の値に設定できることを意味します。
複数のトピックまたは複数のパーティションを含むトピック(あるいはその両方)から読み取る場合、Kafkaキャプチャにより生成された証跡ファイルに、単調に増加する操作タイムスタンプがないことはほぼ確実です。Kafkaストリームは相互に独立して移動し、様々なストリームから受信したメッセージの配信順序の保証はありません。KafkaコンシューマがKafkaクラスタから読み取る場合、異なるストリームからのメッセージがランダムな順序でインタレースする可能性があります。
親トピック: Kafka Captureの一般的な用語および機能
9.1.4.3.4 Kafkaメッセージの座標
Kafka Captureはメッセージ・ギャップ・チェックを実行して、メッセージ・ストリームのコンテキストとメッセージの一貫性を確保します。Kafkaキャプチャでメッセージを消費しているすべてのKafkaストリームについて、Kafkaメッセージ・オフセットの順序にギャップはありません。
メッセージ・オフセットの順序にギャップが見つかった場合、Kafkaキャプチャはエラーを記録し、Kafka Captureの抽出プロセスは異常終了します。
メッセージ・ギャップ・チェックは、パラメータ・ファイル内で次のことを設定すると無効にできます。
SETENV (PERFORMMESSAGEGAPCHECK = "false")
.
親トピック: Kafka Captureの一般的な用語および機能
9.1.4.3.5 Extractモードの開始
Extractは、2つの異なるポイントからレプリケーションを開始するように構成できます。UIの「抽出オプション」ステップの「開始」セクションで、Extractの開始位置を選択できます。「今すぐ」をクリックするか、「カスタム時間」を定義することができます。
9.1.4.3.5.1 最早を開始
ggsci> ADD EXTRACT kafka, TRANLOG ggsci> ADD EXTRAIL dirdat/kc, extract kafka ggsci> START EXTRACT kafka
親トピック: 抽出モードの開始
9.1.4.3.5.2 タイムスタンプを開始
ggsci> ADD EXTRACT kafka, TRANLOG BEGIN 2019-03-27 23:05:05.123456 ggsci> ADD EXTRAIL dirdat/kc, extract kafka ggsci> START EXTRACT kafka
ggsci> ADD EXTRACT kafka, TRANLOG BEGIN NOW ggsci> ADD EXTRAIL dirdat/kc, extract kafka ggsci> START EXTRACT kafka
ノート:
特定の時点から開始することに注意してください。Kafka Captureは、基準(構成した時間以上)に適合するストリーム内の最初の使用可能なレコードから開始します。Replicatは、後続のメッセージのタイムスタンプに関係なく、最初のメッセージから続行します。前述のとおり、Kafkaメッセージのタイムスタンプが単調に増加する保証や要件はありません。Extractの変更
ggsci> STOP EXTRACT kafka ggsci> ALTER EXTRACT kafka BEGIN {Timestamp} ggsci> START EXTRACT kafka
現在の変更
ggsci> STOP EXTRACT kafka ggsci> ALTER EXTRACT kafka BEGIN NOW ggsci> START EXTRACT kafka
親トピック: 抽出モードの開始
9.1.4.3.7 OGGSOURCEパラメータ
OGGSOURCE KAFKA JVMCLASSPATH ggjava/ggjava.jar:/kafka/client/path/*:dirprm JVMBOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO
OGGSOURCE KAFKA
: 最初の行は、レプリケーションのソースがKafkaであることを示します。
JVMCLASSPATH ggjava/ggjava.jar:/kafka/client/path/*:dirprm
: 2行目は、Java JVMクラスパスを設定します。このJavaクラスパスにより、必要なすべてのOracle GoldenGate for Distributed Applications and Analytics (GG for DAA)およびKafkaクライアント・ライブラリをロードするためのパスを指定します。Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)ライブラリは、このリストの先頭にある必要があります(ggjava.jar
)。Kafkaクライアント・ライブラリ、Kafka ConnectフレームワークおよびKafka Connectコンバータは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)インストールには含まれていません。これらのライブラリは個別に取得する必要があります。接続先のKafkaブローカのバージョンと同じバージョンのKafkaクライアントを使用することをお薦めします。依存性ダウンロード・ツールを使用して、依存性ライブラリをダウンロードできます。または、パスをKafkaインストールに設定することもできます。依存関係ダウンロードの詳細は、依存関係ダウンロードを参照してください。
JVMBOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO
: 3行目はJVMブート・オプションです。これを使用して、Javaヒープの最大サイズ(-Xmx512m)およびlog4jロギング・パラメータを構成し、.log
ファイル(-Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO
)を生成します。
ノート:
Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)リリース23c以降では、このパラメータは非推奨になります。親トピック: Kafka Captureの一般的な用語および機能
9.1.4.3.8 抽出パラメータ・ファイル
<extract name>.prm
です。たとえば、抽出プロセスkcの抽出パラメータ・ファイルはkc.prm
です。EXTRACT KC -- alter credentialstore add user kafka:// password <somepass> alias kafka SOURCEDB USERIDALIAS kafka JVMOPTIONS CLASSPATH ggjava/ggjava.jar:/kafka/client/path/* JVMOPTIONS BOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO TRANLOGOPTIONS GETMETADATAFROMVAM TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties EXTTRAIL dirdat/kc TABLE QASOURCE.TOPIC1;
EXTRACT KC
: 最初の行は、抽出プロセスの名前を設定します。
TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties
: この行は、Kafkaコンシューマ・プロパティ・ファイルの名前と場所を設定します。Kafkaコンシューマ・プロパティは、Kafkaクラスタへの接続およびセキュリティを構成するKafka固有の構成を含むファイルです。Kafkaコンシューマ・プロパティに関するドキュメントは、Kafkaドキュメントにあります。
EXTTRAIL dirdat/kc
: 4行目は、生成される証跡ファイルの場所と接頭辞を設定します。
TABLE QASOURCE.TOPIC1;
: 5行目は、extract TABLE
文です。1つ以上のTABLE文を指定できます。例のスキーマ名はQASOURCE
です。スキーマ名はOGGアーティファクトであり、必須です。任意の有効な文字列に設定できます。スキーマ名にワイルドカードは使用できません。各extactプロセスでは、1つのスキーマ名のみがサポートされます。構成された表名は、Kafkaトピック名にマップされます。表の構成では、ワイルドカードがサポートされています。有効なKafkaトピック名には次の文字を含めることができます。
- a-z (小文字のaからz)
- A-Z (大文字のAからZ)
- 0-9 (0から9の数字)
- . (ピリオド)
- _ (アンダースコア)
- - (ハイフン)
MYTOPIC1
およびMyTopic1
は異なるKafkaトピックです。
TABLE TESTSCHEMA.TEST*; TABLE TESTSCHEMA.MyTopic1; TABLE TESTSCHEMA.”My.Topic1”;
TABLE QASOURCE.TEST*; TABLE TESTSCHEMA.MYTOPIC1;
TABLE QASOURE.My.Topic1;
TABLE *.*;
オプションの.prm
構成。
SETENV (PERFORMMESSAGEGAPCHECK = "false")
親トピック: Kafka Captureの一般的な用語および機能
9.1.4.3.9 Kafkaコンシューマ・プロパティ・ファイル
Kafkaコンシューマ・プロパティ・ファイルには、Kafkaクラスタへの接続方法やセキュリティ・パラメータなど、Kafkaコンシューマを構成するためのプロパティが含まれています。
#Kafka Properties bootstrap.servers=den02box:9092 group.id=mygroupid key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
9.1.4.3.9.1 Kafkaプロデューサ・プロパティの暗号化
資格証明ストアの使用方法の詳細は、「Oracle GoldenGate資格証明ストアでの識別子の使用」を参照してください。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice" password="alice";
次のように置き換えることができます:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username=ORACLEWALLETUSERNAME[alias domain_name] password=ORACLEWALLETPASSWORD[alias
domain_name];
親トピック: Kafkaコンシューマ・プロパティ・ファイル
9.1.4.4 汎用ミューテーション・ビルダー
-
id: これは表の主キーです。文字列として入力されます。値は、トピック名:パーティション番号:オフセットという形式でのKafka内のメッセージの座標です。たとえば、トピックTEST、パーティション1およびオフセット245の値はTEST:1:245になります。
-
key: これは、ソースKafkaメッセージのメッセージ・キー・フィールドです。フィールドはバイナリとして入力されます。フィールドの値は、バイトとして伝播されたソースKafkaメッセージのキーです。
-
payload: これは、ソースKafkaメッセージのメッセージ・ペイロードまたは値です。フィールドはバイナリとして入力されます。フィールドの値は、バイトとして伝播されたソースKafkaメッセージのペイロードです。
- すべてのレコードは、挿入操作として伝播されます。
- 各Kafkaメッセージによって、独自のトランザクションに操作が作成されます。
Logdump 2666 >n ___________________________________________________________________ Hdr-Ind : E (x45) Partition : . (x00) UndoFlag : . (x00) BeforeAfter: A (x41) RecLength : 196 (x00c4) IO Time : 2021/07/22 14:57:25.085.436 IOType : 170 (xaa) OrigNode : 2 (x02) TransInd : . (x03) FormatType : R (x52) SyskeyLen : 0 (x00) Incomplete : . (x00) DDR/TDR index: (001, 001) AuditPos : 0 Continued : N (x00) RecCount : 1 (x01) 2021/07/22 14:57:25.085.436 Metadata Len 196 RBA 1335 Table Name: QASOURCE.TOPIC1 * 1)Name 2)Data Type 3)External Length 4)Fetch Offset 5)Scale 6)Level 7)Null 8)Bump if Odd 9)Internal Length 10)Binary Length 11)Table Length 12)Most Sig DT 13)Least Sig DT 14)High Precision 15)Low Precision 16)Elementary Item 17)Occurs 18)Key Column 19)Sub DataType 20)Native DataType 21)Character Set 22)Character Length 23)LOB Type 24)Partial Type 25)Remarks * TDR version: 11 Definition for table QASOURCE.TOPIC1 Record Length: 20016 Columns: 3 id 64 8000 0 0 0 0 0 8000 8000 0 0 0 0 0 1 0 1 0 12 -1 0 0 0 key 64 16000 8005 0 0 1 0 8000 8000 0 0 0 0 0 1 0 0 4 -3 -1 0 0 0 payload 64 8000 16010 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 End of definition
親トピック: Apache Kafka
9.1.4.5 Kafka Connectミューテーション・ビルダー
Kafka Connectミューテーション・ビルダーは、Kafka Connectメッセージを論理変更レコードに解析し、Oracle GoldenGateの証跡ファイルに書き込みます。
- Kafka Connectミューテーション・ビルダーの機能および制限
- 主キー
- Kafkaメッセージ・キー
- Kafka Connectでサポートされているタイプ
- Kafka Connectミューテーション・ビルダーを有効にする方法
親トピック: Apache Kafka
9.1.4.5.1 Kafka Connectミューテーション・ビルダーの機能および制限
- すべてのレコードは、挿入操作として伝播されます。
- 各Kafkaメッセージによって、独自のトランザクションに操作が作成されます。
- Kafkaメッセージ・キーは、Kafka Connectプリミティブ型または論理型である必要があります。
- Kafkaメッセージ値は、プリミティブ型/論理型、またはプリミティブ型、論理型およびコンテナ型のみを含むレコードのいずれかである必要があります。ネストされたレコードは現在サポートされていないため、レコードに別のレコードを含めることはできません。
- Kafka Connect配列データ型は、バイナリ・フィールドにマップされます。バイナリ・フィールドの内容は、シリアライズされたJSON配列に変換されたソース配列になります。
- Kafka Connectマップ・データ型は、バイナリ・フィールドにマップされます。バイナリ・フィールドの内容は、シリアライズされたJSONに変換されたソース・マップになります。
- ソースKafkaメッセージはKafka Connectメッセージである必要があります。
- Kafka Connect Protobufメッセージは現在サポートされていません。(現在のKafka Capture機能は、Kafkaメッセージ・キーのプリミティブ型または論理型のみをサポートします。Kafka Connect Protobuf Converterは、スタンド・オンリー・プリミティブ型または論理型をサポートしていません。)
- 各ソース・トピックには、同じスキーマに準拠するメッセージが含まれている必要があります。異なるKafka Connectスキーマに準拠する同じKafkaトピック内のメッセージのインタレースは現在サポートされていません。
- スキーマ変更は現在サポートされていません。
親トピック: Kafka Connectミューテーション・ビルダー
9.1.4.5.2 主キー
主キー・フィールドは、出力でgg_id
という名前の列として作成されます。このフィールドの値は、:
文字で区切られた、連結されたトピック名、パーティションおよびオフセットです。たとえば、TOPIC1:0:1001
です。
親トピック: Kafka Connectミューテーション・ビルダー
9.1.4.5.4 Kafka Connectでサポートされているタイプ
- 文字列
- 8ビットの整数
- 16ビットの整数
- 32ビットの整数
- 64ビットの整数
- ブール
- 32ビット浮動小数点数
- 64ビット浮動小数点数
- バイト(バイナリ)
- 小数
- タイムスタンプ
- 日付
- 時刻
サポートされているコンテナ型
- 配列 - プリミティブ型または論理型の配列のみがサポートされています。データは、ソース配列の内容を含むJSON配列ドキュメントを値として持つバイナリ・フィールドとしてマップされます。
- リスト - プリミティブ型または論理型のリストのみがサポートされています。データは、ソース・リストの内容を含むJSONドキュメントを値として持つバイナリ・フィールドとしてマップされます。
親トピック: Kafka Connectミューテーション・ビルダー
9.1.4.5.5 Kafka Connectミューテーション・ビルダーを有効にする方法
Kafka Connectエミュテーション・ビルダーは、Kafkaプロデューサ・プロパティ・ファイル内のKafka Connectキーおよび値コンバータの構成によって有効化されます。
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
Kafka Connect Avro Converterの場合
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter.schema.registry.url=http://localhost:8081
Kafka Capture機能は、Kafkaプロデューサ・プロパティ・ファイルを読み取ります。Kafka Connectコンバータが構成されている場合、Kafka Connectエミュテーション・ビルダーが起動されます。
logdumpを使用した証跡ファイルからのサンプル・メタデータ
2021/08/03 09:06:05.243.881 Metadata Len 1951 RBA 1335 Table Name: TEST.KC * 1)Name 2)Data Type 3)External Length 4)Fetch Offset 5)Scale 6)Level 7)Null 8)Bump if Odd 9)Internal Length 10)Binary Length 11)Table Length 12)Most Sig DT 13)Least Sig DT 14)High Precision 15)Low Precision 16)Elementary Item 17)Occurs 18)Key Column 19)Sub DataType 20)Native DataType 21)Character Set 22)Character Length 23)LOB Type 24)Partial Type 25)Remarks * TDR version: 11 Definition for table TEST.KC Record Length: 36422 Columns: 30 gg_id 64 8000 0 0 0 0 0 8000 8000 0 0 0 0 0 1 0 1 0 12 -1 0 0 0 gg_key 64 4000 8005 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 0 -1 -1 0 1 0 string_required 64 4000 12010 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 0 -1 -1 0 1 0 string_optional 64 4000 16015 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 0 -1 -1 0 1 0 byte_required 134 23 20020 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 byte_optional 134 23 20031 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 short_required 134 23 20042 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 short_optional 134 23 20053 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 integer_required 134 23 20064 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 integer_optional 134 23 20075 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 long_required 134 23 20086 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 -5 -1 0 0 0 long_optional 134 23 20097 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 -5 -1 0 0 0 boolean_required 0 2 20108 0 0 1 0 1 1 0 0 0 0 0 1 0 0 4 -2 -1 0 0 0 boolean_optional 0 2 20112 0 0 1 0 1 1 0 0 0 0 0 1 0 0 4 -2 -1 0 0 0 float_required 141 50 20116 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 6 -1 0 0 0 float_optional 141 50 20127 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 6 -1 0 0 0 double_required 141 50 20138 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 8 -1 0 0 0 double_optional 141 50 20149 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 8 -1 0 0 0 bytes_required 64 8000 20160 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 bytes_optional 64 8000 24165 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 decimal_required 64 50 28170 0 0 1 0 50 50 0 0 0 0 0 1 0 0 0 12 -1 0 0 0 decimal_optional 64 50 28225 0 0 1 0 50 50 0 0 0 0 0 1 0 0 0 12 -1 0 0 0 timestamp_required 192 29 28280 0 0 1 0 29 29 29 0 6 0 0 1 0 0 0 11 -1 0 0 0 timestamp_optional 192 29 28312 0 0 1 0 29 29 29 0 6 0 0 1 0 0 0 11 -1 0 0 0 date_required 192 10 28344 0 0 1 0 10 10 10 0 2 0 0 1 0 0 0 9 -1 0 0 0 date_optional 192 10 28357 0 0 1 0 10 10 10 0 2 0 0 1 0 0 0 9 -1 0 0 0 time_required 192 18 28370 0 0 1 0 18 18 18 3 6 0 0 1 0 0 0 10 -1 0 0 0 time_optional 192 18 28391 0 0 1 0 18 18 18 3 6 0 0 1 0 0 0 10 -1 0 0 0 array_optional 64 8000 28412 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 map_optional 64 8000 32417 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 End of definition
親トピック: Kafka Connectミューテーション・ビルダー
9.1.4.6 構成ファイルの例
9.1.4.6.1 kc.prmファイルの例
EXTRACT KC OGGSOURCE KAFKA JVMOOPTIONS CLASSPATH ggjava/ggjava.jar:/path/to/kafka/libs/* TRANLOGOPTIONS GETMETADATAFROMVAM --Uncomment the following line to disable Kafka message gap checking. --SETENV (PERFORMMESSAGEGAPCHECK = "false") TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties EXTTRAIL dirdat/kc TABLE TEST.KC;
親トピック: 構成ファイルの例
9.1.4.6.2 Kafkaコンシューマ・プロパティ・ファイルの例
#Kafka Properties bootstrap.servers=localhost:9092 group.id=someuniquevalue key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer #JSON Converter Settings #Uncomment to use the Kafka Connect Mutation Builder with JSON Kafka Connect Messages #key.converter=org.apache.kafka.connect.json.JsonConverter #value.converter=org.apache.kafka.connect.json.JsonConverter #Avro Converter Settings #Uncomment to use the Kafka Connect Mutation Builder with Avro Kafka Connect Messages #key.converter=io.confluent.connect.avro.AvroConverter #value.converter=io.confluent.connect.avro.AvroConverter #key.converter.schema.registry.url=http://localhost:8081 #value.converter.schema.registry.url=http://localhost:8081
親トピック: 構成ファイルの例