Ajouter des flux de données

Oracle GoldenGate 23ai introduit des flux de données qui peuvent grandement simplifier les chemins de données pour les analyses en temps réel.

Remarque

Cet article s'applique uniquement aux déploiements Oracle GoldenGate 23ai.

A propos des flux de données

Les flux de données Oracle GoldenGate utilisent la spécification AsyncAPI pour définir des API asynchrones. Cette approche permet aux applications de s'abonner efficacement aux flux de données à l'aide d'un modèle de publication ou d'abonnement. Les mises à jour sont reçues dès que les modifications sont validées dans la base de données source, ce qui réduit la latence et simplifie le développement d'applications. En outre, Oracle GoldenGate Data Streams permet aux utilisateurs de spécifier leur format de données préféré, tel que JSON, pour une intégration transparente avec les outils et structures existants au sein de leur environnement de développement.

Avantages pour les développeurs et les analystes de données
  • Ingestion de données améliorée : le modèle de publication ou d'abonnement optimisé par AsyncAPI permet aux applications de recevoir efficacement des mises à jour de données en temps réel.

  • Formatage flexible des données : les utilisateurs peuvent choisir leur format préféré pour une intégration transparente avec les outils existants.

  • Intégration simplifiée : AsyncAPI favorise une interaction fluide avec diverses applications et outils couramment utilisés par les développeurs et les data scientists.

  • Intégrité garantie des données : en héritant de la force principale d'Oracle GoldenGate, Data Streams garantit la durabilité des données en répliquant les modifications au fur et à mesure qu'elles sont validées dans la base de données source.

Composants des flux de données Oracle GoldenGate

Les composants des flux de données Oracle GoldenGate sont les suivants :
  • API asynchrone

  • Protocole Data Streams

  • Position de démarrage/redémarrage des flux de données

  • Enregistrements de schéma

  • Format CloudEvents

API asynchrone

Oracle GoldenGate Data Streams fonctionne avec tous les langages de programmation de sorte qu'il puisse interagir avec un client écrit dans n'importe quel langage de programmation. Même si les programmes client sont généralement simples et petits, les utilisateurs doivent toujours implémenter manuellement le code client pour interagir avec le service de transmission en continu de données.

L'adoption de la spécification AsyncAPI dans Oracle GoldenGate Data Streams présente les avantages suivants :

  • Permet de décrire l'API de service de flux de données dans la spécification d'API standard du secteur et de générer automatiquement la documentation d'API.

  • Générez automatiquement du code côté client via @asyncapi/generator.

Avec la prise en charge de AsyncAPI, Oracle GoldenGate Data Streams simplifie la transmission en continu des données en générant automatiquement le code client. Il suit le modèle de l'éditeur et de l'abonné et prend en charge une grande variété de protocoles, y compris websocket, kafka, mqtt, hms et de nombreux protocoles IOT. Lors de la description d'une API orientée événements, elle utilise le langage de modélisation YAML et suit une syntaxe similaire pour la spécification OpenAPI. Par exemple, voici un fragment de code du document AsyncAPI yaml qui décrit les définitions AsyncAPI de Data Streaming :

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: | allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

Lorsqu'une ressource de flux de données est créée, un lien URL vers un document de spécification d'API asynchrone personnalisé décrivant comment accéder à cette adresse de flux de données est renvoyé dans la réponse HTTP. Ce document YAML peut ensuite être utilisé pour générer le code côté client à l'aide de @asyncapi/generator.

Notez que pour prendre en charge le protocole de socket Web dans @asyncapi/generator, vous devez également implémenter/maintenir le modèle de client de socket Web pour @asyncapi/generator dans GitHub. Pour plus d'informations sur le modèle client-client websocket, reportez-vous au référentiel GitHub :

https://github.com/oracle-samples/websocket-client-template

Protocole Data Streams

Avec Oracle GoldenGate Data Streams, l'accès direct aux données au format indiqué par l'utilisateur est activé via un canal de socket Web dédié qui suit un protocole de transmission en continu simple.

Le protocole Data Streams utilise le mode Push pour envoyer des données au client. Le client crée d'abord une ressource de transmission en continu sur le serveur via une demande HTTP RESTful. Une fois la ressource de transmission en continu créée, le client établit une connexion WebSocket via l'adresse de ressource de transmission en continu. Une fois le canal WebSocket établi, Data Streams commence à transmettre les données immédiatement et en continu sans attendre la réponse ou l'accusé de réception du client.


