Gérer les pipelines

Après avoir créé et testé un pipeline, vous contrôlez un pipeline en le démarrant, en l'arrêtant ou en le supprimant. Vous pouvez également réinitialiser un pipeline.

Démarrer un pipeline

Après avoir créé un pipeline, vous pouvez le démarrer.

Lorsqu'un pipeline est démarré, il s'exécute en continu dans une tâche programmée. La tâche programmée du pipeline se répète, par défaut toutes les 15 minutes ou à l'intervalle que vous avez défini avec l'attribut interval.

  1. Démarrer un pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.START_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /

    Par défaut, une tâche de pipeline commence immédiatement, dès le démarrage du pipeline. Pour démarrer une tâche de pipeline plus tard, spécifiez une date ou un horodatage futur valide à l'aide du paramètre start_date.

    Voir Procédure START_PIPELINE pour plus d'informations.

  2. Vérifiez que le pipeline est démarré.

    Par exemple :

    
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    PIPELINE_NAME            STATUS  
    ------------------------ ------- 
    EMPLOYEE_PIPELINE        STARTED

Arrêter un pipeline

Utilisez STOP_PIPELINE pour arrêter un pipeline. Lorsqu'un pipeline est arrêté, aucune tâche future n'est programmée pour le pipeline.

Par défaut, les tâches en cours d'exécution sont terminées lorsque vous arrêtez un pipeline. Réglez le paramètre force à TRUE pour mettre fin aux tâches en cours d'exécution et arrêter immédiatement le pipeline.

  1. Arrêtez un pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.STOP_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /

    Voir Procédure STOP_PIPELINE pour plus d'informations.

  2. Vérifiez que le pipeline est arrêté.
    
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    PIPELINE_NAME            STATUS  
    ------------------------ ------- 
    EMPLOYEE_PIPELINE        STOPPED

Voir Procédure STOP_PIPELINE pour plus d'informations.

Supprimer un pipeline

La procédure DROP_PIPELINE supprime un pipeline existant.

Si un pipeline a été démarré, il doit être arrêté avant de pouvoir être supprimé. Voir Procédure STOP_PIPELINE pour plus d'informations.

Pour supprimer un pipeline qui est démarré, réglez le paramètre force à TRUE pour mettre fin aux tâches en cours d'exécution et supprimer immédiatement le pipeline

  1. Déposer un pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.DROP_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /
  2. Vérifiez que le pipeline est supprimé.
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    No rows selected

Voir Procédure DROP_PIPELINE pour plus d'informations.

Réinitialiser un pipeline

Utilisez l'opération de réinitialisation de pipeline pour effacer l'enregistrement du pipeline à l'état initial.

Note :

Vous pouvez éventuellement utiliser le pipeline de réinitialisation pour épurer les données dans la table de base de données associée à un pipeline de chargement ou pour supprimer des fichiers dans le magasin d'objets pour un pipeline d'exportation. Habituellement, cette option est utilisée lorsque vous testez un pipeline pendant le développement du pipeline.

La réinitialisation du pipeline fonctionne comme suit :

  • Pipeline de chargement : Pour un pipeline de chargement, la réinitialisation du pipeline efface l'enregistrement des fichiers chargés par le pipeline. Lorsque vous appelez START_PIPELINE ou RUN_PIPELINE_ONCE après avoir réinitialisé un pipeline de chargement, celui-ci répète le chargement de données et inclut tous les fichiers présents dans l'emplacement du magasin d'objets.

    Lorsque purge_data est réglé à TRUE, la procédure tronque les données de la table de base de données.

  • Pipeline d'exportation : Pour un pipeline d'exportation, la réinitialisation du pipeline efface les dernières données suivies dans la table de base de données. Lorsque vous appelez START_PIPELINE ou RUN_PIPELINE_ONCE après la réinitialisation d'un pipeline d'exportation, le pipeline répète l'exportation des données de la table ou de l'interrogation.

    Lorsque purge_data est réglé à TRUE, la procédure supprime les fichiers existants dans l'emplacement du magasin d'objets spécifié avec l'attribut location.

