Agregar flujos de datos

Oracle GoldenGate 23ai presenta los flujos de datos que pueden simplificar enormemente las rutas de datos para el análisis en tiempo real.

Note:

Este artículo solo se aplica a los despliegues de Oracle GoldenGate 23ai.

Acerca de los flujos de datos

Los flujos de datos de Oracle GoldenGate utilizan la especificación AsyncAPI para definir API asíncronas. Este enfoque permite a las aplicaciones suscribirse de forma eficaz a los flujos de datos mediante un modelo de publicación o suscripción. Las actualizaciones se reciben tan pronto como se confirman los cambios en la base de datos de origen, lo que minimiza la latencia y simplifica el desarrollo de aplicaciones. Además, los flujos de datos de Oracle GoldenGate permiten a los usuarios especificar su formato de datos preferido, como JSON, para una integración perfecta con las herramientas y los marcos existentes en su entorno de desarrollo.

Beneficios para desarrolladores y científicos de datos
  • Ingesta de datos mejorada: el modelo de publicación o suscripción con tecnología de AsyncAPI permite a las aplicaciones recibir actualizaciones de datos en tiempo real de forma eficaz.

  • Formato de datos flexible: los usuarios pueden elegir el formato que prefieran para una integración perfecta con las herramientas existentes.

  • Integración optimizada: AsyncAPI fomenta una interacción fluida con varias aplicaciones y herramientas que suelen utilizar los desarrolladores y científicos de datos.

  • Integridad de datos garantizada: al heredar la solidez principal de Oracle GoldenGate, los flujos de datos garantizan la durabilidad de los datos replicando los cambios a medida que se confirman en la base de datos de origen.

Componentes de los flujos de datos de Oracle GoldenGate

Los componentes de los flujos de datos de Oracle GoldenGate incluyen:
  • API asíncrona

  • Protocolo de flujos de datos

  • Posición de inicio/reinicio de flujos de datos

  • Registros de esquema

  • Formato de CloudEvents

API asíncrona

Oracle GoldenGate Data Streams no tiene en cuenta el lenguaje de programación para que pueda interactuar con un cliente escrito en cualquier lenguaje de programación. Aunque los programas cliente suelen ser simples y pequeños, los usuarios aún necesitan implementar manualmente el código cliente para interactuar con el servicio de transmisión de datos.

La adopción de la especificación AsyncAPI en los flujos de datos de Oracle GoldenGate tiene las siguientes ventajas:

  • Capacidad para describir la API de servicio de flujos de datos en la especificación de API estándar del sector y generar automáticamente documentación de API.

  • Genere automáticamente el código del cliente a través de @asyncapi/generator.

Con el soporte AsyncAPI, los flujos de datos de Oracle GoldenGate simplifican el flujo de datos generando el código de cliente automáticamente. Sigue el modelo de editor y suscriptor y soporta una amplia variedad de protocolos que incluyen websocket, kafka, mqtt, hms y muchos protocolos IOT. Al describir una API controlada por eventos, utiliza el lenguaje de modelado YAML y sigue una sintaxis similar para la especificación OpenAPI. Por ejemplo, a continuación se muestra un fragmento del documento AsyncAPI yaml que describe las definiciones AsyncAPI de Data Streaming:

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: | allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

Cuando se crea un recurso de flujos de datos, se devuelve un enlace URL a un documento de especificación de API asíncrona personalizado que describe cómo acceder a este punto final de flujo de datos en la respuesta HTTP. Este documento YAML se puede utilizar para generar el código del lado del cliente mediante @asyncapi/generator.

Tenga en cuenta que para admitir el protocolo WebSocket en @asyncapi/generator, también debe implementar/mantener la plantilla de cliente WebSocket para @asyncapi/generator en GitHub. Consulte el repositorio GitHub para obtener más información sobre la plantilla websocket-client:

https://github.com/tianshu-orcl/websocket-client-template.git

Protocolo de flujos de datos

Con los flujos de datos de Oracle GoldenGate, el acceso directo a los datos en el formato especificado por el usuario se activa a través de un canal websocket dedicado que sigue un protocolo de flujo simple.

El protocolo de flujos de datos utiliza el modo push para enviar datos al cliente. El cliente primero crea un recurso de flujo en el servidor mediante la solicitud HTTP RESTful. Después de crear el recurso de flujo, el cliente establece una conexión WebSocket a través del punto final del recurso de flujo. Una vez establecido el canal WebSocket, los flujos de datos comienzan a transferir los datos de forma inmediata y continua sin esperar la respuesta o el reconocimiento del cliente.


