ストリーム・アナリティクスへのデータのレプリケート

OCI GoldenGateからStream Analyticsにデータをレプリケートする方法を学習します。

GoldenGate Stream Analyticsは、Apache SparkおよびApache Kafkaを使用してランタイム・フレームワーク上で動作するように進化した複雑なイベント処理エンジンとして始まりました。Stream Analyticsは、データベース、GoldenGate、Kafka、JMS、REST、さらにはファイル・システム・ファイルなどの任意のソースからデータのストリームを取り込むことができます。データの収集後、ライブ・データに対して分析を実行できます。

開始する前に

このクイックスタートが正常に完了するには、次が必要です。

  • サンプル・データがロードされ、サプリメンタル・ロギングが有効になっているソースAutonomous Database。

    ヒント :

    使用するサンプル・データが必要な場合は、OCI GoldenGateサンプル・データをダウンロードできます。
    • まず、SETUP_USERS_ATP.sqlを編集し、SRC_OCIGGLLユーザーのパスワードを変更して特殊文字を削除します。
    • Autonomous Databaseのデータベース・アクションSQLツールを使用して、2つのスクリプトを実行してユーザー・スキーマおよび表を作成します。
    • SQLツールを使用して、サプリメンタル・ロギングを有効にします。
    詳細は、ラボ1、タスク3: ATPスキーマのロードのステップに従ってください。
  • ソースAutonomous DatabaseインスタンスでGGADMINユーザーをロック解除します
    1. 「Autonomous Databaseの詳細」ページで、「データベース・アクション」メニューから「データベース・ユーザー」を選択します。

      ヒント :

      ログインするインスタンスを作成したときに指定されたAutonomous Database管理者資格証明(プロンプトが表示されたら)を使用します。
    2. GGADMINユーザーを見つけて、その省略記号(3つのドット)メニューから「編集」を選択します。
    3. 「ユーザーの編集」パネルに、パスワードを入力し、パスワードを確認して、「アカウントがロックされています」の選択を解除します。
    4. 「変更の適用」をクリックします。

タスク1: OCI GoldenGateリソースの作成

  1. データ・レプリケーション用のOCI GoldenGateデプロイメントを作成します。
  2. ソース・データベースの接続を作成します。
  3. デプロイメントに接続を割り当てます
  4. Extractを作成して実行します

タスク2: ストリーム分析リソースの作成

  1. Stream Analyticsデプロイメントを作成します。
  2. KafkaインスタンスのパブリックIPを使用してKafka接続を作成し、セキュリティ・プロトコルの「プレーンテキスト」を選択します。
  3. GoldenGate接続を作成します。
  4. Stream Analyticsデプロイメントに接続を割り当てます

タスク3: パイプラインの作成と実行

  1. Stream Analyticsデプロイメント・コンソールを起動します。
  2. Stream Analyticsデプロイメント・コンソールで接続を確認します。
    1. Stream Analyticsデプロイメント・コンソールで、「カタログ」をクリックします。
    2. 「カタログ」ページで、接続のリストを確認します。GoldenGate接続、Autonomous Database接続およびKafka接続が表示されます。
  3. GoldenGateビッグ・データ・クラスタを起動します。
    1. OCI GoldenGate Stream Analyticsデプロイメント・コンソールで、ossaadminユーザー・メニューから「システム設定」を選択します。
    2. 「システム設定」ダイアログで、「クラスタの管理」をクリックし、「GGDBクラスタ」を展開します。
    3. 「クラスタの起動」をクリックします。クラスタ・ステータスが「実行中」になるまで待ってから、ダイアログ・ウィンドウを閉じます。
  4. GoldenGate接続資格証明を更新します。

    GoldenGate接続はStream Analyticsデプロイメント・コンソールで使用できますが、GoldenGate資格証明は引き継がれません。パスワードを更新し、接続をテストします。

    1. 「カタログ」をクリックし、GoldenGate接続をクリックします。
    2. 「接続の編集」ダイアログで、「次へ」をクリックします。
    3. 「GGユーザー名」に、oggadminと入力します。
    4. 「GGパスワード」で、「パスワードの変更」をクリックし、タスク1でデータ・レプリケーション用のOCI GoldenGateデプロイメントを作成したときに指定したパスワードを入力します。
    5. 「接続のテスト」をクリックします。成功した場合は、「保存」をクリックします。
  5. GoldenGate Extractを使用して、GoldenGate Change Dataを作成および起動します。

    「GG変更データ詳細」ページのタスク1で提供されるExtract詳細を使用していることを確認します。

  6. Autonomous Databaseのユーザー名を更新します。

    データベース接続は、デフォルト・ユーザーggadminを使用して作成されます。ユーザー名をSRC_OCIGGLL (提供されているサンプル・データを使用した場合)に更新して、そのスキーマおよび表にアクセスします。

    1. 「カタログ」をクリックし、「Autonomous Database接続」をクリックします。
    2. 「接続の編集」ダイアログで、「次へ」をクリックします。
    3. 「ユーザー名」に、SRC_OCIGGLLと入力します。
    4. 「パスワード」に、このクイックスタートの開始ステップの開始前に変更したSRC_OCIGGLLパスワードを入力します。
    5. 「接続のテスト」をクリックします。成功した場合は、「保存」をクリックします。
  7. Autonomous Database参照表を使用して、顧客およびオーダーの参照を作成します。
  8. Kafka接続を使用して、顧客およびオーダーのKafkaストリームを作成します。
  9. Autonomous Database SQLツールを使用して、ソース・データベースで挿入を実行します。
    たとえば、次の挿入を実行できます。
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (11,'COM',101,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (12,'COM',102,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (13,'COM',103,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (14,'COM',104,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (15,'COM',105,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (16,'COM',106,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (17,'COM',107,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (18,'COM',201,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (19,'COM',202,to_date('16-AUG-2023','DD-MON-YYYY'),null);
  10. ステップ8で作成したKafkaストリームを使用するパイプラインを作成します。
  11. OrdersストリームのCUST_IDがCustomersストリームのCUSTIDと一致するオーダーのみを返すには、問合せステージを追加してからフィルタを追加します。
  12. ターゲット・ステージの追加
  13. パイプラインの公開