Créer et configurer des pipelines

Vous pouvez créer un ou plusieurs pipelines de chargement ou d'exportation. Lorsque vous créez un pipeline, vous utilisez des paramètres et définissez des attributs de pipeline pour le configurer.

Les options pour créer et configurer un pipeline sont les suivantes :

Créer et configurer un pipeline pour le chargement de données

Vous pouvez créer un pipeline pour charger des données à partir de fichiers externes dans le magasin d'objets ou les répertoires vers des tables dans Autonomous Database.

Un pipeline de chargement consomme les données placées sur le magasin d'objets ou dans les répertoires et les charge dans une table d'Autonomous Database. Lorsque vous créez un pipeline de chargement, celui-ci s'exécute à intervalles réguliers pour consommer les données placées à l'emplacement source, lorsque de nouveaux fichiers de données arrivent, le pipeline charge les nouvelles données. Vous pouvez également utiliser un pipeline pour copier de manière fiable des fichiers, avec des capacités de reprise et de nouvelle tentative, depuis l'emplacement de messagerie vers une table de votre base de données.

Avec un pipeline de chargement, l'ensemble de pipeline utilise DBMS_CLOUD.COPY_DATA pour charger des données.

Dans Autonomous Database, utilisez une table existante ou créez la table de base de données dans laquelle vous chargez des données. Par exemple :

CREATE TABLE EMPLOYEE
            (name     VARCHAR2(128),
             age      NUMBER,
             salary   NUMBER);
  1. Créez un pipeline pour charger des données à partir d'un magasin d'objets ou d'objets de répertoire.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name => 'MY_PIPE1',
            pipeline_type => 'LOAD',
            description   => 'Load metrics from object store into a table'
      );
    END;
    /

    Voir Procédure CREATE_PIPELINE pour plus d'informations.

  2. Créez un objet de données d'identification pour accéder au magasin d'objets contenant les fichiers que vous chargez.

    Vous spécifiez les données d'identification pour l'emplacement source du pipeline avec l'attribut credential_name. Si vous ne spécifiez pas credential_name à l'étape suivante, la valeur de credential_name est réglée à NULL. Vous pouvez utiliser la valeur NULL par défaut lorsque l'attribut location est une URL publique ou préauthentifiée.

    Pour plus d'informations, voir Procédure CREATE_CREDENTIAL.

  3. Définissez les attributs du pipeline, y compris les attributs requis : location, table_name et format.
    Cas 1 : Créer un pipeline pour charger des données à partir du magasin d'objets.
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'MY_PIPE1',
           attributes    => JSON_OBJECT(
                'credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'employee',
                'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}',
                'priority' VALUE 'HIGH',
                'interval' VALUE '20')
      );
    END;
    /
    Cas 2 : Créer un pipeline pour charger des données à partir d'objets de répertoire.
    
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'MY_PIPE1',
           pipeline_type => 'LOAD',
           attributes    => JSON_OBJECT(
                'location' VALUE 'MY_DIR:*.csv',
                'table_name' VALUE 'employee',
                'format' VALUE '{"type":"csv"}',
                'priority' VALUE 'HIGH',
                'interval' VALUE '20')
      );
    END;
    /
    

    Les attributs suivants doivent être définis pour exécuter un pipeline de chargement dans les deux cas :

    • location : Spécifie l'emplacement du fichier source dans le magasin d'objets ou l'objet de répertoire.

    • table_name : Spécifie la table de la base de données dans laquelle vous chargez des données. La valeur location que vous spécifiez correspond à une valeur table_name par pipeline.

    • format : Décrit le format des données que vous chargez.

      Pour plus d'informations, voir Options de format de l'ensemble DBMS_CLOUD.

    credential_name est les données d'identification que vous avez créées à l'étape précédente.

    La valeur priority détermine le nombre de fichiers chargés en parallèle. Un pipeline avec une priorité supérieure consomme plus de ressources de base de données et termine chaque exécution plus rapidement, par rapport à une priorité inférieure.

    La valeur interval spécifie l'intervalle de temps en minutes entre les exécutions consécutives d'une tâche de pipeline. La valeur par défaut de interval est de 15 minutes.

    Voir DBMS_CLOUD_PIPELINE Attributs pour plus de détails sur les attributs de pipeline.

  4. Après avoir créé un pipeline, vous pouvez tester le pipeline ou le démarrer :

