创建和配置管道
您可以创建一个或多个负载或导出管道。创建管道时,可以使用参数并设置管道属性来配置管道。
用于创建和配置管道的选项如下:
-
加载管道:
- 请参阅创建和配置用于加载数据的管道。
-
导出管道:
-
使用日期或时间戳列作为跟踪较新数据的键,将查询的增量结果导出到对象存储。请参阅创建和配置用于使用时间戳列导出的管道。
-
使用日期或时间戳列作为跟踪较新数据的键,将表的增量数据导出到对象存储。请参阅创建和配置用于使用时间戳列导出的管道。
-
使用查询将表的数据导出到对象存储,以在不引用日期或时间戳列的情况下选择数据(以便管道导出查询为每个计划程序运行选择的所有数据)。请参阅创建和配置管道以导出查询结果(无时间戳)。
-
创建和配置用于加载数据的管道
您可以创建管道,将数据从对象存储中的外部文件或目录加载到自治 AI 数据库中的表。
负载管道使用放置在对象存储或目录中的数据,并将其加载到自治 AI 数据库中的表。创建加载管道时,当新数据文件到达管道加载新数据时,管道将定期运行以使用放置在源位置的数据。您还可以使用管道通过恢复和重试功能,将文件从源位置可靠地复制到数据库上的表。
使用加载管道时,管道程序包使用 DBMS_CLOUD.COPY_DATA 加载数据。
在自治 AI 数据库上,使用现有表或创建要在其中加载数据的数据库表。例如:
CREATE TABLE EMPLOYEE
(name VARCHAR2(128),
age NUMBER,
salary NUMBER);
- 创建管道以从对象存储或目录对象加载数据。
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name => 'MY_PIPE1', pipeline_type => 'LOAD', description => 'Load metrics from object store into a table' ); END; /有关更多信息,请参见 CREATE_PIPELINE Procedure 。
-
创建身份证明对象以访问包含要加载的文件的对象存储。
使用属性
credential_name为管道源位置指定身份证明。如果未在下一步中提供credential_name,则credential_name值将设置为NULL。当location属性是公共或预先验证的 URL 时,可以使用默认的NULL值。有关更多信息,请参见 CREATE_CREDENTIAL Procedure 。
-
设置管道属性,包括必需属性:
location、table_name和format。案例 1:创建用于从对象存储加载数据的管道。
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'MY_PIPE1', attributes => JSON_OBJECT( 'credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'table_name' VALUE 'employee', 'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}', 'priority' VALUE 'HIGH', 'interval' VALUE '20') ); END; /案例 2:创建用于从目录对象加载数据的管道。
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'MY_PIPE1', pipeline_type => 'LOAD', attributes => JSON_OBJECT( 'location' VALUE 'MY_DIR:*.csv', 'table_name' VALUE 'employee', 'format' VALUE '{"type":"csv"}', 'priority' VALUE 'HIGH', 'interval' VALUE '20') ); END; /在两种情况下,必须设置以下属性才能运行加载管道:
-
location:指定对象存储或目录对象上的源文件位置。 -
table_name:指定要在其中加载数据的数据库中的表。您指定的location用于每个管道一个table_name。 -
format:描述要加载的数据的格式。有关更多信息,请参见 DBMS_CLOUD Package Format Options 。
credential_name是您在上一步中创建的身份证明。priority值确定并行加载的文件数。优先级较高的管道会消耗更多数据库资源,并且每条管道的运行速度都比优先级较低的管道快。interval值指定管道作业连续运行之间的时间间隔(分钟)。默认interval为 15 分钟。有关管道属性的详细信息,请参见 DBMS_CLOUD_PIPELINE Attributes 。
-
-
创建管道后,您可以测试管道或启动管道:
或者,要设置 JSON 格式,可以使用以下格式:
BEGIN
DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
pipeline_name => 'MY_PIPE1',
attribute_name => 'format',
attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
);
END;
/
创建和配置管道以使用时间戳列导出
您可以创建导出管道,以自动将时间序列数据从自治 AI 数据库导出到对象存储。
使用此导出管道选项可以指定表或 SQL 查询以及具有时间戳的列,该时间戳用于跟踪上次上载的时间。您可以使用导出管道共享数据以供其他应用程序使用,或者将数据保存到对象存储。
对于导出管道,管道程序包使用 DBMS_CLOUD.EXPORT_DATA 导出数据。
导出管道将数据从自治 AI 数据库导出到对象存储。创建导出管道时,管道会定期运行,并将数据放置在对象存储上。
-
创建用于将数据导出到对象存储的管道。
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name=>'EXP_PIPE1', pipeline_type=>'EXPORT', description=>'Export time series metrics to object store'); END; /有关更多信息,请参见 CREATE_PIPELINE Procedure 。
-
创建身份证明对象以访问要导出数据文件的目标对象存储位置。
使用属性
credential_name为管道目标位置指定身份证明。如果未在下一步中提供credential_name,则credential_name值将设置为NULL。当location属性是公共或预先验证的 URL 时,可以使用默认的NULL值。有关更多信息,请参见 CREATE_CREDENTIAL Procedure 。
-
设置导出管道属性。
指定
table_name参数时,表行将导出到对象存储。指定query参数时,查询将指定SELECT语句,以便仅将所需数据导出到对象存储。-
使用
table_name参数:BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE1', attributes => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'table_name' VALUE 'metric_table', 'key_column' VALUE 'metric_time', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; / -
使用
query参数:BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE1', attributes => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'query' VALUE 'SELECT * from metrics_table', 'key_column' VALUE 'metric_time', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; /其中,
credential_name是您在上一步中创建的身份证明。
必须设置以下属性才能运行导出管道:
-
location:指定目标对象存储位置。您指定的location用于每个管道一个table_name。 -
table_name:指定数据库中包含要导出的数据的表(需要table_name参数或query参数)。 -
query:指定要在数据库中运行的查询,该查询提供要导出的数据(需要table_name参数或query参数)。 -
format:描述要导出的数据的格式。有关更多信息,请参见 DBMS_CLOUD Package Format Options for EXPORT_DATA 。
priority值确定从数据库提取数据的并行度。interval值指定管道作业连续运行之间的时间间隔(分钟)。默认interval为 15 分钟。有关管道属性的详细信息,请参见 DBMS_CLOUD_PIPELINE Attributes 。
创建管道后,您可以测试管道或启动管道:
-
创建和配置管道以导出查询结果(无时间戳)
您可以创建导出管道,以自动将数据从自治 AI 数据库导出到对象存储。使用此导出管道选项可以指定管道定期运行的 SQL 查询,以将数据导出到对象存储。您可以使用此导出选项将自治 AI 数据库中的最新数据共享到对象存储,以便其他应用使用这些数据。
导出管道将数据从自治 AI 数据库导出到对象存储。创建导出管道时,管道会定期运行并在对象存储上放置数据。
-
创建用于将数据导出到对象存储的管道。
BEGIN DBMS_CLOUD_PIPELINE.CREATE_PIPELINE( pipeline_name=>'EXP_PIPE2', pipeline_type=>'EXPORT', description=>'Export query results to object store.'); END; /有关更多信息,请参见 CREATE_PIPELINE Procedure 。
-
创建身份证明对象以访问要导出数据文件的目标对象存储位置。
使用属性
credential_name为管道目标位置指定身份证明。如果未在下一步中提供credential_name,则credential_name值将设置为NULL。当location属性是公共或预先验证的 URL 时,可以使用默认的NULL值。有关更多信息,请参见 CREATE_CREDENTIAL Procedure 。
-
设置导出管道属性。
BEGIN DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE( pipeline_name => 'EXP_PIPE2', attributes => JSON_OBJECT( 'credential_name' VALUE 'OBJECT_STORE_CRED', 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/', 'query' VALUE 'SELECT * FROM table_name', 'format' VALUE '{"type": "json"}', 'priority' VALUE 'MEDIUM', 'interval' VALUE '20') ); END; /其中,
credential_name是您在上一步中创建的身份证明。必须设置以下属性才能运行导出管道:
-
location:指定目标对象存储位置。 -
query:指定要在数据库中运行的查询,该查询提供要导出的数据。 -
format:描述要导出的数据的格式。有关更多信息,请参见 DBMS_CLOUD Package Format Options for EXPORT_DATA 。
priority值确定从数据库提取数据的并行度。interval值指定管道作业连续运行之间的时间间隔(分钟)。默认interval为 15 分钟。有关管道属性的详细信息,请参见 DBMS_CLOUD_PIPELINE Attributes 。
创建管道后,您可以测试管道或启动管道:
-
测试管道
使用 RUN_PIPELINE_ONCE 可按需运行一次管道,而无需创建调度的作业。
RUN_PIPELINE_ONCE 用于在启动管道之前测试管道。运行一次管道以测试管道并检查管道是否按预期工作后,使用 RESET_PIPELINE 将管道的状态重置为运行 RUN_PIPELINE_ONCE 之前的状态。
-
创建一个管道。
有关详细信息,请参阅创建和配置用于加载数据的管道。
-
运行一次管道以测试管道。
BEGIN DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE( pipeline_name => 'MY_PIPE1' ); END; /有关更多信息,请参见 RUN_PIPELINE_ONCE Procedure 。
-
执行任何必需的检查以验证管道是否按预期运行。
有关更多信息,请参见 Monitor and Troubleshoot Pipelines 。
-
重置管道。
BEGIN DBMS_CLOUD_PIPELINE.RESET_PIPELINE( pipeline_name => 'MY_PIPE1', purge_data => TRUE ); END; /有关更多信息,请参见 RESET_PIPELINE Procedure 。