Gerenciar Pipelines

Depois de criar e testar um pipeline, você o controla iniciando, interrompendo ou eliminando-o. Você também pode redefinir um pipeline.

Iniciar um Pipeline

Depois de criar um pipeline, você poderá iniciá-lo.

Quando um pipeline é iniciado, ele é executado continuamente em um job programado. O job programado do pipeline se repete, por padrão a cada 15 minutos ou no intervalo definido com o atributo interval.

  1. Inicie um pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.START_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /

    Por padrão, um job de pipeline começa imediatamente, assim que o pipeline é iniciado. Para iniciar um job de pipeline posteriormente, especifique uma data futura ou um timestamp válido usando o parâmetro start_date.

    Consulte START_PIPELINE Procedimento para obter mais informações.

  2. Verifique se o pipeline foi iniciado.

    Por exemplo:

    
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    PIPELINE_NAME            STATUS  
    ------------------------ ------- 
    EMPLOYEE_PIPELINE        STARTED

Interromper um Pipeline

Use STOP_PIPELINE para interromper um pipeline. Quando um pipeline é interrompido, nenhum job futuro é programado para o pipeline.

Por padrão, os jobs em execução no momento são concluídos quando você interrompe um pipeline. Defina o parâmetro force como TRUE para encerrar qualquer job em execução e interromper o pipeline imediatamente.

  1. Interromper um pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.STOP_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /

    Consulte STOP_PIPELINE Procedimento para obter mais informações.

  2. Verifique se o pipeline foi interrompido.
    
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    PIPELINE_NAME            STATUS  
    ------------------------ ------- 
    EMPLOYEE_PIPELINE        STOPPED

Consulte STOP_PIPELINE Procedimento para obter mais informações.

Eliminar um Pipeline

O procedimento DROP_PIPELINE elimina um pipeline existente.

Se um pipeline tiver sido iniciado, ele deverá ser interrompido para que possa ser eliminado. Consulte STOP_PIPELINE Procedimento para obter mais informações.

Para eliminar um pipeline que foi iniciado, defina o parâmetro force como TRUE para encerrar qualquer job em execução e eliminar o pipeline imediatamente

  1. Elimine um pipeline.
    BEGIN
      DBMS_CLOUD_PIPELINE.DROP_PIPELINE(
          pipeline_name => 'EMPLOYEE_PIPELINE'
      );
    END;
    /
  2. Verifique se o pipeline foi eliminado.
    SELECT pipeline_name, status from USER_CLOUD_PIPELINES
       WHERE pipeline_name = 'EMPLOYEE_PIPELINE';
    
    No rows selected

Consulte DROP_PIPELINE Procedimento para obter mais informações.

Redefinir um Pipeline

Use a operação de redefinição do pipeline para limpar o registro do pipeline para o estado inicial.

Observação:

Opcionalmente, você pode usar redefinir pipeline para expurgar dados na tabela de banco de dados associada a um pipeline de carga ou para remover arquivos no armazenamento de objetos de um pipeline de exportação. Geralmente, essa opção é usada quando você está testando um pipeline durante o desenvolvimento do pipeline.

O pipeline de redefinição funciona da seguinte forma:

  • Pipeline de Carga: Para um pipeline de carga, a redefinição do pipeline limpa o registro dos arquivos que estão sendo carregados pelo pipeline. Quando você chama START_PIPELINE ou RUN_PIPELINE_ONCE após redefinir um pipeline de carga, o pipeline repete a carga de dados e inclui todos os arquivos presentes no local de armazenamento de objetos.

    Quando purge_data é definido como TRUE, o procedimento trunca os dados na tabela do banco de dados.

  • Pipeline de Exportação: Para um pipeline de exportação, a redefinição do pipeline limpa os últimos dados rastreados na tabela do banco de dados. Quando você chama START_PIPELINE ou RUN_PIPELINE_ONCE após redefinir um pipeline de exportação, o pipeline repete a exportação de dados da tabela ou da consulta.

    Quando purge_data é definido como TRUE, o procedimento exclui arquivos existentes no local de armazenamento de objetos especificado com o atributo location.

Para redefinir um pipeline:

  1. Interrompa o pipeline que você deseja redefinir.

    Um pipeline de dados deve estar no estado interrompido para redefini-lo. Consulte STOP_PIPELINE Procedimento para obter mais informações.

  2. Redefina o pipeline.
    BEGIN
         DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
            pipeline_name => 'EMPLOYEE_PIPELINE',
            purge_data => TRUE);
    END;
    /

    Só use o parâmetro purge_data com o valor TRUE se quiser limpar dados em sua tabela de banco de dados, para um pipeline de carga ou limpar arquivos no armazenamento de objetos para um pipeline de exportação. Geralmente, essa opção é usada quando você está testando um pipeline durante o desenvolvimento do pipeline.

    Consulte RESET_PIPELINE Procedimento para obter mais informações.

