将数据复制到 Stream Analytics

了解如何将数据从 OCI GoldenGate 复制到 Stream Analytics。

GoldenGate Stream Analytics 始于一个复杂的事件处理引擎,它通过 Apache Spark 和 Apache Kafka 发展为在运行时框架之上运行。Stream Analytics 可以从任何源(例如数据库、GoldenGate、Kafka、JMS、REST,甚至文件系统文件)摄取数据流。摄取数据后,您可以对实时数据运行分析。

开始之前

要成功完成此快速入门,您需要:

  • 已加载示例数据的源 Autonomous Database,并启用了补充日志记录。

    提示:

    如果需要使用示例数据,可以下载 OCI GoldenGate 示例数据
    • 首先,编辑 SETUP_USERS_ATP.sql 并修改 SRC_OCIGGLL 用户的密码以删除特殊字符。
    • 使用 Autonomous Database 的 Database Actions SQL 工具运行两个脚本来创建用户方案和表。
    • 使用 SQL 工具启用补充日志记录。
    有关更多详细信息,请执行练习 1、任务 3:加载 ATP 方案中的步骤。
  • 解锁源 Autonomous Database 实例上的 GGADMIN 用户
    1. 在“Autonomous Database 详细信息”页面上,从数据库操作菜单中选择数据库用户

      提示:

      使用在创建要登录的实例时提供的 Autonomous Database 管理员身份证明(如果出现提示)。
    2. 找到 GGADMIN 用户,然后从省略号(三个点)菜单中选择编辑
    3. 在 "Edit User"(编辑用户)面板中,输入密码,确认该密码,然后取消选择 Account is Locked
    4. 单击应用更改

任务 1:创建 OCI GoldenGate 资源

  1. 为数据复制创建 OCI GoldenGate 部署
  2. 创建源数据库的连接
  3. 分配到部署的连接
  4. 创建并运行提取

任务 2:创建流分析资源

  1. 创建 Stream Analytics 部署
  2. 使用 Kafka 实例的公共 IP 创建 Kafka 连接,并为安全协议选择纯文本
  3. 创建 GoldenGate 连接
  4. 分配到 Stream Analytics 部署的连接

任务 3:创建和运行管道

  1. 启动 Stream Analytics 部署控制台
  2. 查看 Stream Analytics 部署控制台中的连接。
    1. 在 Stream Analytics 部署控制台中,单击目录
    2. 在“目录”页面上,查看连接列表。您应该会看到 GoldenGate 连接、Autonomous Database 连接和 Kafka 连接。
  3. 启动 GoldenGate 大数据集群:
    1. 在 OCI GoldenGate Stream Analytics 部署控制台中,从 ossaadmin 用户菜单中选择系统设置
    2. 在 "System Setting" 对话框中,单击 Manage Clusters ,然后展开 GGDB Cluster
    3. 单击启动集群。等待群集状态为 Running(正在运行),然后关闭对话框窗口。
  4. 更新 GoldenGate 连接身份证明:

    虽然 GoldenGate 连接在 Stream Analytics 部署控制台中可用,但 GoldenGate 身份证明不会结转。更新密码并测试连接。

    1. 单击目录,然后单击 GoldenGate 连接。
    2. 编辑连接对话框中,单击下一步
    3. 对于 GG 用户名,输入 oggadmin
    4. 对于 GG 密码,单击更改密码,然后输入在任务 1 中为数据复制创建 OCI GoldenGate 部署时提供的密码。
    5. 单击“测试连接”。如果成功,请单击保存
  5. 使用 GoldenGate 提取可创建并启动 GoldenGate 更改数据

    确保使用 GG 更改数据详细信息页上的任务 1 中提供的提取详细信息。

  6. 更新 Autonomous Database 用户名。

    数据库连接是使用默认用户 ggadmin 创建的。将用户名更新为 SRC_OCIGGLL(如果使用提供的示例数据),以访问其方案和表。

    1. 单击目录,然后单击 Autonomous Database 连接
    2. 在“编辑连接”对话框中,单击下一步
    3. 对于用户名,输入 SRC_OCIGGLL
    4. 对于口令,请输入在此快速入门开始时在“开始之前”步骤中修改的 SRC_OCIGGLL 口令。
    5. 单击“测试连接”。如果成功,请单击保存
  7. 使用 Autonomous Database 查找表为客户和订单创建参考
  8. 使用 Kafka 连接为客户和订单创建 Kafka 流
  9. 使用 Autonomous Database SQL 工具对源数据库执行插入。
    例如,可以运行以下插入:
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (11,'COM',101,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (12,'COM',102,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (13,'COM',103,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (14,'COM',104,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (15,'COM',105,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (16,'COM',106,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (17,'COM',107,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (18,'COM',201,to_date('16-AUG-2023','DD-MON-YYYY'),null);
    Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (19,'COM',202,to_date('16-AUG-2023','DD-MON-YYYY'),null);
  10. 创建管道,该管道使用在步骤 8 中创建的 Kafka 流。
  11. 添加查询阶段,然后添加筛选器,以仅返回订单流的 CUST_ID 与客户流的 CUSTID 匹配的订单。
  12. 添加目标阶段
  13. 发布管道