Agregar flujos de datos
Oracle GoldenGate 26ai presenta los flujos de datos que pueden simplificar enormemente las rutas de datos para el análisis en tiempo real.
Nota: Este artículo solo se aplica a los despliegues de OracleGoldenGate 26ai.
Acerca de los flujos de datos
Los flujos de datos de Oracle GoldenGate utilizan la especificación AsyncAPI para definir API asíncronas. Este enfoque permite a las aplicaciones suscribirse de forma eficaz a los flujos de datos mediante un modelo de publicación o suscripción. Las actualizaciones se reciben tan pronto como se confirman los cambios en la base de datos de origen, lo que minimiza la latencia y simplifica el desarrollo de aplicaciones. Además, los flujos de datos de Oracle GoldenGate permiten a los usuarios especificar el formato de datos que prefieran, como JSON, para una integración perfecta con las herramientas y los marcos existentes en su entorno de desarrollo.
Beneficios para desarrolladores y científicos de datos
-
Ingesta de datos mejorada: el modelo de publicación o suscripción con tecnología de AsyncAPI permite a las aplicaciones recibir actualizaciones de datos en tiempo real de forma eficiente.
-
Formato de datos flexible: los usuarios pueden elegir el formato que prefieran para una integración perfecta con las herramientas existentes.
-
Integración optimizada: AsyncAPI fomenta una interacción fluida con varias aplicaciones y herramientas que suelen utilizar desarrolladores y científicos de datos.
-
Integridad de datos garantizada: al heredar la solidez principal de Oracle GoldenGate, los flujos de datos garantizan la durabilidad de los datos replicando los cambios a medida que se confirman en la base de datos de origen.
Componentes de los flujos de datos de Oracle GoldenGate
Los componentes de los flujos de datos de Oracle GoldenGate incluyen:
-
API asíncrona
-
Protocolo de flujos de datos
-
Posición de inicio/reinicio de flujos de datos
-
Registros de esquema
-
Formato de CloudEvents
API asíncrona
Oracle GoldenGate Data Streams no tiene en cuenta el lenguaje de programación para que pueda interactuar con un cliente escrito en cualquier lenguaje de programación. Aunque los programas cliente suelen ser simples y pequeños, los usuarios aún necesitan implementar manualmente el código cliente para interactuar con el servicio de transmisión de datos.
La adopción de la especificación AsyncAPI en los flujos de datos de Oracle GoldenGate tiene las siguientes ventajas:
-
Capacidad para describir la API de servicio de flujos de datos en la especificación de API estándar del sector y generar automáticamente documentación de API.
-
Generar automáticamente código del lado del cliente a través de @asyncapi/generator.
Con el soporte AsyncAPI, Oracle GoldenGate Data Streams simplifica el flujo de datos al generar el código de cliente automáticamente. Sigue el modelo de editor y suscriptor y soporta una amplia variedad de protocolos, incluidos websocket, kafka, mqtt, hms y muchos protocolos IOT. Al describir una API controlada por eventos, utiliza el lenguaje de modelado YAML y sigue una sintaxis similar para la especificación OpenAPI. Por ejemplo, a continuación se muestra un fragmento del documento asyncAPI yaml que describe las definiciones AsyncAPI de Data Streaming:
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: \| allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
Cuando se crea un recurso de flujos de datos, se devuelve un enlace URL a un documento de especificación de API asíncrona personalizado que describe cómo acceder a este punto final de flujo de datos en la respuesta HTTP. Este documento YAML se puede utilizar para generar el código del lado del cliente mediante @asyncapi/generator.
Tenga en cuenta que para soportar el protocolo de websocket en @asyncapi/generator, también debe implantar/mantener la plantilla de cliente de websocket para @asyncapi/generator en GitHub. Consulte el repositorio de GitHub para obtener más información sobre la plantilla websocket-client:
https://github.com/oracle-samples/websocket-client-template
Protocolo de flujos de datos
Con los flujos de datos de Oracle GoldenGate, el acceso directo a los datos en el formato especificado por el usuario se activa a través de un canal websocket dedicado que sigue un protocolo de flujo simple.
El protocolo de flujos de datos utiliza el modo push para enviar datos al cliente. El cliente primero crea un recurso de transmisión en el servidor mediante la solicitud RESTful de HTTP. Después de crear el recurso de flujo, el cliente establece una conexión WebSocket mediante el punto final del recurso de flujo. Una vez establecido el canal de WebSocket, Data Streams comienza a transferir los datos de forma inmediata y continua sin esperar la respuesta o el reconocimiento del cliente.

