Ajouter des flux de données
Oracle GoldenGate 23ai introduit les flux de données qui peuvent très simplifier les chemins de données pour les analyses en temps réel.
Note :
Cet article s'applique uniquement aux déploiements d'Oracle GoldenGate 23ai.À propos des flux de données
Oracle GoldenGate Data Streams utilise la spécification AsyncAPI pour définir des API asynchrones. Cette approche permet aux applications de s'abonner efficacement aux flux de données à l'aide d'un modèle de publication ou d'abonnement. Les mises à jour sont reçues dès que les modifications sont validées dans la base de données source, ce qui réduit la latence et simplifie le développement d'applications. En outre, les flux de données Oracle GoldenGate permettent aux utilisateurs de spécifier leur format de données préféré, tel que JSON, pour une intégration transparente aux outils et aux cadres existants dans leur environnement de développement.
-
Ingestion améliorée des données : Le modèle Publier ou s'abonner optimisé par AsyncAPI permet aux applications de recevoir efficacement les mises à jour de données en temps réel.
-
Formatage de données flexible : Les utilisateurs peuvent choisir leur format privilégié pour une intégration transparente aux outils existants.
-
Intégration simplifiée : AsyncAPI favorise une interaction fluide avec diverses applications et outils couramment utilisés par les développeurs et les experts en science des données.
-
Intégrité garantie des données : Héritant de la solidité de base d'Oracle GoldenGate, les flux de données assurent la durabilité des données en répliquant les modifications lors de leur validation dans la base de données source.
Composants des flux de données Oracle GoldenGate
-
API asynchrone
-
Protocole de flux de données
-
Position de début/redémarrage des flux de données
-
Enregistrements de schéma
-
Format CloudEvents
API asynchrone
Oracle GoldenGate Data Streams est indépendant du langage de programmation afin qu'il puisse interagir avec un client écrit dans n'importe quel langage de programmation. Même si les programmes client sont généralement simples et petits, les utilisateurs doivent toujours implémenter manuellement le code client pour interagir avec le service de diffusion de données.
L'adoption de la spécification AsyncAPI dans les flux de données Oracle GoldenGate présente les avantages suivants :
-
Permet de décrire l'API du service de flux de données dans les spécifications d'API standard de l'industrie et de générer automatiquement de la documentation sur l'API.
-
Générez automatiquement du code côté client via @asyncapi/generator.
Avec la prise en charge de AsyncAPI, Oracle GoldenGate Data Streams simplifie la diffusion en continu des données en générant automatiquement le code client. Il suit le modèle de l'éditeur et de l'abonné et prend en charge une grande variété de protocoles, notamment websocket, kafka, mqtt, hms et de nombreux protocoles IOT. Lors de la description d'une API basée sur les événements, elle utilise le langage de modélisation YAML et suit une syntaxe similaire pour la spécification OpenAPI. Par exemple, ci-dessous se trouve un extrait de document AsyncAPI yaml qui décrit les définitions AsyncAPI du service de flux de données :
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
<deployment-url>:
protocol: ws
url: <deployment-url>:<port-number>
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
Lorsqu'une ressource de flux de données est créée, un lien URL vers un document de spécification d'API asynchrone personnalisé décrivant comment accéder à ce point d'extrémité de flux de données est retourné dans la réponse HTTP. Ce document YAML peut ensuite être utilisé pour générer le code côté client à l'aide de @asyncapi/generator.
Notez que pour prendre en charge le protocole Websocket dans @asyncapi/generator, vous devez également mettre en oeuvre/maintenir le modèle de client Websocket pour @asyncapi/generator dans GitHub. Pour plus d'informations sur le modèle websocket-client, reportez-vous au référentiel GitHub :
https://github.com/tianshu-orcl/websocket-client-template.git
Protocole de flux de données
Avec Oracle GoldenGate Data Streams, l'accès direct aux données dans le format spécifié par l'utilisateur est activé au moyen d'un canal Websocket dédié qui suit un protocole de diffusion en continu simple.
Le protocole Data Streams utilise le mode push pour envoyer des données au client. Le client crée d'abord une ressource de diffusion en continu sur le serveur au moyen d'une demande HTTP RESTful. Une fois la ressource de diffusion en continu créée, le client établit une connexion WebSocket au moyen du point d'extrémité de la ressource de diffusion en continu. Une fois le canal WebSocket établi, les flux de données commencent à pousser les données immédiatement et en continu sans attendre la réponse ou l'accusé de réception du client.

