Control Pipelines (Start, Stop, Drop, or Reset a Pipeline)

After you create and test a pipeline, you control a pipeline by starting, stopping, or dropping the pipeline. You can also reset a pipeline.

Start a Pipeline

After you create a pipeline, you can start the pipeline.

When a pipeline is started the pipeline runs continuously in a scheduled job. The pipeline's scheduled job repeats, either by default every 15 minutes or at the interval you set with the interval attribute.

  1. Start a pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.START_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /

    By default, a pipeline job begins immediately, as soon as the pipeline is started. To start a pipeline job at a later time, specify a valid future date or timestamp using the start_date parameter.

    See START_PIPELINE Procedure for more information.

  2. Verify that the pipeline is started.

    For example:

    
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    PIPELINE_NAME            STATUS  
    ------------------------ ------- 
    EMPLOYEE_PIPELINE        STARTED

Stop a Pipeline

Use STOP_PIPELINE to stop a pipeline. When a pipeline is stopped, no future jobs are scheduled for the pipeline.

By default currently running jobs complete when you stop a pipeline. Set the force parameter to TRUE to terminate any running jobs and stop the pipeline immediately.

  1. Stop a pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.STOP_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /

    See STOP_PIPELINE Procedure for more information.

  2. Verify that the pipeline is stopped.
    
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    PIPELINE_NAME            STATUS  
    ------------------------ ------- 
    EMPLOYEE_PIPELINE        STOPPED

See STOP_PIPELINE Procedure for more information.

Drop a Pipeline

The procedure DROP_PIPELINE drops an existing pipeline.

If a pipeline has been started, it must be stopped before the pipeline can be dropped. See STOP_PIPELINE Procedure for more information.

In order to drop a pipeline that is started, set the force parameter to TRUE to terminate any running jobs and drop the pipeline immediately

  1. Drop a pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.DROP_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /
  2. Verify the pipeline is dropped.
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    No rows selected

See DROP_PIPELINE Procedure for more information.

Reset a Pipeline

Use the reset pipeline operation to clear the record of the pipeline to the initial state.
Note

You can optionally use reset pipeline to purge data in the database table associated with a load pipeline or to remove files in object store for an export pipeline. Usually this option is used when you are testing a pipeline during pipeline development.

Reset pipeline operates as follows:

  • Load Pipeline: For a load pipeline, resetting the pipeline clears the record of the files being loaded by the pipeline. When you call either START_PIPELINE or RUN_PIPELINE_ONCE after resetting a load pipeline, the pipeline repeats the data load and includes all the files present in the object store location.

    When purge_data is set to TRUE, the procedure truncates the data in the database table.

  • Export Pipeline: For an export pipeline, resetting the pipeline clears the last tracked data in the database table. When you call either START_PIPELINE or RUN_PIPELINE_ONCE after resetting an export pipeline, the pipeline repeats exporting data from the table or query.

    When purge_data set to TRUE, the procedure deletes existing files in the object store location specified with the location attribute.

To reset a pipeline:

  1. Stop the pipeline you want to reset.

    A data pipeline must be in stopped state to reset it. See STOP_PIPELINE Procedure for more information.

  2. Reset the pipeline.
    BEGIN
         DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
            pipeline_name => 'EMPLOYEE_PIPELINE',
            purge_data => TRUE);
    END;
    /

    Only use the purge_data parameter with value TRUE if you want to clear data in your database table, for a load pipeline, or clear files in object store for an export pipeline. Usually this option is used when you are testing a pipeline during pipeline development.

    See RESET_PIPELINE Procedure for more information.

Monitor and Troubleshoot Pipelines

All pipelines that are created are logged in the DBMS_CLOUD_PIPELINE views.

View Pipeline Status Information

Check pipeline status and obtain other pipeline information using the USER_CLOUD_PIPELINES or DBA_CLOUD_PIPELINES views. For example, the following SELECT statement with a WHERE clause predicate on the pipeline_name shows that MY_TREE_DATA is a load pipeline and the pipeline is started:

SELECT pipeline_name, pipeline_type, status FROM USER_CLOUD_PIPELINES
   WHERE pipeline_name = 'MY_TREE_DATA';


PIPELINE_NAME PIPELINE_TYPE STATUS  
------------- ------------- ------- 
MY_TREE_DATA  LOAD          STARTED

See DBMS_CLOUD_PIPELINE Views for more information.

View Pipeline Attributes

The pipeline attributes can be monitored by querying the USER_CLOUD_PIPELINE_ATTRIBUTES or DBA_CLOUD_PIPELINE_ATTRIBUTES views. Query these views to see pipeline attribute information.

For example:

SELECT pipeline_name, attribute_name, attribute_value FROM user_cloud_pipeline_attributes
     WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_NAME ATTRIBUTE_NAME  ATTRIBUTE_VALUE                                                                   
------------- --------------- --------------------------------------------------------------------------------- 
MY_TREE_DATA  credential_name DEF_CRED_OBJ_STORE                                                                
MY_TREE_DATA  format          {"type": "csv"}                                                                   
MY_TREE_DATA  interval        20                                                                                
MY_TREE_DATA  location        https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/treetypes/o/ 
MY_TREE_DATA  priority        high                                                                              
MY_TREE_DATA  table_name      TREES

