Datenstreams hinzufügen

Oracle GoldenGate 26ai führt Data Streams ein, mit denen Datenpfade für Echtzeitanalysen in hohem Maße vereinfacht werden können.

Hinweis: Dieser Artikel gilt nur für OracleGoldenGate 26ai-Deployments.

Datenstreams

Oracle GoldenGate-Datenstreams verwenden die AsyncAPI-Spezifikation zur Definition asynchroner APIs. Mit diesem Ansatz können Anwendungen Datenstreams effizient mit einem Publish- oder Subscribe-Modell abonnieren. Updates werden empfangen, sobald Änderungen in der Quelldatenbank festgeschrieben werden, wodurch die Latenz minimiert und die Anwendungsentwicklung vereinfacht wird. Darüber hinaus können Benutzer mit Oracle GoldenGate Data Streams ihr bevorzugtes Datenformat angeben, z. B. JSON, um nahtlose Integration mit vorhandenen Tools und Frameworks in ihrer Entwicklungsumgebung zu ermöglichen.

Vorteile für Entwickler und Data Scientists

Komponenten von Oracle GoldenGate-Datenstreams

Komponenten von Oracle GoldenGate Data Streams:

Asynchrone API

Oracle GoldenGate Data Streams ist programmiersprachenunabhängig, sodass es mit einem Client interagieren kann, der in einer beliebigen Programmiersprache geschrieben ist. Obwohl die Clientprogramme in der Regel einfach und klein sind, müssen Benutzer den Clientcode weiterhin manuell implementieren, um mit dem Datenstreamingdienst zu interagieren.

Die Übernahme der AsyncAPI-Spezifikation in Oracle GoldenGate Data Streams bietet die folgenden Vorteile:

Mit AsyncAPI-Unterstützung vereinfacht Oracle GoldenGate Data Streams das Daten-Streaming, indem der Clientcode automatisch generiert wird. Es folgt dem Publisher- und Subscriber-Modell und unterstützt eine Vielzahl von Protokollen, einschließlich Websocket, Kafka, mqtt, hms und vielen IOT-Protokollen. Bei der Beschreibung einer ereignisgesteuerten API verwendet sie die YAML-Modelliersprache und befolgt eine ähnliche Syntax für die OpenAPI-Spezifikation. Beispiel: Im Folgenden finden Sie ein Snippet des AsyncAPI-YAML-Dokuments, das die AsyncAPI-Definitionen für Data Streaming beschreibt:

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: \| allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

Wenn eine Datenstreamressource erstellt wird, wird in der HTTP-Antwort ein URL-Link zu einem benutzerdefinierten Async-API-Spezifikationsdokument zurückgegeben, in dem beschrieben wird, wie auf diesen Datenstreamendpunkt zugegriffen wird. Dieses YAML-Dokument kann dann verwendet werden, um den clientseitigen Code mit @asyncapi/generator zu generieren.

Um das WebSocket-Protokoll in @asyncapi/generator zu unterstützen, müssen Sie auch die WebSocket-Clientvorlage für @asyncapi/generator in GitHub implementieren/verwalten. Weitere Informationen zur WebSocket-Client-Vorlage finden Sie im GitHub-Repository:

https://github.com/oracle-samples/websocket-client-template

Data Streams-Protokoll

Bei Oracle GoldenGate Data Streams wird der direkte Zugriff auf die Daten im vom Benutzer angegebenen Format über einen dedizierten Websocket-Kanal aktiviert, der einem einfachen Streamingprotokoll folgt.

Das Data Streams-Protokoll verwendet den Push-Modus, um Daten an den Client zu senden. Der Client erstellt zunächst eine Streaming-Ressource auf dem Server über HTTP RESTful-Anforderung. Nachdem die Streaming-Ressource erstellt wurde, stellt der Client eine WebSocket-Verbindung über den Streaming-Ressourcenendpunkt her. Nachdem der WebSocket-Kanal eingerichtet wurde, beginnt Data Streams, die Daten sofort und kontinuierlich zu übertragen, ohne auf eine Antwort oder Bestätigung vom Client zu warten.

Data Streams-Protokoll

Der folgende python-Beispielclient veranschaulicht die Interaktion zwischen dem Client und dem Datenstreaming-Service:

import asyncio
import requests
import websockets
import json

async def client():
    ###create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)

###establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())

Im angegebenen Clientprogramm wird beim Erstellen des Datenstreamressourcenendpunkts s1 eine einfache Data Stream-Payload angegeben, die den Namen des Quelldatentrails angibt. In einer realen Anwendung können während der Handshake-Phase des Streamingprotokolls viel komplizierte Data Stream-Payloads verwendet werden, um das Datenstreamingverhalten zu konfigurieren.

Beispiel: Die folgende Data Stream-Anforderungs-Payload gibt die Filterregeln, das Codierungsformat und bufferSize zusammen mit dem erforderlichen Datenquelltrailnamen an.

