データ・ストリームの追加
Oracle GoldenGate 23aiでは、リアルタイム分析のためのデータ・パスを大幅に簡素化できるデータ・ストリームが導入されています。
この記事は、Oracle GoldenGate 23aiデプロイメントにのみ適用されます。
データ・ストリームについて
Oracle GoldenGateデータ・ストリームでは、非同期APIの定義にAsyncAPI仕様が使用されます。このアプローチにより、アプリケーションはパブリッシュ・モデルまたはサブスクライブ・モデルを使用してデータ・ストリームに効率的にサブスクライブできます。変更がソース・データベースでコミットされるとすぐに更新を受信し、レイテンシを最小限に抑え、アプリケーション開発を簡素化します。また、Oracle GoldenGate Data Streamsを使用すると、ユーザーはJSONなどの優先データ形式を指定して、開発環境内の既存のツールおよびフレームワークとシームレスに統合できます。
-
拡張データ収集: AsyncAPIを搭載したパブリッシュ・モデルまたはサブスクライブ・モデルを使用すると、アプリケーションはリアルタイムのデータ更新を効率的に受信できます。
-
柔軟なデータ書式設定: ユーザーは、既存のツールとシームレスに統合するために、希望する形式を選択できます。
-
合理化された統合: AsyncAPIは、開発者およびデータ・サイエンティストによって一般的に使用される様々なアプリケーションおよびツールとのスムーズな対話を促進します。
-
保証されたデータ整合性: データ・ストリームは、Oracle GoldenGateのコア強度を継承し、ソース・データベースでコミットされた変更をレプリケートすることで、データの耐久性を確保します。
Oracle GoldenGateデータ・ストリームのコンポーネント
-
非同期API
-
データストリームプロトコル
-
データ・ストリーム開始/再開位置
-
スキーマ・レコード
-
CloudEvents形式
非同期API
Oracle GoldenGateデータ・ストリームは、プログラミング言語に依存しないため、任意のプログラミング言語で記述されたクライアントと対話できます。通常、クライアント・プログラムは単純で小規模ですが、ユーザーは、データ・ストリーミング・サービスと対話するためにクライアント・コードを手動で実装する必要があります。
AsyncAPI仕様をOracle GoldenGateデータ・ストリームに採用すると、次の利点があります。
-
業界標準のAPI仕様でデータ・ストリーム・サービスAPIを記述し、APIドキュメントを自動的に生成する機能。
-
@asyncapi/generatorを介してクライアント側のコードを自動的に生成します。
AsyncAPIのサポートにより、Oracle GoldenGate Data Streamsはクライアント・コードを自動的に生成することでデータ・ストリーミングを簡素化します。パブリッシャおよびサブスクライバ・モデルに従い、websocket、kafka、mqtt、hms、および多くのIOTプロトコルを含む様々なプロトコルをサポートします。イベント駆動型APIを記述する場合、YAMLモデリング言語を使用し、OpenAPI仕様の同様の構文に従います。たとえば、データ・ストリーミングのAsyncAPI定義を説明するAsyncAPI yamlドキュメントのスニペットを次に示します。
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
データ・ストリーム・リソースが作成されると、このデータ・ストリーム・エンドポイントへのアクセス方法を記述したカスタマイズされた非同期API仕様ドキュメントへのURLリンクがHTTPレスポンスで返されます。このYAMLドキュメントを使用すると、@asyncapi/generatorを使用してクライアント側のコードを生成できます。
@asyncapi/generatorでWebSocketプロトコルをサポートするには、GitHubの@asyncapi/generatorのWebSocketクライアント・テンプレートを実装/メンテナンスする必要もあります。websocket-client-templateの詳細は、GitHubリポジトリを参照してください。
データストリームプロトコル
Oracle GoldenGateデータ・ストリームでは、ユーザー指定形式のデータへの直接アクセスは、単純なストリーミング・プロトコルに従った専用のWebソケット・チャネルを介して有効になります。
データ・ストリーム・プロトコルは、プッシュ・モードを使用してクライアントにデータを送信します。クライアントは、最初にHTTP RESTfulリクエストを介してサーバー上にストリーミング・リソースを作成します。ストリーミング・リソースが作成されると、クライアントはストリーミング・リソース・エンドポイントを介してWebSocket接続を確立します。WebSocketチャネルが確立された後、データ・ストリームは、クライアントからのレスポンスまたは確認を待たずに、ただちに継続的にデータのプッシュを開始します。

