Kafkaプラットフォームからのデータの取得

OCI GoldenGateを使用して、Kafkaプラットフォーム・ストリーミング・ソースからメッセージを抽出します。

概要

OCI GoldenGateを使用して、次のストリーミング・ソースからメッセージを取得できます:

  • Apache Kafka
  • OCIストリーミング
  • Confluent Kafka、スキーマ・レジストリの有無にかかわらず
  • Azure Event Hubs
  • Amazon MSK

OCI GoldenGateは、1つ以上のKafkaトピックからメッセージを読み取り、GoldenGateトレイル・ファイルに書き込まれる論理変更レコードにデータを変換します。GoldenGate Replicatプロセスは、生成トレイル・ファイルを使用して、RDBMS実装をサポートするためのデータを伝播できます。

タスク1: コンシューマ・プロパティの構成

  1. 次のデシリアライザまたはコンバータのいずれかを使用して、Kafkaコンシューマ・プロパティ・ファイルを作成します。ソースがスキーマ・レジストリを持つConfluent Kafkaのトピックである場合、Avroコンバータを使用できます。その他のソースの場合は、必要に応じてJSONコンバータまたはデシリアライザを使用します。
    • JSONデシリアライザ用のKafkaコンシューマ・プロパティ:
      key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
      value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
    • JSONコンバータのKafkaコンシューマ・プロパティ:
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
    • AvroコンバータのKafkaコンシューマ・プロパティ:
      key.converter=io.confluent.connect.avro.AvroConverter
      value.converter=io.confluent.connect.avro.AvroConverter
  2. プロパティ・ファイルを保存し、その場所をメモします。

タスク2: OCI GoldenGateリソースの作成

このタスクでは、リソースがまだ存在しない場合に新しいリソースを作成する方法について説明します。使用しているビッグ・データ・デプロイメントが使用可能な最新バージョンにアップグレードされていることを確認します。

  1. ビッグ・データ用のOCI GoldenGateデプロイメントを作成します
  2. 接続を作成します。

    ノート:

    次の接続を作成する場合は、必ず「拡張オプションの表示」をクリックし、コンシューマ・プロパティ・ファイルをアップロードします。
  3. ビッグ・データ・デプロイメントに接続を割り当てます

タスク3: 資格証明の作成

Extractを作成するには、まず資格証明を作成する必要があります。
資格証明を作成するには:
  1. 「デプロイメント」ページで、ビッグ・データ・デプロイメントを選択します。
  2. 「デプロイメントの詳細」ページで、「コンソールの起動」をクリックします。
  3. タスク2のステップ1でデプロイメントを作成したときに指定したユーザー名とパスワードで、ビッグ・データ・デプロイメントにログインします。
  4. ナビゲーション・メニューから、「DB接続」を選択します。
  5. 「構成」ページの「データベース」タブで、「DB接続の追加」(プラス・アイコン)をクリックし、次のようにフォームに入力します:
    1. 「資格証明ドメイン」に、OracleGoldenGateと入力します。
    2. 「資格証明別名」に、kafkaと入力します。
    3. 「ユーザーID」に、kafka://と入力します
    4. 「パスワード」および「パスワードの検証」に、パスワードを入力します。
    5. 「送信」をクリックします。

タスク4: Extractの作成

  1. 管理サービスの「概要」ページで、Extractの追加」(プラス・アイコン)をクリックします。
  2. 次の値を持つExtractの追加:
    1. Extractの「情報」ページで、次のようにフィールドに入力し、「次」をクリックします:
      • Extractタイプ」で、Integrated Extractを選択します。
      • 「プロセス名」に、Extractの名前を入力します。
    2. Extractオプション・ページで、次のようにフィールドに入力し、「次」をクリックします:
      • 「ドメイン」で、ドメインを選択します。
      • 「別名」で、デプロイメントに割り当てられている接続を選択します。
      • 「名前」に、2文字の名前を入力します。
    3. 「管理対象オプション」ページで、フィールドをそのままにして、「次へ」をクリックします。
    4. 「パラメータ・ファイル」ページで、次の手順を実行します。
      • 指定されたブートストラップ・サーバー内のすべてのトピックをリスニングするには、表マッピングをTABLE TESTSCHEMA.*;のままにします。表マッピングをTABLE TESTSCHEMA.<topic-name>;として設定して、指定されたトピックから取得することもできます。
      • SOURCEDB USERIDALIASSOURCEDB USERIDALIAS kafka DOMAIN OracleGoldenGateに更新します。
    5. 「作成および実行」をクリックします。
Administration Serviceの「概要」ページに戻り、Extractプロセスの起動を監視し、イベント・メッセージを確認できます。