Utilisation d'un pipeline

Un pipeline vous permet de connecter un ensemble de tâches dans une séquence ou en parallèle pour orchestrer le traitement des données.

En créant un pipeline, vous pouvez créer un graphique de dépendance de tâche complexe et automatiser l'ensemble d'une charge globale de tâches. Les tâches doivent être publiées et vous pouvez ajouter des tâches publiées à partir de n'importe quelle application se trouvant dans l'espace de travail en cours ou à partir d'un autre espace de travail.

Dans ce tutoriel, vous effectuez les opérations suivantes :

  • Créez deux tâches de programme de chargement de données à exécuter en parallèle dans un pipeline.
  • Créez une tâche REST pour utiliser le service de notification afin d'envoyer des notifications par courriel.
  • Créez un pipeline et ajoutez des opérateurs pour les tâches de programme de chargement de données, la fusion, la tâche d'intégration et la tâche REST.
  • Créez une tâche de pipeline afin de configurer un contexte d'exécution pour un pipeline.
  • Publiez une tâche de pipeline et exécutez un pipeline.
  • Surveiller une exécution de pipeline.

1. Création d'une tâche de programme de chargement de données pour les données de revenu

Dupliquez la tâche Load Revenue Data into Data Warehouse pour créer une tâche qui charge et écrase les données de revenu.

  1. Sur la page du détail du projet DI Lab, sélectionnez Tâches dans le sous-menu.
  2. Dans la liste Tâches, recherchez Load Revenue Data into Data Warehouse.
  3. Sélectionnez le menu Actions (Menu Actions), puis sélectionnez Dupliquer.
  4. Dans la boîte de dialogue Dupliquer la tâche, entrez Revenue Data Load pour le nouveau nom, puis sélectionnez Dupliquer.

    La valeur Identificateur est générée automatiquement en fonction du nom que vous indiquez. Vous pouvez modifier la valeur générée, mais après avoir enregistré la nouvelle tâche, vous n'êtes pas autorisé à mettre à jour l'identifiant.

  5. Dans la liste Tâches, sélectionnez Chargement des données de produit.

    La page de tâche du programme de chargement de données en double s'ouvre dans un nouvel onglet.

  6. Sélectionnez l'icône d'étape Cible.
  7. Sous Paramètres de chargement des entités de données cible, sélectionnez Utiliser des entités de données existantes.
  8. Dans le menu Stratégie d'intégration, sélectionnez Ecraser.
  9. Sous Entités de données disponibles, cochez la case REVENUE_TARGET, puis sélectionnez Définir en tant que cible.

    Le nom REVENUE_TARGET apparaît en regard de Entité de données sélectionnée.

  10. Sélectionnez Enregistrer pour enregistrer la tâche et poursuivre les modifications.
  11. Sélectionnez Suivant pour passer à l'étape Transformation.

    Ne supprimez pas la transformation de remplissage NULL précédemment appliquée à SRC_ORDER_NUMBER.

  12. Sélectionnez Suivant pour accéder à l'étape Mise en correspondance d'attributs.

    Tous les attributs source et cible sont automatiquement mis en correspondance.

  13. Sélectionnez Suivant pour accéder à l'étape Vérifier et valider.

    La validation de la tâche commence automatiquement.

    Un récapitulatif des détails de configuration de chaque étape est présenté dans un bloc. Si vous modifiez la configuration d'une étape, accédez à l'étape Vérifier et valider pour valider à nouveau la tâche.

    Le résultat de la validation de tâche est affiché dans le dernier bloc, Validation.

  14. Lorsque la validation aboutit, sélectionnez Enregistrer et fermer.

2. Création d'une tâche de programme de chargement de données pour les données client