Monitorar, Diagnosticar e Solucionar Problemas de Pipelines

Todos os pipelines criados são registrados nas views DBMS_CLOUD_PIPELINE.

Exibir Informações de Status do Pipeline

Verifique o status do pipeline e obtenha outras informações do pipeline usando as views USER_CLOUD_PIPELINES ou DBA_CLOUD_PIPELINES. Por exemplo, a seguinte instrução SELECT com um predicado de cláusula WHERE em pipeline_name mostra que MY_TREE_DATA é um pipeline de carga e o pipeline é iniciado:

SELECT pipeline_name, pipeline_type, status FROM USER_CLOUD_PIPELINES
   WHERE pipeline_name = 'MY_TREE_DATA';


PIPELINE_NAME PIPELINE_TYPE STATUS  
------------- ------------- ------- 
MY_TREE_DATA  LOAD          STARTED

Consulte DBMS_CLOUD_PIPELINE Views para obter mais informações.

Exibir Atributos do Pipeline

Os atributos do pipeline podem ser monitorados consultando as views USER_CLOUD_PIPELINE_ATTRIBUTES ou DBA_CLOUD_PIPELINE_ATTRIBUTES. Consulte essas views para ver as informações do atributo do pipeline.

Por exemplo:

SELECT pipeline_name, attribute_name, attribute_value FROM user_cloud_pipeline_attributes
     WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_NAME ATTRIBUTE_NAME  ATTRIBUTE_VALUE                                                                   
------------- --------------- --------------------------------------------------------------------------------- 
MY_TREE_DATA  credential_name DEF_CRED_OBJ_STORE                                                                
MY_TREE_DATA  format          {"type": "csv"}                                                                   
MY_TREE_DATA  interval        20                                                                                
MY_TREE_DATA  location        https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/treetypes/o/ 
MY_TREE_DATA  priority        high                                                                              
MY_TREE_DATA  table_name      TREES

Consulte DBMS_CLOUD_PIPELINE Views para obter mais informações.

Exibir Histórico do Pipeline

As views USER_CLOUD_PIPELINE_HISTORY e DBA_CLOUD_PIPELINE_HISTORY mostram o estado dos jobs em execução. Use as views do histórico do pipeline para ajudar a monitorar a integridade de um pipeline e detectar falhas em um pipeline em execução.

Por exemplo:

SELECT pipeline_id, pipeline_name, status, error_message  FROM user_cloud_pipeline_history      
     WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_ID PIPELINE_NAME STATUS    ERROR_MESSAGE 
----------- ------------- --------- ------------- 
          7  MY_TREE_DATA SUCCEEDED

Consulte DBMS_CLOUD_PIPELINE Views para obter mais informações.

Tabela de Status do Pipeline: Monitoramento Adicional para Pipelines de Carga

A tabela de status do pipeline mostra cada nome de arquivo e seu status para um pipeline de carga. A coluna STATUS_TABLE em DBA_CLOUD_PIPELINES e USER_CLOUD_PIPELINES mostra o nome da tabela de status.

Por exemplo, a seguinte instrução SELECT com um predicado de cláusula WHERE em pipeline_name mostra o nome da tabela de status de um pipeline:

SELECT pipeline_name, status_table FROM user_cloud_pipelines
   WHERE pipeline_name = 'MY_TREE_DATA';

PIPELINE_NAME STATUS_TABLE
------------- --------------------
MY_TREE_DATA  PIPELINE$9$41_STATUS

Exiba a tabela de status para ver informações sobre o pipeline, incluindo o seguinte:

  • O número de erro relevante e a mensagem de erro serão registrados na tabela de status se uma operação em um arquivo específico falhar.

  • Para operações de pipeline concluídas, o tempo necessário para cada operação pode ser calculado usando os START_TIME e END_TIME reportados.

Por exemplo, o seguinte mostra que a operação de carregamento de dois arquivos falhou e um foi concluído:

SELECT id, name, status, error_code, error_message, sid FROM PIPELINE$9$41_STATUS;

ID NAME       STATUS    ERROR_CODE ERROR_MESSAGE                      SID 
-- ---------- --------- ---------- -------------------------------- ----- 
 1 trees1.txt FAILED         30653 ORA-30653: reject limit reached  18070 
 2 trees2.txt FAILED         30653 ORA-30653: reject limit reached  18070 
 3 trees3.txt COMPLETED                                             18070 

