將資料複寫至串流處理分析

瞭解如何將資料從 OCI GoldenGate 複製到串流分析。

GoldenGate Stream Analytics 開始成為「複雜事件處理」引擎,該引擎已發展成可在程式實際執行架構中使用 Apache Spark 和 Apache Kafka 執行。Stream Analytics 可以從任何來源 (例如資料庫、GoldenGate、Kafka、JMS、REST 或甚至檔案系統檔案) 擷取串流資料。擷取資料後,您可以對即時資料執行分析。

開始之前

若要順利完成此快速啟動,您需要:

作業 1:建立 OCI GoldenGate 資源

  1. 建立用於資料複製的 OCI GoldenGate 部署

  2. 為來源資料庫建立連線

  3. 將連線指派給部署

  4. 建立並執行 Extract

工作 2:建立串流分析資源

  1. 建立 Stream Analytics 部署

  2. 使用 Kafka 執行處理的公用 IP 建立 Kafka 連線,然後針對安全協定選取純文字

  3. 建立 GoldenGate 連線

  4. 指定串流分析部署的連線

作業 3:建立及執行管線

  1. 啟動串流分析部署主控台

  2. 複查 Stream Analytics 部署主控台中的連線。

    1. 在 Stream Analytics 部署主控台中,選取目錄

    2. 在「目錄」頁面上,複查連線清單。您應該會看到 GoldenGate 連線、自治式 AI 資料庫連線以及 Kafka 連線。

  3. 啟動 GoldenGate 大數據叢集:

    1. 在 OCI GoldenGate Stream Analytics 部署主控台中,從 ossaadmin 使用者功能表中選取系統設定值

    2. 在「系統設定」對話方塊中,選取管理叢集,然後展開 GGDB 叢集

    3. 選取啟動叢集。等待叢集狀態為執行中,然後關閉對話方塊視窗。

  4. 更新 GoldenGate 連線證明資料:

    雖然 GoldenGate 連線可在 Stream Analytics 部署主控台中使用,但 GoldenGate 證明資料不會結轉。更新密碼並測試連線。

    1. 選取目錄,然後選取 GoldenGate 連線。

    2. 編輯連線對話方塊中,選取下一步

    3. GG 使用者名稱中,輸入 oggadmin

    4. 若為 GG 密碼,請選取變更密碼,然後輸入在任務 1 中為資料複製建立 OCI GoldenGate 部署時所提供的密碼。

    5. 選取測試連線 (Test connection) 。如果成功,請選取儲存

  5. 使用 GoldenGate Extract 建立並啟動 GoldenGate 變更資料

    請確定您在 GG 變更資料明細頁面上,使用任務 1 中提供的擷取明細。

  6. 更新自治式 AI 資料庫使用者名稱。

    使用預設使用者 ggadmin 建立資料庫連線。將使用者名稱更新為 SRC_OCIGGLL (如果您使用提供的範例資料) 以存取其綱要和表格。

    1. 選取目錄,然後選取自治式 AI 資料庫連線

    2. 在「編輯連線」對話方塊中,選取下一步

    3. 使用者名稱中,輸入 SRC_OCIGGLL

    4. 對於密碼,請輸入您在開始此快速啟動步驟之前,於開始步驟中修改的 SRC_OCIGGLL 密碼。

    5. 選取測試連線 (Test connection) 。如果成功,請選取儲存

  7. 使用 Autonomous AI Database 查詢表格來建立客戶和訂單的參考

  8. 使用 Kafka 連線建立客戶和訂單的 Kafka 串流

  9. 使用自治式 AI 資料庫 SQL 工具在來源資料庫執行插入。

    例如,您可以執行下列插入:

    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (11,'COM',101,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (12,'COM',102,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (13,'COM',103,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (14,'COM',104,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (15,'COM',105,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (16,'COM',106,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (17,'COM',107,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (18,'COM',201,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (19,'COM',202,to_date('16-AUG-2023','DD-MON-YYYY'),null);
  10. 建立管線,此管線使用步驟 8 中建立的 Kafka 串流。

  11. 新增查詢階段,然後新增篩選,僅傳回「訂單」串流的 CUST_ID 與「客戶」串流的 CUSTID 相符的訂單。

  12. 新增目標階段

  13. 發布管線