Gérer les pipelines

Une fois que vous avez créé et testé un pipeline, vous pouvez le contrôler en le démarrant, en l'arrêtant ou en le supprimant. Vous pouvez également réinitialiser un pipeline.

Démarrer un pipeline

Une fois que vous avez créé un pipeline, vous pouvez le démarrer.

Lorsqu'un pipeline est démarré, il est exécuté en continu dans un travail programmé. Le travail programmé du pipeline se répète, par défaut toutes les 15 minutes ou à l'intervalle que vous définissez avec l'attribut interval.

  1. Démarrez un pipeline.

     BEGIN
       DBMS_CLOUD_PIPELINE.START_PIPELINE(
         pipeline_name => 'EMPLOYEE_PIPELINE'
       );
     END;
     /
    

    Par défaut, un travail de pipeline démarre immédiatement dès que le pipeline est démarré. Pour démarrer un travail de pipeline ultérieurement, indiquez une date future ou un horodatage valide à l'aide du paramètre start_date.

    Pour plus d'informations, reportez-vous à la section START_PIPELINE Procedure.

  2. Vérifiez que le pipeline est démarré.

    Exemple :

     SELECT pipeline_name, status from USER_CLOUD_PIPELINES
      WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
     PIPELINE_NAME            STATUS
    
     ------------------------ -------
     EMPLOYEE_PIPELINE        STARTED
    

Arrêt d'un pipeline

Utilisez STOP_PIPELINE pour arrêter un pipeline. Lorsqu'un pipeline est arrêté, aucun travail futur n'est programmé pour le pipeline.

Par défaut, les travaux en cours d'exécution sont terminés lorsque vous arrêtez un pipeline. Définissez le paramètre force sur TRUE pour mettre fin aux travaux en cours d'exécution et arrêter immédiatement le pipeline.

  1. Arrêtez un pipeline.

     BEGIN
       DBMS_CLOUD_PIPELINE.STOP_PIPELINE(
         pipeline_name => 'EMPLOYEE_PIPELINE'
       );
     END;
     /
    

    Pour plus d'informations, reportez-vous à Procédure STOP_PIPELINE.

  2. Vérifiez que le pipeline est arrêté.

     SELECT pipeline_name, status from USER_CLOUD_PIPELINES
      WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
     PIPELINE_NAME            STATUS
    
     ------------------------ -------
     EMPLOYEE_PIPELINE        STOPPED
    

Pour plus d'informations, reportez-vous à Procédure STOP_PIPELINE.

Suppression d'un pipeline

La procédure DROP_PIPELINE supprime un pipeline existant.

Si un pipeline a été démarré, il doit être arrêté avant de pouvoir le supprimer. Pour plus d'informations, reportez-vous à Procédure STOP_PIPELINE.

Afin de supprimer un pipeline démarré, définissez le paramètre force sur TRUE pour mettre fin aux travaux en cours d'exécution et supprimer immédiatement le pipeline

  1. Supprimer un pipeline.

     BEGIN
       DBMS_CLOUD_PIPELINE.DROP_PIPELINE(
         pipeline_name => 'EMPLOYEE_PIPELINE'
       );
     END;
     /
    
  2. Vérifiez que le pipeline est supprimé.

     SELECT pipeline_name, status from USER_CLOUD_PIPELINES
      WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
     No rows selected
    

Pour plus d'informations, reportez-vous à Procédure DROP_PIPELINE.

Réinitialisation d'un pipeline

Utilisez l'opération de réinitialisation de pipeline pour faire passer l'enregistrement du pipeline à l'état initial.

Remarque : vous pouvez éventuellement utiliser le pipeline de réinitialisation pour purger les données de la table de base de données associée à un pipeline de chargement ou pour enlever les fichiers de la banque d'objets d'un pipeline d'export. En général, cette option est utilisée lorsque vous testez un pipeline pendant son développement.

Le pipeline de réinitialisation fonctionne comme suit :

