新增資料流
關於資料串流
Oracle GoldenGate Data Streams 使用 AsyncAPI 規格來定義非同步 API。此方法可讓應用程式使用「發佈」或「訂閱」模型,有效率地訂閱資料串流。在來源資料庫中確認變更、將延遲降到最低並簡化應用程式開發之後,就會立即收到更新。此外,Oracle GoldenGate Data Streams 可讓使用者指定其偏好的資料格式 (例如 JSON),以便與開發環境內的現有工具和架構緊密整合。
-
增強型資料擷取:由 AsyncAPI 提供的發布或訂閱模型可讓應用程式有效率地接收即時資料更新。
-
彈性資料格式:使用者可以選擇偏好的格式,與現有工具緊密整合。
-
簡化整合:AsyncAPI 可促進開發人員和資料科學家經常使用的各種應用程式和工具的順暢互動。
-
保證的資料完整性:繼承 Oracle GoldenGate 的核心優勢,Data Streams 透過複製來源資料庫中承諾的變更,確保資料持久性。
Oracle GoldenGate 資料串流的元件
-
非同步 API
-
資料串流協定
-
資料流開始 / 重新啟動位置
-
綱要記錄
-
CloudEvents 格式
非同步 API
Oracle GoldenGate Data Streams 提供程式設計語言支援,因此能與以任何程式設計語言撰寫的用戶端互動。即使從屬端程式通常很簡單而且很小,使用者仍然需要手動實行從屬端程式碼,才能與資料串流服務互動。
在 Oracle GoldenGate 資料串流中採用 AsyncAPI 規格有下列優點:
-
能夠以業界標準 API 規格描述資料串流服務 API,並自動產生 API 文件。
-
透過 @asyncapi/generator 自動產生用戶端程式碼。
透過 AsyncAPI 支援,Oracle GoldenGate Data Streams 可自動產生用戶端程式碼,藉此簡化資料串流。它依循發佈者與訂閱者模型,並支援各種協定,包括 Websocket、kafka、mqtt、hms 以及許多 IOT 通訊協定。描述事件導向 API 時,會使用 YAML 模型設計語言,並遵循 OpenAPI 規格的類似語法。例如,以下是描述 Data Streaming AsyncAPI 定義的 AsyncAPI yaml 文件片段:
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
建立資料串流資源時,HTTP 回應中會傳回描述如何存取此資料串流端點之自訂非同步 API 規格文件的 URL 連結。接著,此 YAML 文件可用來使用 @asyncapi/generator 產生從屬端程式碼。
請注意,若要支援 @asyncapi/generator 中的 Websocket 協定,您也必須實作 / 維護 GitHub 中 @asyncapi/generator 的 Websocket 用戶端範本。如需有關 websocket-client-template 的詳細資訊,請參閱 GitHub 儲存區域:
https://github.com/tianshu-orcl/websocket-client-template.git
資料串流協定
使用 Oracle GoldenGate Data Streams 時,會透過遵循簡單串流協定的專用 Webocket 通道,啟用使用者指定格式的資料直接存取。
「資料串流」協定會使用植入模式將資料傳送給從屬端。用戶端會先透過 HTTP RESTful 要求,在伺服器上建立串流資源。串流資源建立後,從屬端會透過串流資源端點建立 WebSocket 連線。建立 WebSocket 通道之後,「資料串流」便會開始立即且持續地發送資料,而不會等待從屬端的回應或認可。

