Datenstreams hinzufügen

Oracle GoldenGate 23ai führt Datenstreams ein, die Datenpfade für Echtzeitanalysen in hohem Maße vereinfachen können.

Hinweis:

Dieser Artikel gilt nur für Oracle GoldenGate 23ai-Deployments.

Datenstreams

Oracle GoldenGate Data Streams verwendet die AsyncAPI-Spezifikation zur Definition asynchroner APIs. Mit diesem Ansatz können Anwendungen Datenstreams effizient mit einem Publish- oder Subscribe-Modell abonnieren. Updates werden empfangen, sobald Änderungen in der Quelldatenbank festgeschrieben werden, wodurch die Latenz minimiert und die Anwendungsentwicklung vereinfacht wird. Darüber hinaus können Benutzer mit Oracle GoldenGate Data Streams ihr bevorzugtes Datenformat angeben, z. B. JSON, um nahtlose Integration mit vorhandenen Tools und Frameworks in ihrer Entwicklungsumgebung zu ermöglichen.

Vorteile für Entwickler und Data Scientists
  • Verbesserte Datenaufnahme: Mit dem Modell "Veröffentlichen" oder "Abonnieren" auf Basis von AsyncAPI können Anwendungen Datenaktualisierungen in Echtzeit effizient empfangen.

  • Flexible Datenformatierung: Benutzer können ihr bevorzugtes Format für eine nahtlose Integration mit vorhandenen Tools auswählen.

  • Optimierte Integration: AsyncAPI fördert die reibungslose Interaktion mit verschiedenen Anwendungen und Tools, die häufig von Entwicklern und Data Scientists verwendet werden.

  • Garantierte Datenintegrität: Durch die Übernahme der Kernstärke von Oracle GoldenGate stellen Data Streams die Datendauerhaftigkeit sicher, indem Änderungen repliziert werden, die in der Quelldatenbank festgeschrieben werden.

Komponenten von Oracle GoldenGate-Datenstreams

Komponenten von Oracle GoldenGate Data Streams:
  • Asynchrone API

  • Data Streams-Protokoll

  • Start-/Neustartposition für Datenstreams

  • Schemadatensätze

  • CloudEvents-Format

Asynchrone API

Oracle GoldenGate Data Streams ist programmiersprachenunabhängig, sodass es mit einem Client interagieren kann, der in einer beliebigen Programmiersprache geschrieben ist. Obwohl die Clientprogramme in der Regel einfach und klein sind, müssen Benutzer den Clientcode weiterhin manuell implementieren, um mit dem Datenstreamingdienst zu interagieren.

Die Übernahme der AsyncAPI-Spezifikation in Oracle GoldenGate Data Streams bietet die folgenden Vorteile:

  • Fähigkeit, die API des Datenstreamservice in einer branchenüblichen API-Spezifikation zu beschreiben und automatisch API-Dokumentation zu generieren.

  • Generieren Sie automatisch clientseitigen Code über @asyncapi/generator.

Mit der Unterstützung von AsyncAPI vereinfacht Oracle GoldenGate Data Streams das Daten-Streaming, indem der Clientcode automatisch generiert wird. Es folgt dem Publisher- und Subscriber-Modell und unterstützt eine Vielzahl von Protokollen, einschließlich Websocket, Kafka, mqtt, hms und vielen IOT-Protokollen. Beim Beschreiben einer ereignisgesteuerten API verwendet sie die YAML-Modelliersprache und befolgt eine ähnliche Syntax für die OpenAPI-Spezifikation. Beispiel: Im Folgenden finden Sie ein Snippet des AsyncAPI-YAML-Dokuments, das die Definitionen von Data Streaming AsyncAPI beschreibt:

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: | allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

Wenn eine Datenstreamressource erstellt wird, wird in der HTTP-Antwort ein URL-Link zu einem benutzerdefinierten Async-API-Spezifikationsdokument zurückgegeben, in dem beschrieben wird, wie auf diesen Datenstreamendpunkt zugegriffen wird. Dieses YAML-Dokument kann dann verwendet werden, um den clientseitigen Code mit @asyncapi/generator zu generieren.

Beachten Sie, dass Sie zur Unterstützung des Websocket-Protokolls in @asyncapi/generator auch die Websocket-Clientvorlage für @asyncapi/generator in GitHub implementieren/verwalten müssen. Weitere Informationen zur WebSocket-Client-Vorlage finden Sie im Repository GitHub:

https://github.com/tianshu-orcl/websocket-client-template.git

Data Streams-Protokoll

Bei Oracle GoldenGate Data Streams wird der direkte Zugriff auf die Daten im vom Benutzer angegebenen Format über einen dedizierten Websocket-Kanal aktiviert, der einem einfachen Streamingprotokoll folgt.

Das Data Streams-Protokoll verwendet den Push-Modus, um Daten an den Client zu senden. Der Client erstellt zunächst über die HTTP-RESTful-Anforderung eine Streamingressource auf dem Server. Nachdem die Streamingressource erstellt wurde, stellt der Client eine WebSocket-Verbindung über den Streamingressourcenendpunkt her. Nachdem der WebSocket-Kanal eingerichtet wurde, beginnt Data Streams, die Daten sofort und kontinuierlich zu pushen, ohne auf eine Antwort oder Bestätigung vom Client zu warten.