L'exemple de client python suivant illustre l'interaction entre le client et le service de diffusion de données en continu :
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())
Dans le programme client indiqué, des données utiles de flux de données simples spécifiant le nom de la piste de données source sont fournies lors de la création du point d'extrémité de ressource de flux de données s1. Dans une application du monde réel, des données utiles de flux de données complexes peuvent être utilisées pendant la phase d'établissement de liaison du protocole de diffusion en continu pour configurer le comportement de diffusion de données.
Par exemple, les données utiles de demande de flux de données suivantes spécifient les règles de filtrage, le format d'encodage et bufferSize, ainsi que le nom de piste de source de données requis.
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
Position de début/redémarrage des flux de données
Lors de l'établissement de la connexion au connecteur logiciel, le côté client indique la position de début (en tant que paramètre d'interrogation dans l'URL de connexion au connecteur logiciel) pour commencer à diffuser les données en continu. La position de début peut être l'une des valeurs suivantes :
-
Mot clé spécial "maintenant"
-
Mot clé spécial "le plus ancien"
-
Chaîne d'horodatage de format ISO 8601
-
Position du ratio de liquidité à court terme la plus récente traitée
Chaque enregistrement LCR non lié aux métadonnées contient une position opaque (comprend CSN, XID, numéro d'enregistrement dans la transaction). Le client est responsable du maintien de la position du dernier enregistrement LCR traité. Le service de flux de données est responsable de localiser le bon point de début/redémarrage en fonction de la position de début indiquée.
Si c'est la première fois qu'un client se connecte au service de flux de données, le client doit fournir un horodatage de l'endroit où commencer la diffusion des données en continu. Le mot clé maintenant sera converti en horodatage courant et le mot clé le plus ancien sera converti en horodatage 0.
Vous pouvez également utiliser une chaîne d'horodatage ISO 8601 pour la position de début. Dans tous les cas, le service de flux de données effectue une consultation basée sur un horodatage sur la piste source pour déterminer la position de début.
S'il s'agit du cas de récupération/redémarrage, le client doit fournir la dernière position traitée enregistrée au service de flux de données lors de l'établissement d'une liaison. Le service de flux de données effectuera une consultation basée sur les postes sur la piste source pour déterminer la position de début. Le comportement de la récupération en continu des données dépend également du niveau QoS spécifié dans le flux de données.
API REST du service de flux de données pour Oracle GoldenGate
Vous pouvez utiliser les API Rest suivantes pour gérer les flux de données GoldenGate.
CloudEvents
CloudEvents est une spécification permettant de décrire les données d'événement dans des formats communs afin d'assurer l'interopérabilité entre les services, les plateformes et les systèmes. Comme les flux de données Oracle GoldenGate ne prennent actuellement en charge que l'encodage de données JSON, la prise en charge du format CloudEvents est limitée au format d'événement JSON. La spécification complète du format d'événement JSON pour CloudEvents est disponible à l'adresse suivante :
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
data
contient les enregistrements de données initiaux, c'est-à-dire les enregistrements LMD/DL/métadonnées/schéma d'Oracle GoldenGate.{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}
Ajouter des flux de données
-
Dans la page d'accueil du service de distribution, ajoutez des flux de données (icône plus) pour ouvrir l'Assistant Ajouter un flux de données.
-
Dans la page Informations sur le flux de données, entrez un nom de processus de flux de données dans la zone Nom et ajoutez une description. Cliquez sur Suivant.
-
Dans la page Source Options (Options sources), indiquez les valeurs des options affichées dans l'image suivante :
-
Nom de la piste : Nom du fichier de piste source.
-
Sous-répertoire de piste : Chemin du sous-répertoire dans lequel les fichiers de piste sont stockés.
-
Encodage : Cette option contrôle l'encodage des enregistrements pour le flux de données. Actuellement, l'encodage JSON est pris en charge.
-
Taille de la mémoire tampon : Cette valeur contrôle la taille de la mémoire tampon utilisée dans le service de diffusion de données en continu. Le service de diffusion de données en continu videra la file d'attente des messages en mémoire une fois que sa taille totale en octets dépassera la valeur spécifiée et remettra les enregistrements au client.
-
Qualité de service : Cette option définit le comportement de suppression et de récupération des doubles du flux de données. Trois niveaux de qualité de service sont pris en charge dans le service de diffusion de données en continu.
-
Exactement une fois : Ce mode est le plus restrictif dans lequel le service filtre les enregistrements en double lors de la récupération des enregistrements
RESTART OK/ABEND
dans les pistes sources. Les clients ne verront pas les enregistrements en double. Si le service ne parvient pas à localiser l'enregistrement avec le dernier poste traité indiqué, une erreur se produit. -
Au moins une fois : Ce mode ne supprime pas les enregistrements en double lors de la récupération ni les enregistrements
RESTART OK/ABEND
visibles dans la piste source. Les clients peuvent voir des enregistrements en double dans le flux de données. De plus, si le service ne peut pas localiser l'enregistrement avec le dernier poste traité indiqué, une erreur se produit. -
Au plus une fois : Ce mode supprime les enregistrements en double lors de la récupération ou la consultation d'enregistrements
RESTART OK/ABEND
dans la piste source. Les clients ne verront pas les enregistrements en double dans le flux de données. Si le service ne parvient pas à localiser l'enregistrement avec le dernier poste traité indiqué, il trouvera l'enregistrement disponible suivant et avancera.
-
-
Format CloudEvents : Le service de diffusion en continu des données prend en charge la transmission des enregistrements de données au format CloudEvents. Par défaut, ce format est désactivé et peut être contrôlé en activant la propriété à l'aide du commutateur, lors de la création du canal de diffusion de données.
-
-
Dans la page Options de filtrage, les options permettant d'inclure et d'exclure des règles de filtrage sont disponibles :
Spécifiez les options de règle de filtrage comme suit :-
Action associée à la règle : Sélectionnez les options Exclure ou Inclure.
-
Type de filtre : Le type de filtre inclut les éléments suivants :
Type d'objet : Vous pouvez sélectionner plusieurs options dans la liste déroulante, notamment LMD, DL, INSÉRER, MISE À jour, UPSERT, DELETE.
Noms d'objet : Nom de l'objet de filtrage créé précédemment.
Cliquez sur Ajouter pour ajouter la règle de filtrage au processus de flux de données.
-
-
Cliquez sur Créer un flux de données. Vous retournerez à la page d'accueil du service de distribution dans laquelle le flux de données est répertorié.
-
Cliquez sur le flux de données nouvellement créé pour voir les définitions AsyncAPI du flux de données du document YAML.
Modifier la configuration des flux de données
-
Dans le volet de navigation de gauche Distribution Service, cliquez sur Data Streams.
-
Supprimer le nom du flux de données à modifier.
-
Dans la page Flux de données, utilisez la colonne Action pour voir les détails du flux de données, supprimer un flux de données et modifier son filtrage.
-
Si vous cliquez sur l'option Modifier le filtrage dans le menu Action, la boîte de dialogue Modifier les règles de filtrage de flux de données s'affiche. À partir de cette case, vous pouvez modifier l'action de règle (inclure, exclure) et le type de filtre (nom d'objet ou type d'objet).
-
Cliquez sur Ajouter pour appliquer le filtrage.
-
Cliquez sur Soumettre pour retourner à la page Flux de données.
Vous pouvez modifier davantage le flux de données en cliquant sur celui-ci dans la colonne Nom. Affiche la configuration complète du flux de données. Utilisez l'icône en forme de crayon en regard de chaque paramètre de configuration pour le modifier. Vous pouvez modifier le fichier de piste source utilisé par un flux de données, ses règles de filtrage et sa qualité de service. Vous pouvez également utiliser l'éditeur YAML pour modifier la configuration du flux de données et charger les modifications à l'aide de l'icône Charger les modifications située à côté de l'éditeur YAML.