8.11 Oracle GoldenGate for Distributed Applications and AnalyticsによるApache Kafkaへのリアルタイム・メッセージ取込み
概要
このクイックスタートでは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)を使用してリアルタイムでApache Kafkaにメッセージを取り込む方法を手順を追って説明します。
Apache Kafkaは、リアルタイムのデータ・ストリームを処理するために設計されたオープンソース・プラットフォームです。これにより、アプリケーションで継続的なデータ・フローをパブリッシュおよびサブスクライブできるようになります。高パフォーマンスのデータ・パイプラインとストリーミング・アプリケーションの構築に最適です。
GG for DAAでは、KafkaハンドラとKafka ConnectハンドラでApache Kafkaに接続します。GG for DAAにより、証跡ファイルからソース操作が読み取られ、それらがフォーマットされ、Kafkaトピックにマップされ、配信されます。
- 前提条件
- 依存性ファイルのインストール
- Kafkaプロデューサのプロパティ・ファイルの作成
- Oracle GoldenGate for Distributed Applications and AnalyticsでのReplicatの作成
親トピック: クイックスタート
8.11.1 前提条件
このクイックスタートを正常に完了するには、次のものが必要です:
- 稼働しているApache Kafkaノード。
このクイックスタートでは、GG for DAAに付属のサンプル証跡ファイル(tr
という名前)を使用します。サンプル証跡ファイルを使用して続行する場合、それはGG for DAAインスタンスのGG_HOME/opt/AdapterExamples/trail/
にあります。
8.11.2 依存性ファイルのインストール
GG for DAAでは、スノーフレークによって提供されているJava SDKが使用されます。それらのSDKは、GG for DAAに付属の依存性ダウンローダ・ユーティリティを使用してダウンロードできます。依存性ダウンローダは、Mavenおよび他のリポジトリから依存性jarファイルをダウンロードするシェル・スクリプトのセットです。
- GG for DAA VMで、依存性ダウンローダ・ユーティリティに移動します。それは
GG_HOME/opt/DependencyDownloader/
にあります。kafka.sh
を見つけてください。 - 必要なバージョンを指定して
kafka.sh
を実行します。バージョンおよび報告されている脆弱性は、Maven Centralで確認できます。このドキュメントでは、このクイックスタートの公開時の最新バージョンである3.7.0を使用しています。図8-75 必要なバージョンを指定したkafka.shの実行
- 新しいディレクトリが、
GG_HOME/opt/DependencyDownloader/dependencies
に、<kafka_version>
という名前で作成されます。たとえば、/u01/app/ogg/opt/DependencyDownloader/dependencies/kafka_3.7.0
です。
8.11.3 Kafkaプロデューサのプロパティ・ファイルの作成
GG for DAAインスタンスで、producer.properties
ファイルを作成し構成します。
bootstrap.servers=localhost:9092 acks = 1 compression.type = gzip reconnect.backoff.ms = 1000 value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
8.11.4 Oracle GoldenGate for Distributed Applications and AnalyticsでのReplicatの作成
Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)でReplicatを作成するには:
- GG for DAA UIの「管理サービス」タブで、「+」記号をクリックしてReplicatを追加します。
図8-76 「管理サービス」タブの「+」をクリックします。
- 「クラシックReplicat」のReplicatのタイプを選択し、「次へ」をクリックします。クラシックと調整済という2つの異なるReplicatタイプがあります。クラシックReplicatは単一のスレッド・プロセスですが、調整済Replicatは、トランザクションをパラレルに適用するマルチスレッド・プロセスです。
図8-77 Replicatの追加
- 基本情報を入力し、「次へ」をクリックします:
- ターゲット: Kafka
図8-78 Replicatオプション
- 「管理対象オプション」はそのままにして、「次」をクリックします。
図8-79 管理対象オプション
- 「パラメータ・ファイル」の詳細を入力し、「次へ」をクリックします。「パラメータ・ファイル」では、ソースからターゲットへのマッピングを指定するか、ワイルドカード選択のままにしておくことができます。
図8-80 パラメータ・ファイル
- 「プロパティ・ファイル」で、TODOとマークされているプロパティを更新し、「作成および実行」をクリックします。
# Properties file for Replicat KFK #Kafka Handler Template gg.handlerlist=kafkahandler gg.handler.kafkahandler.type=kafka #TODO: Set the name of the Kafka producer properties file. gg.handler.kafkahandler.kafkaProducerConfigFile=/path_to/producer.properties #TODO: Set the template for resolving the topic name. gg.handler.kafkahandler.topicMappingTemplate=<target_topic_name> gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} gg.handler.kafkahandler.mode=op gg.handler.kafkahandler.format=json gg.handler.kafkahandler.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]} #TODO: Set the location of the Kafka client libraries. gg.classpath= path_to/dependencies/kafka_3.7.0/* jvm.bootoptions=-Xmx512m -Xms32m
GG for DAAでは、テンプレート・キーワードによる動的トピック・マッピングがサポートされています。たとえば、topicMappingTemplateを${tablename}として
割り当てた場合は、GG for DAAにより、ソース表ごとに、ソース表名でトピックが作成され、イベントがこれらのトピックにマップされます。keyMappingTemplate=${primaryKeys}
を使用することをお薦めします。この場合は、GG for DAAにより、pk
が同一のソース操作が、同じパーティションに送信されます。これにより、Apache Kafkaへの配信中にソース操作の順序を維持できるようになります。 - Replicatが正常に起動すると、実行状態になります。「アクション」/「詳細」/「統計」に移動して、レプリケーション統計を確認できます。
図8-81 Replicatの統計
- Kafkaトピックに移動してメッセージを確認できます。詳細は、「Apache Kafka」を参照してください。
ノート:
- ターゲットのkafkaトピックが存在しない場合は、Kafkaクラスタで自動トピック作成が有効になっていると、GG for DAAによってそれが自動的に作成されます。テンプレート・キーワードを使用してトピック名を動的に割り当てることができます。
- Apache Kafkaレプリケーションのパフォーマンスを高めるには、このブログを参照してください。