添加数据流的

Oracle GoldenGate 26ai 引入了数据流,可以高度简化数据路径以进行实时分析。

注:本文仅适用于 OracleGoldenGate 26ai 部署。

关于数据流

Oracle GoldenGate 数据流使用 AsyncAPI 规范来定义异步 API。此方法使应用程序能够使用“发布”或“订阅”模型高效地订阅数据流。在源数据库中提交更改后,将立即收到更新,从而尽可能减少延迟并简化应用开发。此外,Oracle GoldenGate 数据流还支持用户指定自己的首选数据格式(例如 JSON),以便与开发环境中的现有工具和框架无缝集成。

对开发人员和数据科学家的好处

Oracle GoldenGate 数据流的组件

Oracle GoldenGate 数据流的组件包括:

异步 API

Oracle GoldenGate 数据流不限定编程语言,因此可以与使用任何编程语言编写的客户端进行交互。尽管客户端程序通常既简单又小,但用户仍需要手动实施客户端代码以与数据流服务进行交互。

在 Oracle GoldenGate 数据流中采用 AsyncAPI 规范具有以下优势:

借助 AsyncAPI 支持,Oracle GoldenGate Data Streams 可以自动生成客户端代码,从而简化数据流处理。它遵循发布者和用户模型,并支持各种协议,包括 websocket,kafka,mqtt,hms 和许多物联网协议。在描述事件驱动的 API 时,它使用 YAML 建模语言,并遵循 OpenAPI 规范的类似语法。例如,下面是描述数据流 AsyncAPI 定义的 AsyncAPI yaml 文档片段:

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: \| allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

在创建数据流资源时,将在 HTTP 响应中返回指向定制异步 API 规范文档的 URL 链接,该文档描述如何访问此数据流端点。然后,可以使用此 YAML 文档使用 @asyncapi/generator 生成客户端代码。

请注意,要支持 @asyncapi/generator 中的 Websocket 协议,还需要为 GitHub 中的 @asyncapi/generator 实施/维护 Websocket 客户机模板。有关 websocket-client-template 的更多信息,请参阅 GitHub 系统信息库:

https://github.com/oracle-samples/websocket-client-template

数据流协议

借助 Oracle GoldenGate 数据流,您可以通过遵循简单流处理协议的专用 Websocket 通道,以用户指定的格式直接访问数据。

数据流协议使用推送模式将数据发送到客户端。客户机首先通过 HTTP RESTful 请求在服务器上创建流处理资源。创建流处理资源后,客户机通过流处理资源端点建立 WebSocket 连接。建立 WebSocket 通道后,Data Streams 开始立即、连续地推送数据,而无需等待客户端的响应或确认。

数据流协议

以下示例 python 客户机说明了客户机与数据流服务之间的交互:

import asyncio
import requests
import websockets
import json

async def client():
    ###create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)

###establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())

在给定的客户机程序中,在创建数据流资源端点 s1 时会提供一个指定源数据线索名称的简单数据流有效负载。在现实世界中的应用程序中,在流协议握手阶段可以使用非常复杂的数据流有效负载来配置数据流行为。

例如,以下数据流请求有效负载指定筛选规则、编码格式和 bufferSize 以及所需的数据源线索名称。

{
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : "json",
    "bufferSize" : 2097152
}

数据流启动/重新启动位置

在建立 Websocket 连接期间,客户机端指定 begin 位置(作为 Websocket 连接 URL 中的查询参数),以开始流式传输数据。起始位置可以是以下值之一:

每个非元数据 LCR 记录都包含一个不透明的位置(包括事务处理中的 CSN、XID、记录编号)。客户机端负责维护上次处理的 LCR 记录的位置。数据流服务负责根据给定的 begin 位置定位正确的启动/重新启动点。

如果这是客户机首次连接到数据流服务,则客户机应提供用于开始流数据的时间戳。关键字 now 将转换为当前时间戳,关键字 earliest 将转换为时间戳 0。

或者,ISO 8601 时间戳字符串可用于 begin 位置。在所有情况下,数据流服务都会对源线索执行基于时间戳的查找,以确定开始位置。

如果是恢复/重新启动情况,则客户端应在握手期间向数据流服务提供上次处理的位置。数据流服务将在源线索上执行基于职位的查找,以确定开始位置。数据流恢复的行为也取决于数据流中指定的 QoS 级别。

Oracle GoldenGate 数据流 REST API

您可以使用以下 Rest API 管理 GoldenGate 数据流。

  1. 创建新的 Oracle GoldenGate 数据流配置

  2. 检索现有的 Oracle GoldenGate 数据流配置

  3. 获取数据流资源的列表

  4. 更新现有的 Oracle GoldenGate 数据流配置

  5. 检索 AsyncAPI YAML 规范

  6. 更新 AsyncAPI YAML 规范

  7. 删除现有的 Oracle GoldenGate 数据流配置

