Creación y configuración de pipelines

Puede crear uno o más pipelines de carga o exportación. Al crear un pipeline, se utilizan parámetros y se definen atributos de pipeline para configurar el pipeline.

A continuación, se muestran las opciones para crear y configurar un pipeline:

Creación y configuración de un pipeline para la carga de datos

Puede crear un pipeline para cargar datos de archivos externos en el almacén de objetos en tablas de Autonomous Database.

Un pipeline de carga consume datos colocados en el almacén de objetos y los carga en una tabla de Autonomous Database. Al crear un pipeline de carga, el pipeline se ejecuta a intervalos regulares para consumir los datos colocados en el almacén de objetos. Cuando llegan nuevos archivos de datos, el pipeline carga los nuevos datos. También puede utilizar un pipeline para copiar de forma fiable archivos, con capacidades de reanudación y reintento, del almacén de objetos a una tabla de la base de datos.

Con un pipeline de carga, el paquete de pipeline utiliza DBMS_CLOUD.COPY_DATA para cargar datos.

En Autonomous Database, utilice una tabla existente o cree la tabla de base de datos en la que está cargando datos. Por ejemplo:

CREATE TABLE EMPLOYEE
            (name     VARCHAR2(128),
             age      NUMBER,
             salary   NUMBER);
  1. Cree un pipeline para cargar datos del almacén de objetos.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name => 'MY_PIPE1',
            pipeline_type => 'LOAD',
            description   => 'Load metrics from object store into a table'
      );
    END;
    /

    Consulte procedimiento CREATE_PIPELINE para obtener más información.

  2. Cree un objeto de credencial para acceder al almacén de objetos que contiene los archivos que está cargando.

    Especifique la credencial para la ubicación de origen de pipeline con el atributo credential_name. Si no proporciona credential_name en el siguiente paso, el valor credential_name se define en NULL. Puede utilizar el valor NULL por defecto cuando el atributo location es una URL pública o autenticada previamente.

    Consulte Procedimiento CREATE_CREDENTIAL para obtener más información.

  3. Defina los atributos de pipeline, incluidos los atributos necesarios: location, table_name y format.
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'MY_PIPE1',
           attributes    => JSON_OBJECT(
                'credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'employee',
                'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}',
                'priority' VALUE 'HIGH',
                'interval' VALUE '20')
      );
    END;
    /

    Se deben definir los siguientes atributos para ejecutar un pipeline de carga:

    • location: especifica la ubicación del archivo de origen en el almacén de objetos.

    • table_name: especifica la tabla de la base de datos en la que está cargando los datos. El location que especifique es para un table_name por pipeline.

    • format: describe el formato de los datos que está cargando.

      Consulte Opciones de formato del paquete DBMS_CLOUD para obtener más información.

    credential_name es la credencial que ha creado en el paso anterior.

    El valor priority determina el número de archivos cargados en paralelo. Un pipeline con una prioridad más alta consume más recursos de base de datos y completa cada ejecución más rápido, en comparación con una prioridad más baja.

    El valor interval especifica el intervalo de tiempo en minutos entre ejecuciones consecutivas de un trabajo de pipeline. El valor por defecto de interval es de 15 minutos.

    Consulte Atributos de DBMS_CLOUD_PIPELINE para obtener más información sobre los atributos de pipeline.

    Después de crear un pipeline, puede probarlo o iniciarlo:

Como alternativa, para definir el formato para JSON, puede utilizar el siguiente formato:

BEGIN
    DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
        pipeline_name   => 'MY_PIPE1',
        attribute_name  => 'format',
        attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
    );
END;
/

Creación y Configuración de un Pipeline para la Exportación con Columna de Registro de Hora

Puede crear un pipeline de exportación para exportar automáticamente datos de serie temporal de Autonomous Database al almacén de objetos.

Con esta opción de pipeline de exportación se especifica una tabla o consulta SQL y una columna con un registro de hora que utiliza el pipeline para realizar un seguimiento de la hora de la última carga. Puede utilizar un pipeline de exportación para compartir datos para que los utilicen otras aplicaciones o para guardar datos en el almacén de objetos.

Con un pipeline de exportación, el paquete de pipeline utiliza DBMS_CLOUD.EXPORT_DATA para exportar datos.

