データ・ストリームの追加
Oracle GoldenGate 23aiでは、リアルタイム分析のためのデータ・パスを大幅に簡素化できるデータ・ストリームが導入されています。
ノート:
この記事は、Oracle GoldenGate 23aiデプロイメントにのみ適用されます。データ・ストリームについて
Oracle GoldenGateのデータ・ストリームでは、非同期APIの定義にAsyncAPI仕様が使用されます。このアプローチにより、アプリケーションはパブリッシュ・またはサブスクライブ・モデルを使用してデータ・ストリームを効率的にサブスクライブできるようになります。変更がソース・データベースでコミットされるとすぐに更新を受信し、レイテンシを最小限に抑え、アプリケーション開発を簡素化します。また、Oracle GoldenGateのデータ・ストリームを使用すると、ユーザーはJSONなどの優先データ形式を指定して、開発環境内の既存のツールおよびフレームワークとシームレスに統合することができます。
-
拡張データ収集: AsyncAPIを搭載したパブリッシュ・モデルまたはサブスクライブ・モデルは、アプリケーションでリアルタイム・データ更新を効率的に受信できるようにします。
-
柔軟なデータ書式設定: ユーザーは、既存のツールとシームレスに統合するために、希望する形式を選択できます。
-
合理化された統合: AsyncAPIは、開発者およびデータ・サイエンティストによって一般的に使用される様々なアプリケーションおよびツールとのスムーズな対話を促進します。
-
保証されたデータ整合性: Oracle GoldenGateのコア強度を継承すると、データ・ストリームでは、ソース・データベースでコミットされた変更をレプリケートすることで、データの耐久性を確保します。
Oracle GoldenGateのデータ・ストリームのコンポーネント
-
非同期API
-
データ・ストリーム・プロトコル
-
データ・ストリーム開始/再開位置
-
スキーマ・レコード
-
CloudEvents形式
非同期API
Oracle GoldenGateのデータ・ストリームは、プログラミング言語に依存しないため、任意のプログラミング言語で記述されたクライアントと対話できます。通常、クライアント・プログラムは単純で小規模であるが、ユーザーは、データ・ストリーミング・サービスと対話するためにクライアント・コードを手動で実装する必要があります。
AsyncAPI仕様をOracle GoldenGateのデータ・ストリームに採用すると、次の利点があります。
-
業界標準のAPI仕様でデータ・ストリームservice APIを記述し、APIドキュメントを自動的に生成する機能。
-
@asyncapi/generatorを使用して、クライアント側のコードを自動的に生成します。
AsyncAPIのサポートにより、Oracle GoldenGateのデータ・ストリームはクライアント・コードを自動的に生成することでデータ・ストリーミングを簡素化します。パブリッシャおよびサブスクライバ・モデルに従い、websocket、kafka、mqtt、hms、および多くのIOTプロトコルを含む様々なプロトコルをサポートします。イベント駆動型APIを記述する場合、YAMLモデリング言語を使用し、OpenAPI仕様の同様の構文に従います。たとえば、データ・ストリーミングのAsyncAPI定義を説明するAsyncAPI yamlドキュメントのスニペットを次に示します。
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
データ・ストリーム・リソースが作成されると、このデータ・ストリーム・エンドポイントへのアクセス方法を記述したカスタマイズされた非同期API仕様ドキュメントへのURLリンクがHTTPレスポンスで返されます。このYAMLドキュメントを使用すると、@asyncapi/generatorを使用してクライアント側のコードを生成できます。
@asyncapi/generatorでWebSocketプロトコルをサポートするには、GitHubの@asyncapi/generatorのWebSocketクライアント・テンプレートも実装/メンテナンスする必要があります。websocket-client-templateの詳細は、GitHubリポジトリを参照してください。
https://github.com/tianshu-orcl/websocket-client-template.git
データ・ストリーム・プロトコル
Oracle GoldenGateのデータ・ストリームでは、ユーザー指定形式のデータへの直接アクセスは、単純なストリーミング・プロトコルに従った専用のWebSocketチャネルを介して有効になります。
データ・ストリーム・プロトコルは、プッシュ・モードを使用してクライアントにデータを送信します。クライアントは、最初にHTTP RESTfulリクエストを介してサーバー上にストリーミング・リソースを作成します。ストリーミング・リソースが作成されると、クライアントはストリーミング・リソース・エンドポイントを介してWebSocket接続を確立します。WebSocketチャネルが確立された後、データ・ストリームは、クライアントからのレスポンスまたは確認を待たずに、ただちに継続的にデータのプッシュを開始します。

