Gérer les pipelines

Après avoir créé et testé un pipeline, vous le contrôlez en le démarrant, l'arrêt ou la suppression. Vous pouvez également réinitialiser un pipeline.

Démarrer un pipeline

Une fois que vous avez créé un pipeline, vous pouvez le démarrer.

Lorsqu'un pipeline est démarré, il est exécuté en continu dans un travail programmé. Le travail programmé du pipeline se répète, par défaut toutes les 15 minutes ou à l'intervalle que vous définissez avec l'attribut interval.

  1. Démarrez un pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.START_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /

    Par défaut, un travail de pipeline commence immédiatement, dès que le pipeline est démarré. Pour démarrer un travail de pipeline ultérieurement, indiquez une date ou un horodatage futur valide à l'aide du paramètre start_date.

    Pour plus d'informations, reportez-vous à Procédure START_PIPELINE.

  2. Vérifiez que le pipeline est démarré.

    Exemple :

    
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    PIPELINE_NAME            STATUS  
    ------------------------ ------- 
    EMPLOYEE_PIPELINE        STARTED

Arrêt d'un pipeline

Utilisez STOP_PIPELINE pour arrêter un pipeline. Lorsqu'un pipeline est arrêté, aucun travail futur n'est programmé pour le pipeline.

Par défaut, les travaux en cours d'exécution sont terminés lorsque vous arrêtez un pipeline. Définissez le paramètre force sur TRUE pour mettre fin aux travaux en cours d'exécution et arrêter immédiatement le pipeline.

  1. Arrêtez un pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.STOP_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /

    Pour plus d'informations, reportez-vous à Procédure STOP_PIPELINE.

  2. Vérifiez que le pipeline est arrêté.
    
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    PIPELINE_NAME            STATUS  
    ------------------------ ------- 
    EMPLOYEE_PIPELINE        STOPPED

Pour plus d'informations, reportez-vous à Procédure STOP_PIPELINE.

Supprimer un pipeline

La procédure DROP_PIPELINE supprime un pipeline existant.

Si un pipeline a été démarré, il doit être arrêté pour pouvoir être supprimé. Pour plus d'informations, reportez-vous à Procédure STOP_PIPELINE.

Pour supprimer un pipeline démarré, définissez le paramètre force sur TRUE afin de mettre fin aux travaux en cours d'exécution et de supprimer immédiatement le pipeline.

  1. Supprimez un pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.DROP_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /
  2. Vérifiez que le pipeline est supprimé.
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    No rows selected

Pour plus d'informations, reportez-vous à Procédure DROP_PIPELINE.

Réinitialisation d'un pipeline

Utilisez l'opération de réinitialisation du pipeline pour rétablir l'état initial de l'enregistrement du pipeline.

Remarques :

Vous pouvez éventuellement utiliser le pipeline de réinitialisation pour purger les données de la table de base de données associée à un pipeline de chargement ou pour enlever des fichiers dans la banque d'objets pour un pipeline d'export. Cette option est généralement utilisée lorsque vous testez un pipeline pendant le développement du pipeline.

Le pipeline de réinitialisation fonctionne comme suit :

  • Pipeline de chargement : pour un pipeline de chargement, la réinitialisation du pipeline efface l'enregistrement des fichiers en cours de chargement par le pipeline. Lorsque vous appelez START_PIPELINE ou RUN_PIPELINE_ONCE après la réinitialisation d'un pipeline de chargement, le pipeline répète le chargement des données et inclut tous les fichiers présents dans l'emplacement de banque d'objets.

    Lorsque purge_data est défini sur TRUE, la procédure tronque les données de la table de base de données.

  • Pipeline d'export : pour un pipeline d'export, la réinitialisation du pipeline efface les dernières données suivies dans la table de base de données. Lorsque vous appelez START_PIPELINE ou RUN_PIPELINE_ONCE après la réinitialisation d'un pipeline d'export, le pipeline répète l'export des données de la table ou de la requête.

    Lorsque purge_data est défini sur TRUE, la procédure supprime les fichiers existants dans l'emplacement de banque d'objets indiqué avec l'attribut location.