Data Streams-Protokoll

Der folgende python-Beispielclient veranschaulicht die Interaktion zwischen dem Client und dem Datenstreaming-Service:

import asyncio
import requests
import websockets
import json

async def client():
    ### create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)

### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())

Im angegebenen Clientprogramm wird beim Erstellen des Datenstreamressourcenendpunkts s1 eine einfache Data Stream-Payload angegeben, die den Namen des Quelldatentrails angibt. In einer realen Anwendung können während der Handshake-Phase des Streamingprotokolls viel komplizierte Data Stream-Payloads verwendet werden, um das Datenstreamingverhalten zu konfigurieren.

Beispiel: Die folgende Data Stream-Anforderungs-Payload gibt die Filterregeln, das Codierungsformat und bufferSize zusammen mit dem erforderlichen Datenquelltrailnamen an.

{   
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : “json",
    "bufferSize" : 2097152
}
Start-/Neustartposition für Datenstreams

Beim Aufbau der Websocket-Verbindung gibt die Clientseite die Begin-Position (als Abfrageparameter in der Websocket-Verbindungs-URL) an, um das Streaming der Daten zu starten. Die Anfangsposition kann einen der folgenden Werte aufweisen:

  • Sonderschlüsselwort "now"

  • Sonderschlüsselwort "earliest"

  • Zeitstempelzeichenfolge im ISO 8601-Format

  • Zuletzt verarbeitete LCR-Position

Jeder Nicht-Metadaten-LCR-Datensatz enthält eine undurchsichtige Position (einschließlich CSN, XID, Datensatznummer in der Transaktion). Die Kundenseite ist für die Aufrechterhaltung der Position des zuletzt verarbeiteten LCR-Datensatzes verantwortlich. Der Datenstreamservice ist dafür verantwortlich, den richtigen Start-/Neustartpunkt basierend auf der angegebenen Begin-Position zu finden.

Wenn dies das erste Mal ist, dass ein Client eine Verbindung zum Datenstreamservice herstellt, muss der Client einen Zeitstempel für den Start von Streamingdaten bereitstellen. Das Schlüsselwort now wird in den aktuellen Zeitstempel konvertiert und das Schlüsselwort earliest in den Zeitstempel 0 konvertiert.

Alternativ kann eine ISO 8601-Zeitstempelzeichenfolge für die Begin-Position verwendet werden. In allen Fällen führt der Datenstreams-Service einen zeitstempelbasierten Lookup im Quelltrail aus, um die Startposition zu bestimmen.

Wenn dies der Fall ist, muss der Client während des Handshakes die zuletzt verarbeitete gespeicherte Position für den Datenstreams-Service bereitstellen. Der Datenstreamservice führt eine positionsbasierte Suche im Quelltrail durch, um die Startposition zu bestimmen. Das Verhalten des Datenstreaming-Recoverys hängt auch von der QoS-Ebene ab, die im Datenstream angegeben ist.

CloudEvents

CloudEvents ist eine Spezifikation für die Beschreibung von Ereignisdaten in gängigen Formaten, um die Interoperabilität über Services, Plattformen und Systeme hinweg bereitzustellen. Da Oracle GoldenGate Data Streams derzeit nur die JSON-Datencodierung unterstützt, ist die Unterstützung für das CloudEvents-Format auf das JSON-Ereignisformat beschränkt. Die vollständige Spezifikation für das JSON-Ereignisformat für CloudEvents finden Sie unter:

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

Das CloudEvents-Format definiert die Liste der Attribute zur Beschreibung des Ereignisses, im Wesentlichen ein Envelope mit einem Set obligatorischer und optionaler Attribute. Wenn das Format CloudEvents in Oracle GoldenGate Data Streams aktiviert ist, sehen die endgültigen JSON-Datensätze wie folgt aus, wobei das Feld data die ursprünglichen Datensätze enthält, d.h. Oracle GoldenGate DML/DDL/metadata/schema-Datensätze.
{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {…}
}

Datenstreams hinzufügen