Os pipelines para carregar dados, em que pipeline_type é 'LOAD', reservam um ID que é mostrado em USER_LOAD_OPERATIONS e em DBA_LOAD_OPERATIONS. O valor ID nessas views é mapeado para o OPERATION_ID do pipeline em USER_CLOUD_PIPELINES e DBA_CLOUD_PIPELINES.

Para obter mais informações sobre um pipeline de carga, consulte o OPERATION_ID do pipeline:

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';

PIPELINE_NAME OPERATION_ID 
------------- ------------ 
MY_TREE_DATA            41

Em seguida, consulte USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS com um predicado de cláusula WHERE na coluna ID (usando o valor OPERATION_ID).

Por exemplo:

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;

ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Essa consulta mostra ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE se ela existir e STATUS_TABLE. Você pode exibir essas tabelas para obter informações adicionais de carga do pipeline.

Detalhes da Tabela de Status do Pipeline

Coluna Tipo de dados Descrição
ID NUMBER Número exclusivo designado ao pipeline.
NAME VARCHAR2(4000) Nome do pipeline.
BYTES NUMBER Bytes
CHECKSUM VARCHAR2(128) Checksum
LAST_MODIFIED TIMESTAMP(6) WITH TIME ZONE Horário da última modificação do pipeline.
STATUS VARCHAR2(30) O valor STATUS é um de:
  • COMPLETED: A operação do arquivo foi concluída com sucesso.
  • FAILED: Falha na operação do arquivo; pode ser feita uma nova tentativa duas vezes.
  • PENDING: A operação do arquivo ainda não foi iniciada.
  • RUNNING: A operação do arquivo está em andamento no momento.
  • SKIPPED: A operação de arquivo foi ignorada.
ERROR_CODE NUMBER Código do erro
ERROR_MESSAGE VARCHAR2(4000) Mensagem de erro
START_TIME TIMESTAMP(6) WITH TIME ZONE Horário inicial do pipeline.
END_TIME TIMESTAMP(6) WITH TIME ZONE Hora final do pipeline.
SID NUMBER

As sessões SID e SERIAL# indicam a sessão do job que estava executando a operação de carga do pipeline.

SERIAL# NUMBER

As sessões SID e SERIAL# indicam a sessão do job que estava executando a operação de carga do pipeline.

ROWS_LOADED NUMBER Número de linhas carregadas.
OPERATION_ID NUMBER

Reservado para uso futuro.

Arquivo de Log de Pipeline e Tabelas de Arquivos Inválidos

Para obter o arquivo de log e os nomes de arquivo inválidos de um pipeline de carga, consulte o OPERATION_ID do pipeline. Por exemplo:

SELECT PIPELINE_NAME, OPERATION_ID FROM USER_CLOUD_PIPELINES
     WHERE PIPELINE_NAME = 'MY_TREE_DATA';

PIPELINE_NAME OPERATION_ID 
------------- ------------ 
MY_TREE_DATA            41

Em seguida, consulte USER_LOAD_OPERATIONS ou DBA_LOAD_OPERATIONS com um predicado de cláusula WHERE na coluna ID (usando o valor OPERATION_ID).

Por exemplo:

SELECT ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE, STATUS_TABLE FROM USER_LOAD_OPERATIONS
     WHERE ID = 41;

ID TYPE     LOGFILE_TABLE     BADFILE_TABLE     STATUS_TABLE
-- -------- ----------------- ----------------- --------------------
41 PIPELINE PIPELINE$9$41_LOG PIPELINE$9$41_BAD PIPELINE$9$41_STATUS

Essa consulta mostra ID, TYPE, LOGFILE_TABLE, BADFILE_TABLE se ela existir e STATUS_TABLE. Você pode exibir essas tabelas para obter informações adicionais de carga do pipeline.

Exiba a tabela de arquivos de log do pipeline para ver um log completo das operações de carga do pipeline.

Por exemplo:

SELECT * FROM PIPELINE$9$41_LOG;

Exiba a tabela de arquivos inválidos do pipeline para ver detalhes sobre registros de formato de entrada com erros. A tabela de arquivos incorretos mostra informações para as linhas que relatam erros durante o carregamento. Dependendo dos erros mostrados na tabela de arquivos de log e das linhas mostradas na tabela de arquivos inválidos do pipeline, talvez você possa corrigir os erros modificando as opções do atributo format do pipeline ou modificando os dados no arquivo que está sendo carregado.

Por exemplo:

SELECT * FROM PIPELINE$9$41_BAD;

Consulte Monitorar e Solucionar Problemas de Carga de Dados para obter mais informações.