新增資料流

Oracle GoldenGate 26ai 推出資料串流,可高度簡化即時分析的資料路徑。

注意:本文僅適用於 OracleGoldenGate 26ai 部署。

關於資料串流

Oracle GoldenGate Data Streams 使用 AsyncAPI 規格來定義非同步 API。此方法可讓應用程式使用「發佈」或「訂閱」模型,有效率地訂閱資料串流。在來源資料庫中確認變更、將延遲降到最低並簡化應用程式開發之後,就會立即收到更新。此外,Oracle GoldenGate Data Streams 可讓使用者指定其偏好的資料格式 (例如 JSON),以便與開發環境內的現有工具和架構緊密整合。

對開發人員和資料科學家的優勢

Oracle GoldenGate 資料串流的元件

Oracle GoldenGate 資料串流的元件包括:

非同步 API

Oracle GoldenGate Data Streams 提供程式設計語言支援,因此能與以任何程式設計語言撰寫的用戶端互動。即使從屬端程式通常很簡單而且很小,使用者仍然需要手動實行從屬端程式碼,才能與資料串流服務互動。

在 Oracle GoldenGate 資料串流中採用 AsyncAPI 規格具有下列優點:

透過 AsyncAPI 支援,Oracle GoldenGate Data Streams 會自動產生用戶端程式碼,藉此簡化資料串流。它遵循發佈者和訂閱者模型,並支援各種協定,包括 Websocket、kafka、mqtt、hms 以及許多 IOT 通訊協定。描述事件導向 API 時,會使用 YAML 模型設計語言,並遵循 OpenAPI 規格的類似語法。例如,以下是 AsyncAPI yaml 文件的程式碼片段,說明 Data Streaming AsyncAPI 定義:

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: \| allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

建立資料串流資源時,HTTP 回應中會傳回描述如何存取此資料串流端點之自訂非同步 API 規格文件的 URL 連結。接著,此 YAML 文件可用來使用 @asyncapi/generator 產生從屬端程式碼。

請注意,若要支援 @asyncapi/generator 中的 Websocket 通訊協定,您也必須導入 / 維護 GitHub 中 @asyncapi/generator 的 Websocket 用戶端範本。如需有關 websocket-client-template 的詳細資訊,請參閱 GitHub 儲存區域:

https://github.com/oracle-samples/websocket-client-template

資料串流協定

使用 Oracle GoldenGate Data Streams 時,會透過遵循簡單串流協定的專用 Webocket 通道,啟用使用者指定格式的資料直接存取。

「資料串流」協定會使用植入模式將資料傳送給從屬端。用戶端會先透過 HTTP RESTful 要求在伺服器上建立串流資源。建立串流資源之後,從屬端會透過串流處理資源端點建立 WebSocket 連線。建立 WebSocket 通道之後,「資料串流」便會開始立即且連續地發送資料,無須等待從屬端的回應或認可。

資料串流協定

下列範例 python 用戶端說明用戶端與資料串流服務之間的互動:

import asyncio
import requests
import websockets
import json

async def client():
    ###create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)

###establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())

在指定的從屬端程式中,建立資料串流資源端點 s1 時,會提供指定來源資料歷程檔名稱的簡單「資料串流」有效負載。在實際的應用程式中,串流通訊協定的交握階段會使用許多複雜的資料串流有效負載來設定資料串流行為。

例如,下列「資料串流」要求有效負載會指定篩選規則、編碼格式和 bufferSize,以及必要的資料來源歷程檔名稱。

{
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : "json",
    "bufferSize" : 2097152
}

資料流開始 / 重新啟動位置

在建立 Websocket 連線時,從屬端會指定 begin 位置 (作為 Websocket 連線 URL 中的查詢參數),以開始串流處理資料。開始位置可以是下列其中一個值:

每個非中繼資料 LCR 記錄都包含不透明位置 (包括交易內的 CSN、XID、記錄編號)。客戶端負責維護上次處理 LCR 記錄的位置。資料串流服務負責根據指定的 begin 位置來尋找正確的開始 / 重新啟動點。

如果這是從屬端第一次連線至資料串流服務時,從屬端應該提供開始串流資料時所在的時戳。關鍵字 now 將會轉換成目前的時間戳記,並將關鍵字 earliest 轉換為時間戳記 0。

或者,您也可以將 ISO 8601 時間戳記字串用於 begin 位置。在所有情況下,資料串流服務都會對來源軌跡執行以時間戳記為基礎的查尋,以決定開始位置。

如果這是復原 / 重新啟動情況,從屬端就應該在交握式確認期間,將上次處理的位置提供給資料串流服務。資料流服務會在來源軌跡上執行以職位為基礎的查尋,以決定開始位置。資料串流復原的行為也取決於資料串流中指定的 QoS 層次。

Oracle GoldenGate 資料串流 REST API

您可以使用下列 Rest API 管理 GoldenGate 資料串流。

  1. 建立新的 Oracle GoldenGate 資料串流組態

  2. 擷取現有的 Oracle GoldenGate 資料串流組態

  3. 取得資料流資源清單

  4. 更新現有的 Oracle GoldenGate 資料串流組態

  5. 擷取 AsyncAPI YAML 規格

  6. 更新 AsyncAPI YAML 規格

  7. 刪除現有的 Oracle GoldenGate 資料串流組態

