Kafka StreamsデータをOracle Autonomous Databaseに接続

コンピュート・インスタンスを作成し、Kafkaをインストールしてプロデューサとしてメッセージを送信するか、OCIコンソールで次のステップを使用できます。

  1. OCIコンソールにログインし、Marketplaceに移動します。
  2. 「すべてのアプリケーション」で、GoldenGateストリーム分析を検索します。
  3. 「スタックの起動」をクリックします。
  4. VCNの詳細を指定し、後で秘密キーを使用してGoldenGateストリーム分析インスタンスにログインするためのSSH公開キーを追加します。

    ノート:

    このGoldenGateストリーム分析インスタンスには、組込みのサンプルKafkaストリーム(プロデューサ)がインストールされています。独自のKafkaストリームがある場合は、これも使用できます。

Autonomous Databaseの作成

  1. 左上のメニューから「Oracle Database」をクリックし、「Autonomous Database」をクリックします。
  2. Autonomous Databasesホームページで、「Autonomous Databaseの作成」をクリックします。
  3. データベースを作成するコンパートメントを選択します。
  4. 表示名: Autonomous Databaseを簡単に識別できるようにするユーザーフレンドリな説明またはその他の情報を入力します。
  5. データベース名: 新しいAutonomous Databaseの名前を指定します。
  6. 「ワークロード・タイプ」に「データ・ウェアハウス」を選択します。
  7. 「サーバーレス」として「デプロイメント・タイプ」を選択します。
  8. 「データベース・バージョン」に19c、ECPUに2、ストレージ(TB)に1を選択します。
  9. 管理者資格証明を作成し、パスワードを入力します。パスワードは、Oracle Cloudセキュリティ標準に基づく強力なパスワード複雑性基準を満たしている必要があります。パスワード複雑性ルールの詳細は、データベース・ユーザーの作成を参照してください。
  10. 「すべての場所からのセキュア・アクセス」としてネットワーク・アクセスを選択します。
  11. 残りのパラメータをデフォルトのままにして、「Autonomous Databaseの作成」をクリックします。
新しいAutonomous Databaseが使用可能になるまで、画面に「プロビジョニング中」が表示されます。

Kafkaメッセージを格納する表の作成

  1. SQLDeveloperを開き、「+」アイコンをクリックして「Oracleデータベース接続の作成」をクリックします。
  2. 接続名を入力し、データベースのユーザー名とパスワードを指定します。
  3. 「接続タイプ」に「Cloud Wallet」を選択し、wallet.zipファイルを参照して「接続」をクリックします。接続は正常に作成されます。
  4. 次のスクリプトを使用して、サンプル表を作成します。
    CREATE TABLE "TEST"."SAMPLE"
            (    "FRUIT" VARCHAR2(50 BYTE)",
                 "TOTAL" VARCHAR2(50 BYTE)",
                 "COLOR" VARCHAR2(50 BYTE)"
            )

Oracle Integration 3インスタンスの作成

  1. OCIコンソールで「開発者サービス」をクリックし、「アプリケーション統合」「統合」をクリックします。
  2. 統合インスタンスのホームページで、「インスタンスの作成」をクリックします。
  3. 「名前」を入力し、「バージョン」として「Oracle Integration 3」「エディション」として「エンタープライズ」「シェイプ」として「本番」「ライセンス・タイプ」として「新規Oracle Integrationインスタンスのサブスクライブ」を選択します。
  4. 「Create」をクリックします。
インスタンスが作成されたら、サービス・コンソールをクリックすると、新しいウィンドウが表示されます。

Oracle Integration 3でのOracle Autonomous Data Warehouse接続の作成

  1. Oracle Integrationインスタンスで、ハンバーガー・メニューをクリックし、「デザイン」「接続」を選択します。
  2. 「作成」をクリックし、Oracle ADWを検索して選択します。
  3. この接続に関する「名前」「識別子」「ロール」「トリガーと呼出し」「キーワード」および「説明」を入力します。
  4. 「プロパティ」で、「オプションのプロパティ」を展開し、「サービス名」を入力します。
  5. 「セキュリティ」で、「JDBC Over SSL」を選択し、Wallet.zipファイルをアップロードします。
  6. データベースの作成時に指定されたWalletパスワードを入力します。
  7. データベース・サービス・ユーザー名をadminとして入力します。
  8. 管理ユーザーのデータベース・サービス・パスワードを入力します。
  9. 「アクセス・タイプ」「パブリック・ゲートウェイ」を選択します。
  10. 右上の「テスト」をクリックします。
  11. 正常なメッセージが表示されたら、「保存」をクリックします。

Kafkaインスタンスでのエージェント・グループの関連付け

Kafka Streamsがインストールされているインスタンスで実行する必要があるエージェントが必要です。

  1. Oracle Integration 3インスタンスで、ハンバーガー・メニューをクリックし、「デザイン」「エージェント」を選択します。
  2. 「エージェント」ページで、「作成」をクリックします。
  3. 「名前」「識別子」「説明」を入力して、「作成」をクリックします。
  4. エージェントが作成されたら、「ダウンロード」をクリックし、「接続エージェント」をクリックします。
  5. ダウンロードに成功したら、oic_conn_agent_installer.zipをKafkaトピックが実行されているKafkaインスタンスにコピーし、メッセージを生成します。このソリューション・プレイブックの例では、Oracle GoldenGate Stream Analyticsインスタンスです。
  6. oic_conn_agent_installer.zipを解凍します。
  7. 「ステータス」列で、「...」アイコンをクリックし、「構成のダウンロード」をクリックします。
  8. InstallerProfile.cfgファイルをコピーし、このファイルをKafkaインスタンスにコピーします。
  9. KafkaインスタンスのInstallerProfile.cfgファイルを置き換えます。
  10. InstallerProfile.cfgを置換した後、次の文を実行して、このコマンドを使用してOracle GoldenGate Stream AnalyticsインスタンスまたはKafkaオンプレミス・インスタンスでエージェントを起動します:
    $ java -jar connectivityagent.jar