Pour réinitialiser un pipeline, procédez comme suit :

  1. Arrêtez le pipeline à réinitialiser.

    Un pipeline de données doit être à l'état Arrêté pour pouvoir être réinitialisé. Pour plus d'informations, reportez-vous à Procédure STOP_PIPELINE.

  2. Réinitialisez le pipeline.
    BEGIN
         DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
            pipeline_name => 'EMPLOYEE_PIPELINE',
            purge_data => TRUE);
    END;
    /

    N'utilisez le paramètre purge_data avec la valeur TRUE que si vous voulez effacer des données dans la table de base de données, pour un pipeline de chargement ou des fichiers dans la banque d'objets pour un pipeline d'export. Cette option est généralement utilisée lorsque vous testez un pipeline pendant le développement du pipeline.

    Pour plus d'informations, reportez-vous à Procédure RESET_PIPELINE.

Surveillance et dépannage des pipelines

Tous les pipelines créés sont journalisés dans les vues DBMS_CLOUD_PIPELINE.

Afficher les informations sur le statut du pipeline

Vérifiez le statut du pipeline et obtenez d'autres informations sur le pipeline à l'aide des vues USER_CLOUD_PIPELINES ou DBA_CLOUD_PIPELINES. Par exemple, l'instruction SELECT suivante avec un prédicat de clause WHERE sur pipeline_name indique que MY_TREE_DATA est un pipeline de chargement et que le pipeline est démarré :

SELECT pipeline_name, pipeline_type, status FROM USER_CLOUD_PIPELINES
   WHERE pipeline_name = 'MY_TREE_DATA';


PIPELINE_NAME PIPELINE_TYPE STATUS  
------------- ------------- ------- 
MY_TREE_DATA  LOAD          STARTED

Pour plus d'informations, reportez-vous à la section DBMS_CLOUD_PIPELINE Views.

Visualiser les attributs de pipeline

Vous pouvez surveiller les attributs de pipeline en interrogeant les vues USER_CLOUD_PIPELINE_ATTRIBUTES ou DBA_CLOUD_PIPELINE_ATTRIBUTES. Interrogez ces vues pour afficher les informations d'attribut de pipeline.

Exemple :

SELECT pipeline_name, attribute_name, attribute_value FROM user_cloud_pipeline_attributes
     WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_NAME ATTRIBUTE_NAME  ATTRIBUTE_VALUE                                                                   
------------- --------------- --------------------------------------------------------------------------------- 
MY_TREE_DATA  credential_name DEF_CRED_OBJ_STORE                                                                
MY_TREE_DATA  format          {"type": "csv"}                                                                   
MY_TREE_DATA  interval        20                                                                                
MY_TREE_DATA  location        https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/treetypes/o/ 
MY_TREE_DATA  priority        high                                                                              
MY_TREE_DATA  table_name      TREES

Pour plus d'informations, reportez-vous à la section DBMS_CLOUD_PIPELINE Views.

Visualiser l'historique du pipeline

Les vues USER_CLOUD_PIPELINE_HISTORY et DBA_CLOUD_PIPELINE_HISTORY indiquent l'état des travaux en cours d'exécution. Utilisez les vues d'historique de pipeline pour surveiller l'état d'un pipeline et détecter les défaillances dans un pipeline en cours d'exécution.

Exemple :

SELECT pipeline_id, pipeline_name, status, error_message  FROM user_cloud_pipeline_history      
     WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_ID PIPELINE_NAME STATUS    ERROR_MESSAGE 
----------- ------------- --------- ------------- 
          7  MY_TREE_DATA SUCCEEDED

Pour plus d'informations, reportez-vous à la section DBMS_CLOUD_PIPELINE Views.

Table de statut de pipeline : surveillance supplémentaire pour les pipelines de chargement

La table des statuts de pipeline indique le nom de chaque fichier et son statut pour un pipeline de chargement. La colonne STATUS_TABLE dans DBA_CLOUD_PIPELINES et USER_CLOUD_PIPELINES affiche le nom de la table de statut.

Par exemple, l'instruction SELECT suivante avec un prédicat de clause WHERE sur pipeline_name affiche le nom de table de statut d'un pipeline :

SELECT pipeline_name, status_table FROM user_cloud_pipelines
   WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_NAME STATUS_TABLE
------------- --------------------
MY_TREE_DATA  PIPELINE$9$41_STATUS

Consultez la table de statut pour afficher des informations sur le pipeline, notamment les informations suivantes :

  • Le numéro d'erreur et le message d'erreur pertinents sont enregistrés dans la table des statuts en cas d'échec d'une opération sur un fichier spécifique.

  • Pour les opérations de pipeline terminées, le temps nécessaire à chaque opération peut être calculé à l'aide des éléments START_TIME et END_TIME signalés.

Par exemple, l'opération de chargement pour deux fichiers a échoué et une est terminée :