Créez une tâche de programme de chargement de données pour charger les données client dans Data Warehouse en créant une entité de données cible.

  1. Sur la page du détail du projet DI Lab, sélectionnez Tâches dans le sous-menu.
  2. Sélectionnez Créer une tâche, puis Programme de chargement de données.

    La page Créer un programme de chargement de données s'ouvre dans un nouvel onglet. Les étapes numérotées et nommées en haut vous guident tout au long de la configuration. Une coche apparaît sur l'icône d'une étape une fois cette dernière configurée. Pour passer d'une étape à l'autre, sélectionnez Suivant ou Précédent. Vous pouvez également accéder directement à une étape configurée en sélectionnant son icône.

  3. Sur la page Créer une tâche de programme de chargement de données, à l'étape Informations de base, sélectionnez les éléments suivants :
    Pour cet élémentSélectionnez
    Type de source File Storage
    Type de cible Base de données
    Type de chargement Entité de données unique
  4. Pour le nom de la tâche, entrez Customer Data Load. Sélectionnez ensuite Suivant pour passer à l'étape suivante.

    Une case à cocher apparaît sur l'icône de l'étape Informations de base une fois celle-ci configurée.

  5. A l'étape Source, sélectionnez les éléments suivants :
    Pour cet élémentSélectionnez
    Ressource de données Data_Lake
    Connexion Connexion par défaut
    Compartiment Compartiment contenant le bucket dans lequel vous avez téléchargé le fichier de données échantillon CUSTOMERS.JSON
    Bucket bucket Object Storage qui contient le fichier JSON échantillon
  6. Sous Paramètres de fichier, sélectionnez les éléments suivants :
    Pour cet élémentSélectionnez
    Type de fichier JSON
    Type de compression Automatique (valeur par défaut)
    Encodage UTF-8

    Vous pouvez conserver les paramètres par défaut dans les autres champs.

  7. Sous Entités de données disponibles, cochez la case CUSTOMERS.JSON, puis sélectionnez Définir comme source.

    Le nom CUSTOMERS.JSON apparaît en regard de Entité de données sélectionnée.

  8. Sélectionnez Créer pour enregistrer la tâche et poursuivre les modifications.
  9. Sélectionnez Suivant pour passer à l'étape Cible, puis sélectionnez les éléments suivants :
    Pour cet élémentSélectionnez
    Ressource de données Data_Warehouse
    Connexion Connexion par défaut
    Schéma BETA
  10. Sous Emplacement de préparation, vous pouvez utiliser l'emplacement de préparation par défaut configuré lors de la création de la ressource de données cible.

    Vous pouvez également désélectionner la case pour sélectionner un autre bucket Object Storage.

  11. Sous Paramètres de chargement des entités à données cible, sélectionnez Créer de nouvelles entités à données.
  12. Sous Options de nom d'entité de données cible, sélectionnez Indiquer un nom d'entité. Ensuite, dans le champ Nom d'entité, entrez CUSTOMER_JSON_TARGET.
  13. Sélectionnez Enregistrer pour enregistrer la tâche et poursuivre les modifications.
  14. Sélectionnez l'étape Vérifier et valider, en ignorant l'étape de transformation facultative.

    La validation de la tâche commence automatiquement.

    Un récapitulatif des détails de configuration de chaque étape est présenté dans un bloc. Si vous modifiez la configuration d'une étape, accédez à l'étape Vérifier et valider pour valider à nouveau la tâche.

    Le résultat de la validation de tâche est affiché dans le dernier bloc, Validation.

  15. Lorsque la validation aboutit, sélectionnez Enregistrer et fermer.

3. Création d'une tâche REST pour l'envoi de notifications

Vous pouvez utiliser une tâche REST pour exécuter une adresse d'API REST dans un pipeline. Dans ce tutoriel, vous utilisez l'API du service Notifications dans une tâche REST Data Integration pour publier un courriel à partir d'un pipeline.

Pour créer une tâche REST à cette étape, vous devez déjà disposer des éléments suivants :
  • Sujet et abonnement par courriel créés dans le service Notifications.

  • OCID du sujet que vous avez créé. L'OCID est disponible dans la section Informations sur la rubrique de la page de détails de la rubrique du service Notifications.

  • L'instruction de stratégie suivante qui vous permet d'exécuter des tâches Data Integration appelant l'API REST Notifications :

    allow any-user to use notification-family in tenancy where ALL {request.principal.type='disworkspace'}

