Ajouter des flux de données

Oracle GoldenGate 23ai introduit des flux de données qui peuvent simplifier considérablement les chemins de données pour les analyses en temps réel.

Remarques :

Cet article s'applique uniquement aux déploiements Oracle GoldenGate 23ai.

A propos des flux de données

Oracle GoldenGate Data Streams utilise la spécification AsyncAPI pour définir des API asynchrones. Cette approche permet aux applications de s'abonner efficacement aux flux de données à l'aide d'un modèle de publication ou d'abonnement. Les mises à jour sont reçues dès que les modifications sont validées dans la base de données source, ce qui réduit la latence et simplifie le développement des applications. En outre, les flux de données Oracle GoldenGate permettent aux utilisateurs d'indiquer leur format de données préféré, tel que JSON, pour une intégration transparente avec les outils et structures existants dans leur environnement de développement.

Avantages pour les développeurs et les data scientists
  • Ingestion améliorée de données : le modèle de publication ou d'abonnement optimisé par AsyncAPI permet aux applications de recevoir efficacement des mises à jour de données en temps réel.

  • Formatage flexible des données : les utilisateurs peuvent choisir leur format préféré pour une intégration transparente avec les outils existants.

  • Intégration simplifiée : AsyncAPI favorise une interaction fluide avec diverses applications et outils couramment utilisés par les développeurs et les data scientists.

  • Intégrité des données garantie : héritant de la force principale d'Oracle GoldenGate, Data Streams assure la durabilité des données en répliquant les modifications au fur et à mesure de leur validation dans la base de données source.

Composants des flux de données Oracle GoldenGate

Les composants d'Oracle GoldenGate Data Streams sont les suivants :
  • API asynchrone

  • Protocole Data Streams

  • Position de démarrage/redémarrage des flux de données

  • Enregistrements de schéma

  • Format CloudEvents

API asynchrone

Oracle GoldenGate Data Streams fonctionne avec tous les langages de programmation afin de pouvoir interagir avec un client écrit dans n'importe quel langage de programmation. Même si les programmes client sont généralement simples et petits, les utilisateurs doivent toujours implémenter manuellement le code client pour interagir avec le service de transmission en continu de données.

L'adoption de la spécification AsyncAPI dans Oracle GoldenGate Data Streams présente les avantages suivants :

  • Permet de décrire l'API du service de flux de données dans la spécification d'API standard et de générer automatiquement la documentation de l'API.

  • Générer automatiquement du code côté client via @asyncapi/generator.

Avec la prise en charge de AsyncAPI, Oracle GoldenGate Data Streams simplifie la transmission en continu des données en générant automatiquement le code client. Il suit le modèle d'éditeur et d'abonné et prend en charge une grande variété de protocoles, y compris websocket, kafka, mqtt, hms et de nombreux protocoles IOT. Lors de la description d'une API orientée événement, elle utilise le langage de modélisation YAML et suit une syntaxe similaire pour la spécification OpenAPI. Par exemple, voici un fragment de code du document AsyncAPI yaml qui décrit les définitions AsyncAPI de Data Streaming :

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: | allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

Lorsqu'une ressource de flux de données est créée, un lien URL vers un document de spécification d'API asynchrone personnalisé décrivant comment accéder à cette adresse de flux de données est renvoyé dans la réponse HTTP. Ce document YAML peut ensuite être utilisé pour générer le code côté client à l'aide de @asyncapi/generator.

Notez que pour prendre en charge le protocole websocket dans @asyncapi/generator, vous devez également implémenter/gérer le modèle client websocket pour @asyncapi/generator dans GitHub. Reportez-vous au référentiel GitHub pour plus d'informations sur le modèle client websocket :

https://github.com/tianshu-orcl/websocket-client-template.git

Protocole Data Streams

Avec Oracle GoldenGate Data Streams, l'accès direct aux données au format indiqué par l'utilisateur est activé via un canal de socket Web dédié qui suit un protocole de transmission en continu simple.

Le protocole Data Streams utilise le mode Push pour envoyer des données au client. Le client crée d'abord une ressource de transmission en continu sur le serveur via la demande RESTful HTTP. Une fois la ressource de transmission en continu créée, le client établit une connexion WebSocket via l'adresse de ressource de transmission en continu. Une fois le canal WebSocket établi, Data Streams commence à transmettre les données immédiatement et en continu sans attendre la réponse ou l'accusé de réception du client.


