パイプラインの作成および構成

1つ以上のロード・パイプラインまたはエクスポート・パイプラインを作成できます。パイプラインを作成する場合、パラメータを使用し、パイプライン属性を設定してパイプラインを構成します。

パイプラインの作成と構成のオプションは次のとおりです:

データをロードするためのパイプラインの作成および構成

パイプラインを作成して、オブジェクト・ストア内の外部ファイルまたはディレクトリからAutonomous AI Database内の表にデータをロードできます。

ロード・パイプラインは、オブジェクト・ストアまたはディレクトリに配置されたデータを消費し、Autonomous AI Databaseの表にロードします。ロード・パイプラインを作成すると、パイプラインは定期的に実行され、ソースの場所に配置されたデータが消費されます。新しいデータファイルがパイプラインに到着すると、新しいデータがロードされます。パイプラインを使用して、再開および再試行機能を使用して、ソースの場所からデータベース上の表にファイルを確実にコピーすることもできます。

ロード・パイプラインでは、パイプライン・パッケージはDBMS_CLOUD.COPY_DATAを使用してデータをロードします。

Autonomous AI Databaseで、既存の表を使用するか、データをロードするデータベース表を作成します。たとえば:

 CREATE TABLE EMPLOYEE
     (name     VARCHAR2(128),
      age      NUMBER,
      salary   NUMBER);
  1. オブジェクト・ストアまたはディレクトリ・オブジェクトからデータをロードするパイプラインを作成します。
      BEGIN
       DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
          pipeline_name => 'MY_PIPE1',
          pipeline_type => 'LOAD',
          description   => 'Load metrics from object store into a table'
       );
      END;
     /
    

    詳細は、CREATE_PIPELINEプロシージャを参照してください。

  2. 資格証明オブジェクトを作成して、ロードするファイルを含むオブジェクト・ストアにアクセスします。

    属性credential_nameを使用して、パイプライン・ソースの場所の資格証明を指定します。次のステップでcredential_nameを指定しない場合、credential_name値はNULLに設定されます。location属性がパブリックURLまたは事前認証済URLの場合、デフォルトのNULL値を使用できます。

    詳細は、「CREATE_CREDENTIALプロシージャ」を参照してください。

  3. 必要な属性を含むパイプライン属性(locationtable_nameおよびformat)を設定します。

    ケース1: オブジェクト・ストアからデータをロードするためのパイプラインを作成します。

     BEGIN
          DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
            pipeline_name => 'MY_PIPE1',
            attributes    => JSON_OBJECT(
                 'credential_name' VALUE 'OBJECT_STORE_CRED',
                 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                 'table_name' VALUE 'employee',
                 'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}',
                 'priority' VALUE 'HIGH',
                 'interval' VALUE '20')
       );
     END;
     /
    

    ケース2: ディレクトリ・オブジェクトからデータをロードするためのパイプラインを作成します。

     BEGIN
          DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
            pipeline_name => 'MY_PIPE1',
            pipeline_type => 'LOAD',
            attributes    => JSON_OBJECT(
                 'location' VALUE 'MY_DIR:*.csv',
                 'table_name' VALUE 'employee',
                 'format' VALUE '{"type":"csv"}',
                 'priority' VALUE 'HIGH',
                 'interval' VALUE '20')
       );
     END;
     /
    

    どちらの場合も、ロード・パイプラインを実行するには、次の属性を設定する必要があります。

    • location: オブジェクト・ストアまたはディレクトリ・オブジェクトのソース・ファイルの場所を指定します。

    • table_name: データをロードするデータベース内の表を指定します。指定するlocationは、パイプラインごとに1つのtable_name用です。

    • format: ロードするデータの形式を示します。

      詳細は、DBMS_CLOUDパッケージ・フォーマット・オプションを参照してください。

    credential_nameは、前のステップで作成した資格証明です。

    priority値は、パラレルにロードされるファイルの数を決定します。優先度の高いパイプラインは、より低い優先度での実行と比較して、より多くのデータベース・リソースを消費し、各実行を高速に完了します。

    interval値は、パイプライン・ジョブの連続実行間の時間間隔を分単位で指定します。デフォルトのintervalは15分です。

    パイプライン属性の詳細は、「DBMS_CLOUD_PIPELINEの属性」を参照してください。

  4. パイプラインを作成した後、パイプラインをテストするか、パイプラインを開始できます:

