Adicionar Streams de Dados
O Oracle GoldenGate 23ai apresenta fluxos de dados que podem simplificar altamente os caminhos de dados para análise em tempo real.
Observação:
Este artigo só se aplica a implantações do Oracle GoldenGate 23ai.Sobre Fluxos de Dados
O Oracle GoldenGate Data Streams utiliza a especificação AsyncAPI para definir APIs assíncronas. Essa abordagem permite que os aplicativos se inscrevam com eficiência em fluxos de dados usando um modelo Publicar ou Inscrever. As atualizações são recebidas assim que as alterações são confirmadas no banco de dados de origem, minimizando a latência e simplificando o desenvolvimento de aplicativos. Além disso, o Oracle GoldenGate Data Streams permite que os usuários especifiquem seu formato de dados preferido, como JSON, para integração perfeita com ferramentas e estruturas existentes em seu ambiente de desenvolvimento.
-
Ingestão Aprimorada de Dados: O modelo Publicar ou Inscrever com a tecnologia AsyncAPI permite que os aplicativos recebam atualizações de dados em tempo real com eficiência.
-
Formatação Flexível de Dados: os usuários podem escolher seu formato preferido para integração perfeita com as ferramentas existentes.
-
Integração Simplificada: AsyncAPI promove uma interação suave com vários aplicativos e ferramentas comumente usados por desenvolvedores e cientistas de dados.
-
Integridade de Dados Garantida: Herdando a força central do Oracle GoldenGate, o serviço Data Streams garante a durabilidade dos dados replicando as alterações à medida que elas são confirmadas no banco de dados de origem.
Componentes do Oracle GoldenGate Data Streams
-
API assíncrona
-
Protocolo do Serviço Data Streams
-
Posição de Início/Reinício dos Fluxos de Dados
-
Registros de Esquemas
-
Formato CloudEvents
API assíncrona
O Oracle GoldenGate Data Streams é independente da linguagem de programação para que possa interagir com um cliente gravado em qualquer linguagem de programação. Mesmo que os programas clientes sejam geralmente simples e pequenos, os usuários ainda precisam implementar manualmente o código do cliente para interagir com o serviço de streaming de dados.
A adoção da especificação AsyncAPI no Oracle GoldenGate Data Streams tem as seguintes vantagens:
-
Capacidade de descrever a API do serviço de fluxos de dados na especificação de API padrão do setor e gerar automaticamente a documentação da API.
-
Gere automaticamente o código do lado do cliente via @asyncapi/generator.
Com suporte ao AsyncAPI, o Oracle GoldenGate Data Streams simplifica o streaming de dados gerando o código do cliente automaticamente. Ele segue o modelo de editor e assinante e suporta uma ampla variedade de protocolos, incluindo websocket, kafka, mqtt, hms e muitos protocolos IOT. Ao descrever uma API orientada a eventos, ela usa a linguagem de modelagem YAML e segue uma sintaxe semelhante para a especificação OpenAPI. Por exemplo, abaixo há um trecho de código do documento yaml AsyncAPI que descreve as definições do serviço Data Streaming AsyncAPI:
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
Quando um recurso de fluxos de dados é criado, um link de URL para um documento de especificação de API Assíncrona personalizado que descreve como acessar esse ponto final de fluxo de dados é retornado na resposta HTTP. Esse documento YAML pode ser usado para gerar o código do lado do cliente usando @asyncapi/generator.
Observe que, para suportar o protocolo websocket no @asyncapi/generator, você também precisa implementar/manter o modelo do cliente websocket para o @asyncapi/generator no GitHub. Consulte o repositório GitHub para obter mais informações sobre o modelo websocket-client-template:
https://github.com/tianshu-orcl/websocket-client-template.git
Protocolo do Serviço Data Streams
Com o Oracle GoldenGate Data Streams, o acesso direto aos dados no formato especificado pelo usuário é ativado por meio de um canal de websocket dedicado que segue um protocolo de streaming simples.
O protocolo do serviço Data Streams usa o modo de envio para enviar dados ao cliente. O cliente primeiro cria um recurso de streaming no servidor por meio da solicitação HTTP RESTful. Após a criação do recurso de streaming, o cliente estabelece uma conexão WebSocket por meio do ponto final do recurso de streaming. Após o canal WebSocket ser estabelecido, o serviço Data Streams começa a enviar os dados de forma imediata e contínua, sem aguardar resposta ou confirmação do cliente.