El siguiente ejemplo de cliente python ilustra la interacción entre el cliente y el servicio de transmisión de datos:
import asyncio
import requests
import websockets
import json
async def client():
###create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
###establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
En el programa de cliente proporcionado, se proporciona una carga útil de flujo de datos simple que especifica el nombre de la pista de datos de origen al crear el punto final de recurso de flujo de datos s1. En una aplicación del mundo real, se pueden utilizar cargas útiles muy complicadas de Data Stream durante la fase de establecimiento de comunicación del protocolo de transmisión para configurar el comportamiento de transmisión de datos.
Por ejemplo, la siguiente carga útil de solicitud de Data Stream especifica las reglas de filtrado, el formato de codificación y el tamaño de buffer junto con el nombre de pista de origen de datos necesario.
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : "json",
"bufferSize" : 2097152
}
Posición de inicio/reinicio de flujos de datos
Durante el establecimiento de la conexión de websocket, el cliente especifica la posición begin (como parámetro de consulta en la URL de conexión de websocket) para iniciar el flujo de datos. La posición de inicio puede ser uno de los siguientes valores:
-
Palabra clave especial "now"
-
Palabra clave especial "earliest"
-
Cadena de registro de hora de formato ISO 8601
-
Última posición de ratio de cobertura de liquidez procesada
Cada registro de LCR que no es de metadatos contiene una posición opaca (incluye CSN, XID, número de registro dentro de la transacción). El cliente es responsable de mantener la posición del último registro de LCR procesado. El servicio de flujos de datos es responsable de localizar el punto de inicio/reinicio correcto en función de la posición de inicio determinada.
Si es la primera vez que un cliente se conecta al servicio de flujos de datos, el cliente debe proporcionar un registro de hora de dónde iniciar los datos de transmisión. La palabra clave now se convertirá en el registro de hora actual y la palabra clave earliest se convertirá en el registro de hora 0.
Como alternativa, se puede utilizar una cadena de registro de hora ISO 8601 para la posición begin. En todos los casos, el servicio de flujos de datos realiza una consulta basada en registro de hora en la pista de origen para determinar la posición de inicio.
Si este es el caso de recuperación/reinicio, el cliente debe proporcionar la última posición procesada guardada al servicio de flujos de datos durante el establecimiento de comunicación. El servicio de flujos de datos realizará una consulta basada en posición en la pista de origen para determinar la posición de inicio. El comportamiento de la recuperación de transmisión de datos también depende del nivel de QoS especificado en el flujo de datos.
API de REST de flujos de datos de Oracle GoldenGate
Puede utilizar las siguientes API de Rest para gestionar flujos de datos de GoldenGate.
-
Crear una nueva configuración de flujo de datos de Oracle GoldenGate
-
Recuperar una configuración de flujo de datos de Oracle GoldenGate existente
-
Actualizar una configuración de flujo de datos de Oracle GoldenGate existente
-
Suprimir una configuración de flujo de datos de Oracle GoldenGate existente
CloudEvents
CloudEvents es una especificación para describir los datos de eventos en formatos comunes para proporcionar interoperabilidad entre servicios, plataformas y sistemas. Dado que actualmente Oracle GoldenGate Data Streams solo soporta la codificación de datos JSON, el soporte para el formato CloudEvents está limitado al formato de evento JSON. La especificación completa para el formato de evento JSON para CloudEvents se puede encontrar en:
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
El formato CloudEvents define la lista de atributos para describir el evento, esencialmente un sobre con un juego de atributos obligatorios y opcionales. Cuando el formato CloudEvents está activado en Oracle GoldenGate Data Streams, los registros JSON finales tendrán un aspecto similar al siguiente, donde el campo datos contiene los registros de datos originales, que son los registros DML/DDL/metadatos/esquema de Oracle GoldenGate.
{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {...}
}
Agregar flujos de datos
Los flujos de datos se crean desde el servicio de distribución. Conéctese al servicio de distribución para empezar a crear un proceso de flujo de datos. Estos son los pasos para crear un flujo de datos:
-
En la página de inicio del servicio de distribución, Agregar flujos de datos (icono más) para abrir el asistente Agregar flujo de datos.

