Kafka StreamsデータをOracle Autonomous Databaseに接続

Oracle GoldenGate Stream Analyticsを使用すると、ユーザーはスケーラブルなデータ変換および分析パイプラインを構築して、俊敏なリアルタイムのビジネス・インサイトを得ることができます。

この項では、「開始する前に」セクションからOracle Autonomous Databaseがすでにインストールされていることを前提としています。ここでは、次の操作について学習します。

  • GGSAコンソールにアクセスします。
  • データを取り込むようにKafkaプロデューサを構成します。
  • KafkaのGGSAをAutonomous Databaseに接続します。

GGSAコンソールへのアクセスの取得

  1. OCIコンソールの「コンピュート」で、「インスタンス」を選択します。GGSA Marketplaceインスタンスが稼働している必要があります。
  2. パブリックIPアドレスをコピーします。
  3. 秘密キーを使用してインスタンスにログインし、/home/opcの下のREADME.txtを確認します。
  4. OSA UIパスワードをコピーします。
  5. ブラウザを開き、https://<Public IP Address>/osaと入力します。
  6. 「ユーザー名」osaadminと入力し、README.txtからコピーしたパスワードを使用します。

データを取り込むためのKafkaプロデューサの構成

Kafkaストリームを作成するには、次のステップに従います。

  1. 「カタログ」ページで、「新規アイテムの作成」をクリックします。
  2. マウスを「接続」の上に置き、サブメニューから「Kafka」を選択します。
  3. 「タイプ・プロパティ」画面で、「名前」を入力し、「接続タイプ」として「Kafka」を選択します。この例では、GGSAインスタンスにインストールされているKafkaを使用します。
  4. 「接続の詳細」画面で、「Zookeepers」フィールドにlocalhost:2181と入力します。
  5. 「接続のテスト」をクリックします。「成功」というメッセージが表示されます。
  6. 保存」をクリックします。

    ノート:

    ポート2181がイングレスで開いていることを確認します。

データを取り込むためにKafkaトピックを開始するには、次のステップに従います。

  1. GGSAインスタンスにSSHで接続し、/u01/app/osa/utilities/kafka-utilsフォルダに移動します。
  2. 受信データとしてcomplex.jsonを使用します。
  3. 次のコマンドを実行して、データ・フィードをKafkaトピックとしてループします:
    opc@ggsanew kafka-utils]$ ./loop-file.sh ./complex.json | ./sampler.sh 1 1 | ./kafka.sh feed complex
    複合Kafkaトピックは、データの生成を開始し、取込みの準備が整います。
  4. 「カタログ」ページで、「新規アイテムの作成」をクリックし、Kafka接続を使用してストリームを作成します。
  5. マウスをStreamの上に置き、サブメニューからKafkaを選択します。
  6. 「タイプ・プロパティ」画面で、名前を入力し、「ストリーム・タイプ」として「Kafka」を選択します。
  7. 「次へ」をクリックします。
  8. 「ソースの詳細」画面で、「接続」で作成したKafka接続を選択します。
    1. 「トピック名」「複合」を選択します。
    2. 「データ書式」JSONを選択します。
  9. 「次へ」をクリックします。
  10. 「データ・フォーマット」画面で、デフォルト値のままにします。
  11. 「次へ」をクリックします。
  12. 「シェイプ」画面で、受信したJSONシェイプがStreamから推測されます。
  13. 保存」をクリックします。Kafkaストリームは正常に作成されました。

KafkaのGGSAをAutonomous Databaseに接続します

GGSAからOracle Autonomous Databaseへの接続を作成するには、次のステップに従います。

  1. 「カタログ」ページで、「新規アイテムの作成」をクリックします。
  2. マウスを「接続」の上に置き、サブメニューから「Oracle Database」を選択します。
  3. 「タイプ・プロパティ」画面で、「名前」を入力し、「接続タイプ」「Oracle Database」を選択します。
  4. 「次へ」をクリックします。
  5. 「接続の詳細」「タイプ: Oracle Database」画面で、Autonomous Database接続の詳細を入力します。
    • 「接続方法」「Wallet」を選択し、ウォレット・ファイルをアップロードします。
    • ドロップダウン・リストから「サービス名/SID」を選択します。
    • 「ユーザー名」adminと入力します。
    • データベース管理パスワードを入力します。
  6. 保存」をクリックします。Autonomous Database接続は正常に作成されます。
  7. Autonomous Databaseのスキーマにログインし、データを受信する表を作成します。
    CREATE TABLE COMPLEX
            (  BOOLEANFIELD VARCHAR2(20) , NUMBERFIELD NUMBER
            , STRINGFIELD VARCHAR2(20)
            , OBJECTFIELD_A_KEY NUMBER
            , OBJECTFIELD_A_VALUE NUMBER
            , OBJECTFIELD_C VARCHAR2(20)
            , OBJECTFIELD_E VARCHAR2(20)
            , ARRAYFIELD_0 NUMBER
            , ARRAYFIELD_1 NUMBER
            
        ); 

次のステップに従って、GGSAにパイプラインを作成し、ソースとターゲットを設定します:

  1. 「カタログ」ページで、「新規アイテムの作成」をクリックし、ドロップダウン・リストから「パイプライン」を選択します。
  2. マウスを「ターゲット」の上に置き、サブメニューから「データベース表」を選択します。
  3. 「タイプ・プロパティ」画面で、「ターゲット」「名前」を入力し、「データベース表」として「ターゲット・タイプ」を選択します。
  4. 「次へ」をクリックします。
  5. 「ターゲットの詳細」画面で、以前に作成したAutonomous Database表をドロップダウン・リストから選択します。
  6. 「次へ」をクリックします。
  7. 「形状」画面で、「表名」をドロップダウン・リストから「複合」として選択します。
  8. 「次へ」をクリックします。
  9. 「シェイプ」を推測し、「保存」をクリックします。

次のステップを実行して、ターゲットを設定します。

  1. 「カタログ」ページで、「新規アイテムの作成」をクリックし、ドロップダウン・リストから「パイプライン」を選択します。
  2. マウスを「ターゲット」の上に置き、サブメニューから「Kafkaストリーム」を選択します。
  3. 保存」をクリックします。
  4. パイプラインで、ストリームを右クリックし、「ステージの追加」「ターゲット」の順に選択します。
  5. 「ターゲット・ステージの作成」ウィンドウで、名前を入力して「保存」をクリックします。
  6. 以前に作成した「ターゲット」表を選択します。
  7. 「公開」をクリックしてパイプラインを公開し、ターゲット表のデータを使用可能にします。
  8. データベースにログインして、データがAutonomous Database複合表にロードされていることを確認します。