Gérer les pipelines

Après avoir créé et testé un pipeline, vous contrôlez ce dernier en le démarrant, en l'arrêtant ou en le supprimant. Vous pouvez également réinitialiser un pipeline.

Démarrer un pipeline

Après avoir créé un pipeline, vous pouvez le démarrer.

Lorsqu'un pipeline est démarré, il s'exécute en continu dans une tâche programmée. La tâche programmée du pipeline se répète, soit par défaut toutes les 15 minutes, soit à l'intervalle défini avec l'attribut interval.

  1. Démarrez un pipeline.

     BEGIN
       DBMS_CLOUD_PIPELINE.START_PIPELINE(
         pipeline_name => 'EMPLOYEE_PIPELINE'
       );
     END;
     /
    

    Par défaut, une tâche de pipeline commence immédiatement, dès que le pipeline est démarré. Pour démarrer une tâche de pipeline ultérieurement, spécifiez une date ou un horodatage futur valide à l'aide du paramètre start_date.

    Pour plus d'informations, voir Procédure START_PIPELINE.

  2. Vérifiez que le pipeline est démarré.

    Par exemple :

     SELECT pipeline_name, status from USER_CLOUD_PIPELINES
      WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
     PIPELINE_NAME            STATUS
    
     ------------------------ -------
     EMPLOYEE_PIPELINE        STARTED
    

Arrêter un pipeline

Utilisez STOP_PIPELINE pour arrêter un pipeline. Lorsqu'un pipeline est arrêté, aucune tâche future n'est programmée pour le pipeline.

Par défaut, les tâches en cours d'exécution sont terminées lorsque vous arrêtez un pipeline. Réglez le paramètre force à TRUE pour mettre fin aux tâches en cours d'exécution et arrêter immédiatement le pipeline.

  1. Arrêtez un pipeline.

     BEGIN
       DBMS_CLOUD_PIPELINE.STOP_PIPELINE(
         pipeline_name => 'EMPLOYEE_PIPELINE'
       );
     END;
     /
    

    Pour plus d'informations, voir Procédure STOP_PIPELINE.

  2. Vérifiez que le pipeline est arrêté.

     SELECT pipeline_name, status from USER_CLOUD_PIPELINES
      WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
     PIPELINE_NAME            STATUS
    
     ------------------------ -------
     EMPLOYEE_PIPELINE        STOPPED
    

Pour plus d'informations, voir Procédure STOP_PIPELINE.

Supprimer un pipeline

La procédure DROP_PIPELINE supprime un pipeline existant.

Si un pipeline a été démarré, il doit être arrêté pour pouvoir être abandonné. Pour plus d'informations, voir Procédure STOP_PIPELINE.

Pour supprimer un pipeline démarré, réglez le paramètre force à TRUE pour mettre fin aux tâches en cours d'exécution et supprimer le pipeline immédiatement

  1. Supprimez un pipeline.

     BEGIN
       DBMS_CLOUD_PIPELINE.DROP_PIPELINE(
         pipeline_name => 'EMPLOYEE_PIPELINE'
       );
     END;
     /
    
  2. Vérifiez que le pipeline a été abandonné.

     SELECT pipeline_name, status from USER_CLOUD_PIPELINES
      WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
     No rows selected
    

Pour plus d'informations, voir Procédure DROP_PIPELINE.

Réinitialiser un pipeline

Utilisez l'opération de réinitialisation de pipeline pour effacer l'enregistrement du pipeline à l'état initial.

Note : Vous pouvez éventuellement utiliser le pipeline de réinitialisation pour épurer les données de la table de base de données associée à un pipeline de chargement ou pour supprimer des fichiers dans le magasin d'objets pour un pipeline d'exportation. Cette option est généralement utilisée lorsque vous testez un pipeline lors du développement d'un pipeline.

La réinitialisation du pipeline fonctionne comme suit :