Protocole Data Streams

L'exemple de client python suivant illustre l'interaction entre le client et le service de transmission en continu de données :

import asyncio
import requests
import websockets
import json

async def client():
    ### create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'http://name:pswd@localhost:9002/services/v2/stream/s1', json=payload)

### establish websocket connection and receive data continuously
uri = "ws://name:pswd@localhost:9002/services/v2/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
asyncio.get_event_loop().run_until_complete(client())

Dans le programme client donné, une charge utile de flux de données simple indiquant le nom de la trace de données source est fournie lors de la création de l'adresse de ressource de flux de données s1. Dans une application du monde réel, des charges utiles de flux de données très complexes peuvent être utilisées pendant la phase d'établissement de liaison du protocole de transmission en continu pour configurer le comportement de transmission en continu des données.

Par exemple, la charge utile de demande de flux de données suivante indique les règles de filtrage, le format d'encodage et bufferSize, ainsi que le nom de trace de source de données requis.

{   
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : “json",
    "bufferSize" : 2097152
}
Position de démarrage/redémarrage des flux de données

Lors de l'établissement de la connexion websocket, côté client indique la position begin (en tant que paramètre de requête dans l'URL de connexion websocket) pour démarrer la transmission en continu des données. La position de début peut avoir l'une des valeurs suivantes :

  • Mot-clé spécial "now"

  • Mot-clé spécial "earliest"

  • Chaîne d'horodatage au format ISO 8601

  • Dernière position LCR traitée