エージェントは正常に起動し、実行したままにして、停止しないでください。詳細は、Oracle Integration Generation 2での統合の使用を参照してください

Oracle Integration 3でのKafka接続の作成

  1. Oracle Integration 3インスタンスで、ハンバーガー・メニューをクリックし、「デザイン」「接続」を選択します。
  2. 「作成」をクリックし、「Apache Kafka」を検索して選択します。
  3. この接続に関する「名前」「識別子」「ロール」「トリガーと呼出し」「キーワード」および「説明」を入力します。
  4. 「プロパティ」で、ブートストラップ・サーバー: instancename:9092を入力します。

    ノート:

    Kafkaインスタンスによって使用されているVCNの「セキュリティ・リスト」で、トラフィックを許可するためにポート9092を追加する必要があります。
  5. 「セキュリティ」「セキュリティ・ポリシーなし」を選択します。
  6. 「アクセス・タイプ」「接続性エージェント」を選択し、「エージェント・グループの関連付け」をクリックします。
  7. エージェントを選択して「使用」をクリックします。
  8. 「テスト」をクリックします。成功するメッセージが表示されます。
  9. 「保存」をクリックします。

Oracle Integration 3でのKafkaとOracle Autonomous Data Warehouseの統合の作成

  1. Oracle Integration 3インスタンスで、ハンバーガー・メニューをクリックし、「設計」「統合」を選択します。
  2. 「作成」をクリックし、「統合の作成」ダイアログ・ボックスで「アプリケーション」を選択します。
  3. 統合名KafkaToADWを入力し、「作成」をクリックします。

    ノート:

    Kafka用に作成した2つの接続と、トリガーとしてOracle Autonomous Data Warehouseがあります。
  4. Kafkaを選択すると、トリガーの名前を入力するように求められ、「メッセージ・タイプ」「コンシューマ」を選択し、「続行」をクリックします。
  5. ドロップダウンからKafkaトピックを選択し、コンシューマ名を指定し、フィールドをデフォルトのままにして「続行」をクリックします。
  6. 「メッセージ構造」「サンプルJSONドキュメント」を選択し、同じJSONファイルをドラッグ・アンド・ドロップします。
  7. 「サマリー」ウィンドウで、すべての詳細を確認し、「終了」をクリックします。
  8. 作成された「Trigger」の下に下矢印が表示されます。
  9. 「+」アイコンをクリックし、「起動」でADWを選択すると、「マップ」という名前の追加ボックスが表示されます。
  10. 「起動」にマウスを移動し、「...」をクリックして「編集」を選択します。
  11. 名前を指定し、「実行する操作」フィールドの「表に対する操作の実行」を選択します。「挿入」を選択し、「続行」をクリックします。
  12. 表を選択して「続行」をクリックし、「サマリー」ページで「終了」をクリックします。
  13. 次に、「マップ」ボックスにマウスを移動し、「...」をクリックして「編集」を選択します。
  14. 「マッピング」ページで、「ソース」および「ターゲット」フィールドに接続します。
  15. 「検証」をクリックすると、正常に検証されます。
  16. 「統合」ページで「保存」をクリックすると、統合が正常に構成されます。
  17. 統合ホームページで、「ステータス」フィールドにマウスを移動し、「電源」アイコンをクリックしてアクティブ化します。
  18. 「統合のアクティブ化」という名前のウィンドウが表示され、「本番」を選択して「アクティブ化」をクリックします。これにより、Kafkaメッセージを消費するために統合が正常にアクティブ化されます。
  19. Kafkaトピックを起動すると、データベースに格納されるメッセージが表示されます。SQLDeveloperを開き、表を開き、「データ」タブをクリックしてメッセージを表示します。
  20. 独自のKafkaストリームがある場合は、同じ形式でメッセージの作成を開始できます。
  21. Oracle GoldenGate Stream Analyticsを使用している場合は、Oracle GoldenGate Stream AnalyticsインスタンスにSSH接続し、/u01/app/osa/utilities/kafka-utilsフォルダに移動します。
  22. 受信データとしてsample.jsonを使用できます。次のjson形式を使用します。
    {"fruit": "Apple","total": "Large","color": "Red"}
  23. 次のコマンドを実行して、Kafkaトピックとしてデータ・フィードをループします:
    opc@ggsanew kafka-utils]$ ./loop-file.sh ./sample.json | ./sampler.sh 1 1 | ./kafka.sh feed complex

データのチェック

  1. Oracle Integrationコンソールを開き、「監視」をクリックして「統合」を選択します。
  2. 受信済、処理済および成功のデータの詳細が表示されます。
  3. Oracle SQL Developerを開き、表をチェックして、データが使用可能かどうかを確認します。
    1. 「Oracle接続」をクリックし、Oracle Autonomous Data Warehouseデータベースを選択します。
    2. 「表」を展開し、「サンプル」表を選択すると、データが表示されます。