{
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : "json",
    "bufferSize" : 2097152
}

Start-/Neustartposition für Datenstreams

Beim Aufbau der WebSocket-Verbindung gibt die Clientseite die Begin-Position (als Abfrageparameter in der WebSocket-Verbindungs-URL) an, um das Streaming der Daten zu starten. Die Anfangsposition kann einen der folgenden Werte aufweisen:

Jeder Nicht-Metadaten-LCR-Datensatz enthält eine undurchsichtige Position (einschließlich CSN, XID, Datensatznummer in der Transaktion). Die Kundenseite ist für die Aufrechterhaltung der Position des zuletzt verarbeiteten LCR-Datensatzes verantwortlich. Der Datenstreamservice ist dafür verantwortlich, den richtigen Start-/Neustartpunkt basierend auf der angegebenen Begin-Position zu finden.

Wenn dies das erste Mal ist, dass ein Client eine Verbindung zum Datenstreamservice herstellt, muss der Client einen Zeitstempel für den Start von Streamingdaten bereitstellen. Das Schlüsselwort now wird in den aktuellen Zeitstempel konvertiert und das Schlüsselwort earliest in den Zeitstempel 0 konvertiert.

Alternativ kann eine ISO 8601-Zeitstempelzeichenfolge für die Begin-Position verwendet werden. In allen Fällen führt der Datenstreams-Service einen zeitstempelbasierten Lookup im Quelltrail aus, um die Startposition zu bestimmen.

Wenn dies der Fall ist, muss der Client während des Handshakes die zuletzt verarbeitete gespeicherte Position für den Datenstreams-Service bereitstellen. Der Datenstreamservice führt eine positionsbasierte Suche im Quelltrail durch, um die Startposition zu bestimmen. Das Verhalten der Daten-Streaming-Wiederherstellung hängt auch von der QoS-Ebene ab, die im Datenstrom angegeben ist.

REST-APIs für Oracle GoldenGate-Datenstreams

Mit den folgenden Rest-APIs können Sie GoldenGate-Datenstreams verwalten.

  1. Neue Oracle GoldenGate-Datenstreamkonfiguration erstellen

  2. Vorhandene Oracle GoldenGate-Datenstreamkonfiguration abrufen

  3. Ruft eine Liste der Datenstreamressourcen ab

  4. Vorhandene Oracle GoldenGate-Datenstreamkonfiguration aktualisieren

  5. AsyncAPI-YAML-Spezifikation abrufen

  6. AsyncAPI YAML-Spezifikation aktualisieren

  7. Vorhandene Oracle GoldenGate-Datenstreamkonfiguration löschen

Cloud-Events

CloudEvents ist eine Spezifikation für die Beschreibung von Ereignisdaten in gängigen Formaten, um die Interoperabilität über Services, Plattformen und Systeme hinweg bereitzustellen. Da Oracle GoldenGate Data Streams derzeit nur die JSON-Datencodierung unterstützt, ist die Unterstützung für das CloudEvents-Format auf das JSON-Ereignisformat beschränkt. Die vollständige Spezifikation für das JSON-Ereignisformat für CloudEvents finden Sie unter:

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

Das CloudEvents-Format definiert die Liste der Attribute zur Beschreibung des Ereignisses, im Wesentlichen einen Envelope mit einem Set obligatorischer und optionaler Attribute. Wenn das CloudEvents-Format in Oracle GoldenGate Data Streams aktiviert ist, sehen die endgültigen JSON-Datensätze wie folgt aus, wobei das Feld Daten die ursprünglichen Datensätze enthält, d.h. Oracle GoldenGate DML/DDL/metadata/schema-Datensätze.

{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {...}
}

Datenstreams hinzufügen