SELECT id, name, status, error_code, error_message, sid FROM PIPELINE$9$41_STATUS;

ID NAME       STATUS    ERROR_CODE ERROR_MESSAGE                      SID 
-- ---------- --------- ---------- -------------------------------- ----- 
 1 trees1.txt FAILED         30653 ORA-30653: reject limit reached  18070 
 2 trees2.txt FAILED         30653 ORA-30653: reject limit reached  18070 
 3 trees3.txt COMPLETED                                             18070 

Les pipelines pour le chargement de données, où pipeline_type est 'LOAD', réservent un ID affiché dans USER_LOAD_OPERATIONS et dans DBA_LOAD_OPERATIONS. La valeur ID de ces vues est mise en correspondance avec la valeur OPERATION_ID du pipeline dans USER_CLOUD_PIPELINES et DBA_CLOUD_PIPELINES.

Pour obtenir plus d'informations sur un pipeline de chargement, interrogez OPERATION_ID du pipeline :

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';

PIPELINE_NAME OPERATION_ID 
------------- ------------ 
MY_TREE_DATA            41

Interrogez ensuite USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS avec un prédicat de clause WHERE sur la colonne ID (à l'aide de la valeur OPERATION_ID).

Exemple :

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;

ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Cette requête affiche ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE s'il existe et STATUS_TABLE. Vous pouvez visualiser ces tables pour obtenir des informations de chargement de pipeline supplémentaires.

Détails de la table des statuts de pipeline

Colonne Type de données Description
ID NUMBER Numéro unique affecté au pipeline.
NAME VARCHAR2(4000) Nom du pipeline.
BYTES NUMBER Octets
CHECKSUM VARCHAR2(128) Checksum
LAST_MODIFIED TIMESTAMP(6) WITH TIME ZONE Dernière heure de modification du pipeline.
STATUS VARCHAR2(30) La valeur STATUS est l'une des suivantes :
  • COMPLETED : opération de fichier terminée.
  • FAILED : échec de l'opération de fichier. Une nouvelle tentative peut être tentée deux fois.
  • PENDING : l'opération de fichier n'a pas encore démarré.
  • RUNNING : l'opération de fichier est en cours.
  • SKIPPED : opération de fichier ignorée.
ERROR_CODE NUMBER Code d'erreur
ERROR_MESSAGE VARCHAR2(4000) Message d'erreur
START_TIME TIMESTAMP(6) WITH TIME ZONE Heure de début du pipeline.
END_TIME TIMESTAMP(6) WITH TIME ZONE Heure de fin du pipeline.
SID NUMBER

Les sessions SID et SERIAL# indiquent la session de travail qui exécutait l'opération de chargement de pipeline.

SERIAL# NUMBER

Les sessions SID et SERIAL# indiquent la session de travail qui exécutait l'opération de chargement de pipeline.

ROWS_LOADED NUMBER Nombre de lignes chargées.
OPERATION_ID NUMBER

Réservé à une utilisation ultérieure.

Fichier journal de pipeline et tables de fichiers incorrects

Afin d'obtenir le fichier journal et les noms de fichier incorrects pour un pipeline de chargement, interrogez OPERATION_ID du pipeline. Exemple :

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';

PIPELINE_NAME OPERATION_ID 
------------- ------------ 
MY_TREE_DATA            41

Interrogez ensuite USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS avec un prédicat de clause WHERE sur la colonne ID (à l'aide de la valeur OPERATION_ID).

Exemple :

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;

ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Cette requête affiche ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE s'il existe et STATUS_TABLE. Vous pouvez visualiser ces tables pour obtenir des informations de chargement de pipeline supplémentaires.

Consultez la table du fichier journal du pipeline pour consulter le journal complet des opérations de chargement du pipeline.

Exemple :

SELECT * FROM PIPELINE$9$41_LOG;

Consultez la table des fichiers incorrects du pipeline pour obtenir des détails sur les enregistrements de format d'entrée comportant des erreurs. La table des fichiers erronés affiche les informations relatives aux lignes signalant des erreurs lors du chargement. Selon les erreurs affichées dans la table de fichiers journaux et les lignes affichées dans la table de fichiers incorrects du pipeline, vous pouvez corriger les erreurs en modifiant les options d'attribut format du pipeline ou en modifiant les données du fichier que vous chargez.

Exemple :

SELECT * FROM PIPELINE$9$41_BAD;

Pour plus d'informations, reportez-vous à Surveillance et dépannage du chargement de données.