添加数据流的
关于数据流
Oracle GoldenGate 数据流使用 AsyncAPI 规范来定义异步 API。此方法使应用程序能够使用“发布”或“订阅”模型高效地订阅数据流。在源数据库中提交更改后,将立即收到更新,从而尽可能减少延迟并简化应用开发。此外,Oracle GoldenGate 数据流还支持用户指定自己的首选数据格式(例如 JSON),以便与开发环境中的现有工具和框架无缝集成。
-
增强的数据摄取:通过 AsyncAPI 提供的发布或订阅模型,应用可以高效地接收实时数据更新。
-
灵活的数据格式:用户可以选择其首选格式,以便与现有工具无缝集成。
-
简化集成:AsyncAPI 可促进与开发人员和数据科学家常用的各种应用和工具的顺畅交互。
-
保证数据完整性:数据流继承 Oracle GoldenGate 的核心优势,可在更改提交到源数据库中时复制更改,从而确保数据持久性。
异步 API
Oracle GoldenGate 数据流不限定编程语言,因此可以与使用任何编程语言编写的客户端进行交互。尽管客户端程序通常既简单又小,但用户仍需要手动实施客户端代码以与数据流服务进行交互。
在 Oracle GoldenGate 数据流中采用 AsyncAPI 规范具有以下优势:
-
能够在行业标准 API 规范中描述数据流服务 API 并自动生成 API 文档。
-
通过 @asyncapi/generator 自动生成客户端代码。
借助 AsyncAPI 支持,Oracle GoldenGate Data Streams 可以自动生成客户端代码,从而简化数据流处理。它遵循发布者和用户模型,并支持各种协议,包括 websocket,kafka,mqtt,hms 和许多物联网协议。在描述事件驱动的 API 时,它使用 YAML 建模语言,并遵循 OpenAPI 规范的类似语法。例如,下面是描述数据流 AsyncAPI 定义的 AsyncAPI yaml 文档的片段:
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
在创建数据流资源时,将在 HTTP 响应中返回指向定制异步 API 规范文档的 URL 链接,该文档描述如何访问此数据流端点。然后,可以使用此 YAML 文档使用 @asyncapi/generator 生成客户端代码。
请注意,要支持 @asyncapi/generator 中的 Websocket 协议,还需要为 GitHub 中的 @asyncapi/generator 实施/维护 Websocket 客户机模板。有关 websocket-client-template 的更多信息,请参阅 GitHub 系统信息库:
https://github.com/tianshu-orcl/websocket-client-template.git
数据流协议
借助 Oracle GoldenGate 数据流,您可以通过遵循简单流处理协议的专用 Websocket 通道,以用户指定的格式直接访问数据。
数据流协议使用推送模式将数据发送到客户端。客户机首先通过 HTTP RESTful 请求在服务器上创建流处理资源。创建流处理资源后,客户机通过流处理资源端点建立 WebSocket 连接。建立 WebSocket 通道后,数据流将开始立即连续地推送数据,而无需等待客户端的响应或确认。