Ensuite, dans Data Integration, créez une tâche REST qui utilise l'API du service Notifications pour publier un courriel.

  1. Sur la page du détail du projet DI Lab, sélectionnez Tâches dans le sous-menu.
  2. Sélectionnez Créer la tâche, puis REST.

    La page Créer une tâche REST apparaît dans un nouvel onglet.

  3. Dans Nom, entrez Notify by Email.

    La valeur Identificateur est générée automatiquement en fonction du nom que vous indiquez. Vous pouvez modifier la valeur générée, mais après avoir enregistré la nouvelle tâche, vous n'êtes pas autorisé à mettre à jour l'identifiant.

  4. Dans la section Détails de l'API REST, sélectionnez Configurer.

    La page Configurer les détails de l'API REST apparaît. Les étapes numérotées et nommées en haut vous guident tout au long de la configuration. Une coche apparaît sur l'icône d'une étape une fois cette dernière configurée. Pour passer d'une étape à l'autre, sélectionnez Suivant ou Précédent. Vous pouvez également accéder directement à une étape configurée en sélectionnant son icône.

  5. Pour Méthode HTTP, sélectionnez POST.
  6. Dans le champ URL, entrez les informations suivantes et appuyez sur Entrée.
    https://notification.us-ashburn-1.oci.oraclecloud.com/20181201/topics/${TOPICID}/messages
    Remarque

    Veillez à utiliser l'identificateur de région approprié pour le service Notifications.

    Lorsque vous appuyez sur Entrée après avoir saisi l'URL, Data Integration convertit la syntaxe du paramètre ${} en paramètre d'URL de chaîne.

  7. Dans la ligne de table correspondant au nouveau paramètre d'URL ajouté TOPICID, sélectionnez Modifier dans le menu Actions (menu Actions).
  8. Dans le champ Valeur, entrez l'OCID du sujet Notifications que vous avez créé et sélectionnez Enregistrer.
  9. Ensuite, ajoutez un en-tête en procédant comme suit :
    1. Sélectionnez En-tête.
    2. Sélectionnez Ajouter un en-tête.
    3. Dans le champ Clé, entrez con et sélectionnez Type de contenu dans la liste.
    4. Dans le champ Valeur, entrez app et sélectionnez application/json dans la liste.
    5. Choisissez Ajouter.
  10. Ajoutez un corps de demande en procédant comme suit :
    1. Sélectionnez Demande.
    2. Dans l'éditeur, entrez les informations suivantes.
      {"title": "Put your title here", "body": "Put your email body here."}
    3. Choisissez Ajouter.
  11. Sélectionnez Suivant, puis Configurer.
  12. Pour fournir l'authentification, procédez comme suit :
    1. Dans la section Authentification, sélectionnez Modifier pour afficher le panneau Configurer l'authentification.
    2. Dans le menu Authentification, sélectionnez Principal de ressource OCI.
    3. Sous Source d'authentification, sélectionnez Espace de travail.
    4. Sélectionnez Configurer.
  13. Dans la section facultative Valider la tâche, sélectionnez Valider.
  14. Une fois la validation réussie, sélectionnez Créer et fermer.

4. Publication du programme de chargement de données et des tâches REST

  1. Sur la page DI_Lab Détails du projet, sélectionnez Tâches dans le sous-menu.
  2. Dans la liste des tâches, cochez les cases en regard de Chargement des données de revenus, Chargement des données client et Notifier par courriel.
  3. Sélectionnez Publier vers l'application.
  4. Dans la boîte de dialogue Publier sur une application, sélectionnez Lab Application, puis Publier.

    Un message de notification apparaît avec un lien vers l'application pour afficher les tâches publiées.

  5. Sélectionnez Visualiser l'application dans la notification. Sélectionnez ensuite X pour fermer la notification.

    La liste Patches de la page de détails de l'application apparaît. Une entrée de patch est créée pour les tâches que vous publiez.

  6. Dans la liste Patches, vous pouvez surveiller le statut du patch. Sélectionnez Actualiser pour obtenir les dernières mises à jour de statut.

    Lorsque le statut d'un patch devient Réussite, trois entrées de tâche publiées sont créées dans la liste Tâches de la page de détails de l'application.

  7. Sur la page Tâches de l'application de laboratoire, sélectionnez Tâches.

    Les tâches publiées pour Chargement des données de revenu, Chargement des données client et Notifier par courriel sont affichées dans la liste des tâches.