次のサンプルpythonクライアントは、クライアントとデータ・ストリーミング・サービス間の相互作用を示しています:
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'http://name:pswd@localhost:9002/services/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "ws://name:pswd@localhost:9002/services/v2/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
asyncio.get_event_loop().run_until_complete(client())
指定されたクライアント・プログラムでは、データ・ストリーム・リソース・エンドポイントs1の作成時に、ソース・データ証跡名を指定する単純なデータ・ストリーム・ペイロードが提供されます。実際のアプリケーションでは、ストリーミング・プロトコルのハンドシェイク・フェーズで非常に複雑なデータ・ストリーム・ペイロードを使用して、データ・ストリーミング動作を構成できます。
たとえば、次のデータ・ストリーム・リクエスト・ペイロードでは、必要なデータ・ソース証跡名とともにフィルタリング・ルール、エンコーディング形式およびbufferSizeを指定します。
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
データ・ストリーム開始/再開位置
Webソケット接続の確立中に、クライアント側は、データのストリーミングを開始するための開始位置を(Webソケット接続URLの問合せパラメータとして)指定します。開始位置には、次のいずれかの値を指定できます。
-
特殊キーワード"now"
-
特殊キーワード"earliest"
-
ISO 8601形式のタイムスタンプ文字列
-
最後に処理されたLCRの位置
各非メタデータLCRレコードには、不透明な位置(トランザクション内のCSN、XID、レコード番号を含む)が含まれます。クライアント側は、最後に処理されたLCRレコードの位置を維持します。データ・ストリーム・サービスは、指定された開始位置に基づいて正しい開始/再起動ポイントを特定します。
クライアントがデータ・ストリーム・サービスに初めて接続する場合、クライアントはストリーミング・データの開始場所のタイムスタンプを指定する必要があります。キーワードnowは現在のタイムスタンプに変換され、キーワードearliestはタイムスタンプ0に変換されます。
または、begin位置にはISO 8601タイムスタンプ文字列を使用できます。いずれの場合も、データ・ストリーム・サービスは、開始位置を決定するために、ソース証跡に対してタイムスタンプ・ベースの参照を実行します。
これがリカバリ/再起動の場合、クライアントはハンドシェイク中に、最後に処理された保存済位置をデータ・ストリーム・サービスに提供する必要があります。データ・ストリーム・サービスは、開始位置を決定するために、ソース証跡に対して位置ベースの参照を実行します。データ・ストリーミング・リカバリの動作は、データ・ストリームで指定されたQoSレベルにも依存します。
CloudEvents
CloudEventsは、サービス、プラットフォームおよびシステム間の相互運用性を提供するために、共通の形式でイベント・データを記述するための仕様です。Oracle GoldenGateデータ・ストリームは現在JSONデータ・エンコーディングのみをサポートしているため、CloudEvents形式のサポートはJSONイベント形式に制限されます。CloudEventsのJSONイベント形式の完全な仕様は、次の場所にあります。
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
dataフィールドには、Oracle GoldenGate DML/DDL/メタデータ/スキーマ・レコードである元のデータ・レコードが含まれます。{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}データ・ストリームの追加
-
ディストリビューション・サービスのホームページで、「データ・ストリームの追加」(プラス・アイコン)をクリックして「データ・ストリームの追加」ウィザードを開きます。

-
「データ・ストリーム情報」ページで、「名前」ボックスにデータ・ストリーム・プロセス名を入力し、その説明を追加します。「次へ」をクリックします

-
「ソース・オプション」ページで、次のイメージに示すオプションの値を指定します:

