Créer et configurer des pipelines
Vous pouvez créer un ou plusieurs pipelines de chargement ou d'exportation. Lorsque vous créez un pipeline, vous utilisez des paramètres et définissez des attributs de pipeline pour configurer le pipeline.
Les options de création et de configuration d'un pipeline sont les suivantes :
-
Pipeline de chargement :
-
Pipeline d'exportation :
-
Exporter les résultats incrémentiels d'une interrogation vers le magasin d'objets à l'aide d'une colonne de date ou d'horodatage comme clé pour le suivi des données plus récentes. Voir Créer et configurer un pipeline à exporter avec une colonne d'horodatage.
-
Exporter les données incrémentielles d'une table vers le magasin d'objets à l'aide d'une colonne de date ou d'horodatage comme clé pour le suivi des données plus récentes. Voir Créer et configurer un pipeline à exporter avec une colonne d'horodatage.
-
Exportez les données d'une table vers un magasin d'objets à l'aide d'une interrogation pour sélectionner des données sans référence à une colonne de date ou d'horodatage (de sorte que le pipeline exporte toutes les données sélectionnées par l'interrogation pour chaque exécution du programmateur). Voir Créer et configurer un pipeline pour exporter les résultats d'interrogation (sans horodatage).
-
Créer et configurer un pipeline pour le chargement de données
Vous pouvez créer un pipeline pour charger des données à partir de fichiers externes dans le magasin d'objets ou les répertoires vers des tables de la base de données IA autonome.
Un pipeline de chargement consomme les données placées sur le magasin d'objets ou dans les répertoires et les charge dans une table de la base de données d'intelligence artificielle autonome. Lorsque vous créez un pipeline de chargement, celui-ci s'exécute à intervalles réguliers pour consommer les données placées à l'emplacement source, lorsque de nouveaux fichiers de données arrivent, le pipeline charge les nouvelles données. Vous pouvez également utiliser un pipeline pour copier de manière fiable des fichiers, avec les fonctionnalités de reprise et de nouvelle tentative, de l'emplacement source vers une table de votre base de données.
Avec un pipeline de chargement, l'ensemble de pipeline utilise DBMS_CLOUD.COPY_DATA pour charger des données.
Dans votre base de données d'intelligence artificielle autonome, utilisez une table existante ou créez la table de base de données dans laquelle vous chargez les données. Par exemple :
CREATE TABLE EMPLOYEE
(name VARCHAR2(128),
age NUMBER,
salary NUMBER);
- Créez un pipeline pour charger des données à partir d'un magasin d'objets ou d'objets de répertoire.
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name => 'MY_PIPE1', pipeline_type => 'LOAD', description => 'Load metrics from object store into a table' ); END; /Pour plus d'informations, voir Procédure CREATE_PIPELINE.
-
Créez un objet de données d'identification pour accéder au magasin d'objets qui contient les fichiers que vous chargez.
Vous spécifiez les données d'identification pour l'emplacement source du pipeline avec l'attribut
credential_name. Si vous n'indiquez pascredential_nameà l'étape suivante, la valeurcredential_nameest réglée àNULL. Vous pouvez utiliser la valeurNULLpar défaut lorsque l'attributlocationest une URL publique ou préauthentifiée.Pour plus d'informations, voir Procédure CREATE_CREDENTIAL.
-
Définissez les attributs de pipeline, y compris les attributs requis :
location,table_nameetformat.Cas 1 : Créer un pipeline pour charger des données à partir du magasin d'objets.
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'MY_PIPE1', attributes => JSON_OBJECT( 'credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'table_name' VALUE 'employee', 'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}', 'priority' VALUE 'HIGH', 'interval' VALUE '20') ); END; /Cas 2 : Créer un pipeline pour charger des données à partir d'objets de répertoire.
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'MY_PIPE1', pipeline_type => 'LOAD', attributes => JSON_OBJECT( 'location' VALUE 'MY_DIR:*.csv', 'table_name' VALUE 'employee', 'format' VALUE '{"type":"csv"}', 'priority' VALUE 'HIGH', 'interval' VALUE '20') ); END; /Les attributs suivants doivent être définis pour exécuter un pipeline de chargement dans les deux cas :
-
location: Spécifie l'emplacement du fichier source dans le magasin d'objets ou l'objet de répertoire. -
table_name: Spécifie la table dans la base de données où vous chargez les données. La valeurlocationque vous spécifiez est pour une valeurtable_namepar pipeline. -
format: Décrit le format des données que vous chargez.Pour plus d'informations, voir Options de format d'ensemble DBMS_CLOUD.
credential_nameest les données d'identification que vous avez créées à l'étape précédente.La valeur
prioritydétermine le nombre de fichiers chargés en parallèle. Un pipeline avec une priorité supérieure consomme plus de ressources de base de données et exécute chaque exécution plus rapidement, par rapport à une priorité inférieure.La valeur
intervalspécifie l'intervalle de temps en minutes entre les exécutions consécutives d'une tâche de pipeline. La valeur par défaut deintervalest 15 minutes.Voir Attributs DBMS_CLOUD_PIPELINE pour plus de détails sur les attributs de pipeline.
-
-
Après avoir créé un pipeline, vous pouvez tester le pipeline ou le démarrer :
Comme alternative, pour définir le format pour JSON, vous pouvez utiliser le format suivant :
BEGIN
DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
pipeline_name => 'MY_PIPE1',
attribute_name => 'format',
attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
);
END;
/
Créer et configurer un pipeline pour l'exportation avec une colonne d'horodatage
Vous pouvez créer un pipeline d'exportation pour exporter automatiquement des données de série chronologique de votre base de données IA autonome vers le magasin d'objets.
À l'aide de cette option de pipeline d'exportation, vous spécifiez une table ou une interrogation SQL et une colonne avec un horodatage que le pipeline utilise pour suivre l'heure du dernier chargement. Vous pouvez utiliser un pipeline d'exportation pour partager des données en vue de leur utilisation par d'autres applications ou pour enregistrer des données dans le magasin d'objets.
Avec un pipeline d'exportation, l'ensemble de pipeline utilise DBMS_CLOUD.EXPORT_DATA pour exporter des données.
Un pipeline d'exportation exporte les données de votre base de données d'intelligence artificielle autonome vers le magasin d'objets. Lorsque vous créez un pipeline d'exportation, celui-ci s'exécute à intervalles réguliers et place des données dans le magasin d'objets.
-
Créez un pipeline pour exporter des données vers le magasin d'objets.
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name=>'EXP_PIPE1', pipeline_type=>'EXPORT', description=>'Export time series metrics to object store'); END; /Pour plus d'informations, voir Procédure CREATE_PIPELINE.
-
Créez un objet de données d'identification pour accéder à l'emplacement du magasin d'objets de destination où vous exportez des fichiers de données.
Vous spécifiez les données d'identification pour l'emplacement de destination du pipeline avec l'attribut
credential_name. Si vous n'indiquez pascredential_nameà l'étape suivante, la valeurcredential_nameest réglée àNULL. Vous pouvez utiliser la valeurNULLpar défaut lorsque l'attributlocationest une URL publique ou préauthentifiée.Pour plus d'informations, voir Procédure CREATE_CREDENTIAL.
-
Définissez les attributs du pipeline d'exportation.
Lorsque vous spécifiez un paramètre
table_name, les rangées de table sont exportées vers le magasin d'objets. Lorsque vous spécifiez un paramètrequery, l'interrogation spécifie un énoncéSELECTafin que seules les données requises soient exportées vers le magasin d'objets.-
Utilisation d'un paramètre
table_name:BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE1', attributes => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'table_name' VALUE 'metric_table', 'key_column' VALUE 'metric_time', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; / -
Utilisation d'un paramètre
query:BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE1', attributes => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'query' VALUE 'SELECT * from metrics_table', 'key_column' VALUE 'metric_time', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; /Où
credential_nameest les données d'identification que vous avez créées à l'étape précédente.
Les attributs suivants doivent être définis pour exécuter un pipeline d'exportation :
-
location: Spécifie l'emplacement du magasin d'objets de destination. La valeurlocationque vous spécifiez est pour une valeurtable_namepar pipeline. -
table_name: Spécifie la table de votre base de données qui contient les données que vous exportez (le paramètretable_nameou le paramètrequeryest requis). -
query: Spécifie l'interrogation à exécuter dans la base de données qui fournit les données que vous exportez (le paramètretable_nameou le paramètrequeryest requis). -
format: Décrit le format des données que vous exportez.Pour plus d'informations, voir Options de format d'ensemble DBMS_CLOUD pour EXPORT_DATA.
La valeur
prioritydétermine le degré de parallélisme pour l'extraction des données de la base de données.La valeur
intervalspécifie l'intervalle de temps en minutes entre les exécutions consécutives d'une tâche de pipeline. La valeur par défaut deintervalest 15 minutes.Voir Attributs DBMS_CLOUD_PIPELINE pour plus de détails sur les attributs de pipeline.
Après avoir créé un pipeline, vous pouvez tester le pipeline ou le démarrer :
-
Créer et configurer un pipeline pour exporter des résultats d'interrogation (sans horodatage)
Vous pouvez créer un pipeline d'exportation pour exporter automatiquement des données de votre base de données d'intelligence artificielle autonome vers le magasin d'objets. À l'aide de cette option de pipeline d'exportation, vous spécifiez une interrogation SQL que le pipeline exécute périodiquement pour exporter des données vers le magasin d'objets. Vous pouvez utiliser cette option d'exportation pour partager les données les plus récentes de votre base de données Autonomous AI Database vers le magasin d'objets afin que d'autres applications puissent consommer les données.
Un pipeline d'exportation exporte les données de votre base de données d'intelligence artificielle autonome vers le magasin d'objets. Lorsque vous créez un pipeline d'exportation, celui-ci s'exécute à intervalles réguliers et place des données dans le magasin d'objets.
-
Créez un pipeline pour exporter des données vers le magasin d'objets.
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name=>'EXP_PIPE2', pipeline_type=>'EXPORT', description=>'Export query results to object store.'); END; /Pour plus d'informations, voir Procédure CREATE_PIPELINE.
-
Créez un objet de données d'identification pour accéder à l'emplacement du magasin d'objets de destination où vous exportez des fichiers de données.
Vous spécifiez les données d'identification pour l'emplacement de destination du pipeline avec l'attribut
credential_name. Si vous n'indiquez pascredential_nameà l'étape suivante, la valeurcredential_nameest réglée àNULL. Vous pouvez utiliser la valeurNULLpar défaut lorsque l'attributlocationest une URL publique ou préauthentifiée.Pour plus d'informations, voir Procédure CREATE_CREDENTIAL.
-
Définissez les attributs du pipeline d'exportation.
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE2', attributes => JSON_OBJECT( 'credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'query' VALUE 'SELECT * FROM table_name', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; /Où
credential_nameest les données d'identification que vous avez créées à l'étape précédente.Les attributs suivants doivent être définis pour exécuter un pipeline d'exportation :
-
location: Spécifie l'emplacement du magasin d'objets de destination. -
query: Spécifie l'interrogation à exécuter dans la base de données qui fournit les données que vous exportez. -
format: Décrit le format des données que vous exportez.Pour plus d'informations, voir Options de format d'ensemble DBMS_CLOUD pour EXPORT_DATA.
La valeur
prioritydétermine le degré de parallélisme pour l'extraction des données de la base de données.La valeur
intervalspécifie l'intervalle de temps en minutes entre les exécutions consécutives d'une tâche de pipeline. La valeur par défaut deintervalest 15 minutes.Voir Attributs DBMS_CLOUD_PIPELINE pour plus de détails sur les attributs de pipeline.
Après avoir créé un pipeline, vous pouvez tester le pipeline ou le démarrer :
-
Tester les pipelines
Utilisez RUN_PIPELINE_ONCE pour exécuter un pipeline une fois sur demande sans créer de tâche programmée.
RUN_PIPELINE_ONCE est utile pour tester un pipeline avant de le démarrer. Après avoir exécuté un pipeline une fois pour tester le pipeline et vérifier qu'il fonctionne comme prévu, utilisez RESET_PIPELINE pour réinitialiser l'état du pipeline (à l'état avant d'exécuter RUN_PIPELINE_ONCE).
-
Créez un pipeline.
Pour plus d'informations, voir Créer et configurer un pipeline pour le chargement des données.
-
Exécutez un pipeline une fois pour tester le pipeline.
BEGIN DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE( pipeline_name => 'MY_PIPE1' ); END; /Pour plus d'informations, voir Procédure RUN_PIPELINE_ONCE.
-
Effectuez toutes les vérifications requises pour vérifier que le pipeline fonctionne comme prévu.
Pour plus d'informations, voir Surveiller et dépanner les pipelines.
-
Réinitialisez le pipeline.
BEGIN DBMS_CLOUD_PIPELINE.RESET_PIPELINE( pipeline_name => 'MY_PIPE1', purge_data => TRUE ); END; /Pour plus d'informations, voir Procédure RESET_PIPELINE.