Comme alternative, pour définir le format pour JSON, vous pouvez utiliser le format suivant :

BEGIN
    DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
        pipeline_name   => 'MY_PIPE1',
        attribute_name  => 'format',
        attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
    );
END;
/

Créer et configurer un pipeline pour l'exportation avec une colonne d'horodatage

Vous pouvez créer un pipeline d'exportation pour exporter automatiquement des données de série chronologique de votre base de données Autonomous Database vers le magasin d'objets.

À l'aide de cette option de pipeline d'exportation, vous spécifiez une table ou une interrogation SQL et une colonne avec un horodatage que le pipeline utilise pour suivre l'heure du dernier chargement. Vous pouvez utiliser un pipeline d'exportation pour partager des données pour consommation par d'autres applications ou pour enregistrer des données dans le magasin d'objets.

Avec un pipeline d'exportation, l'ensemble de pipeline utilise DBMS_CLOUD.EXPORT_DATA pour exporter des données.

Un pipeline d'exportation exporte les données de votre base de données Autonomous Database vers le magasin d'objets. Lorsque vous créez un pipeline d'exportation, il s'exécute à intervalles réguliers et place des données dans le magasin d'objets.

  1. Créez un pipeline pour exporter des données vers le magasin d'objets.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name=>'EXP_PIPE1',
            pipeline_type=>'EXPORT',
            description=>'Export time series metrics to object store');
    END;
    /

    Voir Procédure CREATE_PIPELINE pour plus d'informations.

  2. Créez un objet de données d'identification pour accéder à l'emplacement du magasin d'objets de destination où vous exportez des fichiers de données.

    Vous spécifiez les données d'identification pour l'emplacement de destination du pipeline avec l'attribut credential_name. Si vous ne spécifiez pas credential_name à l'étape suivante, la valeur de credential_name est réglée à NULL. Vous pouvez utiliser la valeur NULL par défaut lorsque l'attribut location est une URL publique ou préauthentifiée.

    Pour plus d'informations, voir Procédure CREATE_CREDENTIAL.

  3. Définissez les attributs du pipeline d'exportation.

    Lorsque vous spécifiez un paramètre table_name, les rangées de table sont exportées vers le magasin d'objets. Lorsque vous spécifiez un paramètre query, l'interrogation spécifie un énoncé SELECT de sorte que seules les données requises soient exportées vers le magasin d'objets.

    • Utilisation d'un paramètre table_name :

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'metric_table',
                'key_column' VALUE 'metric_time',
                'format' VALUE '{"type": "json"}',
                'priority' VALUE 'MEDIUM',
                'interval' VALUE '20')
        );
      END;
      /
    • À l'aide d'un paramètre query :

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                 'query' VALUE 'SELECT * from metrics_table',
                 'key_column' VALUE 'metric_time',
                 'format' VALUE '{"type": "json"}',
                 'priority' VALUE 'MEDIUM',
                 'interval' VALUE '20')
        );
      END;
      /

    credential_name est les données d'identification que vous avez créées à l'étape précédente.

    Les attributs suivants doivent être définis pour exécuter un pipeline d'exportation :

    • location : Spécifie l'emplacement du magasin d'objets de destination. La valeur location que vous spécifiez correspond à une valeur table_name par pipeline.

    • table_name : Spécifie la table de votre base de données qui contient les données que vous exportez (le paramètre table_name ou le paramètre query est requis).

    • query : Spécifie l'interrogation à exécuter dans la base de données qui fournit les données que vous exportez (le paramètre table_name ou le paramètre query est requis).

    • format : Décrit le format des données que vous exportez.

      Pour plus d'informations, voir DBMS_CLOUD Options de format de l'ensemble pour EXPORT_DATA.

    La valeur priority détermine le degré de parallélisme pour l'extraction des données de la base de données.

    La valeur interval spécifie l'intervalle de temps en minutes entre les exécutions consécutives d'une tâche de pipeline. La valeur par défaut de interval est de 15 minutes.

    Voir DBMS_CLOUD_PIPELINE Attributs pour plus de détails sur les attributs de pipeline.

    Après avoir créé un pipeline, vous pouvez le tester ou le démarrer :