5. Création d'un pipeline

  1. Dans la barre d'onglets, sélectionnez l'onglet Ouvrir (icône Plus), puis Projets.
  2. Sur la page Projets, sélectionnez DI_Lab.
  3. Sur la page de détails du projet DI_Lab, sélectionnez Pipelines dans le sous-menu de gauche, puis sélectionnez Créer un pipeline.

    Le concepteur de pipeline s'ouvre dans un nouvel onglet. Un opérateur de début et un opérateur de fin sont placés sur le canevas pour vous.

  4. Dans le panneau Propriétés du pipeline, entrez Analyze Revenue en tant que nom.

    La valeur Identificateur est générée automatiquement en fonction de la valeur que vous entrez pour le nom de pipeline. Vous pouvez modifier la valeur générée, mais une fois le pipeline enregistré, vous n'êtes pas autorisé à mettre à jour l'identifiant.

  5. Choisissez Créer.

    Le concepteur reste ouvert pour que vous puissiez poursuivre vos modifications.

6. Ajout d'opérateurs de pipeline

Vous ajoutez des opérateurs de tâche pour indiquer les tâches publiées à orchestrer dans le pipeline.

En savoir plus sur les opérateurs de pipeline.

  1. Dans le panneau Opérateurs, déposez un opérateur de chargeur de données sur le canevas, en le plaçant entre les opérateurs de début et de fin.

    Le panneau Propriétés affiche désormais les détails de l'opérateur de tâche de programme de chargement de données non limité.

  2. Dans l'onglet Détails du panneau Propriétés, cliquez sur Sélectionner.

    Le panneau Sélectionner une tâche de programme de chargement de données apparaît pour vous permettre de sélectionner une tâche de programme de chargement de données publiée.

  3. Dans Lab Application, sélectionnez Chargement des données de revenu (tâche qui charge les données de revenu dans un entrepôt de données), puis cliquez sur Sélectionner.

    Le nom figurant sur l'icône de l'opérateur devient le nom de la tâche sélectionnée.

  4. Connectez l'opérateur de démarrage à la tâche du chargeur de données de revenus.
  5. Pour enregistrer le pipeline et poursuivre la modification, sélectionnez Enregistrer.
  6. Répétez les étapes pour ajouter un deuxième opérateur de chargeur de données. Cette fois, sélectionnez Chargement des données client (tâche qui charge les données client). Connectez ensuite l'opérateur de démarrage à la tâche de programme de chargement de données client.
  7. Ensuite, déposez l'opérateur Fusionner sur le canevas, en le plaçant après les deux tâches du programme de chargement de données.
  8. Connectez chaque tâche de programme de chargement de données à l'opérateur Fusionner.
  9. Dans l'onglet Détails du panneau Propriétés de l'opérateur de fusion, sélectionnez Toutes les réussites dans le menu Condition de fusion.

    Cela indique que les opérations parallèles liées en amont doivent se terminer et réussir avant que l'opération suivante en aval puisse se poursuivre.

  10. Dans le panneau Opérateurs, déposez l'opérateur Intégration sur le canevas, en le plaçant après l'opérateur de fusion.
  11. Dans l'onglet Détails du panneau Propriétés, cliquez sur Sélectionner.
  12. Dans le panneau Sélectionner une tâche d'intégration, sélectionnez la tâche Charger les clients en laboratoire et cliquez sur Sélectionner.
  13. Connectez l'opérateur de fusion à l'opérateur de tâche d'intégration.
  14. Déposez ensuite l'opérateur REST sur le canevas, en le plaçant après la tâche d'intégration.
  15. Dans l'onglet Détails du panneau Propriétés, cliquez sur Sélectionner.
  16. Dans le panneau Sélectionner une tâche REST, sélectionnez la tâche Notifier par courriel, puis cliquez sur Sélectionner.
  17. Dans l'onglet Détails du panneau Propriétés de l'opérateur de tâche REST, sélectionnez Exécuter en cas de réussite de l'opérateur précédent dans le menu Condition de lien entrante.
  18. Connectez la tâche REST à l'opérateur de fin.
  19. Sélectionnez Valider dans la barre d'outils du canevas.

    Le panneau Validation globale s'affiche pour vous permettre de vérifier les avertissements ou les erreurs.

  20. Pour enregistrer le pipeline, sélectionnez Enregistrer et fermer, puis