Protocolo de flujos de datos

El siguiente ejemplo de cliente python ilustra la interacción entre el cliente y el servicio de transmisión de datos:

import asyncio
import requests
import websockets
import json

async def client():
    ### create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)

### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())

En el programa de cliente proporcionado, se proporciona una carga útil de flujo de datos simple que especifica el nombre de la pista de datos de origen al crear el punto final de recurso de flujo de datos s1. En una aplicación del mundo real, se pueden utilizar cargas útiles muy complicadas de Data Stream durante la fase de establecimiento de comunicación del protocolo de transmisión para configurar el comportamiento de transmisión de datos.

Por ejemplo, la siguiente carga útil de solicitud de flujo de datos especifica las reglas de filtrado, el formato de codificación y bufferSize junto con el nombre de pista de origen de datos necesario.

{   
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : “json",
    "bufferSize" : 2097152
}
Posición de inicio/reinicio de flujos de datos

Durante el establecimiento de la conexión de websocket, el cliente especifica la posición begin (como parámetro de consulta en la URL de conexión de websocket) para iniciar el flujo de datos. La posición de inicio puede ser uno de los siguientes valores:

  • Palabra clave especial "now"

  • Palabra clave especial "earliest"

  • Cadena de registro de hora de formato ISO 8601

  • Última posición de ratio de cobertura de liquidez procesada

Cada registro de LCR que no es de metadatos contiene una posición opaca (incluye CSN, XID, número de registro dentro de la transacción). El cliente es responsable de mantener la posición del último registro de LCR procesado. El servicio de flujos de datos es responsable de localizar el punto de inicio/reinicio correcto en función de la posición de inicio determinada.

Si es la primera vez que un cliente se conecta al servicio de flujos de datos, el cliente debe proporcionar un registro de hora de dónde iniciar los datos de transmisión. La palabra clave now se convertirá en el registro de hora actual y la palabra clave earliest se convertirá en el registro de hora 0.

Como alternativa, se puede utilizar una cadena de registro de hora ISO 8601 para la posición begin. En todos los casos, el servicio de flujos de datos realiza una consulta basada en registro de hora en la pista de origen para determinar la posición de inicio.

Si este es el caso de recuperación/reinicio, el cliente debe proporcionar la última posición procesada guardada al servicio de flujos de datos durante el establecimiento de comunicación. El servicio de flujos de datos realizará una consulta basada en posición en la pista de origen para determinar la posición de inicio. El comportamiento de la recuperación de flujo de datos también depende del nivel QoS especificado en el flujo de datos.

CloudEvents

CloudEvents es una especificación para describir los datos de eventos en formatos comunes para proporcionar interoperabilidad entre servicios, plataformas y sistemas. Dado que actualmente Oracle GoldenGate Data Streams solo soporta la codificación de datos JSON, el soporte para el formato CloudEvents está limitado al formato de evento JSON. La especificación completa para el formato de evento JSON para CloudEvents se puede encontrar en:

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

El formato CloudEvents define la lista de atributos para describir el evento, esencialmente un sobre con un juego de atributos obligatorios y opcionales. Cuando el formato CloudEvents está activado en Oracle GoldenGate Data Streams, los registros JSON finales tendrán un aspecto similar al siguiente, donde el campo data contiene los registros de datos originales, que son los registros DML/DDL/metadata/schema de Oracle GoldenGate.
{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {…}
}

Agregar flujos de datos