See DBMS_CLOUD_PIPELINE Views for more information.

View Pipeline History

The USER_CLOUD_PIPELINE_HISTORY and DBA_CLOUD_PIPELINE_HISTORY views show the state of running jobs. Use the pipeline history views to help you monitor the health of a pipeline and detect failures in a running pipeline.

For example:

SELECT pipeline_id, pipeline_name, status, error_message  FROM user_cloud_pipeline_history      
     WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_ID PIPELINE_NAME STATUS    ERROR_MESSAGE 
----------- ------------- --------- ------------- 
          7  MY_TREE_DATA SUCCEEDED

See DBMS_CLOUD_PIPELINE Views for more information.

Pipeline Status Table: Additional Monitoring for Load Pipelines

The pipeline status table shows each file name and its status for a load pipeline. The STATUS_TABLE column in DBA_CLOUD_PIPELINES and USER_CLOUD_PIPELINES shows the status table name.

For example, the following SELECT statement with a WHERE clause predicate on the pipeline_name shows the status table name for a pipeline:

SELECT pipeline_name, status_table FROM user_cloud_pipelines
   WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_NAME STATUS_TABLE
------------- --------------------
MY_TREE_DATA  PIPELINE$9$41_STATUS

View the status table to see information about the pipeline, including the following:

  • The relevant error number and error message are recorded in the status table if an operation on a specific file fails.

  • For completed pipeline operations, the time needed for each operation can be calculated using the reported START_TIME and END_TIME.

For example the following shows that the load operation for two files failed and one completed:

SELECT id, name, status, error_code, error_message, sid FROM PIPELINE$9$41_STATUS;

ID NAME       STATUS    ERROR_CODE ERROR_MESSAGE                      SID 
-- ---------- --------- ---------- -------------------------------- ----- 
 1 trees1.txt FAILED         30653 ORA-30653: reject limit reached  18070 
 2 trees2.txt FAILED         30653 ORA-30653: reject limit reached  18070 
 3 trees3.txt COMPLETED                                             18070 

Pipelines for loading data, where the pipeline_type is 'LOAD', reserve an ID that is shown in USER_LOAD_OPERATIONS and in DBA_LOAD_OPERATIONS. The ID value in these views maps to the pipeline's OPERATION_ID in USER_CLOUD_PIPELINES and DBA_CLOUD_PIPELINES.

To obtain more information for a load pipeline, query the pipeline's OPERATION_ID:

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';

PIPELINE_NAME OPERATION_ID 
------------- ------------ 
MY_TREE_DATA            41

Next, query either USER_LOAD_OPERATIONS or DBA_LOAD_OPERATIONS with a WHERE clause predicate on the ID column (using the OPERATION_ID value).

For example:

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;

ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

This query shows ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE if it exists, and the STATUS_TABLE. You can view these tables for additional pipeline load information.

Pipeline Status Table Details

Column Datatype Description
ID NUMBER Unique number assigned to the pipeline.
NAME VARCHAR2(4000) Name of the pipeline.
BYTES NUMBER Bytes
CHECKSUM VARCHAR2(128) Checksum
LAST_MODIFIED TIMESTAMP(6) WITH TIME ZONE Last modification time for the pipeline.
STATUS VARCHAR2(30) The STATUS value is one of:
  • COMPLETED: The file operation completed successfully.
  • FAILED: The file operation failed, a retry may be attempted two times.
  • PENDING: The file operation has not yet started.
  • RUNNING: The file operation is currently in progress.
  • SKIPPED: The file operation skipped.
ERROR_CODE NUMBER Error code
ERROR_MESSAGE VARCHAR2(4000) Error message
START_TIME TIMESTAMP(6) WITH TIME ZONE Start time for the pipeline.
END_TIME TIMESTAMP(6) WITH TIME ZONE End time for the pipeline.
SID NUMBER

The session SID andSERIAL# indicate the job session that was running the pipeline load operation.

SERIAL# NUMBER

The session SID andSERIAL# indicate the job session that was running the pipeline load operation.

ROWS_LOADED NUMBER Number of rows loaded.
OPERATION_ID NUMBER

Reserved for future use.

Pipeline Log File and Bad File Tables

To obtain the log file and bad file names for a load pipeline, query the pipeline's OPERATION_ID. For example:

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';

PIPELINE_NAME OPERATION_ID 
------------- ------------ 
MY_TREE_DATA            41

Next, query either USER_LOAD_OPERATIONS or DBA_LOAD_OPERATIONS with a WHERE clause predicate on the ID column (using the OPERATION_ID value).

For example:

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;

ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

This query shows ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE if it exists, and the STATUS_TABLE. You can view these tables for additional pipeline load information.

View the pipeline log file table to see a complete log of the pipeline's load operations.

For example:

SELECT * FROM PIPELINE$9$41_LOG;

View the pipeline bad file table to see details on input format records with errors. The bad file table show information for the rows reporting errors during loading. Depending on the errors shown in the log file table and the rows shown in the pipeline's bad file table, you might be able to correct the errors either by modifying the pipeline format attribute options, or by modifying the data in the file you are loading.

For example:

SELECT * FROM PIPELINE$9$41_BAD;

See Monitor and Troubleshoot Data Loading for more information.