Protocole Data Streams

L'exemple de client python suivant illustre l'interaction entre le client et le service de transmission en continu de données :

import asyncio
import requests
import websockets
import json

async def client():
    ### create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)

### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())

Dans le programme client indiqué, une charge utile de flux de données simple indiquant le nom de trace de données source est fournie lors de la création de l'adresse de ressource de flux de données s1. Dans une application du monde réel, des charges utiles de flux de données très complexes peuvent être utilisées pendant la phase d'établissement de liaison du protocole de transmission en continu pour configurer le comportement de transmission en continu de données.

Par exemple, la charge utile de demande de flux de données suivante indique les règles de filtrage, le format d'encodage et bufferSize, ainsi que le nom de trace de source de données requis.

{   
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : “json",
    "bufferSize" : 2097152
}
Position de démarrage/redémarrage des flux de données

Pendant l'établissement de la connexion websocket, le côté client spécifie la position begin (en tant que paramètre de requête dans l'URL de connexion websocket) pour commencer à diffuser les données. La position de début peut être l'une des valeurs suivantes :

  • Mot-clé spécial "now"

  • Mot-clé spécial "earliest"

  • Chaîne d'horodatage au format ISO 8601

  • Dernier poste LCR traité

Chaque enregistrement LCR sans métadonnées contient une position opaque (comprend CSN, XID, numéro d'enregistrement dans la transaction). Le côté client est responsable de la gestion de la position du dernier enregistrement LCR traité. Le service de flux de données est responsable de la localisation du point de démarrage/redémarrage correct en fonction de la position de début donnée.

Si c'est la première fois qu'un client se connecte au service de flux de données, le client doit fournir un horodatage indiquant l'endroit où commencer la transmission en continu des données. Le mot-clé now est converti en horodatage en cours et le mot-clé earliest est converti en horodatage 0.

Une chaîne d'horodatage ISO 8601 peut également être utilisée pour la position begin. Dans tous les cas, le service de flux de données effectue une recherche basée sur l'horodatage sur la trace source pour déterminer la position de début.

S'il s'agit du cas de récupération/redémarrage, le client doit fournir la dernière position traitée enregistrée au service de flux de données lors de l'établissement de liaison. Le service de flux de données effectue une recherche basée sur la position sur la trace source pour déterminer la position de début. Le comportement de la récupération de transmission en continu de données dépend également du niveau QoS indiqué dans le flux de données.

CloudEvents

CloudEvents est une spécification pour décrire les données d'événement dans des formats courants afin de fournir une interopérabilité entre les services, les plates-formes et les systèmes. Comme Oracle GoldenGate Data Streams ne prend actuellement en charge que l'encodage de données JSON, la prise en charge du format CloudEvents est limitée au format d'événement JSON. La spécification complète pour le format d'événement JSON pour CloudEvents est disponible à l'adresse suivante :

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

Le format CloudEvents définit la liste des attributs pour décrire l'événement. Il s'agit essentiellement d'une enveloppe avec un ensemble d'attributs obligatoires et facultatifs. Lorsque le format CloudEvents est activé dans les flux de données Oracle GoldenGate, les enregistrements JSON finaux se présentent comme suit, où le champ data contient les enregistrements de données d'origine, à savoir les enregistrements LMD/DLD/métadonnées/schéma Oracle GoldenGate.
{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {…}
}

Ajouter des flux de données

Les flux de données sont créés à partir du service de distribution. Connectez-vous au service de distribution pour commencer à créer un processus de flux de données. Pour créer un flux de données, procédez comme suit :
  1. Sur la page d'accueil du service de distribution, Ajoutez des flux de données (icône Plus) pour ouvrir l'assistant Ajouter un flux de données.


    Page d'accueil du service de distribution avec la section Data Streams

  2. Sur la page Informations sur le flux de données, entrez un nom de processus de flux de données dans la zone Nom et ajoutez une description pour ce processus. Cliquez sur Suivant.


    Boîte de dialogue Ajouter des flux de données : page Informations sur le flux de données

  3. Sur la page Options source, indiquez les valeurs des options affichées dans l'image suivante :


    Boîte de dialogue Flux de données : page Options source

    • Nom de trace : nom du fichier trace source.

    • Sous-répertoire de trace : chemin du sous-répertoire dans lequel les fichiers trace sont stockés.

    • Encodage : Cette option contrôle le codage des enregistrements pour le flux de données. Actuellement, l'encodage JSON est pris en charge.

    • Taille de tampon : cette valeur contrôle la taille de tampon de mémoire utilisée dans le service de transmission en continu de données. Le service Data Streaming vide la file d'attente des messages en mémoire une fois que sa taille totale d'octets dépasse la valeur indiquée et transmet les enregistrements au client.

    • Qualité de service : Cette option définit le comportement de suppression et de récupération des doublons de flux de données. Trois niveaux de qualité de service sont pris en charge dans le service de transmission en continu de données.
      • Une seule fois : ce mode est le mode le plus restrictif dans lequel le service filtre les enregistrements en double lors de la récupération de l'affichage d'enregistrements RESTART OK/ABEND dans les traces source. Les clients ne verront pas les enregistrements en double. Si le service ne parvient pas à localiser l'enregistrement avec la dernière position traitée donnée, une erreur se produit.

      • Au moins une fois : ce mode ne supprime pas les enregistrements en double lors de la récupération ni n'affiche les enregistrements RESTART OK/ABEND dans la trace source. Les clients peuvent voir des enregistrements en double dans le flux de données. En outre, si le service ne parvient pas à localiser l'enregistrement avec la dernière position traitée donnée, une erreur se produit.

      • Au plus une fois : ce mode supprime les enregistrements en double lors de la récupération ou affiche un enregistrement RESTART OK/ABEND dans la trace source. Les clients ne verront pas les enregistrements en double dans le flux de données. Si le service ne parvient pas à localiser l'enregistrement avec la dernière position traitée donnée, il trouvera l'enregistrement disponible suivant et avancera.

    • Format CloudEvents : le service de transmission en continu de données prend en charge la transmission des enregistrements de données au format CloudEvents. Par défaut, ce format est désactivé et peut être contrôlé en activant la propriété à l'aide du commutateur, lors de la création du canal de transmission en continu de données.

  4. Sur la page Options de filtrage, les options permettant d'inclure et d'exclure des règles de filtrage sont disponibles :

    Boîte de dialogue Flux de données et options de filtrage disponibles.

    Indiquez les options de règle de filtrage, comme suit :
    • Action sur règle : sélectionnez les options Exclure ou Inclure.

    • Type de filtre : le type de filtre inclut les éléments suivants :

      Type d'objet : vous pouvez sélectionner plusieurs options dans la liste déroulante, notamment LMD, DDL, INSERT, UPDATE, UPSERT, DELETE.

      Noms d'objet : nom de l'objet de filtrage précédemment créé.

      Cliquez sur Ajouter pour ajouter la règle de filtrage au processus de flux de données.

  5. Cliquez sur Créer un flux de données. Vous revenez à la page d'accueil du service de distribution où le flux de données est répertorié.

  6. Cliquez sur le flux de données que vous venez de créer pour visualiser les définitions AsyncAPI du flux de données du document YAML.

Modifier la configuration de Data Streams

Pour modifier la configuration Data Streams, procédez comme suit :
  1. Dans le volet de navigation de gauche du service de distribution, cliquez sur Data Streams.

  2. Renseignez le nom du flux de données à modifier.

  3. Sur la page Flux de données, utilisez la colonne Action pour visualiser les détails du flux de données, supprimer un flux de données et modifier son filtrage.

    Modification de la configuration du flux de données

  4. Si vous cliquez sur l'option Modifier le filtrage dans le menu Action, la boîte de dialogue Modifier les règles de filtrage de flux de données apparaît. Dans cette zone, vous pouvez modifier l'action de règle (Inclure, Exclure) et le type de filtre (nom d'objet ou type d'objet).

  5. Cliquez sur Ajouter pour appliquer le filtrage.

  6. Cliquez sur Soumettre pour renvoyer la page Flux de données.

Pour modifier davantage le flux de données, cliquez sur le flux de données dans la colonne Nom. La configuration complète du flux de données s'affiche. Utilisez l'icône en forme de crayon en regard de chaque paramètre de configuration pour le modifier. Vous pouvez modifier le fichier trace source utilisé par un flux de données, ses règles de filtrage et sa qualité de service. Vous pouvez également utiliser l'éditeur YAML pour modifier la configuration du flux de données et télécharger les modifications à l'aide de l'icône Télécharger les modifications en regard de l'éditeur YAML.