Add Data Streams

Oracle GoldenGate 23ai introduces Data Streams that can highly simplify data paths for real-time analytics.

Note

This article applies only to Oracle GoldenGate 23ai deployments.

About Data Streams

Oracle GoldenGate Data Streams utilizes the AsyncAPI specification for defining asynchronous APIs. This approach enables applications to efficiently subscribe to data streams using a Publish or Subscribe model. Updates are received as soon as changes are committed in the source database, minimizing latency and simplifying application development. Additionally, Oracle GoldenGate Data Streams enable users to specify their preferred data format, such as JSON, for seamless integration with existing tools and frameworks within their development environment.

Benefits for Developers and Data Scientists
  • Enhanced Data Ingestion: The Publish or Subscribe model powered by AsyncAPI enables applications to efficiently receive real-time data updates.

  • Flexible Data Formatting: Users can choose their preferred format for seamless integration with existing tools.

  • Streamlined Integration: AsyncAPI fosters smooth interaction with various applications and tools commonly used by developers and data scientists.

  • Guaranteed Data Integrity: Inheriting the core strength of Oracle GoldenGate, Data Streams ensure data durability by replicating changes as they are committed in the source database.

Components of Oracle GoldenGate Data Streams

Components of Oracle GoldenGate Data Streams include:
  • Async API

  • Data Streams Protocol

  • Data Streams Start/Restart Position

  • Schema Records

  • CloudEvents Format

Async API

Oracle GoldenGate Data Streams is programming language agnostic so that it can interact with a client written in any programming language. Even though the client programs typically are simple and small, users still need to manually implement the client code to interact with the data streaming service.

Adopting the AsyncAPI specification into Oracle GoldenGate Data Streams has the following advantages:

  • Ability to describe the data streams service API in industry-standard API specification and automatically generate API documentation.

  • Automatically generate client-side code via @asyncapi/generator.

With AsyncAPI support, Oracle GoldenGate Data Streams simplifies data streaming by generating the client code automatically. It follows the publisher and subscriber model and support a wide variety of protocols including websocket, kafka, mqtt, hms, and many IOT protocols. When describing an event-driven API, it uses the YAML modeling language and follow similar syntax for OpenAPI specification. For example, below is a snippet of AsyncAPI yaml document that describes Data Streaming AsyncAPI definitions:

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: | allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

When a data streams resource is created, a URL link to a customized Async API specification document describing how to access this data stream endpoint, is returned in the HTTP response. This YAML document can then be used to generate the client-side code using @asyncapi/generator.

Note that to support the websocket protocol in @asyncapi/generator, you also need to implement/maintain the websocket client template for the @asyncapi/generator in GitHub. Refer to the GitHub repository for more information about the websocket-client-template:

https://github.com/tianshu-orcl/websocket-client-template.git

Data Streams Protocol

With Oracle GoldenGate Data Streams, direct access to the data in user specified format is enabled through a dedicated websocket channel that follows a simple streaming protocol.

Data Streams protocol uses push mode to send data to the client. The client first creates a streaming resource on the server through HTTP RESTful request. After the streaming resource is created, the client establishes a WebSocket connection through the streaming resource endpoint. After the WebSocket channel is established, Data Streams starts to push the data immediately and continuously without waiting for response or acknowledgement from the client.


Data Streams Protocol

The following sample python client illustrates the interaction between the client and the data streaming service:

import asyncio
import requests
import websockets
import json

async def client():
    ### create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'http://name:pswd@localhost:9002/services/v2/stream/s1', json=payload)

### establish websocket connection and receive data continuously
uri = "ws://name:pswd@localhost:9002/services/v2/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
asyncio.get_event_loop().run_until_complete(client())

In the given client program, a simple Data Stream payload specifying the source data trail name is provided when creating the data stream resource endpoint s1. In a real world application, much complicated Data Stream payloads can be used during the handshake phase of the streaming protocol to configure the data streaming behavior.

For example, the following Data Stream request payload specifies the filtering rules, encoding format, and bufferSize along with the required data source trail name.

{   
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : “json",
    "bufferSize" : 2097152
}
Data Streams Start/Restart Position

During the websocket connection establishment, client side specifies the begin position (as a query parameter in the websocket connection URL) to start streaming the data. The begin position can be one of the following values:

  • Special keyword “now

  • Special keyword “earliest

  • ISO 8601 format timestamp string

  • Last processed LCR position