Datenstreams werden aus dem Verteilungsservice erstellt. Melden Sie sich beim Distribution Service an, um mit der Erstellung eines Data Stream-Prozesses zu beginnen. So erstellen Sie einen Datenstream:

  1. Klicken Sie auf der Homepage des Verteilungsservice auf Datenstreams hinzufügen (Plussymbol), um den Assistenten Datenstream hinzufügen zu öffnen.

    Daten-Streams

  2. Geben Sie auf der Seite Datenstreaminformationen einen Datenstreamprozessnamen in das Feld Name ein, und fügen Sie eine Beschreibung hinzu. Wählen Sie Weiter.

    Dialogfeld {\b Add Data Streams}: Seite {\b Data Stream Information}

  3. Geben Sie auf der Seite Quelloptionen die Werte für Optionen an, die in der folgenden Abbildung angezeigt werden:

    Dialogfeld "Data Stream": Seite "Quelloptionen"

    • Trailname: Name der Quelltraildatei.

    • Trailunterverzeichnis: Der Pfad des Unterverzeichnisses, in dem Traildateien gespeichert werden.

    • Codierung: Diese Option steuert die Codierung von Datensätzen für den Datenstream. Derzeit wird die JSON-Codierung unterstützt.

    • Puffergröße: Dieser Wert steuert die Speicherpuffergröße, die im Daten-Streaming-Service verwendet wird. Der Datenstreamingdienst leert die speicherresidente Nachrichtenwarteschlange, nachdem die Gesamtbytegröße den angegebenen Wert überschreitet und die Datensätze an den Client übermittelt.

    • Servicequalität: Diese Option definiert das Unterdrückungs- und Recovery-Verhalten doppelter Datenstreams. Im Datenstreaming-Service werden drei Quality of Service-Ebenen unterstützt.

      • Exakt einmal: Dieser Modus ist der restriktivste Modus, bei dem der Service doppelte Datensätze beim Recovery herausfiltert, wenn RESTART OK/ABEND-Datensätze in den Quelltrails angezeigt werden. Kunden sehen keine doppelten Datensätze. Wenn der Service den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, tritt ein Fehler auf.

      • Mindestens einmal: Dieser Modus unterdrückt keine doppelten Datensätze während des Recoverys oder zeigt keine RESTART OK/ABEND-Datensätze im Quelltrail an. Clients können doppelte Datensätze im Datenstream sehen. Wenn der Service den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, tritt außerdem ein Fehler auf.

      • Höchstens einmal: Dieser Modus unterdrückt doppelte Datensätze beim Recovery oder beim Anzeigen von RESTART OK/ABEND-Datensätzen im Quelltrail. Clients sehen keine doppelten Datensätze im Datenstream. Wenn der Dienst den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, sucht er den nächsten verfügbaren Datensatz und geht weiter.

    • CloudEvents-Format: Der Datenstreaming-Service unterstützt die Übertragung der Datensätze im CloudEvents-Format. Standardmäßig ist dieses Format deaktiviert und kann gesteuert werden, indem die Eigenschaft beim Erstellen des Datenstreamingkanals mit dem Umschalter aktiviert wird.

  4. Auf der Seite Filteroptionen sind die Optionen zum Ein- und Ausschließen von Filterregeln verfügbar:

    Dialogfeld "Datenstream" und Filteroptionen verfügbar.

    Geben Sie die Filterregeloptionen wie folgt an:

    • Regelaktion: Wählen Sie die Optionen "Ausschließen" oder "Einschließen".

    • Filtertyp: Der Filtertyp umfasst Folgendes:

      Objekttyp: Sie können mehrere Optionen aus der Dropdown-Liste auswählen, einschließlich DML, DDL, INSERT, UPDATE, UPSERT, DELETE.

      Objektnamen: Name des zuvor erstellten Filterobjekts.

      Wählen Sie Hinzufügen, um die Filterregel zum Datenstreamprozess hinzuzufügen.

  5. Wählen Sie Datenstream erstellen aus. Sie kehren zur Homepage des Verteilungsservice zurück, auf der der Datenstream aufgeführt ist.

  6. Wählen Sie den neu erstellten Datenstream aus, um die YAML-Dokumentdatenstreaming-AsyncAPI-Definitionen für den Stream anzuzeigen.

Datenstreamkonfiguration bearbeiten

So bearbeiten Sie die Data Streams-Konfiguration: 1. Wählen Sie im linken Navigationsbereich "Data Streams".

  1. Wählen Sie den Namen des Datenstreams aus, der geändert werden muss.

  2. Verwenden Sie auf der Seite "Datenstreams" die Spalte Aktion, um Datenstreamdetails anzuzeigen, einen Datenstream zu löschen und seine Filterung zu ändern.

    Ändern Sie die Datenstromfilterung.

  3. Wenn Sie die Option Filterung ändern im Menü Aktion auswählen, wird das Dialogfeld Datenstreamfilterregeln bearbeiten angezeigt. In diesem Feld können Sie die Regelaktion (Einschließen, Ausschließen) und den Filtertyp (Objektname oder Objekttyp) ändern.

  4. Wählen Sie Hinzufügen aus, um die Filterung anzuwenden.

  5. Wählen Sie Weiterleiten, um die Seite "Datenstreams" zurückzugeben.

Sie können den Datenstream weiter bearbeiten, indem Sie den Datenstream in der Spalte Name auswählen. Damit wird die vollständige Datenstreamkonfiguration angezeigt. Verwenden Sie das Stiftsymbol neben jeder Konfigurationseinstellung, um es zu ändern. Sie können die Quelltraildatei ändern, die von einem Datenstream verwendet wird, Filterregeln und Quality of Service. Mit dem YAML-Editor können Sie auch die Datenstreamkonfiguration ändern und Änderungen hochladen, indem Sie das Symbol Änderungen hochladen neben dem YAML-Editor verwenden.