かわりに、JSONの形式を設定するには、次の形式を使用できます。

BEGIN
    DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
        pipeline_name   => 'MY_PIPE1',
        attribute_name  => 'format',
        attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
    );
END;
/

タイムスタンプ列を使用したエクスポートのためのパイプラインの作成および構成

エクスポート・パイプラインを作成して、Autonomous AI Databaseからオブジェクト・ストアに時系列データを自動的にエクスポートできます。

このエクスポート・パイプライン・オプションを使用して、表またはSQL問合せと、パイプラインが最後にアップロードした時間を追跡するために使用するタイムスタンプを持つ列を指定します。エクスポート・パイプラインを使用すると、他のアプリケーションによる消費のためにデータを共有したり、データをオブジェクト・ストアに保存したりできます。

エクスポート・パイプラインでは、パイプライン・パッケージはDBMS_CLOUD.EXPORT_DATAを使用してデータをエクスポートします。

エクスポート・パイプラインは、Autonomous AI Databaseからオブジェクト・ストアにデータをエクスポートします。エクスポート・パイプラインを作成すると、パイプラインは定期的に実行され、データがオブジェクト・ストアに配置されます。

  1. データをオブジェクト・ストアにエクスポートするパイプラインを作成します。

     BEGIN
          DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
             pipeline_name=>'EXP_PIPE1',
             pipeline_type=>'EXPORT',
             description=>'Export time series metrics to object store');
     END;
     /
    

    詳細は、CREATE_PIPELINEプロシージャを参照してください。

  2. 資格証明オブジェクトを作成して、データファイルをエクスポートする宛先オブジェクト・ストアの場所にアクセスします。

    パイプライン宛先の場所の資格証明は、属性credential_nameを使用して指定します。次のステップでcredential_nameを指定しない場合、credential_name値はNULLに設定されます。location属性がパブリックURLまたは事前認証済URLの場合、デフォルトのNULL値を使用できます。

    詳細は、「CREATE_CREDENTIALプロシージャ」を参照してください。

  3. エクスポート・パイプライン属性を設定します。

    table_nameパラメータを指定すると、表の行がオブジェクト・ストアにエクスポートされます。queryパラメータを指定すると、必要なデータのみがオブジェクト・ストアにエクスポートされるように、問合せによってSELECT文が指定されます。

    • table_nameパラメータの使用:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'metric_table',
                'key_column' VALUE 'metric_time',
                'format' VALUE '{"type": "json"}',
                'priority' VALUE 'MEDIUM',
                'interval' VALUE '20')
        );
      END;
      /
      
    • queryパラメータを使用すると、次のようになります。

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                 'query' VALUE 'SELECT * from metrics_table',
                 'key_column' VALUE 'metric_time',
                 'format' VALUE '{"type": "json"}',
                 'priority' VALUE 'MEDIUM',
                 'interval' VALUE '20')
        );
      END;
      /
      

      ここで、credential_nameは、前のステップで作成した資格証明です。

    エクスポート・パイプラインを実行するには、次の属性を設定する必要があります。

    • location: 宛先オブジェクト・ストアの場所を指定します。指定するlocationは、パイプラインごとに1つのtable_name用です。

    • table_name: エクスポートするデータを含むデータベース内の表を指定します(table_nameパラメータまたはqueryパラメータのいずれかが必要です)。

    • query: エクスポートするデータを提供するデータベースで実行する問合せを指定します(table_nameパラメータまたはqueryパラメータのいずれかが必要です)。

    • format: エクスポートするデータの形式を示します。

      詳細は、「EXPORT_DATAのDBMS_CLOUDパッケージ形式オプション」を参照してください。

    priority値は、データベースからデータをフェッチするための並列度を決定します。

    interval値は、パイプライン・ジョブの連続実行間の時間間隔を分単位で指定します。デフォルトのintervalは15分です。

    パイプライン属性の詳細は、「DBMS_CLOUD_PIPELINEの属性」を参照してください。

    パイプラインを作成した後、パイプラインをテストするか、パイプラインを開始できます:

問合せ結果をエクスポートするためのパイプラインの作成および構成(タイムスタンプなし)

