ストリーム・アナリティクスへのデータのレプリケート

OCI GoldenGateからStream Analyticsにデータをレプリケートする方法について学習します。

GoldenGate Stream Analyticsは、Apache SparkおよびApache Kafkaを使用してランタイム・フレームワーク上で実行されるように進化した複合イベント処理エンジンとして始まりました。Stream Analyticsでは、データベース、GoldenGate、Kafka、JMS、REST、さらにはファイル・システム・ファイルなどの任意のソースからデータのストリームを取り込むことができます。データの収集後、ライブ・データに対して分析を実行できます。

開始する前に

このクイックスタートを正常に完了するには、次が必要です。

タスク1: OCI GoldenGateリソースの作成

  1. データ・レプリケーション用のOCI GoldenGateデプロイメントを作成します。

  2. ソース・データベースの接続を作成します。

  3. デプロイメントに接続を割り当てます

  4. Extractを作成して実行します

タスク2: Stream Analyticsリソースの作成

  1. Stream Analyticsデプロイメントを作成します。

  2. KafkaインスタンスのパブリックIPを使用してKafka接続を作成し、セキュリティ・プロトコルの場合は「プレーンテキスト」を選択します。

  3. GoldenGate接続を作成します。

  4. Stream Analyticsデプロイメントに接続を割り当てます

タスク3: パイプラインの作成および実行

  1. Stream Analyticsデプロイメント・コンソールを起動します。

  2. Stream Analyticsデプロイメント・コンソールで接続を確認します。

    1. Stream Analyticsデプロイメント・コンソールで、「カタログ」を選択します。

    2. 「カタログ」ページで、接続のリストを確認します。GoldenGate接続、Autonomous AI Database接続およびKafka接続が表示されます。

  3. GoldenGateビッグ・データ・クラスタを起動します。

    1. OCI GoldenGate Stream Analyticsデプロイメント・コンソールで、ossaadminユーザー・メニューから「システム設定」を選択します。

    2. 「システム設定」ダイアログで「クラスタの管理」を選択し、「GGDBクラスタ」を展開します。

    3. 「クラスタの起動」を選択します。クラスタ・ステータスが「実行中」になるまで待ってから、ダイアログ・ウィンドウを閉じます。

  4. GoldenGate接続資格証明を更新します。

    GoldenGate接続はStream Analyticsデプロイメント・コンソールで使用できますが、GoldenGate資格証明は引き継がれません。パスワードを更新し、接続をテストします。

    1. 「カタログ」を選択し、「GoldenGate」接続を選択します。

    2. 「接続の編集」ダイアログで、「次へ」を選択します。

    3. 「GGユーザー名」に、oggadminと入力します。

    4. 「GGパスワード」で、「パスワードの変更」を選択し、タスク1でデータ・レプリケーション用のOCI GoldenGateデプロイメントを作成したときに指定したパスワードを入力します。

    5. 「接続テスト」を選択します。成功した場合は、「保存」を選択します。

  5. GoldenGate Extractを使用して、GoldenGate変更データを作成して起動します。

    GG変更データの詳細ページのタスク1に示されているExtractの詳細を使用していることを確認してください。

  6. Autonomous AI Databaseユーザー名を更新します。

    データベース接続は、デフォルト・ユーザーggadminを使用して作成されます。ユーザー名をSRC_OCIGGLLに更新して(提供されているサンプル・データを使用した場合)、そのスキーマおよび表にアクセスします。

    1. 「カタログ」を選択し、「Autonomous AI Database connection」を選択します。

    2. 「接続の編集」ダイアログで、「次へ」を選択します。

    3. 「ユーザー名」に、SRC_OCIGGLLと入力します。

    4. 「パスワード」に、このクイックスタートの開始時に開始する前のステップで変更したSRC_OCIGGLLパスワードを入力します。

    5. 「接続テスト」を選択します。成功した場合は、「保存」を選択します。

  7. Autonomous AI Database参照表を使用して、顧客およびオーダーの参照を作成します。

  8. Kafka接続を使用して、顧客およびオーダーのKafkaストリームを作成します。

  9. Autonomous AI Database SQLツールを使用して、ソース・データベースへの挿入を実行します。

    たとえば、次の挿入を実行できます。

    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (11,'COM',101,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (12,'COM',102,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (13,'COM',103,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (14,'COM',104,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (15,'COM',105,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (16,'COM',106,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (17,'COM',107,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (18,'COM',201,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (19,'COM',202,to_date('16-AUG-2023','DD-MON-YYYY'),null);
  10. ステップ8で作成したKafkaストリームを使用するパイプラインを作成します。

  11. 「問合せステージの追加」を選択し、フィルタを追加して、OrdersストリームのCUST_IDがCustomersストリームのCUSTIDと一致するオーダーのみを返します。

  12. ターゲット・ステージの追加

  13. パイプラインのパブリッシュ