下列範例 python 用戶端說明用戶端與資料串流服務之間的互動:
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
在指定的用戶端程式中,建立資料串流資源端點 s1 時,會提供指定來源資料歷程檔名稱的簡單「資料串流」有效負載。在實際的應用程式中,串流通訊協定的交握階段會使用許多複雜的資料串流有效負載來設定資料串流行為。
例如,下列「資料串流」要求有效負載會指定篩選規則、編碼格式和 bufferSize 以及必要的資料來源歷程檔名稱。
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
資料流開始 / 重新啟動位置
在建立 Websocket 連線時,從屬端會指定 begin 位置 (作為 Websocket 連線 URL 中的查詢參數),以開始串流處理資料。開始位置可以是下列其中一個值:
-
特殊關鍵字 " now "
-
特殊關鍵字 " earliest "
-
ISO 8601 格式時戳字串
-
上次處理的 LCR 位置
每個非中繼資料 LCR 記錄都包含不透明位置 (包括交易內的 CSN、XID、記錄編號)。客戶端負責維護上次處理 LCR 記錄的位置。資料串流服務負責根據指定的 begin 位置來尋找正確的開始 / 重新啟動點。
如果這是從屬端第一次連線至資料串流服務時,從屬端應該提供開始串流資料時所在的時戳。關鍵字 now 將會轉換成目前的時間戳記,並將關鍵字 earliest 轉換為時間戳記 0。
或者,您也可以將 ISO 8601 時間戳記字串用於 begin 位置。在所有情況下,資料串流服務都會對來源軌跡執行以時間戳記為基礎的查尋,以決定開始位置。
如果這是復原 / 重新啟動情況,從屬端就應該在交握式確認期間,將上次處理的位置提供給資料串流服務。資料流服務會在來源軌跡上執行以職位為基礎的查尋,以決定開始位置。資料串流復原的行為也取決於資料串流中指定的 QoS 層次。
CloudEvents
CloudEvents 是以通用格式描述事件資料的規格,以提供跨服務、平台和系統的互通性。由於 Oracle GoldenGate Data Streams 目前僅支援 JSON 資料編碼,因此 CloudEvents 格式的支援僅限於 JSON 事件格式。您可以在下列位置找到 CloudEvents 的 JSON 事件格式完整規格:
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
data
欄位包含原始資料記錄,即 Oracle GoldenGate DML/DDL/ 描述資料 / 綱要記錄。{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}
新增資料流
-
從「分送服務」首頁的新增資料串流 (加上圖示),以開啟新增資料串流精靈。
-
在資料流資訊頁面上,在名稱方塊中輸入資料流處理名稱,然後為其新增描述。按一下下一步。
-
在來源選項頁面上,提供下列影像中所顯示之選項的值:
-
歷程檔名稱:來源歷程檔的名稱。
-
歷程檔子目錄:儲存歷程檔的子目錄路徑。
-
編碼:此選項會控制資料串流的記錄編碼。目前支援 JSON 編碼。
-
緩衝區大小:此值控制資料串流服務中使用的記憶體緩衝區大小。資料串流處理服務將在記憶體內訊息佇列的總位元組大小超過指定的值後清除佇列,並將記錄傳遞給從屬端。
-
服務品質:此選項會定義資料流重複隱藏與復原行為。資料串流服務支援三種等級的服務品質。
-
精確一次:此模式是最具限制性的模式,服務會在復原來源歷程檔中的
RESTART OK/ABEND
記錄時篩選出任何重複的記錄。客戶看不到重複的記錄。如果服務找不到指定最後處理職位的記錄,就會發生錯誤。 -
至少一次:此模式不會在復原期間隱藏重複的記錄,或是在來源歷程檔中查看
RESTART OK/ABEND
記錄。客戶可能會在資料串流中看到重複的記錄。此外,如果服務找不到指定最後處理職位的記錄,就會發生錯誤。 -
最多一次:此模式會在復原期間隱藏重複的記錄,或查看來源歷程檔中的
RESTART OK/ABEND
記錄。從屬端在資料串流中看不到重複的記錄。如果服務找不到具有指定最後處理職位的記錄,就會找到下一個可用記錄並向前移動。
-
-
CloudEvents 格式:資料串流服務支援以 CloudEvents 格式傳輸資料記錄。建立資料串流通道時,預設會停用此格式,而且可以使用切換開關啟用特性來控制此格式。
-
-
在篩選選項頁面上,可以使用包含和排除篩選規則的選項:
指定篩選規則選項,如下所示:-
規則動作:選取「排除」或「包含」選項。
-
篩選類型:篩選類型包含下列項目:
物件類型:您可以從下拉式清單中選取多個選項,包括 DML 、 DDL 、 INSERT 、 UPDATE 、 UPSERT 、 DELETE 。
物件名稱:先前建立之篩選物件的名稱。
按一下新增,即可將篩選規則新增至「資料串流」處理作業。
-
-
按一下建立資料串流。您將會返回列出「資料流」的「分送服務」首頁。
-
按一下新建立的資料流,以檢視串流的 YAML 文件資料流 AsyncAPI 定義。
編輯資料流組態
-
從「分送服務」左側導覽窗格中,按一下「資料串流」。
-
刪除需要修改的資料串流名稱。
-
在「資料串流」頁面中,使用動作資料欄來檢視資料串流詳細資訊、刪除資料串流,以及變更其篩選。
-
如果您按一下動作功能表中的變更篩選選項,就會顯示編輯資料串流篩選規則對話方塊。您可以從此方塊變更規則動作 (包括、排除) 和篩選類型 (物件名稱或物件類型)。
-
按一下新增以套用篩選。
-
按一下送出,即可傳回「資料串流」頁面。
您可以從名稱資料欄按一下資料串流,進一步編輯資料串流。這會顯示完整的資料串流組態。使用每個組態設定旁邊的鉛筆圖示來變更它。您可以變更資料串流所使用的來源歷程檔、篩選規則以及服務品質。您也可以使用 YAML 編輯器,使用「YAML 編輯器」旁邊的上傳變更圖示來變更資料串流組態及上傳變更。