-
トレイル名: ソース・トレイル・ファイルの名前。
-
Trailサブディレクトリ: 証跡ファイルが格納されるサブディレクトリのパス。
-
エンコーディング: このオプションは、データ・ストリームのレコードのエンコーディングを制御します。現在、JSONエンコーディングがサポートされています。
-
バッファ・サイズ: この値は、データ・ストリーミング・サービスで使用されるメモリー・バッファ・サイズを制御します。データ・ストリーミング・サービスは、合計バイト・サイズが指定された値を超えた後、インメモリー・メッセージ・キューをフラッシュし、レコードをクライアントに配信します。
-
サービスの品質: このオプションは、データ・ストリームの重複抑制およびリカバリ動作を定義します。データ・ストリーミング・サービスでは、3つのレベルのサービス品質がサポートされています。
-
1回のみ: このモードは、ソース証跡に
RESTART OK/ABENDレコードが表示されるリカバリ時にサービスが重複レコードを除外する最も制限の多いモードです。クライアントに重複レコードは表示されません。最後に処理されたポジションを持つレコードが見つからない場合は、エラーが発生します。 -
少なくとも1回: このモードでは、リカバリ中またはソース証跡の
RESTART OK/ABENDレコードの表示中に重複レコードは抑制されません。クライアントでは、データ・ストリームに重複レコードが表示される場合があります。また、最後に処理されたポジションを持つレコードが見つからない場合は、エラーが発生します。 -
最大1回: このモードでは、リカバリ中またはソース証跡の
RESTART OK/ABENDレコードの表示中に重複レコードが抑制されます。クライアントでは、データ・ストリームに重複するレコードは表示されません。サービスが、最後に処理されたポジションを持つレコードを見つけられない場合、次の使用可能なレコードを見つけ、先に進みます。
-
-
CloudEvents形式: データ・ストリーミング・サービスでは、CloudEvents形式のデータ・レコードの転送がサポートされています。デフォルトでは、この形式は無効になっており、データ・ストリーミング・チャネルの作成時にトグル・スイッチを使用してプロパティを有効にすることで制御できます。
-
-
「フィルタリング・オプション」ページでは、フィルタリング・ルールを含めるオプションと除外するオプションを使用できます:

フィルタリング・ルール・オプションを次のように指定します。-
ルール・アクション: 「除外」または「含む」オプションを選択します。
-
フィルタ・タイプ: フィルタ・タイプには、次のものが含まれます:
Object Type: You can select multiple options from the drop-down list including DML, DDL, INSERT, UPDATE, UPSERT, DELETE.
オブジェクト名: 以前に作成したフィルタリング・オブジェクトの名前。
「追加」をクリックして、データ・ストリーム・プロセスにフィルタリング・ルールを追加します。
-
-
「データ・ストリームの作成」をクリックします。データ・ストリームがリストされているDistribution Serviceホーム・ページに戻ります。
-
新しく作成したデータ・ストリームをクリックして、ストリームのYAMLドキュメントのデータ・ストリーミングAsyncAPI定義を表示します。
データ・ストリーム構成の編集
-
配布サービスの左側のナビゲーション・ペインで、「データ・ストリーム」をクリックします。
-
変更する必要があるデータ・ストリームの名前を選択します。
-
「データ・ストリーム」ページで、「アクション」列を使用して、データ・ストリームの詳細の表示、データ・ストリームの削除、およびフィルタリングの変更を行います。

-
「アクション」メニューから「フィルタの変更」オプションをクリックすると、「データ・ストリーム・フィルタリング・ルールの編集」ダイアログ・ボックスが表示されます。このボックスでは、ルール・アクション(包含、除外)およびフィルタ・タイプ(オブジェクト名またはオブジェクト・タイプ)を変更できます。
-
「追加」をクリックしてフィルタを適用します。
-
「送信」をクリックして、「データ・ストリーム」ページに戻ります。
「名前」列からデータ・ストリームをクリックして、データ・ストリームをさらに編集できます。これにより、完全なデータ・ストリーム構成が表示されます。各構成設定の横にある鉛筆アイコンを使用して、変更します。データ・ストリームで使用されるソース証跡ファイル、フィルタリング・ルールおよびサービス品質を変更できます。YAMLエディタを使用して、データ・ストリーム構成を変更し、YAMLエディタの横にある「変更のアップロード」アイコンを使用して変更をアップロードすることもできます。