将数据复制到 Stream Analytics
了解如何将数据从 OCI GoldenGate 复制到 Stream Analytics。
GoldenGate Stream Analytics 始于一个复杂的事件处理引擎,该引擎通过 Apache Spark 和 Apache Kafka 发展为运行于运行时框架之上。Stream Analytics 可以从任何源(例如数据库、GoldenGate、Kafka、JMS、REST,甚至文件系统文件)摄取数据流。摄取数据后,您可以对实时数据运行分析。
开始之前
要成功完成此快速入门,您需要:
-
加载了示例数据的源自治 AI 数据库,并启用了补充日志记录。
提示:
如果需要使用示例数据,可以下载 OCI GoldenGate 示例数据。
-
首先,编辑
SETUP_USERS_ATP.sql并修改SRC_OCIGGLL用户的密码以删除特殊字符。 -
使用自治 AI 数据库的数据库操作 SQL 工具运行两个脚本来创建用户方案和表。
-
使用 SQL 工具启用补充日志记录。
有关更多详细信息,请执行练习 1、任务 3:加载 ATP 方案中的步骤。
-
-
解锁源自治 AI 数据库实例上的 GGADMIN 用户
-
在“Autonomous AI Database Details(自治 AI 数据库详细信息)”页面上,从 Database actions(数据库操作)菜单中选择 Database Users(数据库用户)。
提示:使用在创建要登录的实例时提供的自治 AI 数据库管理员身份证明(如果出现提示)。
-
找到 GGADMIN 用户,然后从省略号(三个点)菜单中选择编辑。
-
在 "Edit User"(编辑用户)面板中,输入密码,确认该密码,然后取消选择 Account is Locked 。
-
选择 Apply Changes(应用更改)。
-
任务 1:创建 OCI GoldenGate 资源
-
为数据复制创建 OCI GoldenGate 部署。
-
创建源数据库的连接。
任务 2:创建流分析资源
-
使用 Kafka 实例的公共 IP 创建 Kafka 连接,并为安全协议选择 Plaintext 。
-
创建 GoldenGate 连接。
任务 3:创建和运行管道
-
查看 Stream Analytics 部署控制台中的连接。
-
在 Stream Analytics 部署控制台中,选择目录。
-
在“目录”页面上,查看连接列表。您应该会看到 GoldenGate 连接、自治 AI 数据库连接和 Kafka 连接。
-
-
启动 GoldenGate 大数据集群:
-
在 OCI GoldenGate Stream Analytics 部署控制台中,从 ossaadmin 用户菜单中选择系统设置。
-
在 "System Setting" 对话框中,选择 Manage Clusters ,然后展开 GGDB Cluster 。
-
选择 Start Cluster 。等待群集状态为 Running(正在运行),然后关闭对话框窗口。
-
-
更新 GoldenGate 连接身份证明:
尽管 GoldenGate 连接在 Stream Analytics 部署控制台中可用,但 GoldenGate 身份证明不会结转。更新密码并测试连接。
-
选择目录,然后选择 GoldenGate 连接。
-
在编辑连接对话框中,选择下一步。
-
在 GG 用户名中,输入
oggadmin。 -
对于 GG 密码,选择更改密码,然后输入在任务 1 中为数据复制创建 OCI GoldenGate 部署时提供的密码。
-
选择测试连接。如果成功,请选择保存。
-
-
使用 GoldenGate 提取可创建并启动 GoldenGate 更改数据。
确保使用 GG 更改数据详细信息页上任务 1 中提供的提取详细信息。
-
更新自治 AI 数据库用户名。
使用默认用户
ggadmin创建数据库连接。将用户名更新为SRC_OCIGGLL(如果使用提供的示例数据),以访问其方案和表。-
选择 Catalog(目录),然后选择 Autonomous AI Database 连接。
-
在“编辑连接”对话框中,选择下一步。
-
对于用户名,输入
SRC_OCIGGLL。 -
对于 Password(密码),输入在此快速入门开始时在“开始之前”步骤中修改的
SRC_OCIGGLL密码。 -
选择测试连接。如果成功,请选择保存。
-
-
使用自治 AI 数据库查找表为客户和订单创建引用。
-
使用 Kafka 连接为客户和订单创建 Kafka 流。
-
使用自治 AI 数据库 SQL 工具对源数据库执行插入。
例如,可以运行以下插入:
Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (11,'COM',101,to_date('16-AUG-2023','DD-MON-YYYY'),null); Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (12,'COM',102,to_date('16-AUG-2023','DD-MON-YYYY'),null); Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (13,'COM',103,to_date('16-AUG-2023','DD-MON-YYYY'),null); Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (14,'COM',104,to_date('16-AUG-2023','DD-MON-YYYY'),null); Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (15,'COM',105,to_date('16-AUG-2023','DD-MON-YYYY'),null); Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (16,'COM',106,to_date('16-AUG-2023','DD-MON-YYYY'),null); Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (17,'COM',107,to_date('16-AUG-2023','DD-MON-YYYY'),null); Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (18,'COM',201,to_date('16-AUG-2023','DD-MON-YYYY'),null); Insert into SRC_OCIGGLL.SRC_ORDERS (ORDER_ID,STATUS,CUST_ID,ORDER_DATE,CUSTOMER) values (19,'COM',202,to_date('16-AUG-2023','DD-MON-YYYY'),null); -
创建管道,使用在步骤 8 中创建的 Kafka 流。
-
添加查询阶段,然后添加筛选器,以仅返回“订单”流的 CUST_ID 与“客户”流的 CUSTID 匹配的订单。
-
发布管道。