Aggiungi flussi di dati
Oracle GoldenGate 26ai introduce flussi di dati che possono semplificare notevolmente i percorsi di dati per gli analytics in tempo reale.
Nota: questo articolo si applica solo alle distribuzioni OracleGoldenGate 26ai.
Informazioni sui flussi di dati
I flussi di dati di Oracle GoldenGate utilizzano la specifica AsyncAPI per definire le API asincrone. Questo approccio consente alle applicazioni di eseguire in modo efficiente la sottoscrizione ai flussi di dati utilizzando un modello di pubblicazione o sottoscrizione. Gli aggiornamenti vengono ricevuti non appena viene eseguito il commit delle modifiche nel database di origine, riducendo al minimo la latenza e semplificando lo sviluppo delle applicazioni. Inoltre, i flussi di dati di Oracle GoldenGate consentono agli utenti di specificare il formato di dati preferito, ad esempio JSON, per una perfetta integrazione con gli strumenti e i framework esistenti all'interno del proprio ambiente di sviluppo.
Vantaggi per sviluppatori e data scientist
-
Maggiore inclusione dei dati: il modello Pubblica o Sottoscrivi basato su AsyncAPI consente alle applicazioni di ricevere in modo efficiente aggiornamenti dei dati in tempo reale.
-
Formattazione flessibile dei dati: gli utenti possono scegliere il formato preferito per una perfetta integrazione con gli strumenti esistenti.
-
Integrazione semplificata: AsyncAPI favorisce un'interazione fluida con varie applicazioni e strumenti comunemente utilizzati da sviluppatori e data scientist.
-
Integrità garantita dei dati: ereditando la forza di base di Oracle GoldenGate, i flussi di dati garantiscono la durabilità dei dati replicando le modifiche di cui viene eseguito il commit nel database di origine.
Componenti dei flussi di dati di Oracle GoldenGate
I componenti dei flussi di dati di Oracle GoldenGate includono:
-
API asincrona
-
Protocollo flussi di dati
-
Posizione inizio/riavvio flussi dati
-
Schema - Record
-
Formato CloudEvents
API asincrona
Oracle GoldenGate Data Streams è un linguaggio di programmazione indipendente in modo che possa interagire con un client scritto in qualsiasi linguaggio di programmazione. Anche se i programmi client sono in genere semplici e piccoli, gli utenti devono ancora implementare manualmente il codice client per interagire con il servizio di streaming dati.
L'adozione della specifica AsyncAPI nei flussi di dati di Oracle GoldenGate presenta i vantaggi riportati di seguito.
-
Possibilità di descrivere l'API del servizio dei flussi di dati nelle specifiche API standard del settore e di generare automaticamente la documentazione API.
-
Genera automaticamente codice lato client tramite @asyncapi/generator.
Grazie al supporto AsyncAPI, Oracle GoldenGate Data Streams semplifica lo streaming dei dati generando automaticamente il codice client. Segue il modello dell'editore e dell'abbonato e supporta un'ampia varietà di protocolli, tra cui websocket, kafka, mqtt, hms e molti protocolli IOT. Quando descrive un'API basata su eventi, utilizza il linguaggio di modellazione YAML e segue una sintassi simile per le specifiche OpenAPI. Di seguito, ad esempio, è riportato uno snippet del documento yaml AsyncAPI che descrive le definizioni AsyncAPI di Data Streaming:
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: \| allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
Quando viene creata una risorsa dei flussi di dati, viene restituito un collegamento URL a un documento di specifica API asincrona personalizzato che descrive come accedere a questo endpoint del flusso di dati nella risposta HTTP. Questo documento YAML può quindi essere utilizzato per generare il codice lato client utilizzando @asyncapi/generator.
Tenere presente che per supportare il protocollo Websocket in @asyncapi/generator, è inoltre necessario implementare/gestire il modello client Websocket per @asyncapi/generator in GitHub. Consultare il repository GitHub per ulteriori informazioni sul modello Websocket-client:
https://github.com/oracle-samples/websocket-client-template
Protocollo flussi di dati
Con Oracle GoldenGate Data Streams, l'accesso diretto ai dati nel formato specificato dall'utente viene abilitato tramite un canale WebSocket dedicato che segue un protocollo di streaming semplice.
Il protocollo Data Streams utilizza la modalità push per inviare i dati al client. Il client crea prima una risorsa di streaming sul server tramite richiesta HTTP RESTful. Dopo la creazione della risorsa di streaming, il client stabilisce una connessione WebSocket tramite l'endpoint della risorsa di streaming. Una volta stabilito il canale WebSocket, Data Streams inizia a inviare i dati immediatamente e continuamente senza attendere la risposta o il riconoscimento da parte del client.