Pour réinitialiser un pipeline, procédez comme suit :

  1. Arrêtez le pipeline à réinitialiser.

    Un pipeline de données doit être à l'état Arrêté pour pouvoir le réinitialiser. Pour plus d'informations, reportez-vous à Procédure STOP_PIPELINE.

  2. Réinitialisez le pipeline.

     BEGIN
       DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
         pipeline_name => 'EMPLOYEE_PIPELINE',
         purge_data => TRUE);
     END;
     /
    

    Utilisez uniquement le paramètre purge_data avec la valeur TRUE si vous voulez effacer des données dans la table de base de données, pour un pipeline de chargement ou pour effacer des fichiers dans la banque d'objets pour un pipeline d'export. En général, cette option est utilisée lorsque vous testez un pipeline pendant son développement.

    Pour plus d'informations, reportez-vous à Procédure RESET_PIPELINE.

Surveillance et dépannage des pipelines

Tous les pipelines créés sont journalisés dans les vues DBMS_CLOUD_PIPELINE.

Afficher les informations sur le statut du pipeline

Vérifiez le statut du pipeline et obtenez d'autres informations sur le pipeline à l'aide des vues USER_CLOUD_PIPELINES ou DBA_CLOUD_PIPELINES. Par exemple, l'instruction SELECT suivante avec un prédicat de clause WHERE sur pipeline_name indique que MY_TREE_DATA est un pipeline de charge et que le pipeline est démarré :

SELECT pipeline_name, pipeline_type, status 
  FROM USER_CLOUD_PIPELINES
 WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME PIPELINE_TYPE STATUS
------------- ------------- -------
MY_TREE_DATA  LOAD          STARTED

Pour plus d'informations, reportez-vous à Vues DBMS_CLOUD_PIPELINE.

Afficher les attributs de pipeline

Vous pouvez surveiller les attributs de pipeline en interrogeant les vues USER_CLOUD_PIPELINE_ATTRIBUTES ou DBA_CLOUD_PIPELINE_ATTRIBUTES. Interrogez ces vues pour afficher les informations sur les attributs de pipeline.

Exemple :

SELECT pipeline_name, attribute_name, attribute_value FROM user_cloud_pipeline_attributes
     WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME ATTRIBUTE_NAME  ATTRIBUTE_VALUE
------------- --------------- ---------------------------------------------------------------------------------
MY_TREE_DATA  credential_name DEF_CRED_OBJ_STORE
MY_TREE_DATA  format          {"type": "csv"}
MY_TREE_DATA  interval        20
MY_TREE_DATA  location        https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/treetypes/o/
MY_TREE_DATA  priority        high
MY_TREE_DATA  table_name      TREES

Pour plus d'informations, reportez-vous à Vues DBMS_CLOUD_PIPELINE.

Afficher l'historique du pipeline

Les vues USER_CLOUD_PIPELINE_HISTORY et DBA_CLOUD_PIPELINE_HISTORY indiquent l'état des travaux en cours d'exécution. Utilisez les vues d'historique de pipeline pour surveiller l'état d'un pipeline et détecter les échecs dans un pipeline en cours d'exécution.

Exemple :

SELECT pipeline_id, pipeline_name, status, error_message  FROM user_cloud_pipeline_history
     WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_ID PIPELINE_NAME STATUS    ERROR_MESSAGE
----------- ------------- --------- -------------
          7  MY_TREE_DATA SUCCEEDED

Pour plus d'informations, reportez-vous à Vues DBMS_CLOUD_PIPELINE.

Table Statut du pipeline : Surveillance supplémentaire pour les pipelines de chargement

La table des statuts de pipeline affiche chaque nom de fichier et son statut pour un pipeline de chargement. La colonne STATUS_TABLE dans DBA_CLOUD_PIPELINES et USER_CLOUD_PIPELINES indique le nom de la table de statut.

Par exemple, l'instruction SELECT suivante avec un prédicat de clause WHERE sur pipeline_name affiche le nom de la table de statut d'un pipeline :

SELECT pipeline_name, status_table FROM user_cloud_pipelines
   WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME STATUS_TABLE
------------- --------------------
MY_TREE_DATA  PIPELINE$9$41_STATUS

Consultez la table des statuts pour obtenir des informations sur le pipeline, notamment :

Par exemple, l'opération de chargement de deux fichiers a échoué et l'une est terminée :