以下示例 python 客户机说明了客户机与数据流服务之间的交互:
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
在给定的客户机程序中,在创建数据流资源端点 s1 时会提供一个用于指定源数据线索名称的简单数据流有效负载。在现实世界中的应用程序中,在流协议握手阶段可以使用非常复杂的数据流有效负载来配置数据流行为。
例如,以下数据流请求有效负载指定筛选规则、编码格式和 bufferSize 以及所需的数据源线索名称。
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
数据流启动/重新启动位置
在建立 Websocket 连接期间,客户机端指定 begin 位置(作为 Websocket 连接 URL 中的查询参数),以开始流式传输数据。起始位置可以是以下值之一:
-
特殊关键字 " now "
-
特殊关键字“最早”
-
ISO 8601 格式时间戳字符串
-
上次处理的 LCR 位置
每个非元数据 LCR 记录都包含一个不透明的位置(包括事务处理中的 CSN、XID、记录编号)。客户机端负责维护上次处理的 LCR 记录的位置。数据流服务负责根据给定的 begin 位置定位正确的启动/重新启动点。
如果这是客户机首次连接到数据流服务,则客户机应提供用于开始流数据的时间戳。关键字 now 将转换为当前时间戳,关键字 earliest 将转换为时间戳 0。
或者,ISO 8601 时间戳字符串可用于 begin 位置。在所有情况下,数据流服务都会对源线索执行基于时间戳的查找,以确定开始位置。
如果是恢复/重新启动情况,则客户端应在握手期间向数据流服务提供上次处理的位置。数据流服务将在源线索上执行基于职位的查找,以确定开始位置。数据流恢复的行为也取决于数据流中指定的 QoS 级别。
CloudEvents
CloudEvents 是一种以通用格式描述事件数据的规范,用于提供跨服务、平台和系统的互操作性。由于 Oracle GoldenGate 数据流当前仅支持 JSON 数据编码,因此对 CloudEvents 格式的支持仅限于 JSON 事件格式。可以在以下位置找到 CloudEvents 的 JSON 事件格式的完整规范:
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
data
字段包含原始数据记录,即 Oracle GoldenGate DML/DDL/元数据/方案记录。{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}
添加数据流的
-
从分发服务主页中,添加数据流(加号图标)以打开添加数据流向导。
-
在数据流信息页上,在名称框中输入数据流进程名称并为其添加说明。单击下一步。
-
在源选项页上,为下图中显示的选项提供值:
-
线索名称:源线索文件的名称。
-
线索子目录:存储线索文件的子目录的路径。
-
编码:此选项控制数据流的记录编码。目前,支持 JSON 编码。
-
缓冲区大小:此值控制数据流服务中使用的内存缓冲区大小。数据流处理服务将在内存中消息队列的总字节大小超过指定值后刷新该队列,并将记录传送给客户机。
-
服务质量:此选项定义数据流重复禁止和恢复行为。数据流服务支持三个级别的服务质量。
-
仅限于一次:此模式是最严格的模式,在恢复在源跟踪中查看
RESTART OK/ABEND
记录的过程中,服务会筛选出任何重复记录。客户不会看到重复的记录。如果服务无法找到具有给定上次处理位置的记录,则会出现错误。 -
至少一次:此模式不会在恢复期间隐藏重复记录,也不会在源线索中看到
RESTART OK/ABEND
记录。客户机可能会在数据流中看到重复的记录。此外,如果服务无法找到具有给定上次处理位置的记录,则会发生错误。 -
最多一次:此模式在恢复期间隐藏重复记录或在源线索中看到
RESTART OK/ABEND
记录。客户机在数据流中看不到重复的记录。如果服务无法找到具有给定上次处理位置的记录,它将查找下一个可用记录并前进。
-
-
CloudEvents 格式:数据流处理服务支持以 CloudEvents 格式传输数据记录。默认情况下,此格式处于禁用状态,可以在创建数据流通道时通过使用切换开关启用属性来控制。
-
-
在 Filtering Options(筛选选项)页面上,包含和排除筛选规则的选项可用:
指定筛选规则选项,如下所示:-
规则操作:选择“排除”或“包括”选项。
-
筛选器类型:筛选器类型包括以下内容:
对象类型:可以从下拉列表中选择多个选项,包括 DML 、 DDL 、 INSERT 、 UPDATE 、 UPSERT 和 DELETE 。
对象名称:以前创建的过滤对象的名称。
单击添加将筛选规则添加到数据流流程中。
-
-
单击创建数据流。您将返回到列出数据流的分发服务主页。
-
单击新创建的数据流以查看流的 YAML 文档数据流 AsyncAPI 定义。
编辑数据流的配置
-
在“分发服务”左侧导航窗格中,单击“数据流”。
-
删除需要修改的数据流的名称。
-
在“数据流”页中,使用操作列查看数据流详细信息、删除数据流并更改其筛选。
-
如果从操作菜单中单击更改筛选选项,则会显示编辑数据流筛选规则对话框。在此框中,您可以更改规则操作(包括、排除)和筛选器类型(对象名称或对象类型)。
-
单击添加以应用筛选。
-
单击提交以返回“数据流”页。
通过从名称列中单击数据流,可以进一步编辑数据流。这将显示完整的数据流配置。使用每个配置设置旁边的铅笔图标进行更改。您可以更改数据流使用的源线索文件、其筛选规则和服务质量。您还可以使用 YAML 编辑器更改数据流配置,并使用 YAML 编辑器旁边的上载更改图标上载更改。