Manage Pipelines
After you create and test a pipeline, you control a pipeline by starting, stopping, or dropping the pipeline. You can also reset a pipeline.
Start a Pipeline
After you create a pipeline, you can start the pipeline.
When a pipeline is started the pipeline runs continuously in a scheduled job. The pipeline’s scheduled job repeats, either by default every 15 minutes or at the interval you set with the interval attribute.
-
Start a pipeline.
BEGIN DBMS_CLOUD_PIPELINE.START_PIPELINE( pipeline_name => 'EMPLOYEE_PIPELINE' ); END; /By default, a pipeline job begins immediately, as soon as the pipeline is started. To start a pipeline job at a later time, specify a valid future date or timestamp using the
start_dateparameter.See START_PIPELINE Procedure for more information.
-
Verify that the pipeline is started.
For example:
SELECT pipeline_name, status from USER_CLOUD_PIPELINES WHERE pipeline_name = 'EMPLOYEE_PIPELINE';PIPELINE_NAME STATUS ------------------------ ------- EMPLOYEE_PIPELINE STARTED
Stop a Pipeline
Use STOP_PIPELINE to stop a pipeline. When a pipeline is stopped, no future jobs are scheduled for the pipeline.
By default currently running jobs complete when you stop a pipeline. Set the force parameter to TRUE to terminate any running jobs and stop the pipeline immediately.
-
Stop a pipeline.
BEGIN DBMS_CLOUD_PIPELINE.STOP_PIPELINE( pipeline_name => 'EMPLOYEE_PIPELINE' ); END; /See STOP_PIPELINE Procedure for more information.
-
Verify that the pipeline is stopped.
SELECT pipeline_name, status from USER_CLOUD_PIPELINES WHERE pipeline_name = 'EMPLOYEE_PIPELINE';PIPELINE_NAME STATUS ------------------------ ------- EMPLOYEE_PIPELINE STOPPED
See STOP_PIPELINE Procedure for more information.
Drop a Pipeline
The procedure DROP_PIPELINE drops an existing pipeline.
If a pipeline has been started, it must be stopped before the pipeline can be dropped. See STOP_PIPELINE Procedure for more information.
In order to drop a pipeline that is started, set the force parameter to TRUE to terminate any running jobs and drop the pipeline immediately
-
Drop a pipeline.
BEGIN DBMS_CLOUD_PIPELINE.DROP_PIPELINE( pipeline_name => 'EMPLOYEE_PIPELINE' ); END; / -
Verify the pipeline is dropped.
SELECT pipeline_name, status from USER_CLOUD_PIPELINES WHERE pipeline_name = 'EMPLOYEE_PIPELINE';No rows selected
See DROP_PIPELINE Procedure for more information.
Reset a Pipeline
Use the reset pipeline operation to clear the record of the pipeline to the initial state.
Note: You can optionally use reset pipeline to purge data in the database table associated with a load pipeline or to remove files in object store for an export pipeline. Usually this option is used when you are testing a pipeline during pipeline development.
Reset pipeline operates as follows:
-
Load Pipeline: For a load pipeline, resetting the pipeline clears the record of the files being loaded by the pipeline. When you call either
START_PIPELINEorRUN_PIPELINE_ONCEafter resetting a load pipeline, the pipeline repeats the data load and includes all the files present in the object store location.When
purge_datais set toTRUE, the procedure truncates the data in the database table. -
Export Pipeline: For an export pipeline, resetting the pipeline clears the last tracked data in the database table. When you call either
START_PIPELINEorRUN_PIPELINE_ONCEafter resetting an export pipeline, the pipeline repeats exporting data from the table or query.When
purge_dataset toTRUE, the procedure deletes existing files in the object store location specified with thelocationattribute.
To reset a pipeline:
-
Stop the pipeline you want to reset.
A data pipeline must be in stopped state to reset it. See STOP_PIPELINE Procedure for more information.
-
Reset the pipeline.
BEGIN DBMS_CLOUD_PIPELINE.RESET_PIPELINE( pipeline_name => 'EMPLOYEE_PIPELINE', purge_data => TRUE); END; /Only use the
purge_dataparameter with valueTRUEif you want to clear data in your database table, for a load pipeline, or clear files in object store for an export pipeline. Usually this option is used when you are testing a pipeline during pipeline development.See RESET_PIPELINE Procedure for more information.
Monitor and Troubleshoot Pipelines
All pipelines that are created are logged in the DBMS_CLOUD_PIPELINE views.
View Pipeline Status Information
Check pipeline status and obtain other pipeline information using the USER_CLOUD_PIPELINES or DBA_CLOUD_PIPELINES views. For example, the following SELECT statement with a WHERE clause predicate on the pipeline_name shows that MY_TREE_DATA is a load pipeline and the pipeline is started:
SELECT pipeline_name, pipeline_type, status
FROM USER_CLOUD_PIPELINES
WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME PIPELINE_TYPE STATUS
------------- ------------- -------
MY_TREE_DATA LOAD STARTED
See DBMS_CLOUD_PIPELINE Views for more information.
View Pipeline Attributes
The pipeline attributes can be monitored by querying the USER_CLOUD_PIPELINE_ATTRIBUTES or DBA_CLOUD_PIPELINE_ATTRIBUTES views. Query these views to see pipeline attribute information.
For example:
SELECT pipeline_name, attribute_name, attribute_value FROM user_cloud_pipeline_attributes
WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME ATTRIBUTE_NAME ATTRIBUTE_VALUE
------------- --------------- ---------------------------------------------------------------------------------
MY_TREE_DATA credential_name DEF_CRED_OBJ_STORE
MY_TREE_DATA format {"type": "csv"}
MY_TREE_DATA interval 20
MY_TREE_DATA location https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/treetypes/o/
MY_TREE_DATA priority high
MY_TREE_DATA table_name TREES
See DBMS_CLOUD_PIPELINE Views for more information.
View Pipeline History
The USER_CLOUD_PIPELINE_HISTORY and DBA_CLOUD_PIPELINE_HISTORY views show the state of running jobs. Use the pipeline history views to help you monitor the health of a pipeline and detect failures in a running pipeline.
For example:
SELECT pipeline_id, pipeline_name, status, error_message FROM user_cloud_pipeline_history
WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_ID PIPELINE_NAME STATUS ERROR_MESSAGE
----------- ------------- --------- -------------
7 MY_TREE_DATA SUCCEEDED
See DBMS_CLOUD_PIPELINE Views for more information.
Pipeline Status Table: Additional Monitoring for Load Pipelines
The pipeline status table shows each file name and its status for a load pipeline. The STATUS_TABLE column in DBA_CLOUD_PIPELINES and USER_CLOUD_PIPELINES shows the status table name.
For example, the following SELECT statement with a WHERE clause predicate on the pipeline_name shows the status table name for a pipeline:
SELECT pipeline_name, status_table FROM user_cloud_pipelines
WHERE pipeline_name = 'MY_TREE_DATA';
PIPELINE_NAME STATUS_TABLE
------------- --------------------
MY_TREE_DATA PIPELINE$9$41_STATUS
View the status table to see information about the pipeline, including the following:
-
The relevant error number and error message are recorded in the status table if an operation on a specific file fails.
-
For completed pipeline operations, the time needed for each operation can be calculated using the reported
START_TIMEandEND_TIME.
For example the following shows that the load operation for two files failed and one completed:
SELECT id, name, status, error_code, error_message, sid FROM PIPELINE$9$41_STATUS;
ID NAME STATUS ERROR_CODE ERROR_MESSAGE SID
-- ---------- --------- ---------- -------------------------------- -----
1 trees1.txt FAILED 30653 ORA-30653: reject limit reached 18070
2 trees2.txt FAILED 30653 ORA-30653: reject limit reached 18070
3 trees3.txt COMPLETED 18070
Pipelines for loading data, where the pipeline_type is 'LOAD', reserve an ID that is shown in USER_LOAD_OPERATIONS and in DBA_LOAD_OPERATIONS. The ID value in these views maps to the pipeline’s OPERATION_ID in USER_CLOUD_PIPELINES and DBA_CLOUD_PIPELINES.
To obtain more information for a load pipeline, query the pipeline’s OPERATION_ID:
SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
WHERE PIPELINE_NAME = 'MY_TREE_DATA';
PIPELINE_NAME OPERATION_ID
------------- ------------
MY_TREE_DATA 41
Next, query either USER_LOAD_OPERATIONS or DBA_LOAD_OPERATIONS with a WHERE clause predicate on the ID column (using the OPERATION_ID value).
For example:
SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
WHERE ID = 41;
ID TYPE LOGFILE_TABLE BADFILE_TABLE STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS
This query shows ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE if it exists, and the STATUS_TABLE. You can view these tables for additional pipeline load information.
Pipeline Status Table Details
| Column | Datatype | Description |
|---|---|---|
ID |
NUMBER |
Unique number assigned to the pipeline. |
NAME |
VARCHAR2(4000) |
Name of the pipeline. |
BYTES |
NUMBER |
Bytes |
CHECKSUM |
VARCHAR2(128) |
Checksum |
LAST_MODIFIED |
TIMESTAMP(6) WITH TIME ZONE |
Last modification time for the pipeline. |
STATUS |
VARCHAR2(30) |
The STATUS value is one of:
|
ERROR_CODE |
NUMBER |
Error code |
ERROR_MESSAGE |
VARCHAR2(4000) |
Error message |
START_TIME |
TIMESTAMP(6) WITH TIME ZONE |
Start time for the pipeline. |
END_TIME |
TIMESTAMP(6) WITH TIME ZONE |
End time for the pipeline. |
SID |
NUMBER |
The session SID andSERIAL# indicate the job session that was running the pipeline load operation. |
SERIAL# |
NUMBER |
The session SID andSERIAL# indicate the job session that was running the pipeline load operation. |
ROWS_LOADED |
NUMBER |
Number of rows loaded. |
OPERATION_ID |
NUMBER |
Reserved for future use. |
Pipeline Log File and Bad File Tables
To obtain the log file and bad file names for a load pipeline, query the pipeline’s OPERATION_ID. For example:
SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
WHERE PIPELINE_NAME = 'MY_TREE_DATA';
PIPELINE_NAME OPERATION_ID
------------- ------------
MY_TREE_DATA 41
Next, query either USER_LOAD_OPERATIONS or DBA_LOAD_OPERATIONS with a WHERE clause predicate on the ID column (using the OPERATION_ID value).
For example:
SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
WHERE ID = 41;
ID TYPE LOGFILE_TABLE BADFILE_TABLE STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS
This query shows ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE if it exists, and the STATUS_TABLE. You can view these tables for additional pipeline load information.
View the pipeline log file table to see a complete log of the pipeline’s load operations.
For example:
SELECT * FROM PIPELINE$9$41_LOG;
View the pipeline bad file table to see details on input format records with errors. The bad file table show information for the rows reporting errors during loading. Depending on the errors shown in the log file table and the rows shown in the pipeline’s bad file table, you might be able to correct the errors either by modifying the pipeline format attribute options, or by modifying the data in the file you are loading.
For example:
SELECT * FROM PIPELINE$9$41_BAD;
See Monitor and Troubleshoot Data Loading for more information.