Create and Configure Pipelines
You can create one or more load or export pipelines. When you create a pipeline, you use parameters and set pipeline attributes to configure the pipeline.
The options to create and configure a pipeline follow:
-
Load Pipeline:
-
Export Pipeline:
-
Export incremental results of a query to object store using a date or timestamp column as key for tracking newer data. See Create and Configure a Pipeline for Export with Timestamp Column.
-
Export incremental data of a table to object store using a date or timestamp column as key for tracking newer data. See Create and Configure a Pipeline for Export with Timestamp Column.
-
Export data of a table to object store using a query to select data without a reference to a date or timestamp column (so that the pipeline exports all the data that the query selects for each scheduler run). See Create and Configure a Pipeline to Export Query Results (Without a Timestamp).
-
Create and Configure a Pipeline for Loading Data
You can create a pipeline to load data from external files in object store or directories to tables in Autonomous AI Database.
A load pipeline consumes data placed on object store or in directories and loads it to a table in Autonomous AI Database. When you create a load pipeline, the pipeline runs at regular intervals to consume data placed on the source location, when new data files arrive the pipeline loads the new data. You can also use a pipeline to reliably copy files, with resume and retry capabilities, from the source location to a table on your database.
With a load pipeline, the pipeline package uses DBMS_CLOUD.COPY_DATA to load data.
On your Autonomous AI Database, either use an existing table or create the database table where you are loading data. For example:
CREATE TABLE EMPLOYEE
(name VARCHAR2(128),
age NUMBER,
salary NUMBER);
- Create a pipeline to load data from object store or from directory objects.
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name => 'MY_PIPE1', pipeline_type => 'LOAD', description => 'Load metrics from object store into a table' ); END; /See CREATE_PIPELINE Procedure for more information.
-
Create a credential object to access the object store that contains the files you are loading.
You specify the credential for the pipeline source location with the attribute
credential_name. If you do not supply acredential_namein the next step, thecredential_namevalue is set toNULL. You can use the defaultNULLvalue when thelocationattribute is a public or pre-authenticated URL.See CREATE_CREDENTIAL Procedure for more information.
-
Set the pipeline attributes, including the required attributes:
location,table_name, andformat.Case 1: Create a pipeline for loading data from object store.
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'MY_PIPE1', attributes => JSON_OBJECT( 'credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'table_name' VALUE 'employee', 'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}', 'priority' VALUE 'HIGH', 'interval' VALUE '20') ); END; /Case 2: Create a pipeline for loading data from directory objects.
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'MY_PIPE1', pipeline_type => 'LOAD', attributes => JSON_OBJECT( 'location' VALUE 'MY_DIR:*.csv', 'table_name' VALUE 'employee', 'format' VALUE '{"type":"csv"}', 'priority' VALUE 'HIGH', 'interval' VALUE '20') ); END; /The following attributes must be set to run a load pipeline in both cases:
-
location: Specifies the source file location on object store or directory object. -
table_name: Specifies the table in your database where you are loading data. Thelocationyou specify is for onetable_nameper pipeline. -
format: Describes the format of the data you are loading.See DBMS_CLOUD Package Format Options for more information.
The
credential_nameis the credential you created in the previous step.The
priorityvalue determines the number of files loaded in parallel. A pipeline with a higher priority consumes more database resources and completes each run faster, as compared to running at a lower priority.The
intervalvalue specifies the time interval in minutes between consecutive runs of a pipeline job. The defaultintervalis 15 minutes.See DBMS_CLOUD_PIPELINE Attributes for details on the pipeline attributes.
-
-
After you create a pipeline you can test the pipeline or start the pipeline:
As an alternative, to set the format for JSON, you could use the following format:
BEGIN
DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
pipeline_name => 'MY_PIPE1',
attribute_name => 'format',
attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
);
END;
/
Create and Configure a Pipeline for Export with Timestamp Column
You can create an export pipeline to automatically export time-series data from your Autonomous AI Database to object store.
Using this export pipeline option you specify a table or SQL query and a column with a timestamp that the pipeline uses to keep track of the time of the last upload. You can use an export pipeline to share data for consumption by other applications or to save data to object store.
With an export pipeline, the pipeline package uses DBMS_CLOUD.EXPORT_DATA to export data.
An export pipeline exports data from your Autonomous AI Database to object store. When you create an export pipeline, the pipeline runs at regular intervals and places data on object store.
-
Create a pipeline to export data to object store.
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name=>'EXP_PIPE1', pipeline_type=>'EXPORT', description=>'Export time series metrics to object store'); END; /See CREATE_PIPELINE Procedure for more information.
-
Create a credential object to access the destination object store location where you are exporting data files.
You specify the credential for the pipeline destination location with the attribute
credential_name. If you do not supply acredential_namein the next step, thecredential_namevalue is set toNULL. You can use the defaultNULLvalue when thelocationattribute is a public or pre-authenticated URL.See CREATE_CREDENTIAL Procedure for more information.
-
Set the export pipeline attributes.
When you specify a
table_nameparameter, table rows are exported to object store. When you specify aqueryparameter, the query specifies aSELECTstatement so that only the required data is exported to object store.-
Using using a
table_nameparameter:BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE1', attributes => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'table_name' VALUE 'metric_table', 'key_column' VALUE 'metric_time', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; / -
Using a
queryparameter:BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE1', attributes => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'query' VALUE 'SELECT * from metrics_table', 'key_column' VALUE 'metric_time', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; /Where the
credential_nameis the credential you created in the previous step.
The following attributes must be set to run an export pipeline:
-
location: Specifies the destination object store location. Thelocationyou specify is for onetable_nameper pipeline. -
table_name: Specifies the table in your database that contains the data you are exporting (either thetable_nameparameter or thequeryparameter is required). -
query: Specifies the query to run in your database that provides the data you are exporting (either thetable_nameparameter or thequeryparameter is required). -
format: Describes the format of the data you are exporting.See DBMS_CLOUD Package Format Options for EXPORT_DATA for more information.
The
priorityvalue determines the degree of parallelism for fetching data from the database.The
intervalvalue specifies the time interval in minutes between consecutive runs of a pipeline job. The defaultintervalis 15 minutes.See DBMS_CLOUD_PIPELINE Attributes for details on the pipeline attributes.
After you create a pipeline you can test the pipeline or start the pipeline:
-
Create and Configure a Pipeline to Export Query Results (Without a Timestamp)
You can create an export pipeline to automatically export data from your Autonomous AI Database to object store. Using this export pipeline option you specify a SQL query that the pipeline runs periodically to export data to object store. You can use this export option to share the latest data from your Autonomous AI Database to object store for other applications to consume the data.
An export pipeline exports data from your Autonomous AI Database to object store. When you create an export pipeline, the pipeline runs at regular intervals and places data on object store.
-
Create a pipeline to export data to object store.
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name=>'EXP_PIPE2', pipeline_type=>'EXPORT', description=>'Export query results to object store.'); END; /See CREATE_PIPELINE Procedure for more information.
-
Create a credential object to access the destination object store location where you are exporting data files.
You specify the credential for the pipeline destination location with the attribute
credential_name. If you do not supply acredential_namein the next step, thecredential_namevalue is set toNULL. You can use the defaultNULLvalue when thelocationattribute is a public or pre-authenticated URL.See CREATE_CREDENTIAL Procedure for more information.
-
Set the export pipeline attributes.
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE2', attributes => JSON_OBJECT( 'credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'query' VALUE 'SELECT * FROM table_name', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; /Where the
credential_nameis the credential you created in the previous step.The following attributes must be set to run an export pipeline:
-
location: Specifies the destination object store location. -
query: Specifies the query to run in your database that provides the data you are exporting. -
format: Describes the format of the data you are exporting.See DBMS_CLOUD Package Format Options for EXPORT_DATA for more information.
The
priorityvalue determines the degree of parallelism for fetching data from the database.The
intervalvalue specifies the time interval in minutes between consecutive runs of a pipeline job. The defaultintervalis 15 minutes.See DBMS_CLOUD_PIPELINE Attributes for details on the pipeline attributes.
After you create a pipeline you can test the pipeline or start the pipeline:
-
Test Pipelines
Use RUN_PIPELINE_ONCE to run a pipeline once on-demand without creating a scheduled job.
RUN_PIPELINE_ONCE is useful for testing a pipeline before you start the pipeline. After you run a pipeline once to test the pipeline and check that it is working as expected, use RESET_PIPELINE to reset the pipeline’s state (to the state before you ran RUN_PIPELINE_ONCE).
-
Create a pipeline.
See Create and Configure a Pipeline for Loading Data for more information.
-
Run a pipeline once to test the pipeline.
BEGIN DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE( pipeline_name => 'MY_PIPE1' ); END; /See RUN_PIPELINE_ONCE Procedure for more information.
-
Perform any required checks to verify the pipeline is operating as expected.
See Monitor and Troubleshoot Pipelines for more information.
-
Reset the pipeline.
BEGIN DBMS_CLOUD_PIPELINE.RESET_PIPELINE( pipeline_name => 'MY_PIPE1', purge_data => TRUE ); END; /See RESET_PIPELINE Procedure for more information.