Datenstreams hinzufügen
Oracle GoldenGate 23ai führt Datenstreams ein, die Datenpfade für Echtzeitanalysen in hohem Maße vereinfachen können.
Hinweis:
Dieser Artikel gilt nur für Oracle GoldenGate 23ai-Deployments.Verwandte Themen
Datenstreams
Oracle GoldenGate Data Streams verwendet die AsyncAPI-Spezifikation zur Definition asynchroner APIs. Mit diesem Ansatz können Anwendungen Datenstreams effizient mit einem Publish- oder Subscribe-Modell abonnieren. Updates werden empfangen, sobald Änderungen in der Quelldatenbank festgeschrieben werden, wodurch die Latenz minimiert und die Anwendungsentwicklung vereinfacht wird. Darüber hinaus können Benutzer mit Oracle GoldenGate Data Streams ihr bevorzugtes Datenformat angeben, z. B. JSON, um nahtlose Integration mit vorhandenen Tools und Frameworks in ihrer Entwicklungsumgebung zu ermöglichen.
-
Verbesserte Datenaufnahme: Mit dem Modell "Veröffentlichen" oder "Abonnieren" auf Basis von AsyncAPI können Anwendungen Datenaktualisierungen in Echtzeit effizient empfangen.
-
Flexible Datenformatierung: Benutzer können ihr bevorzugtes Format für eine nahtlose Integration mit vorhandenen Tools auswählen.
-
Optimierte Integration: AsyncAPI fördert die reibungslose Interaktion mit verschiedenen Anwendungen und Tools, die häufig von Entwicklern und Data Scientists verwendet werden.
-
Garantierte Datenintegrität: Durch die Übernahme der Kernstärke von Oracle GoldenGate stellen Data Streams die Datendauerhaftigkeit sicher, indem Änderungen repliziert werden, die in der Quelldatenbank festgeschrieben werden.
Komponenten von Oracle GoldenGate-Datenstreams
-
Asynchrone API
-
Data Streams-Protokoll
-
Start-/Neustartposition für Datenstreams
-
Schemadatensätze
-
CloudEvents-Format
Asynchrone API
Oracle GoldenGate Data Streams ist programmiersprachenunabhängig, sodass es mit einem Client interagieren kann, der in einer beliebigen Programmiersprache geschrieben ist. Obwohl die Clientprogramme in der Regel einfach und klein sind, müssen Benutzer den Clientcode weiterhin manuell implementieren, um mit dem Datenstreamingdienst zu interagieren.
Die Übernahme der AsyncAPI-Spezifikation in Oracle GoldenGate Data Streams bietet die folgenden Vorteile:
-
Fähigkeit, die API des Datenstreamservice in einer branchenüblichen API-Spezifikation zu beschreiben und automatisch API-Dokumentation zu generieren.
-
Generieren Sie automatisch clientseitigen Code über @asyncapi/generator.
Mit der Unterstützung von AsyncAPI vereinfacht Oracle GoldenGate Data Streams das Daten-Streaming, indem der Clientcode automatisch generiert wird. Es folgt dem Publisher- und Subscriber-Modell und unterstützt eine Vielzahl von Protokollen, einschließlich Websocket, Kafka, mqtt, hms und vielen IOT-Protokollen. Beim Beschreiben einer ereignisgesteuerten API verwendet sie die YAML-Modelliersprache und befolgt eine ähnliche Syntax für die OpenAPI-Spezifikation. Beispiel: Im Folgenden finden Sie ein Snippet des AsyncAPI-YAML-Dokuments, das die Definitionen von Data Streaming AsyncAPI beschreibt:
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
Wenn eine Datenstreamressource erstellt wird, wird in der HTTP-Antwort ein URL-Link zu einem benutzerdefinierten Async-API-Spezifikationsdokument zurückgegeben, in dem beschrieben wird, wie auf diesen Datenstreamendpunkt zugegriffen wird. Dieses YAML-Dokument kann dann verwendet werden, um den clientseitigen Code mit @asyncapi/generator zu generieren.
Beachten Sie, dass Sie zur Unterstützung des Websocket-Protokolls in @asyncapi/generator auch die Websocket-Clientvorlage für @asyncapi/generator in GitHub implementieren/verwalten müssen. Weitere Informationen zur WebSocket-Client-Vorlage finden Sie im Repository GitHub:
https://github.com/tianshu-orcl/websocket-client-template.git
Data Streams-Protokoll
Bei Oracle GoldenGate Data Streams wird der direkte Zugriff auf die Daten im vom Benutzer angegebenen Format über einen dedizierten Websocket-Kanal aktiviert, der einem einfachen Streamingprotokoll folgt.
Das Data Streams-Protokoll verwendet den Push-Modus, um Daten an den Client zu senden. Der Client erstellt zunächst über die HTTP-RESTful-Anforderung eine Streamingressource auf dem Server. Nachdem die Streamingressource erstellt wurde, stellt der Client eine WebSocket-Verbindung über den Streamingressourcenendpunkt her. Nachdem der WebSocket-Kanal eingerichtet wurde, beginnt Data Streams, die Daten sofort und kontinuierlich zu pushen, ohne auf eine Antwort oder Bestätigung vom Client zu warten.