Datenstreams werden aus dem Verteilungsservice erstellt. Melden Sie sich beim Distribution Service an, um mit der Erstellung eines Data Stream-Prozesses zu beginnen. So erstellen Sie einen Datenstream:
  1. Klicken Sie auf der Homepage des Verteilungsservice auf Datenstreams hinzufügen (Plussymbol), um den Assistenten Datenstream hinzufügen zu öffnen.


    Distribution Service-Homepage mit dem Bereich {\b Data Streams}

  2. Geben Sie auf der Seite Datenstreaminformationen einen Datenstreamprozessnamen in das Feld Name ein, und fügen Sie eine Beschreibung hinzu. Klicken Sie auf Weiter.


    Dialogfeld {\b Add Data Streams}: Seite {\b Data Stream Information}

  3. Geben Sie auf der Seite Quelloptionen die Werte für Optionen an, die in der folgenden Abbildung angezeigt werden:


    Dialogfeld "Data Stream": Seite "Quelloptionen"

    • Trailname: Name der Quelltraildatei.

    • Trailunterverzeichnis: Der Pfad des Unterverzeichnisses, in dem Traildateien gespeichert werden.

    • Codierung: Diese Option steuert die Codierung von Datensätzen für den Datenstream. Derzeit wird die JSON-Codierung unterstützt.

    • Puffergröße: Dieser Wert steuert die Speicherpuffergröße, die im Datenstreaming-Service verwendet wird. Der Datenstreamingdienst leert die speicherresidente Nachrichtenwarteschlange, nachdem die Gesamtbytegröße den angegebenen Wert überschreitet und die Datensätze an den Client übermittelt.

    • Servicequalität: Diese Option definiert das Unterdrückungs- und Recovery-Verhalten doppelter Datenstreams. Im Datenstreaming-Service werden drei Quality of Service-Ebenen unterstützt.
      • Exakt einmal: Dieser Modus ist der restriktivste Modus, bei dem der Service doppelte Datensätze beim Recovery herausfiltert, wenn RESTART OK/ABEND-Datensätze in den Quelltrails angezeigt werden. Kunden sehen keine doppelten Datensätze. Wenn der Service den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, tritt ein Fehler auf.

      • Mindestens einmal: Dieser Modus unterdrückt keine doppelten Datensätze während des Recoverys oder zeigt keine RESTART OK/ABEND-Datensätze im Quelltrail an. Clients können doppelte Datensätze im Datenstream sehen. Wenn der Service den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, tritt ein Fehler auf.

      • Höchstens einmal: Dieser Modus unterdrückt doppelte Datensätze beim Recovery oder beim Anzeigen von RESTART OK/ABEND-Datensätzen im Quelltrail. Clients sehen keine doppelten Datensätze im Datenstream. Wenn der Dienst den Datensatz mit der angegebenen zuletzt verarbeiteten Position nicht finden kann, sucht er den nächsten verfügbaren Datensatz und geht weiter.

    • CloudEvents-Format: Der Datenstreaming-Service unterstützt die Übertragung der Datensätze im CloudEvents-Format. Standardmäßig ist dieses Format deaktiviert und kann gesteuert werden, indem die Eigenschaft beim Erstellen des Datenstreamingkanals mit dem Umschalter aktiviert wird.

  4. Auf der Seite Filteroptionen sind die Optionen zum Ein- und Ausschließen von Filterregeln verfügbar:

    Dialogfeld "Datenstream" und Filteroptionen verfügbar.

    Geben Sie die Filterregeloptionen wie folgt an:
    • Regelaktion: Wählen Sie die Optionen "Ausschließen" oder "Einschließen".

    • Filtertyp: Der Filtertyp umfasst Folgendes:

      Objekttyp: Sie können mehrere Optionen aus der Dropdown-Liste auswählen, einschließlich DML, DDL, INSERT, UPDATE, UPSERT, DELETE.

      Objektnamen: Name des zuvor erstellten Filterobjekts.

      Klicken Sie auf Hinzufügen, um die Filterregel zum Data Stream-Prozess hinzuzufügen.

  5. Klicken Sie auf Datenstream erstellen. Sie kehren zur Homepage des Verteilungsservice zurück, auf der der Datenstream aufgeführt ist.

  6. Klicken Sie auf den neu erstellten Datenstream, um die Definitionen für das YAML-Dokument Data Streaming AsyncAPI für den Stream anzuzeigen.

Datenstreamkonfiguration bearbeiten

So bearbeiten Sie die Data Streams-Konfiguration:
  1. Klicken Sie im linken Navigationsbereich von Distribution Service auf {\b Data Streams}.

  2. Löschen Sie den Namen des Datenstreams, der geändert werden muss.

  3. Verwenden Sie auf der Seite "Datenstreams" die Spalte Aktion, um Datenstreamdetails anzuzeigen, einen Datenstream zu löschen und seine Filterung zu ändern.

    Datenstreamkonfiguration bearbeiten

  4. Wenn Sie im Menü Aktion auf die Option Filterung ändern klicken, wird das Dialogfeld Datenstream-Bedingungsregeln bearbeiten angezeigt. In diesem Feld können Sie die Regelaktion (Einschließen, Ausschließen) und den Filtertyp (Objektname oder Objekttyp) ändern.

  5. Klicken Sie auf Hinzufügen, um die Filterung anzuwenden.

  6. Klicken Sie auf Weiterleiten, um die Seite "Datenstreams" zurückzugeben.

Sie können den Datenstream weiter bearbeiten, indem Sie in der Spalte Name auf den Datenstream klicken. Dadurch wird die vollständige Datenstreamkonfiguration angezeigt. Verwenden Sie das Stiftsymbol neben jeder Konfigurationseinstellung, um es zu ändern. Sie können die Quelltraildatei ändern, die von einem Datenstream verwendet wird, Filterregeln und Quality of Service. Mit dem YAML-Editor können Sie auch die Datenstreamkonfiguration ändern und Änderungen hochladen, indem Sie das Symbol Änderungen hochladen neben dem YAML-Editor verwenden.