Data Streams Protocol

With Oracle GoldenGate Data Streams, direct access to the data in user specified format is enabled through a dedicated websocket channel that follows a simple streaming protocol.

Data Streams protocol uses push mode to send data to the client. The client first creates a streaming resource on the server through HTTP RESTful request. After the streaming resource is created, the client establishes a WebSocket connection through the streaming resource endpoint. After the WebSocket channel is established, Data Streams starts to push the data immediately and continuously without waiting for response or acknowledgement from the client.


Data Streams Protocol

The following sample python client illustrates the interaction between the client and the data streaming service:

import asyncio
import requests
import websockets
import json

async def client():
    ### create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'http://name:pswd@localhost:9002/services/v2/stream/s1', json=payload)

### establish websocket connection and receive data continuously
uri = "ws://name:pswd@localhost:9002/services/v2/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
asyncio.get_event_loop().run_until_complete(client())

In the given client program, a simple Data Stream payload specifying the source data trail name is provided when creating the data stream resource endpoint s1. In a real world application, much complicated Data Stream payloads can be used during the handshake phase of the streaming protocol to configure the data streaming behavior.

For example, the following Data Stream request payload specifies the filtering rules, encoding format, and bufferSize along with the required data source trail name.

{   
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : “json",
    "bufferSize" : 2097152
}