-
En la página Información de flujo de datos, introduzca un nombre de proceso de flujo de datos en el cuadro Nombre y agregue una descripción para él. Seleccione Next (Siguiente).

-
En la página Opciones de origen, proporcione los valores de las opciones que se muestran en la siguiente imagen:

-
Nombre de pista: nombre del archivo de pista de origen.
-
Subdirectorio de pista: ruta del subdirectorio donde se almacenan los archivos de pista.
-
Codificación: esta opción controla la codificación de registros para el flujo de datos. Actualmente, se admite la codificación JSON.
-
Tamaño de buffer: este valor controla el tamaño del buffer de memoria utilizado en el servicio de transmisión de datos. El servicio de transmisión de datos vaciará la cola de mensajes en memoria después de que el tamaño total de bytes supere el valor especificado y entregue los registros al cliente.
-
Calidad del servicio: esta opción define el comportamiento de supresión y recuperación de duplicados del flujo de datos. El servicio de transmisión de datos admite tres niveles de calidad de servicio.
-
Exactamente una vez: este modo es el modo más restrictivo en el que el servicio filtra cualquier registro duplicado durante la recuperación de ver un registro
RESTART OK/ABENDen las pistas de origen. Los clientes no verán registros duplicados. Si el servicio no puede localizar el registro con la última posición procesada proporcionada, se produce un error. -
Al menos una vez: este modo no suprime los registros duplicados durante la recuperación ni ve un registro
RESTART OK/ABENDen la pista de origen. Los clientes pueden ver registros duplicados en el flujo de datos. Además, si el servicio no puede localizar el registro con la última posición procesada proporcionada, se produce un error. -
Como máximo una vez: este modo suprime los registros duplicados durante la recuperación o ve un registro
RESTART OK/ABENDen la pista de origen. Los clientes no verán registros duplicados en el flujo de datos. Si el servicio no puede localizar el registro con la última posición procesada especificada, encontrará el siguiente registro disponible y avanzará.
-
-
Formato de CloudEvents: el servicio de transmisión de datos soporta la transmisión de registros de datos en formato CloudEvents. Por defecto, este formato está desactivado y se puede controlar activando la propiedad mediante el conmutador al crear el canal de transmisión de datos.
-
-
En la página Opciones de filtrado, están disponibles las opciones para incluir y excluir reglas de filtrado:

Especifique las opciones de la regla de filtrado de la siguiente forma:
-
Acción de regla: seleccione las opciones Excluir o Incluir.
-
Tipo de filtro: el tipo de filtro incluye lo siguiente:
Tipo de objeto: puede seleccionar varias opciones de la lista desplegable, incluidas DML, DDL, INSERT, UPDATE, UPSERT, DELETE.
Nombres de objeto: nombre del objeto de filtrado creado anteriormente.
Seleccione Agregar para agregar la regla de filtrado al proceso de flujo de datos.
-
-
Seleccione Crear flujo de datos. Volverá a la página inicial del servicio de distribución donde se muestra el flujo de datos.
-
Seleccione el flujo de datos recién creado para ver las definiciones AsyncAPI del documento YAML Data Streaming para el flujo.
Editar Configuración de Flujos de Datos
Para editar la configuración de Data Streams: 1. En el panel de navegación de la izquierda, seleccione Data Streams.
-
Seleccione el nombre del flujo de datos que se debe modificar.
-
En la página Flujos de Datos, utilice la columna Acción para ver los detalles del flujo de datos, suprimir un flujo de datos y cambiar su filtrado.

-
Si selecciona la opción Cambiar filtrado en el menú Acción, se muestra el cuadro de diálogo Editar reglas de filtrado de flujo de datos. En este cuadro, puede cambiar las opciones Acción de regla (Incluir, Excluir) y Tipo de filtro (Nombre de objeto o Tipo de objeto).
-
Seleccione Agregar para aplicar el filtrado.
-
Seleccione Enviar para devolver la página Flujos de Datos.
Puede editar aún más el flujo de datos seleccionando el flujo de datos en la columna Nombre. Muestra la configuración completa del flujo de datos. Utilice el icono de lápiz junto a cada configuración para cambiarlo. Puede cambiar el archivo de pista de origen que utiliza un flujo de datos, sus reglas de filtrado y la calidad del servicio. También puede utilizar el editor YAML para cambiar la configuración del flujo de datos y cargar los cambios mediante el icono Cargar Cambios situado junto al editor YAML.