7. Création d'une tâche de pipeline

  1. Dans la barre d'onglets, sélectionnez l'onglet Ouvrir (icône Plus), puis Projets.
  2. Sur la page Projets, sélectionnez DI_Lab.
  3. Sur la page DI_Lab de détails de projet, sélectionnez Tâches dans le sous-menu de gauche.
  4. Sélectionnez Créer un projet, puis Pipeline.

    La page Créer une tâche de pipeline apparaît dans un nouvel onglet.

  5. Sur la page Créer une tâche de pipeline, remplacez le nom par Analyze Revenue Lab.

    La saisie d'une Description est facultative. La valeur du champ Identificateur est automatiquement générée en fonction de la valeur saisie dans Nom. Vous pouvez modifier la valeur générée, mais après avoir sauvegardé la tâche, vous n'êtes pas autorisé à mettre à jour l'identifiant.

  6. Dans la section Pipeline, cliquez sur Sélectionner.
  7. Dans le panneau Sélectionner un pipeline, sélectionnez Analyser le revenu, puis cliquez sur Sélectionner.

    La validation du pipeline commence automatiquement.

  8. Sélectionnez Créer et fermer.

8. Publication et exécution d'une tâche de pipeline

  1. Sur la page DI_Lab Détails du projet, sélectionnez Tâches dans le sous-menu.
  2. Dans la liste Tâches, sélectionnez le menu Actions (Menu Actions) pour Analyser le laboratoire de revenus, puis sélectionnez Publier vers l'application.
  3. Dans la boîte de dialogue Publier sur une application, sélectionnez Lab Application, puis Publier.

    Un message de notification apparaît, avec un lien vers l'application pour afficher les tâches publiées.

  4. Accédez à la page de détails de l'application de laboratoire, puis sélectionnez Patches dans le sous-menu de gauche pour afficher les détails du patch de tâche.

    Un patch contient des mises à jour pour une tâche publiée dans une application. Lors de la publication d'une tâche, un patch de publication est créé. En savoir plus sur les patches.

  5. Dans la liste Patches, vous pouvez surveiller le statut du patch. Sélectionnez Actualiser pour obtenir les dernières mises à jour de statut.

    Lorsque le statut d'un patch devient Réussite, une entrée de tâche publiée est créée dans la liste Tâches de la page de détails de l'application.

  6. Sur la page Tâches de l'application de laboratoire, sélectionnez Tâches.

    La tâche publiée par pipeline Analyser le laboratoire de revenus est affichée dans la liste des tâches.

  7. Sélectionnez le menu Actions (Menu Actions) de la tâche de pipeline et sélectionnez Exécuter.

    Un message de succès s'affiche. Lorsque vous exécutez une tâche, une exécution de tâche est créée. Vous êtes automatiquement redirigé vers la page Exécutions, où vous pouvez visualiser toutes les exécutions de tâche et leur statut. Le statut initial d'une exécution de pipeline est Not started.

  8. Dans la liste Exécutions de la page de détails Application de laboratoire, sélectionnez Actualiser pour obtenir les dernières mises à jour du statut d'exécution de tâche.

    L'exécution d'un pipeline comprend des étapes de prétraitement, d'acceptation et de validation avant que le moteur d'exécution ne démarre l'exécution réelle du pipeline.

    Sélectionnez Actualiser plusieurs fois jusqu'à ce que le statut En cours d'exécution s'affiche.

  9. Lorsque la tâche de pipeline est en cours d'exécution, sélectionnez le nom de l'exécution de tâche.

    La page Détails de l'exécution apparaît, dans laquelle vous pouvez surveiller la progression de l'exécution du pipeline sur le graphique de pipeline. Le statut de chaque noeud est indiqué par une icône et un libellé. Par exemple, une coche verte pour un noeud terminé, le libellé Running pour les tâches en cours d'exécution et le libellé Waiting pour une tâche en aval en attente d'exécution.

    Sélectionnez Actualiser plusieurs fois jusqu'à ce que le statut d'exécution global du pipeline soit Succès.

    Vous pouvez également sélectionner Présentation pour afficher plus de détails sur l'exécution de pipeline.

  10. Une fois l'exécution de pipeline réussie, accédez à la liste Exécutions de la page de détails de l'application de laboratoire et développez l'entrée d'exécution principale de l'exécution de tâche de pipeline.

    Vous pouvez visualiser les détails d'exécution de quatre tâches individuelles dans le pipeline.

    Vous auriez également reçu un courriel du service Notifications.