パイプラインの作成および構成

1つ以上のロード・パイプラインまたはエクスポート・パイプラインを作成できます。パイプラインを作成する場合は、パラメータを使用し、パイプライン属性を設定してパイプラインを構成します。

パイプラインを作成および構成するオプションは、次のとおりです。

データをロードするためのパイプラインの作成および構成

パイプラインを作成して、オブジェクト・ストアの外部ファイルからAutonomous Databaseの表にデータをロードできます。

ロード・パイプラインは、オブジェクト・ストアに配置されたデータを消費し、Autonomous Databaseの表にロードします。ロード・パイプラインを作成すると、パイプラインはオブジェクト・ストアに配置されたデータを消費するために定期的に実行され、新しいデータ・ファイルが到着すると、パイプラインは新しいデータがロードされます。パイプラインを使用して、オブジェクト・ストアからデータベースの表に、履歴書および再試行機能を備えたファイルを確実にコピーすることもできます。

ロード・パイプラインでは、パイプライン・パッケージはDBMS_CLOUD.COPY_DATAを使用してデータをロードします。

Autonomous Databaseで、既存の表を使用するか、データをロードするデータベース表を作成します。例:

CREATE TABLE EMPLOYEE
            (name     VARCHAR2(128),
             age      NUMBER,
             salary   NUMBER);
  1. パイプラインを作成して、オブジェクト・ストアからデータをロードします。
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name => 'MY_PIPE1',
            pipeline_type => 'LOAD',
            description   => 'Load metrics from object store into a table'
      );
    END;
    /

    詳細は、CREATE_PIPELINEプロシージャを参照してください。

  2. 資格証明オブジェクトを作成して、ロードするファイルを含むオブジェクト・ストアにアクセスします。

    パイプライン・ソースの場所の資格証明は、属性credential_nameで指定します。次のステップでcredential_nameを指定しない場合、credential_name値はNULLに設定されます。location属性がパブリックURLまたは事前認証済URLの場合、デフォルトのNULL値を使用できます。

    詳細は、CREATE_CREDENTIALプロシージャを参照してください。

  3. 必要な属性(locationtable_nameおよびformat)を含むパイプライン属性を設定します。
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'MY_PIPE1',
           attributes    => JSON_OBJECT(
                'credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'employee',
                'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}',
                'priority' VALUE 'HIGH',
                'interval' VALUE '20')
      );
    END;
    /

    ロード・パイプラインを実行するには、次の属性を設定する必要があります。

    • location: オブジェクト・ストアのソース・ファイルの場所を指定します。

    • table_name: データをロードするデータベース内の表を指定します。指定するlocationは、パイプラインごとに1つのtable_nameに対するものです。

    • format: ロードするデータの形式を示します。

      詳細は、DBMS_CLOUDパッケージ形式オプションに関する項を参照してください。

    credential_nameは、前のステップで作成した資格証明です。

    priority値は、パラレルにロードされるファイルの数を決定します。優先度が高いパイプラインは、より低い優先度で実行する場合と比較して、より多くのデータベース・リソースを消費し、各実行を高速に完了します。

    interval値は、パイプライン・ジョブの連続実行間の時間間隔を分単位で指定します。デフォルトのintervalは15分間です。

    パイプライン属性の詳細は、DBMS_CLOUD_PIPELINE属性を参照してください。

    パイプラインを作成した後は、そのパイプラインをテストしたり、そのパイプラインを起動したりできます。

別の方法として、JSONの形式を設定するには、次の形式を使用できます。

BEGIN
    DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
        pipeline_name   => 'MY_PIPE1',
        attribute_name  => 'format',
        attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
    );
END;
/

タイムスタンプ列を使用したエクスポートのためのパイプラインの作成および構成

エクスポート・パイプラインを作成して、Autonomous Databaseからオブジェクト・ストアに時系列データを自動的にエクスポートできます。

このエクスポート・パイプライン・オプションを使用して、表またはSQL問合せ、およびパイプラインが前回のアップロードの時間を追跡するために使用するタイムスタンプを含む列を指定します。エクスポート・パイプラインを使用すると、他のアプリケーションで使用するためにデータを共有したり、データをオブジェクト・ストアに保存できます。

エクスポート・パイプラインでは、パイプライン・パッケージはDBMS_CLOUD.EXPORT_DATAを使用してデータをエクスポートします。

