创建和配置管道

您可以创建一个或多个负载或导出管道。创建管道时,可以使用参数并设置管道属性来配置管道。

用于创建和配置管道的选项如下:

创建和配置用于加载数据的管道

您可以创建管道,将数据从对象存储中的外部文件或目录加载到自治 AI 数据库中的表。

负载管道使用放置在对象存储或目录中的数据,并将其加载到自治 AI 数据库中的表。创建加载管道时,当新数据文件到达管道加载新数据时,管道将定期运行以使用放置在源位置的数据。您还可以使用管道通过恢复和重试功能,将文件从源位置可靠地复制到数据库上的表。

使用加载管道时,管道程序包使用 DBMS_CLOUD.COPY_DATA 加载数据。

在自治 AI 数据库上,使用现有表或创建要在其中加载数据的数据库表。例如:

 CREATE TABLE EMPLOYEE
     (name     VARCHAR2(128),
      age      NUMBER,
      salary   NUMBER);
  1. 创建管道以从对象存储或目录对象加载数据。
      BEGIN
       DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
          pipeline_name => 'MY_PIPE1',
          pipeline_type => 'LOAD',
          description   => 'Load metrics from object store into a table'
       );
      END;
     /
    

    有关更多信息,请参见 CREATE_PIPELINE Procedure

  2. 创建身份证明对象以访问包含要加载的文件的对象存储。

    使用属性 credential_name 为管道源位置指定身份证明。如果未在下一步中提供 credential_name,则 credential_name 值将设置为 NULL。当 location 属性是公共或预先验证的 URL 时,可以使用默认的 NULL 值。

    有关更多信息,请参见 CREATE_CREDENTIAL Procedure

  3. 设置管道属性,包括必需属性:locationtable_nameformat

    案例 1:创建用于从对象存储加载数据的管道。

     BEGIN
          DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
            pipeline_name => 'MY_PIPE1',
            attributes    => JSON_OBJECT(
                 'credential_name' VALUE 'OBJECT_STORE_CRED',
                 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                 'table_name' VALUE 'employee',
                 'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}',
                 'priority' VALUE 'HIGH',
                 'interval' VALUE '20')
       );
     END;
     /
    

    案例 2:创建用于从目录对象加载数据的管道。

     BEGIN
          DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
            pipeline_name => 'MY_PIPE1',
            pipeline_type => 'LOAD',
            attributes    => JSON_OBJECT(
                 'location' VALUE 'MY_DIR:*.csv',
                 'table_name' VALUE 'employee',
                 'format' VALUE '{"type":"csv"}',
                 'priority' VALUE 'HIGH',
                 'interval' VALUE '20')
       );
     END;
     /
    

    在两种情况下,必须设置以下属性才能运行加载管道:

    • location:指定对象存储或目录对象上的源文件位置。

    • table_name:指定要在其中加载数据的数据库中的表。您指定的 location 用于每个管道一个 table_name

    • format:描述要加载的数据的格式。

      有关更多信息,请参见 DBMS_CLOUD Package Format Options

    credential_name 是您在上一步中创建的身份证明。

    priority 值确定并行加载的文件数。优先级较高的管道会消耗更多数据库资源,并且每条管道的运行速度都比优先级较低的管道快。

    interval 值指定管道作业连续运行之间的时间间隔(分钟)。默认 interval 为 15 分钟。

    有关管道属性的详细信息,请参见 DBMS_CLOUD_PIPELINE Attributes

  4. 创建管道后,您可以测试管道或启动管道:

或者,要设置 JSON 格式,可以使用以下格式:

BEGIN
    DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
        pipeline_name   => 'MY_PIPE1',
        attribute_name  => 'format',
        attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
    );
END;
/

创建和配置管道以使用时间戳列导出

您可以创建导出管道,以自动将时间序列数据从自治 AI 数据库导出到对象存储。

使用此导出管道选项可以指定表或 SQL 查询以及具有时间戳的列,该时间戳用于跟踪上次上载的时间。您可以使用导出管道共享数据以供其他应用程序使用,或者将数据保存到对象存储。

对于导出管道,管道程序包使用 DBMS_CLOUD.EXPORT_DATA 导出数据。