Chaque enregistrement LCR sans métadonnées contient une position opaque (inclut CSN, XID, no d'enregistrement dans la transaction). Le côté client est responsable de la maintenance de la position du dernier enregistrement LCR traité. Le service de flux de données est chargé de localiser le point de démarrage/redémarrage correct en fonction de la position de début donnée.

S'il s'agit de la première connexion d'un client au service de flux de données, le client doit fournir un horodatage indiquant où démarrer la transmission en continu des données. Le mot-clé now sera converti en horodatage actuel et le mot-clé earliest sera converti en horodatage 0.

Vous pouvez également utiliser une chaîne d'horodatage au format ISO 8601 pour la position begin. Dans tous les cas, le service de flux de données effectue une recherche basée sur l'horodatage sur la trace source pour déterminer la position de début.

S'il s'agit du cas de récupération/redémarrage, le client doit fournir la dernière position enregistrée traitée au service de flux de données lors de l'établissement de liaison. Le service de flux de données effectue une recherche basée sur le profil sur la trace source pour déterminer la position de début. Le comportement de la récupération de transmission en continu de données dépend également du niveau QoS indiqué dans le flux de données.

CloudEvents

CloudEvents est une spécification pour décrire les données d'événement dans des formats communs afin de fournir une interopérabilité entre les services, les plates-formes et les systèmes. Les flux de données Oracle GoldenGate ne prenant actuellement en charge que l'encodage de données JSON, la prise en charge du format CloudEvents est limitée au format d'événement JSON. La spécification complète pour le format d'événement JSON pour CloudEvents est disponible à l'adresse suivante :

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

Le format CloudEvents définit la liste des attributs pour décrire l'événement, essentiellement une enveloppe avec un ensemble d'attributs obligatoires et facultatifs. Lorsque le format CloudEvents est activé dans les flux de données Oracle GoldenGate, les enregistrements JSON finaux ressemblent à ce qui suit, où le champ data contient les enregistrements de données d'origine, à savoir les enregistrements DML/DDL/métadonnées/schéma Oracle GoldenGate.
{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {…}
}

Ajouter des flux de données

Les flux de données sont créés à partir du service de distribution. Connectez-vous au service de distribution pour commencer à créer un processus de flux de données. Pour créer un flux de données, procédez comme suit :
  1. Sur la page d'accueil du service de distribution, ajoutez des flux de données (icône Plus) pour ouvrir l'assistant Ajouter un flux de données.


    Page d'accueil du service de distribution avec la section Flux de données

  2. Sur la page Informations sur le flux de données, entrez un nom de processus de flux de données dans la zone Nom et ajoutez-en une description. Cliquez Suivant.


    Boîte de dialogue Ajouter des flux de données : page Informations sur le flux de données

  3. Sur la page Options source, indiquez les valeurs des options affichées dans l'image suivante :


    Boîte de dialogue Flux de données : page Options source

    • Nom de trace : nom du fichier de trace source.

    • Sous-répertoire de trace : chemin du sous-répertoire dans lequel les fichiers de trace sont stockés.

    • Encodage : cette option contrôle l'encodage des enregistrements pour le flux de données. Actuellement, l'encodage JSON est pris en charge.

    • Taille du tampon : cette valeur contrôle la taille du tampon de mémoire utilisée dans le service de transmission en continu de données. Le service de transmission en continu de données vide la file d'attente de messages en mémoire une fois que sa taille d'octet totale dépasse la valeur spécifiée et transmet les enregistrements au client.

    • Qualité de service : cette option définit le comportement de suppression et de récupération des doublons de flux de données. Trois niveaux de qualité de service sont pris en charge dans le service de transmission en continu de données.
      • Une seule fois : ce mode est le mode le plus restrictif dans lequel le service filtre les enregistrements en double lors de la récupération des enregistrements RESTART OK/ABEND affichés dans les traces source. Les clients ne verront pas les enregistrements en double. Si le service ne parvient pas à localiser l'enregistrement avec la dernière position traitée donnée, une erreur se produit.

      • Au moins une fois : ce mode ne supprime pas les enregistrements en double lors de la récupération ni ne voit un enregistrement RESTART OK/ABEND dans la trace source. Les clients peuvent voir des enregistrements en double dans le flux de données. En outre, si le service ne parvient pas à localiser l'enregistrement avec la dernière position traitée donnée, une erreur se produit.

      • Au plus une fois : ce mode supprime les enregistrements en double lors de la récupération ou affiche un enregistrement RESTART OK/ABEND dans la trace source. Les clients ne verront pas d'enregistrements en double dans le flux de données. Si le service ne parvient pas à localiser l'enregistrement ayant le dernier profil traité donné, il recherche l'enregistrement disponible suivant et avance.

    • Format CloudEvents : le service de transmission en continu de données prend en charge la transmission des enregistrements de données au format CloudEvents. Par défaut, ce format est désactivé et peut être contrôlé en activant la propriété à l'aide du commutateur à bascule, lors de la création du canal de transmission en continu de données.

  4. Sur la page Options de filtrage, les options permettant d'inclure et d'exclure des règles de filtrage sont disponibles :
    Boîte de dialogue Flux de données et options de filtrage disponibles.

    Spécifiez les options de règle de filtrage, comme suit :
    • Action sur la règle : sélectionnez les options Exclure ou Inclure.

    • Type de filtre : le type de filtre inclut les éléments suivants :

      Object Type: You can select multiple options from the drop-down list including DML, DDL, INSERT, UPDATE, UPSERT, DELETE.

      Noms d'objet : nom de l'objet de filtrage précédemment créé.

      Cliquez sur Ajouter pour ajouter la règle de filtrage au processus de flux de données.

  5. Cliquez sur Créer un flux de données. Vous serez renvoyé à la page d'accueil du service de distribution où le flux de données est répertorié.

  6. Cliquez sur le flux de données que vous venez de créer pour visualiser les définitions AsyncAPI de flux de données du document YAML.

Modifier la configuration de flux de données

Pour modifier la configuration de Data Streams :
  1. Dans le volet de navigation de gauche, cliquez sur Data Streams.

  2. Sélectionnez le nom du flux de données à modifier.

  3. Sur la page Flux de données, utilisez la colonne Action pour visualiser les détails du flux de données, supprimer un flux de données et modifier son filtrage.


    Modifiez le filtrage de flux de données.

  4. Si vous cliquez sur l'option Modifier le filtrage dans le menu Action, la boîte de dialogue Modifier les règles de filtrage de flux de données apparaît. Dans cette zone, vous pouvez modifier l'action de règle (Inclure, Exclure) et le type de filtre (nom d'objet ou type d'objet).

  5. Cliquez sur Ajouter pour appliquer le filtrage.

  6. Cliquez sur Soumettre pour revenir à la page Flux de données.

Vous pouvez modifier davantage le flux de données en cliquant dessus dans la colonne Nom. Affiche la configuration complète du flux de données. Utilisez l'icône en forme de crayon en regard de chaque paramètre de configuration pour le modifier. Vous pouvez modifier le fichier trace source utilisé par un flux de données, ses règles de filtrage et sa qualité de service. Vous pouvez également utiliser l'éditeur YAML pour modifier la configuration du flux de données et télécharger les modifications à l'aide de l'icône Télécharger les modifications en regard de l'éditeur YAML.