雲端事件

CloudEvents 是描述以通用格式顯示事件資料的規格,以提供跨服務、平台和系統的互通性。由於 Oracle GoldenGate Data Streams 目前僅支援 JSON 資料編碼,因此 CloudEvents 格式的支援僅限於 JSON 事件格式。您可以在下列位置找到 CloudEvents 之 JSON 事件格式的完整規格:

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

CloudEvents 格式會定義描述事件的屬性清單,基本上是包含一組必要與選擇性屬性的信封。在 Oracle GoldenGate Data Streams 中啟用 CloudEvents 格式時,最終 JSON 記錄的外觀將與下列類似,其中資料欄位包含原始資料記錄,即 Oracle GoldenGate DML/DDL/ 描述資料 / 綱要記錄。

{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {...}
}

新增資料流

資料流是從「分配服務」建立。請登入「分配服務」以開始建立「資料串流」處理作業。以下是建立資料串流的步驟:

  1. 從「分送服務」首頁的新增資料串流 (加上圖示),以開啟新增資料串流精靈。

    「資料流」頁面

  2. 資料流資訊頁面上,在名稱方塊中輸入資料流處理名稱,然後為其新增描述。選取下一步

    「新增資料流」對話方塊:「資料流資訊」頁面

  3. 來源選項頁面上,提供下列影像中所顯示之選項的值:

    「資料流」對話方塊:「來源選項」頁面

    • 歷程檔名稱:來源歷程檔的名稱。

    • 歷程檔子目錄:儲存歷程檔的子目錄路徑。

    • 編碼:此選項會控制資料串流的記錄編碼。目前支援 JSON 編碼。

    • 緩衝區大小:此值控制資料串流服務中使用的記憶體緩衝區大小。資料串流處理服務會在記憶體內訊息佇列的總位元組大小超過指定的值後清除佇列,並將記錄傳遞給從屬端。

    • 服務品質:此選項會定義資料流重複隱藏與復原行為。資料串流服務支援三種等級的服務品質。

      • 精確一次:此模式是最具限制性的模式,服務會在復原來源歷程檔中查看 RESTART OK/ABEND 記錄時篩選出任何重複的記錄。客戶看不到重複的記錄。如果服務找不到指定最後處理職位的記錄,就會發生錯誤。

      • 至少一次:此模式不會在復原期間隱藏重複的記錄,或是在來源歷程檔中查看 RESTART OK/ABEND 記錄。客戶可能會在資料串流中看到重複的記錄。此外,如果服務找不到指定最後處理職位的記錄,就會發生錯誤。

      • 最多一次:此模式會在復原期間隱藏重複的記錄,或查看來源歷程檔中的 RESTART OK/ABEND 記錄。從屬端在資料串流中看不到重複的記錄。如果服務找不到具有指定最後處理職位的記錄,就會找到下一個可用記錄並向前移動。

    • CloudEvents 格式:資料串流服務支援以 CloudEvents 格式傳輸資料記錄。建立資料串流通道時,預設會停用此格式,而且可以使用切換開關啟用特性來控制此格式。

  4. 篩選選項頁面上,可以使用包含和排除篩選規則的選項:

    「資料流」對話方塊和可用的篩選選項。

    指定篩選規則選項,如下所示:

    • 規則動作:選取「排除」或「包含」選項。

    • 篩選類型:篩選類型包含下列項目:

      物件類型:您可以從下拉式清單中選取多個選項,包括 DMLDDLINSERTUPDATEUPSERTDELETE

      物件名稱:先前建立之篩選物件的名稱。

      選取新增,將篩選規則新增至「資料串流」處理作業。

  5. 選取建立資料串流。您將會返回列出「資料流」的「分送服務」首頁。

  6. 選取新建立的資料流,以檢視串流的 YAML 文件資料流 AsyncAPI 定義。

編輯資料流組態

若要編輯「資料串流」組態,請執行下列動作:1。從左側導覽窗格中選取「資料串流」。

  1. 選取需要修改的資料串流名稱。

  2. 在「資料串流」頁面中,使用動作資料欄來檢視資料串流詳細資訊、刪除資料串流,以及變更其篩選。

    修改資料流篩選。

  3. 如果您從動作功能表中選取變更篩選選項,就會顯示編輯資料串流篩選規則對話方塊。您可以從此方塊變更規則動作 (包括、排除) 和篩選類型 (物件名稱或物件類型)。

  4. 選取新增以套用篩選。

  5. 選取送出,即可傳回「資料串流」頁面。

您可以從名稱資料欄選取資料串流,進一步編輯資料串流。這會顯示完整的資料串流組態。使用每個組態設定旁邊的鉛筆圖示來變更它。您可以變更資料串流所使用的來源歷程檔、篩選規則以及服務品質。您也可以使用 YAML 編輯器,使用「YAML 編輯器」旁邊的上傳變更圖示來變更資料串流組態及上傳變更。