エクスポート・パイプラインは、Autonomous Databaseからオブジェクト・ストアにデータをエクスポートします。エクスポート・パイプラインを作成すると、パイプラインは一定の間隔で実行され、データをオブジェクト・ストアに配置します。

  1. データをオブジェクト・ストアにエクスポートするパイプラインを作成します。
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name=>'EXP_PIPE1',
            pipeline_type=>'EXPORT',
            description=>'Export time series metrics to object store');
    END;
    /

    詳細は、CREATE_PIPELINEプロシージャを参照してください。

  2. データ・ファイルをエクスポートする宛先オブジェクト・ストアの場所にアクセスするための資格証明オブジェクトを作成します。

    パイプライン宛先の場所の資格証明は、属性credential_nameで指定します。次のステップでcredential_nameを指定しない場合、credential_name値はNULLに設定されます。location属性がパブリックURLまたは事前認証済URLの場合、デフォルトのNULL値を使用できます。

    詳細は、CREATE_CREDENTIALプロシージャを参照してください。

  3. エクスポート・パイプライン属性を設定します。

    table_nameパラメータを指定すると、表の行がオブジェクト・ストアにエクスポートされます。queryパラメータを指定すると、問合せでSELECT文が指定され、必要なデータのみがオブジェクト・ストアにエクスポートされます。

    • table_nameパラメータの使用:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'metric_table',
                'key_column' VALUE 'metric_time',
                'format' VALUE '{"type": "json"}',
                'priority' VALUE 'MEDIUM',
                'interval' VALUE '20')
        );
      END;
      /
    • queryパラメータの使用:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                 'query' VALUE 'SELECT * from metrics_table',
                 'key_column' VALUE 'metric_time',
                 'format' VALUE '{"type": "json"}',
                 'priority' VALUE 'MEDIUM',
                 'interval' VALUE '20')
        );
      END;
      /

    credential_nameは、前のステップで作成した資格証明です。

    エクスポート・パイプラインを実行するには、次の属性を設定する必要があります。

    • location: 宛先オブジェクト・ストアの場所を指定します。指定するlocationは、パイプラインごとに1つのtable_nameに対するものです。

    • table_name: エクスポートするデータを含むデータベース内の表を指定します(table_nameパラメータまたはqueryパラメータのいずれかが必要です)。

    • query: エクスポートするデータを提供するデータベースで実行する問合せを指定します(table_nameパラメータまたはqueryパラメータのいずれかが必要です)。

    • format: エクスポートするデータの形式を示します。

      詳細は、DBMS_CLOUD EXPORT_DATAのパッケージ形式オプションを参照してください。

    priority値は、データベースからデータをフェッチするための並列度を決定します。

    interval値は、パイプライン・ジョブの連続実行間の時間間隔を分単位で指定します。デフォルトのintervalは15分間です。

    パイプライン属性の詳細は、DBMS_CLOUD_PIPELINE属性を参照してください。

    パイプラインを作成した後は、そのパイプラインをテストしたり、そのパイプラインを起動したりできます。

問合せ結果をエクスポートするためのパイプラインの作成および構成(タイムスタンプなし)

エクスポート・パイプラインを作成して、Autonomous Databaseからオブジェクト・ストアにデータを自動的にエクスポートできます。このエクスポート・パイプライン・オプションを使用して、パイプラインが定期的に実行してデータをオブジェクト・ストアにエクスポートするSQL問合せを指定します。このエクスポート・オプションを使用すると、Autonomous Databaseからオブジェクト・ストアに最新データを共有して、他のアプリケーションでデータを消費できます。

エクスポート・パイプラインは、Autonomous Databaseからオブジェクト・ストアにデータをエクスポートします。エクスポート・パイプラインを作成すると、パイプラインは一定の間隔で実行され、データをオブジェクト・ストアに配置します。

  1. データをオブジェクト・ストアにエクスポートするパイプラインを作成します。
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name=>'EXP_PIPE2',
            pipeline_type=>'EXPORT',
            description=>'Export query results to object store.');
    END;
    /

    詳細は、CREATE_PIPELINEプロシージャを参照してください。

  2. データ・ファイルをエクスポートする宛先オブジェクト・ストアの場所にアクセスするための資格証明オブジェクトを作成します。

    パイプライン宛先の場所の資格証明は、属性credential_nameで指定します。次のステップでcredential_nameを指定しない場合、credential_name値はNULLに設定されます。location属性がパブリックURLまたは事前認証済URLの場合、デフォルトのNULL値を使用できます。

    詳細は、CREATE_CREDENTIALプロシージャを参照してください。

  3. エクスポート・パイプライン属性を設定します。
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'EXP_PIPE2',
           attributes    => JSON_OBJECT(
              'credential_name' VALUE 'OBJECT_STORE_CRED',
              'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
              'query' VALUE 'SELECT * FROM table_name',
              'format' VALUE '{"type": "json"}',
              'priority' VALUE 'MEDIUM',
              'interval' VALUE '20')
      );
    END;
    /

    credential_nameは、前のステップで作成した資格証明です。

    エクスポート・パイプラインを実行するには、次の属性を設定する必要があります。

    • location: 宛先オブジェクト・ストアの場所を指定します。

    • query: エクスポートするデータを提供するデータベースで実行する問合せを指定します。

    • format: エクスポートするデータの形式を示します。

      詳細は、DBMS_CLOUD EXPORT_DATAのパッケージ形式オプションを参照してください。

    priority値は、データベースからデータをフェッチするための並列度を決定します。

    interval値は、パイプライン・ジョブの連続実行間の時間間隔を分単位で指定します。デフォルトのintervalは15分間です。

    パイプライン属性の詳細は、DBMS_CLOUD_PIPELINE属性を参照してください。

    パイプラインを作成した後は、そのパイプラインをテストしたり、そのパイプラインを起動したりできます。

パイプラインのテスト

RUN_PIPELINE_ONCEを使用して、スケジュール済ジョブを作成せずにパイプラインをオンデマンドで1回実行します。

RUN_PIPELINE_ONCEは、パイプラインを開始する前にパイプラインをテストする場合に役立ちます。パイプラインを1回実行してパイプラインをテストし、期待どおりに機能していることを確認したら、RESET_PIPELINEを使用してパイプラインの状態を(RUN_PIPELINE_ONCEを実行する前の状態に)リセットします。

  1. パイプラインの作成
  2. パイプラインを1回実行して、パイプラインをテストします。
    BEGIN
        DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE(
            pipeline_name => 'MY_PIPE1'
    );
    END;
    /

    詳細は、RUN_PIPELINE_ONCEプロシージャを参照してください。

  3. 必要なチェックを実行して、パイプラインが期待どおりに動作していることを確認します。
  4. パイプラインをリセットします。
    BEGIN  
       DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
         pipeline_name => 'MY_PIPE1',
         purge_data => TRUE
    );
    END;
    /

    詳細は、RESET_PIPELINEプロシージャを参照してください。