云事件

CloudEvents 是一种以通用格式描述事件数据的规范,用于提供跨服务、平台和系统的互操作性。由于 Oracle GoldenGate 数据流当前仅支持 JSON 数据编码,因此对 CloudEvents 格式的支持仅限于 JSON 事件格式。可以在以下位置找到 CloudEvents 的 JSON 事件格式的完整规范:

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

CloudEvents 格式定义用于描述事件的属性列表,本质上是一个包含一组必需和可选属性的信封。在 Oracle GoldenGate 数据流中启用 CloudEvents 格式时,最终 JSON 记录将与以下内容类似,其中 data 字段包含原始数据记录,即 Oracle GoldenGate DML/DDL/metadata/schema 记录。

{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {...}
}

添加数据流的

数据流是从分发服务创建的。登录到分发服务以开始创建数据流进程。下面是创建数据流的步骤:

  1. 从分发服务主页中,添加数据流(加号图标)以打开添加数据流向导。

    数据流页面

  2. 数据流信息页上,在名称框中输入数据流进程名称并为其添加说明。选择 Next

    “添加数据流”对话框:“数据流信息”页

  3. 源选项页上,为下图中显示的选项提供值:

    “数据流”对话框:“源选项”页

    • 线索名称:源线索文件的名称。

    • 线索子目录:存储线索文件的子目录的路径。

    • 编码:此选项控制数据流的记录编码。目前,支持 JSON 编码。

    • 缓冲区大小:此值控制数据流服务中使用的内存缓冲区大小。数据流处理服务将在内存中消息队列的总字节大小超过指定值后刷新该队列,并将记录传送给客户机。

    • 服务质量:此选项定义数据流重复禁止和恢复行为。数据流处理服务支持三个级别的服务质量。

      • 仅限于一次:此模式是限制性最强的模式,在恢复在源跟踪中查看 RESTART OK/ABEND 记录的过程中,服务会筛选出任何重复记录。客户不会看到重复的记录。如果服务无法找到具有给定上次处理位置的记录,则会出现错误。

      • 至少一次:此模式不会在恢复期间隐藏重复记录,也不会在源线索中看到 RESTART OK/ABEND 记录。客户机可能会在数据流中看到重复的记录。此外,如果服务无法找到具有给定上次处理位置的记录,则会发生错误。

      • 最多一次:此模式在恢复期间隐藏重复记录或在源线索中看到 RESTART OK/ABEND 记录。客户机在数据流中看不到重复的记录。如果服务无法找到具有给定上次处理位置的记录,它将查找下一个可用记录并前进。

    • CloudEvents 格式:数据流处理服务支持以 CloudEvents 格式传输数据记录。默认情况下,此格式处于禁用状态,可以通过在创建数据流通道时使用切换开关启用属性来控制。

  4. Filtering Options(筛选选项)页面上,包含和排除筛选规则的选项可用:

    “数据流”对话框和筛选选项可用。

    指定筛选规则选项,如下所示:

    • 规则操作:选择“排除”或“包括”选项。

    • 筛选器类型:筛选器类型包括以下内容:

      对象类型:可以从下拉列表中选择多个选项,包括 DMLDDLINSERTUPDATEUPSERTDELETE

      对象名称:以前创建的过滤对象的名称。

      选择添加可将筛选规则添加到“数据流”流程。

  5. 选择创建数据流。您将返回到列出数据流的分发服务主页。

  6. 选择新创建的数据流以查看流的 YAML 文档数据流异步 API 定义。

编辑数据流的配置

要编辑数据流配置,请执行以下操作:1。从左侧导航窗格中选择“Data Streams(数据流)”。

  1. 选择需要修改的数据流的名称。

  2. 在“数据流”页中,使用操作列查看数据流详细信息、删除数据流并更改其筛选。

    修改数据流筛选。

  3. 如果从操作菜单中选择更改筛选选项,则会显示编辑数据流筛选规则对话框。在此框中,您可以更改规则操作(包括、排除)和筛选器类型(对象名称或对象类型)。

  4. 选择添加以应用筛选。

  5. 选择提交以返回“数据流”页。

您可以通过从名称列中选择数据流来进一步编辑数据流。这将显示完整的数据流配置。使用每个配置设置旁边的铅笔图标进行更改。您可以更改数据流使用的源线索文件、过滤规则和服务质量。您还可以使用 YAML 编辑器更改数据流配置,并使用 YAML 编辑器旁边的上载更改图标上载更改。