エクスポート・パイプラインを作成して、Autonomous AI Databaseからオブジェクト・ストアにデータを自動的にエクスポートできます。このエクスポート・パイプライン・オプションを使用して、パイプラインが定期的に実行してデータをオブジェクト・ストアにエクスポートするSQL問合せを指定します。このエクスポート・オプションを使用して、自律型AIデータベースからオブジェクト・ストアに最新データを共有し、他のアプリケーションでデータを消費できます。

エクスポート・パイプラインは、Autonomous AI Databaseからオブジェクト・ストアにデータをエクスポートします。エクスポート・パイプラインを作成すると、パイプラインは定期的に実行され、データがオブジェクト・ストアに配置されます。

  1. データをオブジェクト・ストアにエクスポートするパイプラインを作成します。

     BEGIN
          DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
             pipeline_name=>'EXP_PIPE2',
             pipeline_type=>'EXPORT',
             description=>'Export query results to object store.');
     END;
     /
    

    詳細は、CREATE_PIPELINEプロシージャを参照してください。

  2. 資格証明オブジェクトを作成して、データファイルをエクスポートする宛先オブジェクト・ストアの場所にアクセスします。

    パイプライン宛先の場所の資格証明は、属性credential_nameを使用して指定します。次のステップでcredential_nameを指定しない場合、credential_name値はNULLに設定されます。location属性がパブリックURLまたは事前認証済URLの場合、デフォルトのNULL値を使用できます。

    詳細は、「CREATE_CREDENTIALプロシージャ」を参照してください。

  3. エクスポート・パイプライン属性を設定します。

     BEGIN
          DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
            pipeline_name => 'EXP_PIPE2',
            attributes    => JSON_OBJECT(
               'credential_name' VALUE 'OBJECT_STORE_CRED',
               'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
               'query' VALUE 'SELECT * FROM table_name',
               'format' VALUE '{"type": "json"}',
               'priority' VALUE 'MEDIUM',
               'interval' VALUE '20')
       );
     END;
     /
    

    ここで、credential_nameは、前のステップで作成した資格証明です。

    エクスポート・パイプラインを実行するには、次の属性を設定する必要があります。

    • location: 宛先オブジェクト・ストアの場所を指定します。

    • query: エクスポートするデータを提供するデータベースで実行する問合せを指定します。

    • format: エクスポートするデータの形式を示します。

      詳細は、「EXPORT_DATAのDBMS_CLOUDパッケージ形式オプション」を参照してください。

    priority値は、データベースからデータをフェッチするための並列度を決定します。

    interval値は、パイプライン・ジョブの連続実行間の時間間隔を分単位で指定します。デフォルトのintervalは15分です。

    パイプライン属性の詳細は、「DBMS_CLOUD_PIPELINEの属性」を参照してください。

    パイプラインを作成した後、パイプラインをテストするか、パイプラインを開始できます:

パイプラインのテスト

RUN_PIPELINE_ONCEを使用して、スケジュール済ジョブを作成せずに、オンデマンドでパイプラインを1回実行します。

RUN_PIPELINE_ONCEは、パイプラインを開始する前にパイプラインをテストする場合に便利です。パイプラインを1回実行してパイプラインをテストし、予期したとおりに動作していることを確認した後、RESET_PIPELINEを使用してパイプラインの状態を(RUN_PIPELINE_ONCEを実行する前の状態に)リセットします。

  1. パイプラインを作成します。

    詳細は、データをロードするためのパイプラインの作成および構成を参照してください。

  2. パイプラインを1回実行して、パイプラインをテストします。

     BEGIN
         DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE(
             pipeline_name => 'MY_PIPE1'
     );
     END;
     /
    

    詳細は、「RUN_PIPELINE_ONCEプロシージャ」を参照してください。

  3. 必要なチェックを実行して、パイプラインが想定どおりに動作していることを確認します。

    詳細は、パイプラインのモニターおよびトラブルシューティングを参照してください。

  4. パイプラインをリセットします。

     BEGIN
        DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
          pipeline_name => 'MY_PIPE1',
          purge_data => TRUE
     );
     END;
     /
    

    詳細は、「RESET_PIPELINEプロシージャ」を参照してください。