SELECT id, name, status, error_code, error_message, sid FROM PIPELINE$9$41_STATUS;
ID NAME       STATUS    ERROR_CODE ERROR_MESSAGE                      SID
-- ---------- --------- ---------- -------------------------------- -----
 1 trees1.txt FAILED         30653 ORA-30653: reject limit reached  18070
 2 trees2.txt FAILED         30653 ORA-30653: reject limit reached  18070
 3 trees3.txt COMPLETED                                             18070

Pipelines pour le chargement de données, où pipeline_type est 'LOAD', réservez un élément ID affiché dans USER_LOAD_OPERATIONS et dans DBA_LOAD_OPERATIONS. La valeur ID dans ces vues correspond à la valeur OPERATION_ID du pipeline dans USER_CLOUD_PIPELINES et DBA_CLOUD_PIPELINES.

Pour obtenir plus d'informations sur un pipeline de chargement, interrogez le fichier OPERATION_ID du pipeline :

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';
PIPELINE_NAME OPERATION_ID
------------- ------------
MY_TREE_DATA            41

Interrogez ensuite USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS avec un prédicat de clause WHERE sur la colonne ID (en utilisant la valeur OPERATION_ID).

Exemple :

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;
ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Cette requête affiche ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE si elle existe et STATUS_TABLE. Vous pouvez consulter ces tables pour obtenir des informations supplémentaires sur le chargement du pipeline.

Détails de la table des statuts de pipeline

Colonne Type de données Description
ID NUMBER Numéro unique affecté au pipeline.
NAME VARCHAR2(4000) Nom du pipeline.
BYTES NUMBER Octets
CHECKSUM VARCHAR2(128) Checksum
LAST_MODIFIED TIMESTAMP(6) WITH TIME ZONE Heure de la dernière modification du pipeline.
STATUS VARCHAR2(30) La valeur STATUS est l'une des suivantes :
  • COMPLETED : l'opération de fichier s'est terminée.
  • FAILED : échec de l'opération sur le fichier. Vous pouvez tenter une nouvelle tentative deux fois.
  • PENDING : l'opération de fichier n'a pas encore démarré.
  • RUNNING : l'opération de fichier est en cours.
  • SKIPPED : opération de fichier ignorée.
ERROR_CODE NUMBER Code d'erreur
ERROR_MESSAGE VARCHAR2(4000) Message d'erreur
START_TIME TIMESTAMP(6) WITH TIME ZONE Heure de début du pipeline.
END_TIME TIMESTAMP(6) WITH TIME ZONE Heure de fin du pipeline.
SID NUMBER La session SID etSERIAL# indiquent la session de travail qui exécutait l'opération de chargement de pipeline.
SERIAL# NUMBER La session SID et SERIAL# indiquent la session de travail qui exécutait l'opération de chargement de pipeline.
ROWS_LOADED NUMBER Nombre de lignes chargées
OPERATION_ID NUMBER Réservé à une utilisation ultérieure.

Tables de fichiers journaux de pipeline et de fichiers incorrects

Afin d'obtenir le fichier journal et les noms de fichier erronés pour un pipeline de chargement, interrogez le fichier OPERATION_ID du pipeline. Exemple :

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';
PIPELINE_NAME OPERATION_ID
------------- ------------
MY_TREE_DATA            41

Ensuite, interrogez USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS avec un prédicat de clause WHERE sur la colonne ID (en utilisant la valeur OPERATION_ID).

Exemple :

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;
ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Cette requête affiche ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE si elle existe et STATUS_TABLE. Vous pouvez consulter ces tables pour obtenir des informations supplémentaires sur le chargement du pipeline.

Affichez la table des fichiers journaux du pipeline pour afficher un journal complet des opérations de chargement du pipeline.

Exemple :

SELECT * FROM PIPELINE$9$41_LOG;

Consultez la table des fichiers de pipeline erronés pour voir les détails des enregistrements de format d'entrée comportant des erreurs. La table des fichiers incorrects affiche les informations relatives aux lignes signalant des erreurs lors du chargement. En fonction des erreurs affichées dans la table des fichiers journaux et des lignes affichées dans la table des fichiers incorrects du pipeline, vous pouvez corriger les erreurs en modifiant les options d'attribut format du pipeline ou en modifiant les données du fichier que vous chargez.

Exemple :

SELECT * FROM PIPELINE$9$41_BAD;

Pour plus d'informations, reportez-vous à Surveillance et dépannage du chargement de données.