Aggiungi flussi di dati
Oracle GoldenGate 23ai introduce flussi di dati in grado di semplificare notevolmente i percorsi di dati per gli analytics in tempo reale.
Nota
Questo articolo si applica solo alle distribuzioni di Oracle GoldenGate 23ai.Argomenti correlati
Informazioni sui flussi di dati
Oracle GoldenGate Data Streams utilizza la specifica AsyncAPI per definire le API asincrone. Questo approccio consente alle applicazioni di eseguire in modo efficiente la sottoscrizione ai flussi di dati utilizzando un modello di pubblicazione o sottoscrizione. Gli aggiornamenti vengono ricevuti non appena viene eseguito il commit delle modifiche nel database di origine, riducendo al minimo la latenza e semplificando lo sviluppo delle applicazioni. Inoltre, i flussi di dati di Oracle GoldenGate consentono agli utenti di specificare il formato di dati preferito, ad esempio JSON, per una perfetta integrazione con gli strumenti e i framework esistenti all'interno del proprio ambiente di sviluppo.
-
Maggiore inclusione dei dati: il modello di pubblicazione o sottoscrizione basato su AsyncAPI consente alle applicazioni di ricevere in modo efficiente aggiornamenti dei dati in tempo reale.
-
Formattazione flessibile dei dati: gli utenti possono scegliere il formato preferito per una perfetta integrazione con gli strumenti esistenti.
-
Integrazione semplificata: AsyncAPI favorisce un'interazione fluida con varie applicazioni e strumenti comunemente utilizzati da sviluppatori e data scientist.
-
Integrità garantita dei dati: ereditando la forza di base di Oracle GoldenGate, i flussi di dati garantiscono la durabilità dei dati replicando le modifiche durante il commit nel database di origine.
Componenti dei flussi di dati di Oracle GoldenGate
-
API asincrona
-
Protocollo flussi di dati
-
Posizione inizio/riavvio flussi dati
-
Schema - Record
-
Formato CloudEvents
API asincrona
Oracle GoldenGate Data Streams è un linguaggio di programmazione indipendente in modo che possa interagire con un client scritto in qualsiasi linguaggio di programmazione. Anche se i programmi client in genere sono semplici e piccoli, gli utenti devono ancora implementare manualmente il codice client per interagire con il servizio di streaming dati.
L'adozione della specifica AsyncAPI nei flussi di dati di Oracle GoldenGate presenta i vantaggi riportati di seguito.
-
Possibilità di descrivere l'API del servizio dei flussi di dati nelle specifiche API standard del settore e di generare automaticamente la documentazione API.
-
Genera automaticamente codice lato client tramite @asyncapi/generator.
Con il supporto di AsyncAPI, Oracle GoldenGate Data Streams semplifica lo streaming dei dati generando automaticamente il codice client. Segue il modello dell'editore e dell'abbonato e supporta un'ampia varietà di protocolli, tra cui websocket, kafka, mqtt, hms e molti protocolli IOT. Quando descrive un'API basata sugli eventi, utilizza il linguaggio di modellazione YAML e segue una sintassi simile per la specifica OpenAPI. Ad esempio, di seguito è riportato uno snippet di documento yaml AsyncAPI che descrive le definizioni AsyncAPI di Data Streaming:
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
Quando viene creata una risorsa dei flussi di dati, viene restituito un collegamento URL a un documento di specifica API asincrona personalizzato che descrive come accedere a questo endpoint del flusso di dati nella risposta HTTP. Questo documento YAML può quindi essere utilizzato per generare il codice lato client utilizzando @asyncapi/generator.
Tenere presente che per supportare il protocollo Websocket in @asyncapi/generator, è inoltre necessario implementare/gestire il modello client Websocket per @asyncapi/generator in GitHub. Per ulteriori informazioni sul modello Websocket-client, consultare il repository GitHub:
https://github.com/tianshu-orcl/websocket-client-template.git
Protocollo flussi di dati
Con Oracle GoldenGate Data Streams, l'accesso diretto ai dati nel formato specificato dall'utente viene abilitato tramite un canale WebSocket dedicato che segue un protocollo di streaming semplice.
Il protocollo Data Streams utilizza la modalità push per inviare i dati al client. Il client crea prima una risorsa di streaming sul server tramite una richiesta HTTP RESTful. Dopo la creazione della risorsa di streaming, il client stabilisce una connessione WebSocket tramite l'endpoint della risorsa di streaming. Una volta stabilito il canale WebSocket, Data Streams inizia a inviare i dati immediatamente e continuamente senza attendere la risposta o il riconoscimento da parte del client.