Pour réinitialiser un pipeline :

  1. Arrêtez le pipeline à réinitialiser.

    Un pipeline de données doit être à l'état Arrêté pour être réinitialisé. Pour plus d'informations, voir Procédure STOP_PIPELINE.

  2. Réinitialisez le pipeline.

     BEGIN
       DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
         pipeline_name => 'EMPLOYEE_PIPELINE',
         purge_data => TRUE);
     END;
     /
    

    N'utilisez le paramètre purge_data avec la valeur TRUE que si vous voulez effacer des données dans la table de base de données, pour un pipeline de chargement ou effacer des fichiers dans le magasin d'objets pour un pipeline d'exportation. Cette option est généralement utilisée lorsque vous testez un pipeline lors du développement d'un pipeline.

    Pour plus d'informations, voir Procédure RESET_PIPELINE.

Surveiller et dépanner les pipelines

Tous les pipelines créés sont journalisés dans les vues DBMS_CLOUD_PIPELINE.

Voir les informations sur le statut de pipeline

Vérifiez le statut du pipeline et obtenez d'autres informations à l'aide des vues USER_CLOUD_PIPELINES ou DBA_CLOUD_PIPELINES. Par exemple, l'énoncé SELECT suivant avec un prédicat de clause WHERE sur pipeline_name montre que MY_TREE_DATA est un pipeline de chargement et que le pipeline est démarré :

SELECT pipeline_name, pipeline_type, status 
  FROM USER_CLOUD_PIPELINES
 WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME PIPELINE_TYPE STATUS
------------- ------------- -------
MY_TREE_DATA  LOAD          STARTED

Pour plus d'informations, voir Vues DBMS_CLOUD_PIPELINE.

Voir les attributs de pipeline

Les attributs de pipeline peuvent être surveillés en interrogeant les vues USER_CLOUD_PIPELINE_ATTRIBUTES ou DBA_CLOUD_PIPELINE_ATTRIBUTES. Interrogez ces vues pour voir les informations sur les attributs de pipeline.

Par exemple :

SELECT pipeline_name, attribute_name, attribute_value FROM user_cloud_pipeline_attributes
     WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME ATTRIBUTE_NAME  ATTRIBUTE_VALUE
------------- --------------- ---------------------------------------------------------------------------------
MY_TREE_DATA  credential_name DEF_CRED_OBJ_STORE
MY_TREE_DATA  format          {"type": "csv"}
MY_TREE_DATA  interval        20
MY_TREE_DATA  location        https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/treetypes/o/
MY_TREE_DATA  priority        high
MY_TREE_DATA  table_name      TREES

Pour plus d'informations, voir Vues DBMS_CLOUD_PIPELINE.

Voir l'historique de pipeline

Les vues USER_CLOUD_PIPELINE_HISTORY et DBA_CLOUD_PIPELINE_HISTORY affichent l'état des tâches en cours d'exécution. Utilisez les vues d'historique de pipeline pour surveiller l'état d'un pipeline et détecter les défaillances dans un pipeline en cours d'exécution.

Par exemple :

SELECT pipeline_id, pipeline_name, status, error_message  FROM user_cloud_pipeline_history
     WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_ID PIPELINE_NAME STATUS    ERROR_MESSAGE
----------- ------------- --------- -------------
          7  MY_TREE_DATA SUCCEEDED

Pour plus d'informations, voir Vues DBMS_CLOUD_PIPELINE.

Table de statut de pipeline : Surveillance supplémentaire pour les pipelines de chargement

La table de statut de pipeline affiche chaque nom de fichier et son statut pour un pipeline de chargement. La colonne STATUS_TABLE dans DBA_CLOUD_PIPELINES et USER_CLOUD_PIPELINES affiche le nom de la table de statut.

Par exemple, l'énoncé SELECT suivant avec un prédicat de clause WHERE sur pipeline_name affiche le nom de la table de statut d'un pipeline :

SELECT pipeline_name, status_table FROM user_cloud_pipelines
   WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME STATUS_TABLE
------------- --------------------
MY_TREE_DATA  PIPELINE$9$41_STATUS

Consultez la table de statut pour voir les informations sur le pipeline, notamment :

Par exemple, ce qui suit montre que l'opération de chargement de deux fichiers a échoué et qu'une opération est terminée :

SELECT id, name, status, error_code, error_message, sid FROM PIPELINE$9$41_STATUS;
ID NAME       STATUS    ERROR_CODE ERROR_MESSAGE                      SID
-- ---------- --------- ---------- -------------------------------- -----
 1 trees1.txt FAILED         30653 ORA-30653: reject limit reached  18070
 2 trees2.txt FAILED         30653 ORA-30653: reject limit reached  18070
 3 trees3.txt COMPLETED                                             18070