Each non-metadata LCR record contains an opaque position (includes CSN, XID, record # inside the transaction). Client side is responsible for maintaining the position of the last processed LCR record. The data streams service is responsible for locating the correct start/restart point based on the given begin position.

If this is the first time a client connects to the data streams service, client should provide a timestamp of where to start streaming data. The keyword now will be converted to the current timestamp and the keyword earliest will be converted to the timestamp 0.

Alternatively, an ISO 8601 timestamp string can be used for begin position. In all cases, the data streams service performs a timestamp-based lookup on the source trail to determine the start position.

If this is the recovery/restart case, client should provide the saved last processed position to the data streams service during handshake. The data streams service will perform a position-based lookup on the source trail to determine the start position. The behavior of data streaming recovery also depends on the QoS level specified in the data stream.

CloudEvents

CloudEvents is a specification for describing event data in common formats to provide interoperability across services, platforms and systems. As Oracle GoldenGate Data Streams currently only supports JSON data encoding, the support for CloudEvents format is limited to the JSON event format. The complete specification for JSON Event Format for CloudEvents can be found at:

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

CloudEvents format defines the list of attributes to describe the event, essentially an envelope with a set of mandatory and optional attributes. When CloudEvents format is enabled in Oracle GoldenGate Data Streams, the final JSON records will look similar to the following, where data field contains the original data records, which is Oracle GoldenGate DML/DDL/metadata/schema records.
{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {…}
}

Add Data Streams

Data streams are created from the Distribution Service. Log in to the Distribution Service to begin creating a Data Stream process. Here are the steps to create a Data Stream:
  1. From the Distribution Service home page, Add Data Streams (plus icon) to open the Add Data Stream wizard.


    Distribution Service home page with the Data Streams section

  2. On the Data Stream Information page, enter a Data Stream process name in the Name box and add a description for it. Click Next.


    Add Data Streams dialog box: Data Stream Information page

  3. On the Source Options page, provide the values for options shown in the following image:


    Data Stream dialog box: Source Options page

    • Trail Name: Name of the source trail file.

    • Trail Subdirectory: The path of the subdirectory where trail files are stored.

    • Encoding: This option controls the encoding of records for the data stream. Currently, JSON encoding is supported.

    • Buffer Size: This value controls the memory buffer size used in the data streaming service. The data streaming service will flush the in-memory message queue after its total byte size exceeds the specified value and delivers the records to the client.

    • Quality of Service: This option defines the data stream duplicate suppression and recovery behavior. Three levels of quality of service are supported in the data streaming service.
      • Exactly Once: This mode is the most restrictive mode where the service filters out any duplicate records during recovery of seeing a RESTART OK/ABEND records in the source trails. Clients will not see duplicate records. If the service cannot locate the record with given last processed position, an error occurs.

      • At Least Once: This mode does not suppress duplicate records during recovery or seeing a RESTART OK/ABEND records in the source trail. Clients may see duplicate records in the data stream. Also, if the service cannot locate the record with given last processed position, an error occurs.

      • At Most Once: This mode suppresses duplicate records during recovery or seeing a RESTART OK/ABEND records in the source trail. Clients will not see duplicate records in the data stream. If the service cannot locate the record with given last processed position, it will find the next available record and move forward.

    • CloudEvents Format: The data streaming service supports transmitting the data records in CloudEvents format. By default this format is disabled and can be controlled by enabling the property using the toggle switch, when creating the data streaming channel.

  4. On the Filtering Options page, the options to include and exclude filtering rules are available:
    Data Stream dialog box and filtering options available.

    Specify the filtering rule options, as follows:
    • Rule Action: Select Exclude or Include options.

    • Filter Type: The filter type includes the following:

      Object Type: You can select multiple options from the drop-down list including DML, DDL, INSERT, UPDATE, UPSERT, DELETE.

      Object Names: Name of the previously created filtering object.

      Click Add to add the filtering rule to the Data Stream process.

  5. Click Create Data Stream. You will be returned to the Distribution Service home page where the Data Stream is listed.

  6. Click the newly created Data Stream to view the YAML document Data Streaming AsyncAPI definitions for the stream.

Edit Data Streams Configuration

To edit the Data Streams configuration:
  1. From the Distribution Service left navigation pane, click Data Streams.

  2. Selete the name of the data stream that needs to be modified.

  3. From the Data Streams page, use the Action column to view data stream details, delete a data stream, and change it's filtering.
    Edit the Data Stream Configuration

  4. If you click the Change Filtering option from the Action menu, the Edit Data Stream Filterning Rules dialog box is displayed. From this box, you can change the Rule Action (Include, Exclude) and Filter Type (Object Name or Object Type).

  5. Click Add to apply the filtering.

  6. Click Submit to return the Data Streams page.

You can edit the data stream further by clicking the data stream from the Name column. This displays the complete data stream configuration. Use the pencil icon next to each configuration setting, to change it. You can change the source trail file used by a data stream, it's filtering rules, and quality of service. You can also use the YAML editor to change the data stream configuration and upload changes using the Upload Changes icon next to YAML Editor.