8.11 Oracle GoldenGate for Distributed Applications and AnalyticsによるApache Kafkaへのリアルタイム・メッセージ取込み

概要

このクイックスタートでは、Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)を使用してリアルタイムでApache Kafkaにメッセージを取り込む方法を手順を追って説明します。

Apache Kafkaは、リアルタイムのデータ・ストリームを処理するために設計されたオープンソース・プラットフォームです。これにより、アプリケーションで継続的なデータ・フローをパブリッシュおよびサブスクライブできるようになります。高パフォーマンスのデータ・パイプラインとストリーミング・アプリケーションの構築に最適です。

GG for DAAでは、KafkaハンドラKafka ConnectハンドラでApache Kafkaに接続します。GG for DAAにより、証跡ファイルからソース操作が読み取られ、それらがフォーマットされ、Kafkaトピックにマップされ、配信されます。

8.11.1 前提条件

このクイックスタートを正常に完了するには、次のものが必要です:

  • 稼働しているApache Kafkaノード。

このクイックスタートでは、GG for DAAに付属のサンプル証跡ファイル(trという名前)を使用します。サンプル証跡ファイルを使用して続行する場合、それはGG for DAAインスタンスのGG_HOME/opt/AdapterExamples/trail/にあります。

8.11.2 依存性ファイルのインストール

GG for DAAでは、スノーフレークによって提供されているJava SDKが使用されます。それらのSDKは、GG for DAAに付属の依存性ダウンローダ・ユーティリティを使用してダウンロードできます。依存性ダウンローダは、Mavenおよび他のリポジトリから依存性jarファイルをダウンロードするシェル・スクリプトのセットです。

  1. GG for DAA VMで、依存性ダウンローダ・ユーティリティに移動します。それはGG_HOME/opt/DependencyDownloader/にあります。kafka.shを見つけてください。
  2. 必要なバージョンを指定してkafka.shを実行します。バージョンおよび報告されている脆弱性は、Maven Centralで確認できます。このドキュメントでは、このクイックスタートの公開時の最新バージョンである3.7.0を使用しています。

    図8-75 必要なバージョンを指定したkafka.shの実行

    必要なバージョンを指定してkafka.shを実行します。
  3. 新しいディレクトリが、GG_HOME/opt/DependencyDownloader/dependenciesに、<kafka_version>という名前で作成されます。たとえば、 /u01/app/ogg/opt/DependencyDownloader/dependencies/kafka_3.7.0です。

8.11.3 Kafkaプロデューサのプロパティ・ファイルの作成

GG for DAAインスタンスで、producer.propertiesファイルを作成し構成します。

たとえば:
bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
 
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer

8.11.4 Oracle GoldenGate for Distributed Applications and AnalyticsでのReplicatの作成

Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)でReplicatを作成するには:

  1. GG for DAA UIの「管理サービス」タブで、「+」記号をクリックしてReplicatを追加します。

    図8-76 「管理サービス」タブの「+」をクリックします。

    「管理サービス」タブの「+」をクリックします。
  2. 「クラシックReplicat」のReplicatのタイプを選択し、「次へ」をクリックします。クラシックと調整済という2つの異なるReplicatタイプがあります。クラシックReplicatは単一のスレッド・プロセスですが、調整済Replicatは、トランザクションをパラレルに適用するマルチスレッド・プロセスです。

    図8-77 Replicatの追加

    Replicatの追加
  3. 基本情報を入力し、「次へ」をクリックします:
    1. ターゲット: Kafka

    図8-78 Replicatオプション

    Replicatオプション
  4. 「管理対象オプション」はそのままにして、「次」をクリックします。

    図8-79 管理対象オプション

    管理対象オプション
  5. 「パラメータ・ファイル」の詳細を入力し、「次へ」をクリックします。「パラメータ・ファイル」では、ソースからターゲットへのマッピングを指定するか、ワイルドカード選択のままにしておくことができます。

    図8-80 パラメータ・ファイル

    パラメータ・ファイル
  6. 「プロパティ・ファイル」で、TODOとマークされているプロパティを更新し、「作成および実行」をクリックします。
    # Properties file for Replicat KFK
    #Kafka Handler Template
    gg.handlerlist=kafkahandler
    gg.handler.kafkahandler.type=kafka
    #TODO: Set the name of the Kafka producer properties file.
    gg.handler.kafkahandler.kafkaProducerConfigFile=/path_to/producer.properties
    #TODO: Set the template for resolving the topic name.
    gg.handler.kafkahandler.topicMappingTemplate=<target_topic_name>
    gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
    gg.handler.kafkahandler.mode=op
    gg.handler.kafkahandler.format=json
    gg.handler.kafkahandler.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}
    #TODO: Set the location of the Kafka client libraries.
    gg.classpath= path_to/dependencies/kafka_3.7.0/*
    jvm.bootoptions=-Xmx512m -Xms32m
    
    GG for DAAでは、テンプレート・キーワードによる動的トピック・マッピングがサポートされています。たとえば、topicMappingTemplateを${tablename}として割り当てた場合は、GG for DAAにより、ソース表ごとに、ソース表名でトピックが作成され、イベントがこれらのトピックにマップされます。keyMappingTemplate=${primaryKeys}を使用することをお薦めします。この場合は、GG for DAAにより、pkが同一のソース操作が、同じパーティションに送信されます。これにより、Apache Kafkaへの配信中にソース操作の順序を維持できるようになります。
  7. Replicatが正常に起動すると、実行状態になります。「アクション」/「詳細」/「統計」に移動して、レプリケーション統計を確認できます。

    図8-81 Replicatの統計

    Replicatの統計
  8. Kafkaトピックに移動してメッセージを確認できます。詳細は、「Apache Kafka」を参照してください。

ノート:

  • ターゲットのkafkaトピックが存在しない場合は、Kafkaクラスタで自動トピック作成が有効になっていると、GG for DAAによってそれが自動的に作成されます。テンプレート・キーワードを使用してトピック名を動的に割り当てることができます。
  • Apache Kafkaレプリケーションのパフォーマンスを高めるには、このブログを参照してください。