将数据复制到 Stream Analytics

了解如何将数据从 OCI GoldenGate 复制到 Stream Analytics。

GoldenGate Stream Analytics 始于一个复杂的事件处理引擎,该引擎通过 Apache Spark 和 Apache Kafka 发展为运行于运行时框架之上。Stream Analytics 可以从任何源(例如数据库、GoldenGate、Kafka、JMS、REST,甚至文件系统文件)摄取数据流。摄取数据后,您可以对实时数据运行分析。

开始之前

要成功完成此快速入门,您需要:

任务 1:创建 OCI GoldenGate 资源

  1. 为数据复制创建 OCI GoldenGate 部署

  2. 创建源数据库的连接

  3. 分配到部署的连接

  4. 创建并运行提取

任务 2:创建流分析资源

  1. 创建 Stream Analytics 部署

  2. 使用 Kafka 实例的公共 IP 创建 Kafka 连接,并为安全协议选择 Plaintext

  3. 创建 GoldenGate 连接

  4. 分配到 Stream Analytics 部署的连接

任务 3:创建和运行管道

  1. 启动 Stream Analytics 部署控制台

  2. 查看 Stream Analytics 部署控制台中的连接。

    1. 在 Stream Analytics 部署控制台中,选择目录

    2. 在“目录”页面上,查看连接列表。您应该会看到 GoldenGate 连接、自治 AI 数据库连接和 Kafka 连接。

  3. 启动 GoldenGate 大数据集群:

    1. 在 OCI GoldenGate Stream Analytics 部署控制台中,从 ossaadmin 用户菜单中选择系统设置

    2. 在 "System Setting" 对话框中,选择 Manage Clusters ,然后展开 GGDB Cluster

    3. 选择 Start Cluster 。等待群集状态为 Running(正在运行),然后关闭对话框窗口。

  4. 更新 GoldenGate 连接身份证明:

    尽管 GoldenGate 连接在 Stream Analytics 部署控制台中可用,但 GoldenGate 身份证明不会结转。更新密码并测试连接。

    1. 选择目录,然后选择 GoldenGate 连接。

    2. 编辑连接对话框中,选择下一步

    3. GG 用户名中,输入 oggadmin

    4. 对于 GG 密码,选择更改密码,然后输入在任务 1 中为数据复制创建 OCI GoldenGate 部署时提供的密码。

    5. 选择测试连接。如果成功,请选择保存

  5. 使用 GoldenGate 提取可创建并启动 GoldenGate 更改数据

    确保使用 GG 更改数据详细信息页上任务 1 中提供的提取详细信息。

  6. 更新自治 AI 数据库用户名。

    使用默认用户 ggadmin 创建数据库连接。将用户名更新为 SRC_OCIGGLL(如果使用提供的示例数据),以访问其方案和表。

    1. 选择 Catalog(目录),然后选择 Autonomous AI Database 连接。

    2. 在“编辑连接”对话框中,选择下一步

    3. 对于用户名,输入 SRC_OCIGGLL

    4. 对于 Password(密码),输入在此快速入门开始时在“开始之前”步骤中修改的 SRC_OCIGGLL 密码。

    5. 选择测试连接。如果成功,请选择保存

  7. 使用自治 AI 数据库查找表为客户和订单创建引用

  8. 使用 Kafka 连接为客户和订单创建 Kafka 流

  9. 使用自治 AI 数据库 SQL 工具对源数据库执行插入。

    例如,可以运行以下插入:

    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (11,'COM',101,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (12,'COM',102,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (13,'COM',103,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (14,'COM',104,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (15,'COM',105,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (16,'COM',106,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (17,'COM',107,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (18,'COM',201,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (19,'COM',202,to_date('16-AUG-2023','DD-MON-YYYY'),null);
  10. 创建管道,使用在步骤 8 中创建的 Kafka 流。

  11. 添加查询阶段,然后添加筛选器,以仅返回“订单”流的 CUST_ID 与“客户”流的 CUSTID 匹配的订单。

  12. 添加目标阶段

  13. 发布管道