8.11 Oracle GoldenGate for Distributed Applications and AnalyticsによるOCIストリーミングへのリアルタイム・メッセージ取込み
概要
このクイックスタートでは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)を使用してリアルタイムでOCIストリーミングにメッセージを取り込む方法を手順を追って説明します。
Oracle Cloud Infrastructure (OCI)ストリーミングでは、スケーラブルで耐久性に優れた完全管理型のメッセージング・ソリューションが提供されて、連続する大量のデータ・ストリームを取り込んでリアルタイムで消費および処理できます。OCIストリーミングでは、Kafka APIがサポートされています。
GG for DAAでは、KafkaハンドラでOCIストリーミングに接続します。GG for DAAにより、証跡ファイルからソース操作が読み取られ、それらがフォーマットされ、OCIストリーミングのストリームにマップされ、配信されます。
8.11.1 前提条件
このクイックスタートを正常に完了するには、次のものが必要です:
- OCIストリーミングのストリーム・プール
- OCI認証トークン
GG_HOME/opt/AdapterExamples/trail/
にあります。
8.11.2 依存性ファイルのインストール
GG for DAAでは、OCIストリーミングによって提供されているJava SDKが使用されます。それらのSDKは、GG for DAAに付属の依存性ダウンローダ・ユーティリティを使用してダウンロードできます。依存性ダウンローダは、Mavenおよび他のリポジトリから依存性jarファイルをダウンロードするシェル・スクリプトのセットです。
- GG for DAA VMで、依存性ダウンローダ・ユーティリティに移動します。それは
GG_HOME/opt/DependencyDownloader/
にあります。kafka.sh
を見つけてください。 - 必要なバージョンを指定して
kafka.sh
を実行します。バージョンおよび報告されている脆弱性は、Maven Centralで確認できます。このドキュメントでは、2.7.0を使用します。図8-75 kafka.shの実行
- ディレクトリは
GG_HOME/opt/DependencyDownloader/dependencies
に作成されます。これらのディレクトリを書き留めます。次に例を示します:/u01/app/ogg/opt/DependencyDownloader/dependencies/kafka_2.7.0
8.11.3 Kafkaプロデューサのプロパティの作成
サンプル構成ファイル
bootstrap.servers=cell-1.streaming.us-phoenix-1.oci.oraclecloud.com:9092 security.protocol=SASL_SSL sasl.mechanism=PLAIN value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="paasdevgg/oracleidentitycloudservice/user.name@oracle.com/ocid1.streampool.oc1.phx.amaaaaaa3p5c3vqa4hfyl7uv465pay4audmoajughhxlsgj7afc2an5u3xaq" password="YOUR-AUTH-TOKEN";
8.11.4 GG for DAAでのReplicatの作成
GG for DAAでReplicatを作成するには:
- Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) UIの「管理サービス」タブで、「+」記号をクリックしてReplicatを追加します。
図8-76 「管理サービス」タブをクリック
- 「クラシックReplicat」のReplicatのタイプを選択し、「次へ」をクリックします。クラシックと調整済という2つの異なるReplicatタイプがあります。クラシックReplicatは単一のスレッド・プロセスですが、調整済Replicatは、トランザクションをパラレルに適用するマルチスレッド・プロセスです。
図8-77 Replicatの追加
- Replicat情報を入力し、「次」をクリックします:
- Replicatトレイル: 必要な証跡ファイルの名前。サンプル証跡の場合は、
tr
を指定します。 - サブディレクトリ: サンプル証跡を使用する場合は、
GG_HOME/opt/AdapterExamples/trail/
と入力します。 - ターゲット: Kafka
図8-78 Replicatオプション
- Replicatトレイル: 必要な証跡ファイルの名前。サンプル証跡の場合は、
- 「管理対象オプション」はそのままにして、「次」をクリックします。
図8-79 管理対象オプション
- 「パラメータ・ファイル」の詳細を入力し、「次へ」をクリックします。「パラメータ・ファイル」では、ソースからターゲットへのマッピングを指定するか、ワイルドカード選択でそのままにしておくことができます。
図8-80 パラメータ・ファイル
- 「プロパティ・ファイル」で、TODOとマークされているプロパティを更新し、「作成および実行」をクリックします。
#Kafka Handler Template gg.handlerlist=kafkahandler gg.handler.kafkahandler.type=kafka #TODO: Set the name of the Kafka producer properties file. gg.handler.kafkahandler.kafkaProducerConfigFile=/path_to/producer.properties #TODO: Set the template for resolving the topic name. gg.handler.kafkahandler.topicMappingTemplate=<target_stream_name> gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} gg.handler.kafkahandler.mode=op gg.handler.kafkahandler.format=json gg.handler.kafkahandler.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]} #TODO: Set the location of the Kafka client libraries. gg.classpath=path_to/dependencies/kafka_2.7.0/* jvm.bootoptions=-Xmx512m -Xms32m
- Replicatが正常に起動されると、それが実行状態になります。「Replicat」/「統計」に移動してレプリケーション統計を確認できます。
図8-81 Replicatの統計
- OCIコンソールに移動して統計を確認します。
図8-82 OCIストリーミング・コンソール
OCIストリーミング・レプリケーションの詳細は、「OCIストリーミング」を参照してください。
ノート:
- ターゲットのkafkaトピックが存在しない場合は、OCIストリーミングのKafka接続設定で自動トピック作成が選択されていると、GG for DAAによってそれが自動作成されます。トピック名を動的に割り当てるには、「テンプレートのキーワード」を参照してください。
- OCIストリーミング・レプリケーションのパフォーマンスを高めるには、このブログを参照してください。