次のサンプルpythonクライアントは、クライアントとデータ・ストリーミング・サービス間の相互作用を示しています。
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
指定されたクライアント・プログラムでは、データ・ストリーム・リソース・エンドポイントs1の作成時に、ソース・データ証跡名を指定する単純なデータ・ストリーム・ペイロードが提供されます。実際のアプリケーションでは、ストリーミング・プロトコルのハンドシェイク・フェーズで非常に複雑なデータ・ストリーム・ペイロードを使用して、データ・ストリーミング動作を構成できます。
たとえば、次のデータ・ストリーム・リクエスト・ペイロードでは、必要なデータ・ソース証跡名とともにフィルタリング・ルール、エンコーディング形式およびbufferSizeを指定します。
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
データ・ストリーム開始/再開位置
WebSocket接続の確立中に、クライアント側は、データのストリーミングを開始するための開始位置を(WebSocket接続URLの問合せパラメータとして)指定します。開始位置には、次のいずれかの値を指定できます。
-
特殊キーワード"now"
-
特殊キーワード"earliest"
-
ISO 8601形式のタイムスタンプ文字列
-
最後に処理されたLCRの位置
各非メタデータLCRレコードには、不透明な位置(トランザクション内のCSN、XID、レコード番号を含む)が含まれます。クライアント側は、最後に処理されたLCRレコードの位置を維持します。データ・ストリーム・サービスは、指定された開始位置に基づいて正しい開始/再開ポイントを特定します。
クライアントがデータ・ストリーム・サービスに初めて接続する場合、クライアントはストリーミング・データの開始場所のタイムスタンプを指定する必要があります。キーワードnowは現在のタイムスタンプに変換され、キーワードearliestはタイムスタンプ0に変換されます。
または、begin位置にはISO 8601タイムスタンプ文字列を使用できます。いずれの場合も、データ・ストリーム・サービスは、開始位置を決定するために、ソース証跡に対してタイムスタンプベースの参照を実行します。
これがリカバリ/再開の場合、クライアントはハンドシェイク中に、最後に処理された保存済位置をデータ・ストリーム・サービスに提供する必要があります。データ・ストリーム・サービスは、開始位置を決定するために、ソース証跡に対して位置ベースの参照を実行します。データ・ストリーミング・リカバリの動作は、データ・ストリームで指定されたQoSレベルにも依存します。
CloudEvents
CloudEventsは、サービス、プラットフォームおよびシステム間の相互運用性を提供するために、共通の形式でイベント・データを記述するための仕様である。Oracle GoldenGateのデータ・ストリームは現在JSONデータ・エンコーディングのみをサポートしているため、CloudEvents形式のサポートはJSONイベント形式に制限されます。CloudEventsのJSONイベント形式の完全な仕様は、次の各ページを参照してください。
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
data
フィールドには、Oracle GoldenGateのDML/DDL/メタデータ/スキーマ・レコードである元のデータ・レコードが含まれます。{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}
データ・ストリームの追加
-
分散サービスのホーム・ページから、「データ・ストリームの追加」(プラス・アイコン)を使用して「データ・ストリームの追加」ウィザードを開きます。
-
「データ・ストリーム情報」ページで、「名前」ボックスにデータ・ストリーム・プロセス名を入力し、その説明を追加します。「次へ」をクリックします。
-
「ソース・オプション」ページで、次のイメージに表示されているオプションの値を指定します。
-
証跡名: ソース証跡ファイルの名前。
-
トレイルのサブディレクトリ: 証跡ファイルが格納されるサブディレクトリのパス。
-
エンコーディング: このオプションは、データ・ストリームのレコードのエンコーディングを制御します。現在、JSONエンコーディングがサポートされています。
-
バッファ・サイズ: この値は、データ・ストリーミング・サービスで使用されるメモリー・バッファ・サイズを制御します。データ・ストリーミング・サービスでは、合計バイト・サイズが指定された値を超えるため、インメモリーのメッセージ・キューがフラッシュされ、レコードがクライアントに配信されます。
-
サービス品質: このオプションは、データ・ストリームの重複抑制およびリカバリ動作を定義します。データ・ストリーミング・サービスでは、3つのレベルのサービス品質がサポートされています。
-
必ず1回: このモードは最も制限の厳しいモードであり、リカバリ中またはソース証跡の
RESTART OK/ABEND
レコードの表示中は、重複レコードが除去されます。クライアントには重複レコードは表示されません。指定された最後の処理位置のレコードが見つからない場合は、エラーが発生します。 -
少なくとも1回: このモードでは、リカバリ中またはソース証跡の
RESTART OK/ABEND
レコードの表示中に、重複記録は抑制されません。クライアントには、データ・ストリームの重複レコードが表示される場合があります。また、指定された最後の処理位置のレコードが見つからない場合は、エラーが発生します。 -
多くても1回: このモードでは、リカバリ中またはソース証跡の
RESTART OK/ABEND
レコードの表示中は、重複レコードが抑制されます。クライアントには、データ・ストリームの重複レコードは表示されません。指定された最後の処理位置のレコードが見つからない場合は、次の使用可能なレコードを見つけ、先に進みます。
-
-
CloudEvents形式: データ・ストリーミング・サービスでは、CloudEvents形式でのデータ・レコードの転送がサポートされています。デフォルトでは、この形式は無効になっていますが、データ・ストリーミング・チャネルの作成時にトグル・スイッチを使用してプロパティを有効にすることで制御できます。
-
-
「フィルタリング・オプション」ページでは、フィルタリング・ルールを含めるオプションと除外するオプションを選択します。
フィルタリング・ルール・オプションを次のように指定します。-
ルール・アクション: 「除外」または「含める」オプションを選択します。
-
フィルタ・タイプ: フィルタ・タイプには次のものがあります。
オブジェクト・タイプ: 「DML」、「DDL」、「INSERT」、「UPDATE」、「UPSERT」、「DELETE」など、ドロップダウン・リストから複数のオプションを選択できます。
オブジェクト名: 以前に作成したフィルタリング・オブジェクトの名前。
「追加」をクリックして、データ・ストリーム・プロセスにフィルタリング・ルールを追加します。
-
-
「データ・ストリームの作成」をクリックします。データ・ストリームがリストされている分散サービスのホームページが表示されます。
-
作成したデータ・ストリームをクリックして、ストリームのYAMLドキュメントのデータ・ストリーミングAsyncAPI定義を表示します。
データ・ストリーム構成の編集
-
分散サービスの左側のナビゲーション・ペインで、「データ・ストリーム」をクリックします。
-
変更する必要があるデータ・ストリームの名前を削除します。
-
「データ・ストリーム」ページで、「アクション」列を使用して、データ・ストリームの詳細の表示、データ・ストリームの削除、およびフィルタリングの変更を行います。
-
「アクション」メニューから「フィルタの変更」オプションをクリックすると、「データ・ストリーム・フィルタリング・ルールの編集」ダイアログ・ボックスが表示されます。このボックスから、ルール・アクション(包含、除外)およびフィルタ・タイプ(オブジェクト名またはオブジェクト・タイプ)を変更できます。
-
「追加」をクリックしてフィルタを適用します。
-
「発行」をクリックして、「データ・ストリーム」ページに戻ります。
「名前」列からデータ・ストリームをクリックして、データ・ストリームをさらに編集できます。これにより、完全なデータ・ストリーム構成が表示されます。構成設定を変更するには、各構成設定の横にある鉛筆アイコンを使用します。データ・ストリームで使用されるソース証跡ファイル、フィルタリング・ルールおよびサービス品質を変更できます。YAMLエディタを使用して、データ・ストリーム構成を変更し、YAMLエディタの横にある変更のアップロードアイコンを使用して変更をアップロードすることもできます。