Der folgende python-Beispielclient veranschaulicht die Interaktion zwischen dem Client und dem Datenstreaming-Service:
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
Im angegebenen Clientprogramm wird beim Erstellen des Datenstreamressourcenendpunkts s1 eine einfache Data Stream-Payload angegeben, die den Namen des Quelldatentrails angibt. In einer realen Anwendung können während der Handshake-Phase des Streamingprotokolls viel komplizierte Data Stream-Payloads verwendet werden, um das Datenstreamingverhalten zu konfigurieren.
Beispiel: Die folgende Data Stream-Anforderungs-Payload gibt die Filterregeln, das Codierungsformat und bufferSize zusammen mit dem erforderlichen Datenquelltrailnamen an.
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
Start-/Neustartposition für Datenstreams
Beim Aufbau der Websocket-Verbindung gibt die Clientseite die Begin-Position (als Abfrageparameter in der Websocket-Verbindungs-URL) an, um das Streaming der Daten zu starten. Die Anfangsposition kann einen der folgenden Werte aufweisen:
-
Sonderschlüsselwort "now"
-
Sonderschlüsselwort "earliest"
-
Zeitstempelzeichenfolge im ISO 8601-Format
-
Zuletzt verarbeitete LCR-Position
Jeder Nicht-Metadaten-LCR-Datensatz enthält eine undurchsichtige Position (einschließlich CSN, XID, Datensatznummer in der Transaktion). Die Kundenseite ist für die Aufrechterhaltung der Position des zuletzt verarbeiteten LCR-Datensatzes verantwortlich. Der Datenstreamservice ist dafür verantwortlich, den richtigen Start-/Neustartpunkt basierend auf der angegebenen Begin-Position zu finden.
Wenn dies das erste Mal ist, dass ein Client eine Verbindung zum Datenstreamservice herstellt, muss der Client einen Zeitstempel für den Start von Streamingdaten bereitstellen. Das Schlüsselwort now wird in den aktuellen Zeitstempel konvertiert und das Schlüsselwort earliest in den Zeitstempel 0 konvertiert.
Alternativ kann eine ISO 8601-Zeitstempelzeichenfolge für die Begin-Position verwendet werden. In allen Fällen führt der Datenstreams-Service einen zeitstempelbasierten Lookup im Quelltrail aus, um die Startposition zu bestimmen.
Wenn dies der Fall ist, muss der Client während des Handshakes die zuletzt verarbeitete gespeicherte Position für den Datenstreams-Service bereitstellen. Der Datenstreamservice führt eine positionsbasierte Suche im Quelltrail durch, um die Startposition zu bestimmen. Das Verhalten des Datenstreaming-Recoverys hängt auch von der QoS-Ebene ab, die im Datenstream angegeben ist.
REST-APIs für Oracle GoldenGate-Datenstreams
Sie können die folgenden Rest-APIs verwenden, um GoldenGate-Datenstreams zu verwalten.
CloudEvents
CloudEvents ist eine Spezifikation für die Beschreibung von Ereignisdaten in gängigen Formaten, um die Interoperabilität über Services, Plattformen und Systeme hinweg bereitzustellen. Da Oracle GoldenGate Data Streams derzeit nur die JSON-Datencodierung unterstützt, ist die Unterstützung für das CloudEvents-Format auf das JSON-Ereignisformat beschränkt. Die vollständige Spezifikation für das JSON-Ereignisformat für CloudEvents finden Sie unter:
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
data
die ursprünglichen Datensätze enthält, d.h. Oracle GoldenGate DML/DDL/metadata/schema-Datensätze.{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}
Datenstreams hinzufügen
-
Klicken Sie auf der Homepage des Verteilungsservice auf Datenstreams hinzufügen (Plussymbol), um den Assistenten Datenstream hinzufügen zu öffnen.
-
Geben Sie auf der Seite Datenstreaminformationen einen Datenstreamprozessnamen in das Feld Name ein, und fügen Sie eine Beschreibung hinzu. Klicken Sie auf Weiter.
-
Geben Sie auf der Seite Quelloptionen die Werte für Optionen an, die in der folgenden Abbildung angezeigt werden:
-
Trailname: Name der Quelltraildatei.
-
Trailunterverzeichnis: Der Pfad des Unterverzeichnisses, in dem Traildateien gespeichert werden.
-
Codierung: Diese Option steuert die Codierung von Datensätzen für den Datenstream. Derzeit wird die JSON-Codierung unterstützt.
-
Puffergröße: Dieser Wert steuert die Speicherpuffergröße, die im Datenstreaming-Service verwendet wird. Der Datenstreamingdienst leert die speicherresidente Nachrichtenwarteschlange, nachdem die Gesamtbytegröße den angegebenen Wert überschreitet und die Datensätze an den Client übermittelt.
-
Servicequalität: Diese Option definiert das Unterdrückungs- und Recovery-Verhalten doppelter Datenstreams. Im Datenstreaming-Service werden drei Quality of Service-Ebenen unterstützt.
-
Exakt einmal: Dieser Modus ist der restriktivste Modus, bei dem der Service doppelte Datensätze beim Recovery herausfiltert, wenn
RESTART OK/ABEND
-Datensätze in den Quelltrails angezeigt werden. Kunden sehen keine doppelten Datensätze. Wenn der Service den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, tritt ein Fehler auf. -
Mindestens einmal: Dieser Modus unterdrückt keine doppelten Datensätze während des Recoverys oder zeigt keine
RESTART OK/ABEND
-Datensätze im Quelltrail an. Clients können doppelte Datensätze im Datenstream sehen. Wenn der Service den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, tritt ein Fehler auf. -
Höchstens einmal: Dieser Modus unterdrückt doppelte Datensätze beim Recovery oder beim Anzeigen von
RESTART OK/ABEND
-Datensätzen im Quelltrail. Clients sehen keine doppelten Datensätze im Datenstream. Wenn der Dienst den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, sucht er den nächsten verfügbaren Datensatz und geht weiter.
-
-
CloudEvents-Format: Der Datenstreaming-Service unterstützt die Übertragung der Datensätze im CloudEvents-Format. Standardmäßig ist dieses Format deaktiviert und kann gesteuert werden, indem die Eigenschaft beim Erstellen des Datenstreamingkanals mit dem Umschalter aktiviert wird.
-
-
Auf der Seite Filteroptionen sind die Optionen zum Ein- und Ausschließen von Filterregeln verfügbar:
Geben Sie die Filterregeloptionen wie folgt an:-
Regelaktion: Wählen Sie die Optionen "Ausschließen" oder "Einschließen".
-
Filtertyp: Der Filtertyp umfasst Folgendes:
Objekttyp: Sie können mehrere Optionen aus der Dropdown-Liste auswählen, einschließlich DML, DDL, INSERT, UPDATE, UPSERT, DELETE.
Objektnamen: Name des zuvor erstellten Filterobjekts.
Klicken Sie auf Hinzufügen, um die Filterregel zum Data Stream-Prozess hinzuzufügen.
-
-
Klicken Sie auf Datenstream erstellen. Sie kehren zur Homepage des Verteilungsservice zurück, auf der der Datenstream aufgeführt ist.
-
Klicken Sie auf den neu erstellten Datenstream, um die Definitionen für das YAML-Dokument Data Streaming AsyncAPI für den Stream anzuzeigen.
Datenstreamkonfiguration bearbeiten
-
Klicken Sie im linken Navigationsbereich von Distribution Service auf {\b Data Streams}.
-
Löschen Sie den Namen des Datenstreams, der geändert werden muss.
-
Verwenden Sie auf der Seite "Datenstreams" die Spalte Aktion, um Datenstreamdetails anzuzeigen, einen Datenstream zu löschen und seine Filterung zu ändern.
-
Wenn Sie im Menü Aktion auf die Option Filterung ändern klicken, wird das Dialogfeld Datenstream-Bedingungsregeln bearbeiten angezeigt. In diesem Feld können Sie die Regelaktion (Einschließen, Ausschließen) und den Filtertyp (Objektname oder Objekttyp) ändern.
-
Klicken Sie auf Hinzufügen, um die Filterung anzuwenden.
-
Klicken Sie auf Weiterleiten, um die Seite "Datenstreams" zurückzugeben.
Sie können den Datenstream weiter bearbeiten, indem Sie in der Spalte Name auf den Datenstream klicken. Dadurch wird die vollständige Datenstreamkonfiguration angezeigt. Verwenden Sie das Stiftsymbol neben jeder Konfigurationseinstellung, um es zu ändern. Sie können die Quelltraildatei ändern, die von einem Datenstream verwendet wird, Filterregeln und Quality of Service. Mit dem YAML-Editor können Sie auch die Datenstreamkonfiguration ändern und Änderungen hochladen, indem Sie das Symbol Änderungen hochladen neben dem YAML-Editor verwenden.