Il seguente client python di esempio illustra l'interazione tra il client e il servizio di streaming dati:
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
Nel programma client specificato, viene fornito un payload Data Stream semplice che specifica il nome dello storico dati di origine durante la creazione dell'endpoint della risorsa del flusso dati s1. In un'applicazione del mondo reale, i payload Data Stream possono essere utilizzati durante la fase di stretta di mano del protocollo di streaming per configurare il comportamento di streaming dei dati.
Ad esempio, il payload di richiesta Data Stream riportato di seguito specifica le regole di filtro, il formato di codifica e bufferSize insieme al nome trail dell'origine dati richiesto.
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
Posizione inizio/riavvio flussi dati
Durante l'istituzione della connessione websocket, il lato client specifica la posizione begin (come parametro di query nell'URL della connessione websocket) per avviare lo streaming dei dati. La posizione iniziale può essere uno dei seguenti valori:
-
Parola chiave speciale "now"
-
Parola chiave speciale "earliest"
-
Stringa indicatore orario formato ISO 8601
-
Ultima posizione LCR elaborata
Ogni record LCR non metadati contiene una posizione opaca (include CSN, XID, numero di record all'interno della transazione). Il lato cliente è responsabile della gestione della posizione dell'ultimo record LCR elaborato. Il servizio di flussi di dati è responsabile dell'individuazione del punto di inizio/riavvio corretto in base alla posizione iniziale specificata.
Se questa è la prima volta che un client si connette al servizio di flussi di dati, il client deve fornire un indicatore orario di dove avviare lo streaming dei dati. La parola chiave now verrà convertita nell'indicatore orario corrente e la parola chiave earliest verrà convertita nell'indicatore orario 0.
In alternativa, è possibile utilizzare una stringa con indicatore orario ISO 8601 per la posizione inizio. In tutti i casi, il servizio di flussi di dati esegue una ricerca basata sull'indicatore orario nel trail di origine per determinare la posizione iniziale.
Se questo è il caso di recupero / riavvio, il client dovrebbe fornire l'ultima posizione elaborata salvata al servizio flussi di dati durante l'handshake. Il servizio di flussi di dati eseguirà una ricerca basata sulla posizione nel trail di origine per determinare la posizione iniziale. Il funzionamento del recupero dello streaming dei dati dipende anche dal livello QoS specificato nel flusso di dati.
API REST dei flussi di dati di Oracle GoldenGate
È possibile utilizzare le seguenti API Rest per gestire i flussi di dati GoldenGate.
CloudEvents
CloudEvents è una specifica per descrivere i dati degli eventi in formati comuni per fornire interoperabilità tra servizi, piattaforme e sistemi. Poiché Oracle GoldenGate Data Streams attualmente supporta solo la codifica dei dati JSON, il supporto per il formato CloudEvents è limitato al formato evento JSON. La specifica completa per il formato evento JSON per CloudEvents è disponibile all'indirizzo:
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
data
contiene i record di dati originali, ovvero i record DML/DDL/metadati/schema Oracle GoldenGate.{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}
Aggiungi flussi di dati
-
Nella home page del servizio di distribuzione, Aggiungi flussi di dati (icona più) per aprire la procedura guidata Aggiungi flusso di dati.
-
Nella pagina Informazioni flusso dati immettere un nome di processo Flusso dati nella casella Nome e aggiungere una descrizione. Fare clic su Avanti.
-
Nella pagina Opzioni di origine fornire i valori per le opzioni mostrate nell'immagine seguente:
-
Nome percorso: il nome del file trail di origine.
-
Sottodirectory trail: il percorso della sottodirectory in cui sono memorizzati i file trail.
-
Codifica: questa opzione controlla la codifica dei record per il flusso di dati. Al momento, la codifica JSON è supportata.
-
Dimensione buffer: questo valore controlla la dimensione del buffer di memoria utilizzata nel servizio di streaming dati. Il servizio di streaming dati scaricherà la coda dei messaggi in-memory dopo che la dimensione totale in byte supera il valore specificato e consegna i record al client.
-
Quality of Service: questa opzione definisce il funzionamento di eliminazione e ripristino dei duplicati del flusso di dati. Nel servizio di streaming dati sono supportati tre livelli di qualità del servizio.
-
Esattamente una volta: questa modalità è la modalità più restrittiva in cui il servizio filtra i record duplicati durante il recupero della visualizzazione di un record
RESTART OK/ABEND
negli trail di origine. I client non visualizzeranno record duplicati. Se il servizio non riesce a individuare il record con l'ultima posizione elaborata specificata, si verifica un errore. -
Almeno una volta: questa modalità non elimina i record duplicati durante il recupero né visualizza i record
RESTART OK/ABEND
nello storico di origine. I client possono visualizzare record duplicati nel flusso di dati. Inoltre, se il servizio non riesce a individuare il record con l'ultima posizione elaborata specificata, si verifica un errore. -
Al massimo una volta: questa modalità elimina i record duplicati durante il recupero o visualizza un record
RESTART OK/ABEND
nel trail di origine. I client non visualizzeranno record duplicati nel flusso di dati. Se il servizio non riesce a individuare il record con l'ultima posizione elaborata specificata, troverà il successivo record disponibile e andrà avanti.
-
-
Formato CloudEvents: il servizio di streaming dati supporta la trasmissione dei record di dati in formato CloudEvents. Per impostazione predefinita, questo formato è disabilitato e può essere controllato abilitando la proprietà mediante lo switch di attivazione/disattivazione durante la creazione del canale di streaming dati.
-
-
Nella pagina Opzioni filtro sono disponibili le opzioni per includere ed escludere le regole di filtro riportate di seguito.
Specificare le opzioni della regola di filtro come indicato di seguito.-
Azione regola: selezionare le opzioni Escludi o Includi.
-
Tipo di filtro: il tipo di filtro include quanto segue:
Tipo di oggetto: è possibile selezionare più opzioni dall'elenco a discesa, tra cui DML, DDL, INSERT, UPDATE, UPSERT, DELETE.
Nomi oggetto: il nome dell'oggetto filtro creato in precedenza.
Fare clic su Aggiungi per aggiungere la regola all'istruzione di filtraggio dati.
-
-
Fare clic su Crea flusso dati. Verrà visualizzata di nuovo la home page del servizio di distribuzione in cui è elencato il flusso dati.
-
Fare clic sul flusso di dati appena creato per visualizzare le definizioni di Data Streaming AsyncAPI del documento YAML per il flusso.
Modifica configurazione flussi dati
-
Nel riquadro di navigazione a sinistra del servizio di distribuzione, fare clic su Flussi di dati.
-
Consente di eliminare il nome del flusso di dati da modificare.
-
Nella pagina Flussi di dati, utilizzare la colonna Azione per visualizzare i dettagli del flusso di dati, eliminare un flusso di dati e modificarne il filtro.
-
Se si fa clic sull'opzione Modifica filtro nel menu Azione, viene visualizzata la finestra di dialogo Modifica regole filterning flusso dati. Da questa casella è possibile modificare l'azione regola (Includi, Escludi) e il tipo di filtro (nome oggetto o tipo di oggetto).
-
Fare clic su Aggiungi per applicare il filtro.
-
Fare clic su Sottometti per restituire la pagina Flussi di dati.
È possibile modificare ulteriormente il flusso di dati facendo clic sul flusso di dati nella colonna Nome. Viene visualizzata la configurazione completa del flusso dati. Utilizzare l'icona a forma di matita accanto a ciascuna impostazione di configurazione per modificarla. È possibile modificare il file trail di origine utilizzato da un flusso di dati, le relative regole di filtro e la qualità del servizio. È inoltre possibile utilizzare l'editor YAML per modificare la configurazione del flusso di dati e caricare le modifiche utilizzando l'icona Carica modifiche accanto all'editor YAML.