导出管道将数据从自治 AI 数据库导出到对象存储。创建导出管道时,管道会定期运行,并将数据放置在对象存储上。

  1. 创建用于将数据导出到对象存储的管道。

     BEGIN
          DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
             pipeline_name=>'EXP_PIPE1',
             pipeline_type=>'EXPORT',
             description=>'Export time series metrics to object store');
     END;
     /
    

    有关更多信息,请参见 CREATE_PIPELINE Procedure

  2. 创建身份证明对象以访问要导出数据文件的目标对象存储位置。

    使用属性 credential_name 为管道目标位置指定身份证明。如果未在下一步中提供 credential_name,则 credential_name 值将设置为 NULL。当 location 属性是公共或预先验证的 URL 时,可以使用默认的 NULL 值。

    有关更多信息,请参见 CREATE_CREDENTIAL Procedure

  3. 设置导出管道属性。

    指定 table_name 参数时,表行将导出到对象存储。指定 query 参数时,查询将指定 SELECT 语句,以便仅将所需数据导出到对象存储。

    • 使用 table_name 参数:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'metric_table',
                'key_column' VALUE 'metric_time',
                'format' VALUE '{"type": "json"}',
                'priority' VALUE 'MEDIUM',
                'interval' VALUE '20')
        );
      END;
      /
      
    • 使用 query 参数:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                 'query' VALUE 'SELECT * from metrics_table',
                 'key_column' VALUE 'metric_time',
                 'format' VALUE '{"type": "json"}',
                 'priority' VALUE 'MEDIUM',
                 'interval' VALUE '20')
        );
      END;
      /
      

      其中,credential_name 是您在上一步中创建的身份证明。

    必须设置以下属性才能运行导出管道:

    • location:指定目标对象存储位置。您指定的 location 用于每个管道一个 table_name

    • table_name:指定数据库中包含要导出的数据的表(需要 table_name 参数或 query 参数)。

    • query:指定要在数据库中运行的查询,该查询提供要导出的数据(需要 table_name 参数或 query 参数)。

    • format:描述要导出的数据的格式。

      有关更多信息,请参见 DBMS_CLOUD Package Format Options for EXPORT_DATA

    priority 值确定从数据库提取数据的并行度。

    interval 值指定管道作业连续运行之间的时间间隔(分钟)。默认 interval 为 15 分钟。

    有关管道属性的详细信息,请参见 DBMS_CLOUD_PIPELINE Attributes

    创建管道后,您可以测试管道或启动管道:

创建和配置管道以导出查询结果(无时间戳)

您可以创建导出管道,以自动将数据从自治 AI 数据库导出到对象存储。使用此导出管道选项可以指定管道定期运行的 SQL 查询,以将数据导出到对象存储。您可以使用此导出选项将自治 AI 数据库中的最新数据共享到对象存储,以便其他应用使用这些数据。

导出管道将数据从自治 AI 数据库导出到对象存储。创建导出管道时,管道会定期运行并在对象存储上放置数据。

  1. 创建用于将数据导出到对象存储的管道。

     BEGIN
          DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
             pipeline_name=>'EXP_PIPE2',
             pipeline_type=>'EXPORT',
             description=>'Export query results to object store.');
     END;
     /
    

    有关更多信息,请参见 CREATE_PIPELINE Procedure

  2. 创建身份证明对象以访问要导出数据文件的目标对象存储位置。

    使用属性 credential_name 为管道目标位置指定身份证明。如果未在下一步中提供 credential_name,则 credential_name 值将设置为 NULL。当 location 属性是公共或预先验证的 URL 时,可以使用默认的 NULL 值。

    有关更多信息,请参见 CREATE_CREDENTIAL Procedure

  3. 设置导出管道属性。

     BEGIN
          DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
            pipeline_name => 'EXP_PIPE2',
            attributes    => JSON_OBJECT(
               'credential_name' VALUE 'OBJECT_STORE_CRED',
               'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
               'query' VALUE 'SELECT * FROM table_name',
               'format' VALUE '{"type": "json"}',
               'priority' VALUE 'MEDIUM',
               'interval' VALUE '20')
       );
     END;
     /
    

    其中,credential_name 是您在上一步中创建的身份证明。

    必须设置以下属性才能运行导出管道:

    • location:指定目标对象存储位置。

    • query:指定要在数据库中运行的查询,该查询提供要导出的数据。

    • format:描述要导出的数据的格式。

      有关更多信息,请参见 DBMS_CLOUD Package Format Options for EXPORT_DATA

    priority 值确定从数据库提取数据的并行度。

    interval 值指定管道作业连续运行之间的时间间隔(分钟)。默认 interval 为 15 分钟。

    有关管道属性的详细信息,请参见 DBMS_CLOUD_PIPELINE Attributes

    创建管道后,您可以测试管道或启动管道:

测试管道

使用 RUN_PIPELINE_ONCE 可按需运行一次管道,而无需创建调度的作业。

RUN_PIPELINE_ONCE 用于在启动管道之前测试管道。运行一次管道以测试管道并检查管道是否按预期工作后,使用 RESET_PIPELINE 将管道的状态重置为运行 RUN_PIPELINE_ONCE 之前的状态。

  1. 创建一个管道。

    有关详细信息,请参阅创建和配置用于加载数据的管道

  2. 运行一次管道以测试管道。

     BEGIN
         DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE(
             pipeline_name => 'MY_PIPE1'
     );
     END;
     /
    

    有关更多信息,请参见 RUN_PIPELINE_ONCE Procedure

  3. 执行任何必需的检查以验证管道是否按预期运行。

    有关更多信息,请参见 Monitor and Troubleshoot Pipelines

  4. 重置管道。

     BEGIN
        DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
          pipeline_name => 'MY_PIPE1',
          purge_data => TRUE
     );
     END;
     /
    

    有关更多信息,请参见 RESET_PIPELINE Procedure