Pour réinitialiser un pipeline :

  1. Arrêtez le pipeline à réinitialiser.

    Un pipeline de données doit être à l'état arrêté pour pouvoir le réinitialiser. Voir Procédure STOP_PIPELINE pour plus d'informations.

  2. Réinitialisez le pipeline.
    BEGIN
         DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
            pipeline_name => 'EMPLOYEE_PIPELINE',
            purge_data => TRUE);
    END;
    /

    Utilisez uniquement le paramètre purge_data avec la valeur TRUE si vous voulez effacer des données dans la table de base de données, pour un pipeline de chargement ou effacer des fichiers dans le magasin d'objets pour un pipeline d'exportation. Habituellement, cette option est utilisée lorsque vous testez un pipeline pendant le développement du pipeline.

    Voir Procédure RESET_PIPELINE pour plus d'informations.

Surveiller et dépanner les pipelines

Tous les pipelines créés sont enregistrés dans les vues DBMS_CLOUD_PIPELINE.

Voir les informations sur le statut du pipeline

Vérifiez le statut du pipeline et obtenez d'autres informations sur le pipeline à l'aide des vues USER_CLOUD_PIPELINES ou DBA_CLOUD_PIPELINES. Par exemple, l'énoncé SELECT suivant avec un prédicat de clause WHERE sur pipeline_name montre que MY_TREE_DATA est un pipeline de chargement et que le pipeline est démarré :

SELECT pipeline_name, pipeline_type, status FROM USER_CLOUD_PIPELINES
   WHERE pipeline_name = 'MY_TREE_DATA';


PIPELINE_NAME PIPELINE_TYPE STATUS  
------------- ------------- ------- 
MY_TREE_DATA  LOAD          STARTED

Voir Vues DBMS_CLOUD_PIPELINE pour plus d'informations.

Voir les attributs de pipeline

Les attributs de pipeline peuvent être surveillés en interrogeant les vues USER_CLOUD_PIPELINE_ATTRIBUTES ou DBA_CLOUD_PIPELINE_ATTRIBUTES. Interrogez ces vues pour voir les informations sur les attributs de pipeline.

Par exemple :

SELECT pipeline_name, attribute_name, attribute_value FROM user_cloud_pipeline_attributes
     WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_NAME ATTRIBUTE_NAME  ATTRIBUTE_VALUE                                                                   
------------- --------------- --------------------------------------------------------------------------------- 
MY_TREE_DATA  credential_name DEF_CRED_OBJ_STORE                                                                
MY_TREE_DATA  format          {"type": "csv"}                                                                   
MY_TREE_DATA  interval        20                                                                                
MY_TREE_DATA  location        https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/treetypes/o/ 
MY_TREE_DATA  priority        high                                                                              
MY_TREE_DATA  table_name      TREES

Voir Vues DBMS_CLOUD_PIPELINE pour plus d'informations.

Voir l'historique du pipeline

Les vues USER_CLOUD_PIPELINE_HISTORY et DBA_CLOUD_PIPELINE_HISTORY affichent l'état des tâches en cours d'exécution. Utilisez les vues d'historique de pipeline pour surveiller l'état d'un pipeline et détecter les échecs dans un pipeline en cours d'exécution.

Par exemple :

SELECT pipeline_id, pipeline_name, status, error_message  FROM user_cloud_pipeline_history      
     WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_ID PIPELINE_NAME STATUS    ERROR_MESSAGE 
----------- ------------- --------- ------------- 
          7  MY_TREE_DATA SUCCEEDED

Voir Vues DBMS_CLOUD_PIPELINE pour plus d'informations.

Table de statut de pipeline : Surveillance supplémentaire pour les pipelines de chargement

La table de statut du pipeline affiche chaque nom de fichier et son statut pour un pipeline de chargement. La colonne STATUS_TABLE dans DBA_CLOUD_PIPELINES et USER_CLOUD_PIPELINES affiche le nom de la table de statut.

Par exemple, l'énoncé SELECT suivant avec un prédicat de clause WHERE sur pipeline_name affiche le nom de la table de statut d'un pipeline :

SELECT pipeline_name, status_table FROM user_cloud_pipelines
   WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_NAME STATUS_TABLE
------------- --------------------
MY_TREE_DATA  PIPELINE$9$41_STATUS

Consultez la table d'état pour voir les informations sur le pipeline, notamment :

  • Le numéro d'erreur et le message d'erreur pertinents sont enregistrés dans la table d'état en cas d'échec d'une opération sur un fichier spécifique.

  • Pour les opérations de pipeline terminées, le temps nécessaire pour chaque opération peut être calculé à l'aide des valeurs START_TIME et END_TIME indiquées.

Par exemple, l'opération de chargement de deux fichiers a échoué et l'autre est terminée :