O seguinte exemplo de cliente python ilustra a interação entre o cliente e o serviço de streaming de dados:
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
No programa cliente fornecido, um payload simples do Data Stream que especifica o nome da trilha de dados de origem é fornecido ao criar o ponto final do recurso do fluxo de dados s1. Em um aplicativo do mundo real, cargas úteis muito complicadas do Data Stream podem ser usadas durante a fase de handshake do protocolo de streaming para configurar o comportamento de streaming de dados.
Por exemplo, o payload de solicitação do Fluxo de Dados a seguir especifica as regras de filtragem, o formato de codificação e o bufferSize, juntamente com o nome da trilha da origem de dados necessária.
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
Posição de Início/Reinício dos Fluxos de Dados
Durante o estabelecimento da conexão do websocket, o cliente especifica a posição inicial (como um parâmetro de consulta no URL da conexão do websocket) para iniciar o streaming dos dados. A posição inicial pode ser um dos seguintes valores:
-
Palavra-chave especial "now"
-
Palavra-chave especial "primeiro"
-
String de timestamp do formato ISO 8601
-
Última posição LCR processada
Cada registro LCR não de metadados contém uma posição opaca (inclui CSN, XID, número de registro dentro da transação). O cliente é responsável por manter a posição do último registro LCR processado. O serviço de fluxos de dados é responsável por localizar o ponto de início/reinício correto com base na posição inicial fornecida.
Se esta for a primeira vez que um cliente se conecta ao serviço de fluxos de dados, o cliente deverá fornecer um timestamp de onde iniciar o streaming de dados. A palavra-chave now será convertida para o carimbo de data/hora atual e a palavra-chave earliest será convertida para o carimbo de data/hora 0.
Como alternativa, uma string de timestamp ISO 8601 pode ser usada para a posição begin. Em todos os casos, o serviço de fluxos de dados executa uma pesquisa baseada em timestamp na trilha de origem para determinar a posição inicial.
Se este for o caso de recuperação/reinicialização, o cliente deverá fornecer a última posição processada salva ao serviço de fluxos de dados durante o handshake. O serviço de fluxos de dados executará uma pesquisa baseada em posição na trilha de origem para determinar a posição inicial. O comportamento da recuperação de streaming de dados também depende do nível QoS especificado no fluxo de dados.
APIs REST do Oracle GoldenGate Data Streams
Você pode usar as APIs Rest a seguir para gerenciar o Fluxo de Dados GoldenGate.
CloudEvents
CloudEvents é uma especificação para descrever dados de eventos em formatos comuns a fim de fornecer interoperabilidade entre serviços, plataformas e sistemas. Como no momento o Oracle GoldenGate Data Streams só suporta codificação de dados JSON, o suporte para o formato CloudEvents é limitado ao formato de evento JSON. A especificação completa para o Formato de Evento JSON para CloudEvents pode ser encontrada em:
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
data
contém os registros de dados originais, que são registros DML/DDL/metadados/esquema do Oracle GoldenGate.{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}
Adicionar Streams de Dados
-
Na home page do Serviço de Distribuição, Adicionar Streams de Dados (ícone de mais) para abrir o assistente Adicionar Stream de Dados.
-
Na página Informações do Fluxo de Dados, informe um nome de processo do Fluxo de Dados na caixa Nome e adicione uma descrição para ele. Clique emPróximo.
-
Na página Opções de Origem, forneça os valores das opções mostradas na seguinte imagem:
-
Nome da Trilha: Nome do arquivo de trilha de origem.
-
Subdiretório de Trilha: O caminho do subdiretório em que os arquivos de trilha são armazenados.
-
Codificação: Esta opção controla a codificação de registros para o fluxo de dados. Atualmente, a codificação JSON é suportada.
-
Tamanho do Buffer: Esse valor controla o tamanho do buffer de memória usado no serviço de streaming de dados. O serviço de streaming de dados descarregará a fila de mensagens na memória depois que o tamanho total do byte exceder o valor especificado e entregar os registros ao cliente.
-
Qualidade de Serviço: Esta opção define o comportamento duplicado de supressão e recuperação do fluxo de dados. Três níveis de qualidade de serviço são suportados no serviço de streaming de dados.
-
Exatamente Uma Vez: esse modo é o mais restritivo em que o serviço filtra todos os registros duplicados durante a recuperação de ver registros
RESTART OK/ABEND
nas trilhas de origem. Os clientes não verão registros duplicados. Se o serviço não puder localizar o registro com a última posição processada fornecida, ocorrerá um erro. -
Pelo Menos uma Vez: Esse modo não suprime registros duplicados durante a recuperação nem vê um registro
RESTART OK/ABEND
na trilha de origem. Os clientes podem ver registros duplicados no fluxo de dados. Além disso, se o serviço não puder localizar o registro com a última posição processada fornecida, ocorrerá um erro. -
No Máximo Uma Vez: Esse modo suprime registros duplicados durante a recuperação ou vê um registro
RESTART OK/ABEND
na trilha de origem. Os clientes não verão registros duplicados no fluxo de dados. Se o serviço não conseguir localizar o registro com a última posição processada fornecida, ele encontrará o próximo registro disponível e avançará.
-
-
Formato CloudEvents: O serviço de streaming de dados suporta a transmissão dos registros de dados no formato CloudEvents. Por padrão, esse formato é desativado e pode ser controlado ativando a propriedade usando o switch de alternância ao criar o canal de streaming de dados.
-
-
Na página Opções de Filtro, as opções para incluir e excluir regras de filtragem estão disponíveis:
Especifique as opções da regra de filtragem da seguinte forma:-
Ação da Regra: Selecione as opções Excluir ou Incluir.
-
Tipo de Filtro: O tipo de filtro inclui o seguinte:
Tipo de Objeto: você pode selecionar várias opções na lista drop-down, incluindo DML, DDL, INSERT, UPDATE, UPSERT, DELETE.
Nomes de Objeto: Nome do objeto de filtragem criado anteriormente.
Clique em Adicionar para adicionar a regra de filtragem ao processo do Fluxo de Dados.
-
-
Clique em Criar Fluxo de Dados. Você retornará à home page do Serviço de Distribuição na qual o Fluxo de Dados está listado.
-
Clique no Fluxo de Dados recém-criado para exibir as definições AsyncAPI do documento YAML Data Streaming para o fluxo.
Editar Configuração do Serviço Data Streams
-
No painel de navegação esquerdo do Distribution Service, clique em Data Streams.
-
Selecione o nome do fluxo de dados que precisa ser modificado.
-
Na página Fluxos de Dados, use a coluna Ação para exibir detalhes do fluxo de dados, excluir um fluxo de dados e alterar sua filtragem.
-
Se você clicar na opção Alterar Filtragem no menu Ação, a caixa de diálogo Editar Regras de Filtragem do Fluxo de Dados será exibida. Nessa caixa, você pode alterar a Ação da Regra (Incluir, Excluir) e o Tipo de Filtro (Nome do Objeto ou Tipo de Objeto).
-
Clique em Adicionar para aplicar a filtragem.
-
Clique em Submeter para retornar a página Fluxos de Dados.
Você pode editar ainda mais o fluxo de dados clicando no fluxo de dados na coluna Nome. Isso exibe a configuração completa do fluxo de dados. Use o ícone de lápis ao lado de cada definição de configuração para alterá-la. Você pode alterar o arquivo de trilha de origem usado por um fluxo de dados, suas regras de filtragem e a qualidade do serviço. Você também pode usar o editor YAML para alterar a configuração do fluxo de dados e fazer upload de alterações usando o ícone Fazer Upload de Alterações ao lado do Editor YAML.