Pipelines pour le chargement des données, où pipeline_type est 'LOAD', réservez une valeur ID affichée dans USER_LOAD_OPERATIONS et DBA_LOAD_OPERATIONS. La valeur ID dans ces vues est mappée à la valeur OPERATION_ID du pipeline dans USER_CLOUD_PIPELINES et DBA_CLOUD_PIPELINES.

Pour obtenir plus d'informations sur un pipeline de chargement, interrogez la valeur OPERATION_ID du pipeline :

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';
PIPELINE_NAME OPERATION_ID
------------- ------------
MY_TREE_DATA            41

Ensuite, interrogez USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS avec un prédicat de clause WHERE sur la colonne ID (à l'aide de la valeur OPERATION_ID).

Par exemple :

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;
ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Cette interrogation affiche ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE s'il existe et STATUS_TABLE. Vous pouvez consulter ces tables pour obtenir des informations supplémentaires sur le chargement du pipeline.

Détails de la table de statuts de pipeline

Colonne Type de données Description
ID NUMBER Numéro unique affecté au pipeline.
NAME VARCHAR2(4000) Nom du pipeline.
BYTES NUMBER Octets
CHECKSUM VARCHAR2(128) Somme de contrôle
LAST_MODIFIED TIMESTAMP(6) WITH TIME ZONE Heure de la dernière modification pour le pipeline.
STATUS VARCHAR2(30) La valeur STATUS est l'une des suivantes :
  • COMPLETED : L'opération de fichier s'est terminée avec succès.
  • FAILED : Échec de l'opération de fichier. Une nouvelle tentative peut être effectuée deux fois.
  • PENDING : L'opération de fichier n'a pas encore démarré.
  • RUNNING : L'opération de fichier est en cours.
  • SKIPPED : Opération de fichier ignorée.
ERROR_CODE NUMBER Code d'erreur
ERROR_MESSAGE VARCHAR2(4000) Message d'erreur
START_TIME TIMESTAMP(6) WITH TIME ZONE Heure de début du pipeline.
END_TIME TIMESTAMP(6) WITH TIME ZONE Heure de fin du pipeline.
SID NUMBER Les sessions SID etSERIAL# indiquent la session de tâche qui exécutait l'opération de chargement de pipeline.
SERIAL# NUMBER Les sessions SID etSERIAL# indiquent la session de tâche qui exécutait l'opération de chargement de pipeline.
ROWS_LOADED NUMBER Nombre de rangées chargées.
OPERATION_ID NUMBER Réservé pour une utilisation future.

Fichier journal de pipeline et tables de fichiers incorrects

Pour obtenir les noms de fichier journal et de fichier incorrect pour un pipeline de chargement, interrogez la valeur OPERATION_ID du pipeline. Par exemple :

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';
PIPELINE_NAME OPERATION_ID
------------- ------------
MY_TREE_DATA            41

Ensuite, interrogez USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS avec un prédicat de clause WHERE sur la colonne ID (à l'aide de la valeur OPERATION_ID).

Par exemple :

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;
ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Cette interrogation affiche ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE s'il existe et STATUS_TABLE. Vous pouvez consulter ces tables pour obtenir des informations supplémentaires sur le chargement du pipeline.

Consultez la table du fichier journal du pipeline pour voir un journal complet des opérations de chargement du pipeline.

Par exemple :

SELECT * FROM PIPELINE$9$41_LOG;

Consultez la table des fichiers incorrects du pipeline pour voir les détails des enregistrements de format d'entrée comportant des erreurs. La table des fichiers incorrects affiche des informations sur les lignes signalant des erreurs lors du chargement. Selon les erreurs affichées dans la table du fichier journal et les rangées affichées dans la table des fichiers incorrects du pipeline, vous pouvez peut-être corriger les erreurs soit en modifiant les options d'attribut format du pipeline, soit en modifiant les données du fichier que vous chargez.

Par exemple :

SELECT * FROM PIPELINE$9$41_BAD;

Pour plus d'informations, voir Surveiller et dépanner le chargement de données.