Oracle GoldenGateのデータ・ストリームのコンポーネント

Oracle GoldenGateのデータ・ストリームのコンポーネントは次のとおりです。
  • 非同期API

  • データ・ストリーム・プロトコル

  • データ・ストリーム開始/再開位置

  • スキーマ・レコード

  • CloudEvents形式

非同期API

Oracle GoldenGateのデータ・ストリームは、プログラミング言語に依存しないため、任意のプログラミング言語で記述されたクライアントと対話できます。通常、クライアント・プログラムは単純で小規模ですが、ユーザーは、データ・ストリーミング・サービスと対話するためにクライアント・コードを手動で実装する必要があります。

AsyncAPI仕様をOracle GoldenGateのデータ・ストリームに採用すると、次の利点があります。

  • 業界標準のAPI仕様でデータ・ストリーム・サービスAPIを記述し、APIドキュメントを自動的に生成できます。

  • @asyncapi/generatorを使用してクライアント側のコードを自動的に生成します。

AsyncAPIのサポートにより、Oracle GoldenGateのデータ・ストリームはクライアント・コードを自動的に生成することでデータ・ストリーミングを簡素化します。パブリッシャおよびサブスクライバ・モデルに従い、websocket、kafka、mqtt、hms、および多くのIOTプロトコルを含む様々なプロトコルをサポートします。イベント駆動型APIを記述する場合、YAMLモデリング言語を使用し、OpenAPI仕様の同様の構文に従います。

たとえば、次のAsyncAPI YAMLドキュメントのスニペットは、データ・ストリーミングのAsyncAPI定義を記述しています。

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: | allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  EAST:
    protocol: ws
url: east.oraclevcn.com:9002

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

データ・ストリーム・リソースが作成されると、このデータ・ストリーム・エンドポイントへのアクセス方法を記述したカスタマイズされた非同期API仕様ドキュメントへのURLリンクがHTTPレスポンスで返されます。このYAMLドキュメントを使用すると、@asyncapi/generatorを使用してクライアント側のコードを生成できます。

@asyncapi/generatorでWebSocketプロトコルをサポートするには、GitHubの@asyncapi/generatorのWebSocketクライアント・テンプレートも実装/メンテナンスする必要があります。

websocket-client-templateの詳細は、GitHubリポジトリを参照してください。

https://github.com/oracle-samples/websocket-client-template

データ・ストリーム・プロトコル

Oracle GoldenGateのデータ・ストリームでは、ユーザー指定形式のデータへの直接アクセスは、単純なストリーミング・プロトコルに従った専用のWebSocketチャネルを介して有効になります。

データ・ストリーム・プロトコルは、プッシュ・モードを使用してクライアントにデータを送信します。クライアントは、最初にHTTP RESTfulリクエストを介してサーバー上にストリーミング・リソースを作成します。ストリーミング・リソースが作成されると、クライアントはストリーミング・リソース・エンドポイントを介してWebSocket接続を確立します。WebSocketチャネルが確立された後、データ・ストリームは、クライアントからのレスポンスまたは確認を待たずに、ただちに継続的にデータのプッシュを開始します。


データ・ストリーム・プロトコル

次のサンプルpythonクライアントは、クライアントとデータ・ストリーミング・サービス間の相互作用を示しています。

import asyncio
import requests
import websockets
import json

async def client():
    ### create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'http://name:pswd@localhost:9002/services/v2/stream/s1', json=payload)

### establish websocket connection and receive data continuously
uri = "ws://name:pswd@localhost:9002/services/v2/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
asyncio.get_event_loop().run_until_complete(client())

指定されたクライアント・プログラムでは、データ・ストリーム・リソース・エンドポイントs1の作成時に、ソース・データ証跡名を指定する単純なデータ・ストリーム・ペイロードが提供されます。実際のアプリケーションでは、ストリーミング・プロトコルのハンドシェイク・フェーズで非常に複雑なデータ・ストリーム・ペイロードを使用して、データ・ストリーミング動作を構成できます。

たとえば、次のデータ・ストリーム・リクエスト・ペイロードでは、必要なデータ・ソース証跡名とともにフィルタリング・ルール、エンコーディング形式およびbufferSizeを指定します。

{   
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : “json",
    "bufferSize" : 2097152
}

データ・ストリーム開始/再開位置

WebSocket接続の確立中に、クライアント側は、データのストリーミングを開始するための開始位置を(WebSocket接続URLの問合せパラメータとして)指定します。開始位置には、次のいずれかの値を指定できます。

  • 特殊キーワード"now"

  • 特殊キーワード"earliest"

  • ISO 8601形式のタイムスタンプ文字列

  • 最後に処理されたLCRの位置

各非メタデータLCRレコードには、不透明な位置(トランザクション内のCSN、XID、レコード番号を含む)が含まれます。クライアント側は、最後に処理されたLCRレコードの位置を維持します。データ・ストリーム・サービスは、指定された開始位置に基づいて正しい開始/再開ポイントを特定します。

クライアントがデータ・ストリーム・サービスに初めて接続する場合、クライアントはストリーミング・データの開始場所のタイムスタンプを指定する必要があります。キーワードnowは現在のタイムスタンプに変換され、キーワードearliestはタイムスタンプ0に変換されます。

または、begin位置にはISO 8601タイムスタンプ文字列を使用できます。いずれの場合も、データ・ストリーム・サービスは、開始位置を決定するために、ソース証跡に対してタイムスタンプベースの参照を実行します。

これがリカバリ/再開の場合、クライアントはハンドシェイク中に、最後に処理された保存済位置をデータ・ストリーム・サービスに提供する必要があります。データ・ストリーム・サービスは、開始位置を決定するために、ソース証跡に対して位置ベースの参照を実行します。データ・ストリーミング・リカバリの動作は、データ・ストリームで指定されたQoSレベルにも依存します。

スキーマ・レコード

Oracle GoldenGateのデータ・ストリームは、クライアントがデータ・ストリーミング・サービスのレコードを解釈できるように、次のJSONスキーマ・レコードを送信します。データ・ストリーミング・サービスには、次の4つのタイプのレコードがあります。
  • DDL操作レコード

  • DML操作レコード

  • オブジェクト・メタデータ・レコード

  • データ・ストリーム・メタデータ・レコード

Oracle GoldenGateのデータ・ストリームは、任意のタイプのデータまたはメタデータ・レコードを送信する前に、対応するスキーマ・レコードを送信します。

CloudEvents

CloudEventsは、サービス、プラットフォームおよびシステム間の相互運用性を提供するために、共通の形式でイベント・データを記述するための仕様です。Oracle GoldenGateのデータ・ストリームは現在JSONデータ・エンコーディングのみをサポートしているため、CloudEvents形式のサポートはJSONイベント形式に制限されます。CloudEventsのJSONイベント形式の完全な仕様は、次の各ページを参照してください。

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

CloudEvents形式では、イベントを説明する属性のリストを定義します。基本的に、必須属性とオプション属性のセットを含むエンベロープです。Oracle GoldenGateのデータ・ストリームでCloudEvents形式が有効になっている場合、最終的なJSONレコードは次のようになります。dataフィールドには、Oracle GoldenGateのDML/DDL/メタデータ/スキーマ・レコードである元のデータ・レコードが含まれます。
{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {…}
}