Il seguente client python di esempio illustra l'interazione tra il client e il servizio di streaming dati:
import asyncio
import requests
import websockets
import json
async def client():
###create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
###establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
Nel programma client specificato, viene fornito un payload Data Stream semplice che specifica il nome dello storico dati di origine durante la creazione dell'endpoint risorsa flusso dati s1. In un'applicazione del mondo reale, i payload Data Stream possono essere utilizzati durante la fase di stretta di mano del protocollo di streaming per configurare il comportamento di streaming dei dati.
Ad esempio, il payload di richiesta Data Stream riportato di seguito specifica le regole di filtro, il formato di codifica e la dimensione buffer insieme al nome trail dell'origine dati richiesto.
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : "json",
"bufferSize" : 2097152
}
Posizione inizio/riavvio flussi dati
Durante l'istituzione della connessione WebSocket, il lato client specifica la posizione di inizio (come parametro di query nell'URL della connessione WebSocket) per avviare lo streaming dei dati. La posizione iniziale può essere uno dei seguenti valori:
-
Parola chiave speciale "now"
-
Parola chiave speciale "earliest"
-
Stringa indicatore orario formato ISO 8601
-
Ultima posizione LCR elaborata
Ogni record LCR non metadati contiene una posizione opaca (include CSN, XID, numero di record all'interno della transazione). Il lato cliente è responsabile della gestione della posizione dell'ultimo record LCR elaborato. Il servizio di flussi di dati è responsabile dell'individuazione del punto di inizio/riavvio corretto in base alla posizione iniziale specificata.
Se questa è la prima volta che un client si connette al servizio di flussi di dati, il client deve fornire un indicatore orario di dove avviare lo streaming dei dati. La parola chiave now verrà convertita nell'indicatore orario corrente e la parola chiave earliest verrà convertita nell'indicatore orario 0.
In alternativa, è possibile utilizzare una stringa con indicatore orario ISO 8601 per la posizione inizio. In tutti i casi, il servizio di flussi di dati esegue una ricerca basata sull'indicatore orario nel trail di origine per determinare la posizione iniziale.
Se questo è il caso di recupero/riavvio, il client dovrebbe fornire l'ultima posizione elaborata salvata al servizio dei flussi di dati durante l'handshake. Il servizio di flussi di dati eseguirà una ricerca basata sulla posizione nel trail di origine per determinare la posizione iniziale. Il comportamento del recupero dello streaming dei dati dipende anche dal livello QoS specificato nel flusso di dati.
API REST dei flussi di dati di Oracle GoldenGate
È possibile utilizzare le API Rest riportate di seguito per gestire i flussi di dati GoldenGate.
-
Creare una nuova configurazione di Data Stream di Oracle GoldenGate
-
Recuperare una configurazione del flusso di dati Oracle GoldenGate esistente
-
Aggiornare una configurazione del flusso di dati Oracle GoldenGate esistente
-
Eliminare una configurazione del flusso di dati Oracle GoldenGate esistente
Eventi cloud
CloudEvents è una specifica per descrivere i dati degli eventi in formati comuni per fornire interoperabilità tra servizi, piattaforme e sistemi. Poiché Oracle GoldenGate Data Streams attualmente supporta solo la codifica dei dati JSON, il supporto per il formato CloudEvents è limitato al formato evento JSON. La specifica completa per il formato evento JSON per CloudEvents è disponibile all'indirizzo:
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
Il formato CloudEvents definisce l'elenco degli attributi per descrivere l'evento, ovvero un involucro con un set di attributi obbligatori e facoltativi. Quando il formato CloudEvents è abilitato nei flussi di dati di Oracle GoldenGate, i record JSON finali avranno un aspetto simile al seguente, dove il campo data contiene i record di dati originali, ovvero i record DML/DDL/metadati/schema di Oracle GoldenGate.
{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {...}
}
Aggiungi flussi di dati
I flussi di dati vengono creati dal servizio di distribuzione. Eseguire il login al servizio di distribuzione per iniziare a creare un processo Flusso di dati. Di seguito sono riportati i passi per creare un flusso di dati.
-
Nella home page del servizio di distribuzione, Aggiungi flussi di dati (icona più) per aprire la procedura guidata Aggiungi flusso di dati.

-
Nella pagina Informazioni flusso dati immettere un nome di processo Flusso dati nella casella Nome e aggiungere una descrizione. Selezionare Next.

-
Nella pagina Opzioni di origine fornire i valori per le opzioni mostrate nell'immagine seguente:

-
Nome percorso: il nome del file trail di origine.
-
Sottodirectory trail: il percorso della sottodirectory in cui vengono memorizzati i file trail.
-
Codifica: questa opzione controlla la codifica dei record per il flusso di dati. Al momento, la codifica JSON è supportata.
-
Dimensione buffer: questo valore controlla la dimensione del buffer di memoria utilizzata nel servizio di streaming dati. Il servizio di streaming dati scaricherà la coda dei messaggi in-memory dopo che la dimensione totale in byte supera il valore specificato e consegna i record al client.
-
Quality of Service: questa opzione definisce il funzionamento di eliminazione e ripristino dei duplicati del flusso di dati. Nel servizio di streaming dati sono supportati tre livelli di qualità del servizio.
-
Esattamente una volta: questa modalità è la modalità più restrittiva in cui il servizio filtra i record duplicati durante il recupero della visualizzazione di un record
RESTART OK/ABENDnegli trail di origine. I client non visualizzeranno record duplicati. Se il servizio non riesce a individuare il record con l'ultima posizione elaborata specificata, si verifica un errore. -
Almeno una volta: questa modalità non elimina i record duplicati durante il recupero o visualizza un record
RESTART OK/ABENDnello storico origini. I client possono visualizzare record duplicati nel flusso di dati. Inoltre, se il servizio non è in grado di individuare il record con l'ultima posizione elaborata specificata, si verifica un errore. -
Al massimo una volta: questa modalità elimina i record duplicati durante il recupero o visualizza un record
RESTART OK/ABENDnello storico origini. I client non visualizzeranno record duplicati nel flusso di dati. Se il servizio non riesce a individuare il record con l'ultima posizione elaborata specificata, troverà il successivo record disponibile e andrà avanti.
-
-
Formato CloudEvents: il servizio di streaming dati supporta la trasmissione dei record di dati in formato CloudEvents. Per impostazione predefinita, questo formato è disabilitato e può essere controllato abilitando la proprietà mediante lo switch di attivazione/disattivazione durante la creazione del canale di streaming dati.
-
-
Nella pagina Opzioni filtro sono disponibili le opzioni per includere ed escludere le regole di filtro riportate di seguito.

Specificare le opzioni della regola di filtro come indicato di seguito.
-
Azione regola: selezionare le opzioni Escludi o Includi.
-
Tipo di filtro: il tipo di filtro include quanto segue:
Tipo di oggetto: è possibile selezionare più opzioni dall'elenco a discesa, tra cui DML, DDL, INSERT, UPDATE, UPSERT, DELETE.
Nomi oggetto: il nome dell'oggetto filtro creato in precedenza.
Selezionare Aggiungi per aggiungere la regola di filtro al processo Flusso di dati.
-
-
Selezionare Crea flusso dati. Verrà visualizzata di nuovo la home page del servizio di distribuzione in cui è elencato il flusso dati.
-
Selezionare il flusso di dati appena creato per visualizzare le definizioni AsyncAPI di streaming dati del documento YAML per il flusso.
Modifica configurazione flussi dati
Per modificare la configurazione di Data Streams: 1. Dal riquadro di navigazione a sinistra, selezionare Flussi dati.
-
Selezionare il nome del flusso di dati da modificare.
-
Nella pagina Flussi di dati, utilizzare la colonna Azione per visualizzare i dettagli del flusso di dati, eliminare un flusso di dati e modificarne il filtro.

-
Se si seleziona l'opzione Modifica filtro dal menu Azione, viene visualizzata la finestra di dialogo Modifica regole filtro flusso di dati. Da questa casella è possibile modificare l'azione regola (Includi, Escludi) e il tipo di filtro (nome oggetto o tipo di oggetto).
-
Selezionare Aggiungi per applicare il filtro.
-
Selezionare Sottometti per restituire la pagina Flussi di dati.
È possibile modificare ulteriormente il flusso di dati selezionando il flusso di dati dalla colonna Nome. Viene visualizzata la configurazione completa del flusso dati. Utilizzare l'icona a forma di matita accanto a ciascuna impostazione di configurazione per modificarla. È possibile modificare il file trail di origine utilizzato da un flusso di dati, le relative regole di filtro e la qualità del servizio. È inoltre possibile utilizzare l'editor YAML per modificare la configurazione del flusso di dati e caricare le modifiche utilizzando l'icona Carica modifiche accanto all'editor YAML.