Un pipeline de exportación exporta datos de Autonomous Database al almacén de objetos. Al crear un pipeline de exportación, el pipeline se ejecuta a intervalos regulares y coloca los datos en el almacén de objetos.

  1. Cree un pipeline para exportar datos al almacén de objetos.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name=>'EXP_PIPE1',
            pipeline_type=>'EXPORT',
            description=>'Export time series metrics to object store');
    END;
    /

    Consulte CREATE_PIPELINE Procedimiento para obtener más información.

  2. Cree un objeto de credencial para acceder a la ubicación del almacén de objetos de destino en la que está exportando archivos de datos.

    Especifique la credencial para la ubicación de destino de pipeline con el atributo credential_name. Si no proporciona credential_name en el siguiente paso, el valor credential_name se define en NULL. Puede utilizar el valor NULL por defecto cuando el atributo location es una URL pública o autenticada previamente.

    Consulte Procedimiento CREATE_CREDENTIAL para obtener más información.

  3. Defina los atributos del pipeline de exportación.

    Al especificar un parámetro table_name, las filas de tabla se exportan al almacén de objetos. Al especificar un parámetro query, la consulta especifica una sentencia SELECT para que solo se exporten los datos necesarios al almacén de objetos.

    • Uso de un parámetro table_name:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'metric_table',
                'key_column' VALUE 'metric_time',
                'format' VALUE '{"type": "json"}',
                'priority' VALUE 'MEDIUM',
                'interval' VALUE '20')
        );
      END;
      /
    • Mediante un parámetro query:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                 'query' VALUE 'SELECT * from metrics_table',
                 'key_column' VALUE 'metric_time',
                 'format' VALUE '{"type": "json"}',
                 'priority' VALUE 'MEDIUM',
                 'interval' VALUE '20')
        );
      END;
      /

    Donde credential_name es la credencial que ha creado en el paso anterior.

    Se deben definir los siguientes atributos para ejecutar un pipeline de exportación:

    • location: especifica la ubicación del almacén de objetos de destino. El location que especifique es para un table_name por pipeline.

    • table_name: especifica la tabla de la base de datos que contiene los datos que está exportando (se necesita el parámetro table_name o el parámetro query).

    • query: especifica la consulta que se va a ejecutar en la base de datos que proporciona los datos que está exportando (se necesita el parámetro table_name o el parámetro query).

    • format: describe el formato de los datos que está exportando.

      Consulte DBMS_CLOUD Opciones de formato del paquete para EXPORT_DATA para obtener más información.

    El valor priority determina el grado de paralelismo para recuperar datos de la base de datos.

    El valor interval especifica el intervalo de tiempo en minutos entre ejecuciones consecutivas de un trabajo de pipeline. El valor por defecto de interval es de 15 minutos.

    Consulte Atributos de DBMS_CLOUD_PIPELINE para obtener más información sobre los atributos de pipeline.

    Después de crear un pipeline, puede probarlo o iniciarlo:

Creación y configuración de un pipeline para exportar resultados de consultas (sin registro de hora)

Puede crear un pipeline de exportación para exportar automáticamente datos de Autonomous Database al almacén de objetos. Con esta opción de pipeline de exportación se especifica una consulta SQL que el pipeline se ejecuta periódicamente para exportar datos al almacén de objetos. Puede utilizar esta opción de exportación para compartir los datos más recientes de Autonomous Database en el almacén de objetos para que otras aplicaciones consuman los datos.

Un pipeline de exportación exporta datos de Autonomous Database al almacén de objetos. Al crear un pipeline de exportación, el pipeline se ejecuta a intervalos regulares y coloca los datos en el almacén de objetos.

  1. Cree un pipeline para exportar datos al almacén de objetos.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name=>'EXP_PIPE2',
            pipeline_type=>'EXPORT',
            description=>'Export query results to object store.');
    END;
    /

    Consulte CREATE_PIPELINE Procedimiento para obtener más información.

  2. Cree un objeto de credencial para acceder a la ubicación del almacén de objetos de destino en la que está exportando archivos de datos.

    Especifique la credencial para la ubicación de destino de pipeline con el atributo credential_name. Si no proporciona credential_name en el siguiente paso, el valor credential_name se define en NULL. Puede utilizar el valor NULL por defecto cuando el atributo location es una URL pública o autenticada previamente.

    Consulte Procedimiento CREATE_CREDENTIAL para obtener más información.

  3. Defina los atributos del pipeline de exportación.
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'EXP_PIPE2',
           attributes    => JSON_OBJECT(
              'credential_name' VALUE 'OBJECT_STORE_CRED',
              'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
              'query' VALUE 'SELECT * FROM table_name',
              'format' VALUE '{"type": "json"}',
              'priority' VALUE 'MEDIUM',
              'interval' VALUE '20')
      );
    END;
    /

    Donde credential_name es la credencial que ha creado en el paso anterior.

    Se deben definir los siguientes atributos para ejecutar un pipeline de exportación:

    • location: especifica la ubicación del almacén de objetos de destino.

    • query: especifica la consulta que se va a ejecutar en la base de datos que proporciona los datos que está exportando.

    • format: describe el formato de los datos que está exportando.

      Consulte DBMS_CLOUD Opciones de formato del paquete para EXPORT_DATA para obtener más información.

    El valor priority determina el grado de paralelismo para recuperar datos de la base de datos.

    El valor interval especifica el intervalo de tiempo en minutos entre ejecuciones consecutivas de un trabajo de pipeline. El valor por defecto de interval es de 15 minutos.

    Consulte Atributos de DBMS_CLOUD_PIPELINE para obtener más información sobre los atributos de pipeline.

    Después de crear un pipeline, puede probarlo o iniciarlo:

Probar pipelines

Utilice RUN_PIPELINE_ONCE para ejecutar un pipeline una vez bajo demanda sin crear un trabajo programado.

RUN_PIPELINE_ONCE es útil para probar un pipeline antes de iniciar el pipeline. Después de ejecutar un pipeline una vez para probar el pipeline y comprobar que funciona como se esperaba, utilice RESET_PIPELINE para restablecer el estado del pipeline (al estado antes de ejecutar RUN_PIPELINE_ONCE).

  1. Crear un pipeline.

    Consulte Creación y configuración de un pipeline para la carga de datos para obtener más información.

  2. Ejecute un pipeline una vez para probar el pipeline.
    BEGIN
        DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE(
            pipeline_name => 'MY_PIPE1'
    );
    END;
    /

    Consulte RUN_PIPELINE_ONCE Procedimiento para obtener más información.

  3. Realice las comprobaciones necesarias para verificar que el pipeline funciona como se esperaba.

    Consulte Supervisión y solución de problemas de pipelines para obtener más información.

  4. Restablezca el pipeline.
    BEGIN  
       DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
         pipeline_name => 'MY_PIPE1',
         purge_data => TRUE
    );
    END;
    /

    Consulte RESET_PIPELINE Procedimiento para obtener más información.