SELECT id, name, status, error_code, error_message, sid FROM PIPELINE$9$41_STATUS;

ID NAME       STATUS    ERROR_CODE ERROR_MESSAGE                      SID 
-- ---------- --------- ---------- -------------------------------- ----- 
 1 trees1.txt FAILED         30653 ORA-30653: reject limit reached  18070 
 2 trees2.txt FAILED         30653 ORA-30653: reject limit reached  18070 
 3 trees3.txt COMPLETED                                             18070 

Les pipelines pour le chargement de données, où pipeline_type est 'LOAD', réservent une valeur ID affichée dans USER_LOAD_OPERATIONS et dans DBA_LOAD_OPERATIONS. La valeur ID dans ces vues correspond à OPERATION_ID du pipeline dans USER_CLOUD_PIPELINES et DBA_CLOUD_PIPELINES.

Pour obtenir plus d'informations sur un pipeline de chargement, interrogez OPERATION_ID du pipeline :

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';

PIPELINE_NAME OPERATION_ID 
------------- ------------ 
MY_TREE_DATA            41

Ensuite, interrogez USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS avec un prédicat de clause WHERE sur la colonne ID (à l'aide de la valeur OPERATION_ID).

Par exemple :

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;

ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Cette interrogation affiche ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE s'il existe, et STATUS_TABLE. Vous pouvez consulter ces tables pour obtenir des informations supplémentaires sur le chargement de pipeline.

Détails de la table des statuts de pipeline

Colonne Type de données Description
ID NUMBER Numéro unique affecté au pipeline.
NAME VARCHAR2(4000) Nom du pipeline.
BYTES NUMBER Octets
CHECKSUM VARCHAR2(128) Total de contrôle
LAST_MODIFIED TIMESTAMP(6) WITH TIME ZONE Heure de la dernière modification du pipeline.
STATUS VARCHAR2(30) La valeur STATUS est l'une des suivantes :
  • COMPLETED : Opération de fichier terminée.
  • FAILED : Échec de l'opération de fichier. Une nouvelle tentative peut être tentée deux fois.
  • PENDING : L'opération de fichier n'a pas encore démarré.
  • RUNNING : L'opération de fichier est en cours.
  • SKIPPED : Opération de fichier ignorée.
ERROR_CODE NUMBER Code d'erreur
ERROR_MESSAGE VARCHAR2(4000) Message d'erreur
START_TIME TIMESTAMP(6) WITH TIME ZONE Heure de début du pipeline.
END_TIME TIMESTAMP(6) WITH TIME ZONE Heure de fin du pipeline.
SID NUMBER

Les sessions SID et SERIAL# indiquent la session de travail qui exécutait l'opération de chargement de pipeline.

SERIAL# NUMBER

Les sessions SID et SERIAL# indiquent la session de travail qui exécutait l'opération de chargement de pipeline.

ROWS_LOADED NUMBER Nombre de rangées chargées.
OPERATION_ID NUMBER

Réservé pour une utilisation future.

Fichier journal de pipeline et tables de fichiers incorrectes

Pour obtenir le fichier journal et les noms de fichier incorrects pour un pipeline de chargement, interrogez OPERATION_ID du pipeline. Par exemple :

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';

PIPELINE_NAME OPERATION_ID 
------------- ------------ 
MY_TREE_DATA            41

Ensuite, interrogez USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS avec un prédicat de clause WHERE sur la colonne ID (à l'aide de la valeur OPERATION_ID).

Par exemple :

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;

ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Cette interrogation affiche ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE s'il existe, et STATUS_TABLE. Vous pouvez consulter ces tables pour obtenir des informations supplémentaires sur le chargement de pipeline.

Consultez la table des fichiers journaux du pipeline pour voir un journal complet des opérations de chargement du pipeline.

Par exemple :

SELECT * FROM PIPELINE$9$41_LOG;

Consultez la table des fichiers incorrects du pipeline pour voir les détails des enregistrements de format d'entrée comportant des erreurs. La table de fichiers erronée affiche des informations sur les rangées signalant des erreurs lors du chargement. Selon les erreurs affichées dans la table des fichiers journaux et les rangées affichées dans la table des fichiers incorrects du pipeline, vous pouvez corriger les erreurs soit en modifiant les options d'attribut format du pipeline, soit en modifiant les données du fichier que vous chargez.

Par exemple :

SELECT * FROM PIPELINE$9$41_BAD;

Pour plus d'informations, voir Surveiller et résoudre le chargement de données.