Los flujos de datos se crean desde el servicio de distribución. Conéctese al servicio de distribución para empezar a crear un proceso de flujo de datos. Estos son los pasos para crear un flujo de datos:
  1. En la página de inicio del servicio de distribución, Agregar flujos de datos (icono más) para abrir el asistente Agregar flujo de datos.


    Página inicial del servicio de distribución con la sección Data Streams

  2. En la página Información de flujo de datos, introduzca un nombre de proceso de flujo de datos en el cuadro Nombre y agregue una descripción para él. Haga clic en Siguiente.


    Cuadro de diálogo Add Data Streams: página Data Stream Information

  3. En la página Opciones de origen, proporcione los valores de las opciones que se muestran en la siguiente imagen:


    Cuadro de diálogo Data Stream: página Source Options

    • Nombre de pista: nombre del archivo de pista de origen.

    • Subdirectorio de pista: ruta del subdirectorio donde se almacenan los archivos de pista.

    • Codificación: esta opción controla la codificación de registros para el flujo de datos. Actualmente, se admite la codificación JSON.

    • Tamaño de buffer: este valor controla el tamaño del buffer de memoria utilizado en el servicio de transmisión de datos. El servicio de transmisión de datos vaciará la cola de mensajes en memoria después de que el tamaño total de bytes supere el valor especificado y entregue los registros al cliente.

    • Calidad del servicio: esta opción define el comportamiento de supresión y recuperación de duplicados del flujo de datos. El servicio de transmisión de datos admite tres niveles de calidad de servicio.
      • Exactamente una vez: este modo es el modo más restrictivo en el que el servicio filtra cualquier registro duplicado durante la recuperación de ver un registro RESTART OK/ABEND en las pistas de origen. Los clientes no verán registros duplicados. Si el servicio no puede localizar el registro con la última posición procesada proporcionada, se produce un error.

      • Al menos una vez: este modo no suprime los registros duplicados durante la recuperación ni ve un registro RESTART OK/ABEND en la pista de origen. Los clientes pueden ver registros duplicados en el flujo de datos. Además, si el servicio no puede localizar el registro con la última posición procesada proporcionada, se produce un error.

      • Como máximo una vez: este modo suprime los registros duplicados durante la recuperación o ve un registro RESTART OK/ABEND en la pista de origen. Los clientes no verán registros duplicados en el flujo de datos. Si el servicio no puede localizar el registro con la última posición procesada indicada, encontrará el siguiente registro disponible y avanzará.

    • Formato CloudEvents: el servicio de transmisión de datos admite la transmisión de los registros de datos en formato CloudEvents. Por defecto, este formato está desactivado y se puede controlar activando la propiedad mediante el conmutador al crear el canal de transmisión de datos.

  4. En la página Opciones de filtrado, están disponibles las opciones para incluir y excluir reglas de filtrado:

    Cuadro de diálogo Data Stream y opciones de filtrado disponibles.

    Especifique las opciones de la regla de filtrado de la siguiente forma:
    • Acción de regla: seleccione las opciones Excluir o Incluir.

    • Tipo de filtro: el tipo de filtro incluye lo siguiente:

      Tipo de objeto: puede seleccionar varias opciones de la lista desplegable, incluidas DML, DDL, INSERT, UPDATE, UPSERT, DELETE.

      Nombres de objeto: nombre del objeto de filtrado creado anteriormente.

      Haga clic en Agregar para agregar la regla de filtrado al proceso de flujo de datos.

  5. Haga clic en Crear flujo de datos. Volverá a la página inicial del servicio de distribución donde se muestra el flujo de datos.

  6. Haga clic en el flujo de datos recién creado para ver las definiciones AsyncAPI del flujo de datos del documento de YAML para el flujo.

Editar Configuración de Flujos de Datos

Para editar la configuración de Data Streams:
  1. En el panel de navegación de la izquierda Distribution Service, haga clic en Data Streams.

  2. Suprima el nombre del flujo de datos que se debe modificar.

  3. En la página Flujos de Datos, utilice la columna Acción para ver los detalles del flujo de datos, suprimir un flujo de datos y cambiar su filtrado.

    Edición de la configuración del flujo de datos

  4. Si hace clic en la opción Cambiar filtrado del menú Acción, se muestra el cuadro de diálogo Editar reglas de filtrado de flujo de datos. En este cuadro, puede cambiar las opciones Acción de regla (Incluir, Excluir) y Tipo de filtro (Nombre de objeto o Tipo de objeto).

  5. Haga clic en Agregar para aplicar el filtrado.

  6. Haga clic en Enviar para devolver la página Flujos de Datos.

Puede editar más el flujo de datos haciendo clic en el flujo de datos de la columna Nombre. Muestra la configuración completa del flujo de datos. Utilice el icono de lápiz situado junto a cada configuración para cambiarlo. Puede cambiar el archivo de pista de origen que utiliza un flujo de datos, sus reglas de filtrado y la calidad del servicio. También puede utilizar el editor YAML para cambiar la configuración del flujo de datos y cargar los cambios mediante el icono Cargar Cambios situado junto al editor YAML.