Créer et configurer un pipeline pour exporter des résultats d'interrogation (sans horodatage)

Vous pouvez créer un pipeline d'exportation pour exporter automatiquement des données de votre base de données Autonomous Database vers le magasin d'objets. À l'aide de cette option de pipeline d'exportation, vous spécifiez une interrogation SQL que le pipeline exécute périodiquement pour exporter des données vers le magasin d'objets. Vous pouvez utiliser cette option d'exportation pour partager les données les plus récentes de votre base de données Autonomous Database vers le magasin d'objets pour que d'autres applications consomment les données.

Un pipeline d'exportation exporte les données de votre base de données Autonomous Database vers le magasin d'objets. Lorsque vous créez un pipeline d'exportation, il s'exécute à intervalles réguliers et place des données dans le magasin d'objets.

  1. Créez un pipeline pour exporter des données vers le magasin d'objets.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name=>'EXP_PIPE2',
            pipeline_type=>'EXPORT',
            description=>'Export query results to object store.');
    END;
    /

    Voir Procédure CREATE_PIPELINE pour plus d'informations.

  2. Créez un objet de données d'identification pour accéder à l'emplacement du magasin d'objets de destination où vous exportez des fichiers de données.

    Vous spécifiez les données d'identification pour l'emplacement de destination du pipeline avec l'attribut credential_name. Si vous ne spécifiez pas credential_name à l'étape suivante, la valeur de credential_name est réglée à NULL. Vous pouvez utiliser la valeur NULL par défaut lorsque l'attribut location est une URL publique ou préauthentifiée.

    Pour plus d'informations, voir Procédure CREATE_CREDENTIAL.

  3. Définissez les attributs du pipeline d'exportation.
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'EXP_PIPE2',
           attributes    => JSON_OBJECT(
              'credential_name' VALUE 'OBJECT_STORE_CRED',
              'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
              'query' VALUE 'SELECT * FROM table_name',
              'format' VALUE '{"type": "json"}',
              'priority' VALUE 'MEDIUM',
              'interval' VALUE '20')
      );
    END;
    /

    credential_name est les données d'identification que vous avez créées à l'étape précédente.

    Les attributs suivants doivent être définis pour exécuter un pipeline d'exportation :

    • location : Spécifie l'emplacement du magasin d'objets de destination.

    • query : Spécifie l'interrogation à exécuter dans la base de données qui fournit les données que vous exportez.

    • format : Décrit le format des données que vous exportez.

      Pour plus d'informations, voir DBMS_CLOUD Options de format de l'ensemble pour EXPORT_DATA.

    La valeur priority détermine le degré de parallélisme pour l'extraction des données de la base de données.

    La valeur interval spécifie l'intervalle de temps en minutes entre les exécutions consécutives d'une tâche de pipeline. La valeur par défaut de interval est de 15 minutes.

    Voir DBMS_CLOUD_PIPELINE Attributs pour plus de détails sur les attributs de pipeline.

    Après avoir créé un pipeline, vous pouvez le tester ou le démarrer :

Tester les pipelines

Utilisez RUN_PIPELINE_ONCE pour exécuter un pipeline une fois sur demande sans créer de tâche programmée.

RUN_PIPELINE_ONCE est utile pour tester un pipeline avant de le démarrer. Après avoir exécuté un pipeline une fois pour le tester et vérifier qu'il fonctionne comme prévu, utilisez RESET_PIPELINE pour réinitialiser l'état du pipeline (à l'état avant l'exécution de RUN_PIPELINE_ONCE).

  1. Créer un pipeline.
  2. Exécutez un pipeline une fois pour tester le pipeline.
    BEGIN
        DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE(
            pipeline_name => 'MY_PIPE1'
    );
    END;
    /

    Voir Procédure RUN_PIPELINE_ONCE pour plus d'informations.

  3. Effectuez les vérifications nécessaires pour vérifier que le pipeline fonctionne comme prévu.

    Pour plus d'informations, voir Surveiller et dépanner les pipelines.

  4. Réinitialisez le pipeline.
    BEGIN  
       DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
         pipeline_name => 'MY_PIPE1',
         purge_data => TRUE
    );
    END;
    /

    Voir Procédure RESET_PIPELINE pour plus d'informations.