102 DBMS_KAFKA

DBMS_KAFKAパッケージは、Kafkaクラスタ内のトピックへのOracle SQLアクセスを有効にするためのPL/SQLインタフェースを提供します。

Oracle SQL access to Kafka (OSAK)クラスタに対するREADアクセス権を付与されたユーザーは、DBMS_KAFKAパッケージを使用して、Oracle Databaseのビューおよび表からKafkaデータを問い合せるアプリケーションを作成できます。

102.1 DBMS_KAFKAの概要

DBMS_KAFKAパッケージでは、Oracle SQL access to Kafka (OSAK)を使用してKafkaデータにアクセスして処理できます。

アプリケーションでKafkaデータを使用できるようにするには、DBMS_KAFKAパッケージを使用します。Kafkaリソースを使用する前に、Kafkaの管理者がKafkaクラスタを登録してアクセスを有効にしておく必要があります。

Kafkaトピックにアクセスするには、Oracle SQL Access to Kafka (OSAK)アプリケーションを作成します。まず、使用するモードを決定する必要があります。

  • ロード・モード: KafkaトピックのデータをOracle Database表にロードするために使用します。
  • ストリーミング・モード: Kafkaトピックを順番に読み取るために使用します。
  • シーク可能モード: 開始タイムスタンプと終了タイムスタンプの間のKafkaトピックにランダムにアクセスするために使用します。

次に、適切なOSAKパッケージを使用してアプリケーションを作成します。

  • CREATE_LOAD_APP: ロード・モードで使用できるアプリケーションを作成します。
  • CREATE_STREAMING_APP: ストリーミング・モードで使用できるアプリケーションを作成します。
  • CREATE_SEEKABLE_APP: シーク可能モードで使用できるアプリケーションを作成します。

他のDBMS_KAFKAパッケージでは、Kafkaデータを管理できます。

各パッケージの手順の概要を次に示します

ロード・アプリケーションへのデータのロード

  • DBMS_KAFKA.CREATE_LOAD_APPを使用して、Oracle SQL Access to Kafkaロード・アプリケーションを作成します
  • オプションで、DBMS_KAFKA_INIT_OFFSET_TSまたはDBMS_KAFKA_INIT_OFFSETを使用して、読み取る最初のKafkaレコードを設定します。
  • 完了するまでLOOPを実行します。
    • DBMS_KAFKA.EXECUTE_LOAD_APPを使用して、前の読取りが中断した場所から現在の最高水位標までのKafkaデータをロードします。
  • Kafkaデータの操作が終了したら、DBMS_KAFKA.DROP_LOAD_APPを使用してロード・アプリケーションを削除します。

ストリーミング・モード・アプリケーションへのデータのロード

ストリーミング・モードでKafkaデータを問い合せて、Kafkaトピックを順番に読み取る手順は、次のとおりです。

  1. DBMS_KAFKA.CREATE_STREAMING_APPを使用して、Oracle SQL Access to Kafkaストリーミング・アプリケーションを作成します。
  2. オプションで、DBMS_KAFKA_INIT_OFFSET_TSまたはDBMS_KAFKA_INIT_OFFSETを使用して、読み取る最初のKafkaレコードを設定します。
  3. SQLでデータに対してLOOPを実行します。
    1. DBMS_KAFKA.LOAD_TEMP_TABLEをコールして、Kafkaの次の行セットを含むグローバル一時表をロードします
    2. OSAKグローバル一時表からのSELECTを使用します。
    3. 取得されたデータを処理します
    4. 処理が成功した場合、DBMS_KAFKA.UPDATE_OFFSETを使用してKafkaレコードの次のセットに進みます。
    5. COMMITを使用して、オフセット・トラッキング情報をコミットします。
  4. アプリケーションでの作業が終了したら、DBMS_KAFKA.DROP_STREAMING_APPを使用してアプリケーションを削除します。

シーク可能モード・アプリケーションへのデータのロード

2つのタイムスタンプ間のKafkaレコードにアクセスできるように、シーク可能モードでKafkaデータを問い合せる手順の概要は、次のとおりです。

  1. DBMS_KAFKA.CREATE_SEEKABLE_APPを使用して、Oracle SQL Access to Kafkaシーク可能アプリケーションを作成します
  2. SQLでKafkaデータに対してLOOPを実行します。
    1. DBMS_KAFKA.SEEK_OFFSET_TSを使用して、Kafkaトピック内の、定義した時間ウィンドウをシークします。
    2. DBMS_KAFKA.LOAD_TEMP_TABLEをコールして、分析するKafkaの行セットを含むグローバル一時表をロードします。
    3. OSAKグローバル一時表からのSELECTを使用します。
    4. データを処理します。
  3. アプリケーションでの作業が終わったら、DBMS_KAFKA.DROP_SEEKABLE_APPを使用してアプリケーションを削除します。

102.2 DBMS_KAFKAロード・モード

KafkaデータをOracle Databaseに増分的にロードするには、DBMS_KAFKA LOADINGモード・パッケージを使用します。

ロード・プロシージャを使用すると、使用可能なKafkaレコードをOracle Databaseにロードでき、これをその後、そのデータのデータ・ウェアハウスとして使用できます。その後、分析のためにそのデータをOracle Database表と結合できます。

アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_LOAD_APPをコールすることにより、それがロード・アプリケーションであることを宣言して、DBMS_KAFKA.EXECUTE_LOAD_APPへの後続のコールについて状態を初期化します。DBMS_KAFKA.CREATE_LOAD_APPは、トピックのすべてのパーティションを網羅する単一のビューを作成します。アプリケーションは、オプションでDBMS_KAFKA.INIT_OFFSET[_TS]プロシージャをコールして、Kafkaトピック・パーティションの開始ポイントを設定することもできます。

DBMS_KAFKA.EXECUTE_LOAD_APPプロシージャがアプリケーション・ループでコールされると、前のコールが中断した場所からKafkaトピックの現在の最高水位標までのデータがロードされます。このプロシージャは自律型トランザクションで実行されます。

Kafkaデータの操作が終了したら、DBMS_KAFKA.DROP_LOAD_APPを使用してアプリケーションを削除できます。

102.2.1 CREATE_LOAD_APP

このプロシージャは、Kafkaトピックのすべてのパーティションからデータを取得して、そのKafkaデータをOracle Database表にロードするOracle SQL Access to Kafkaロード・アプリケーションを作成します。 また、KafkaクラスタでKafkaトピックに関するトピックおよびパーティションのライブ情報を調べるために使用されるメタデータ・ビューも作成します(まだ存在しない場合)。このビューは1回作成され、同じクラスタを共有するすべてのアプリケーションで使用されます。このモデルは、1つのアプリケーション・インスタンスのみが、作成されたLOADアプリケーションについてDBMS_KAFKA.EXECUTE_LOAD_APPのコールを許可されるという点で限定的です。

パラメータ

パラメータ 説明

cluster_name

このアプリケーションに関連付けるトピックが含まれている登録済Oracle SQL access to Kafkaクラスタの名前。

大/小文字は区別されません。

登録済クラスタの名前は、OSAK管理者から、次の文を使用して取得できます。

SELECT cluster_name FROM sys.user_kafka_clusters;

application_name

アプリケーション名。このパラメータは、トピックを読み取ることができるKafkaグループとしても使用されます。

大/小文字は区別されません。

topic_name

コンテンツを取得する、Kafkaクラスタのトピック名。

大/小文字が区別されます。

options

JSONドキュメント形式のプロパティのリストを含みます。オプションの詳細は、「CREATE_XXX_APPに渡されるDBMS_KAFKAのオプション」のトピックを参照してください。

例102-1 Oracle SQL Access to KafkaのCREATE_LOAD_APPプロシージャ

次の例では、Kafkaトピックmy-company-app-event1およびオプションProducerRecordを使用して、KafkaクラスタExampleClusterのデータについてExampleAppというロード・アプリケーションが作成されます。

  PROCEDURE CREATE_LOAD_APP (
                             ExampleCluster         IN  VARCHAR2,
                             ExampleApp             IN  VARCHAR2,
                             my-company-app-event1  IN  VARCHAR2,
                             ProducerRecord         IN  CLOB

102.2.2 DROP_LOAD_APP

DBMS_KAFKA_ADMのプロシージャDROP_LOAD_APPは、Oracle SQL for Kafka (OSAK) LOADアプリケーションを削除するとともに、関連するメタデータを削除します。

構文

EXEC DROP_LOAD_APP ( 
                           cluster_name     IN VARCHAR2,
                           application_name IN VARCHAR2
  );

パラメータ

表102-1 DBMS_KAFKA_ADMのDROP_LOAD_APPプロシージャのパラメータ。

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

application_name

Kafkaクラスタに関連付けられている既存のアプリケーションの名前。

大/小文字は区別されません。

使用上のノート

KafkaトピックのデータをOracle Database表にロードする必要がなくなった場合、このプロシージャを使用してOracle SQL access to Kafka (OSAK)アプリケーションを削除します。

KafkaクラスタExampleClusterのデータを使用するExampleAppというアプリケーションでの作業が完了したとします。このプロシージャを使用してアプリケーションを削除します。

EXEC DROP_LOAD_APP ( 
                           ExampleCluster   IN VARCHAR2,
                           ExampleApp       IN VARCHAR2
  );

102.2.3 EXECUTE_LOAD_APP

DBMS_KAFKAのプロシージャEXECUTE_LOAD_APPは、専用のOracle SQL access to Kafka (OSAK)ビューからユーザー表をロードします。このプロシージャを使用するには、CREATE_LOAD_APPを使用してロード・アプリケーションを事前に作成しておく必要があります。

構文

PROCEDURE EXECUTE_LOAD_APP (
                              cluster_name        IN  VARCHAR2,
                              application_name    IN  VARCHAR2,
                              target_table        IN  VARCHAR2,
                              records_loaded      OUT INTEGER,
                              parallel_hint      IN INTEGER DEFAULT 0
);

パラメータ

表102-2 DBMS_KAFKAのEXECUTE_LOAD_APPSプロシージャのパラメータ

パラメータ 説明

cluster_name

このアプリケーションに関連付けたトピックが含まれている登録済Oracle SQL access to Kafkaクラスタの名前。

大/小文字は区別されません。

登録済クラスタの名前は、OSAK管理者から取得するか、次のSQL文を使用して取得できます。SELECT cluster_name from SYS.USER_KAFKA_CLUSTERS;

application_name

Kafkaクラスタに関連付けられている既存のアプリケーションの名前

大/小文字は区別されません。

target_table

KafkaトピックのデータとともにロードされるOracle Databaseのターゲット表。LOAD操作によって作成されたOSAKビューによってKafkaクラスタ・トピックから取得された列の一部またはすべてと一致している必要があります。

records_loaded

(OUT)ロードされたKafkaレコードの数

parallel_hint

(IN) (オプション)特定のOSAKビューに排他的にマップされるアプリケーションのロード時に使用する並列度。パラレル・ヒントが指定されていないか、1以下の場合、表をロードするために並列処理は使用されません。

ノート: パラレル・ヒントは、ユーザー・セッションまたはシステムのPARALLEL_DEGREE_POLICYMANUALに設定されている場合にのみ使用します。他のすべてのポリシー(AUTOなど)については、パラレル・ヒントを渡さないでください。パラレル・ヒントがOSAKビューのグラニュル数を超えると、例外が発生します。

使用上のノート

Oracle SQL access to Kafka (OSAK)ビューは、CREATE_LOAD_APPプロシージャによって事前に作成されたKafkaクラスタ内のトピックのレコードをロードするためにPL/SQLコールによって透過的に使用されます。アプリケーションは、EXECUTE_LOAD_APPをコールして、専用のOSAKビューからOracle Databaseユーザー表をロードします。

ノート:

OSAKアプリケーションを処理する単一のデータベース・アプリケーション・インスタンスを使用してOSAK一時表を問い合せることができるのは、1人のユーザーのみです。ただし、EXECUTE_LOAD_APPを使用すると、複数のアプリケーションからアクセスできる永続データベース表にKafkaデータがロードされます。

EXECUTE_LOAD_APPプロシージャをコールするたびに、Kafkaトピックから新しいレコードが読み取られ、これらのレコードがOracle Database表に挿入されます。また、EXECUTE_LOAD_APPはすべてのKafkaパーティションのオフセットを進めるため、次にEXECUTE_LOAD_APPが実行されたときには、新しい行が挿入されます。EXECUTE_LOAD_APPを使用すると、増分ロードを実行できるため、Kafkaトピックに対する更新でOracle Database表を更新できます。そのKafkaデータは標準のOracle Database表に移動されるため、複数のアプリケーションで処理および分析に使用できるようになります。

KafkaクラスタExampleClusterの初期クラスタ定義を完了し、クラスタを登録したとします。次に、このプロシージャを使用して、KafkaクラスタExampleClusterからアプリケーションExampleLoadAppにデータをロードします。

DECLARE
    v_records_inserted INTEGER;
BEGIN
    SYS.DBMS_KAFKA.EXECUTE_LOAD_APP (
                                                                  ‘ExampleCluster’,
                                                                  ‘ExampleLoadApp’,
                                                                  ‘ExampleLoadTable’,
                                                                  v_records_inserted);
END;

102.3 DBMS_KAFKAのグローバル一時表

Oracle DatabaseのKafkaデータ・ビューの作成元のKafkaデータを一時表にロードするには、DBMS_KAFKA LOAD_TEMP_TABLEモード・パッケージを使用します。

DBMS_KAFKA.LOAD_TEMP_TABLEプロシージャがアプリケーション・ループでコールされると、Oracle SQL access to Kafka (OSAK)アプリケーションにデータがロードされます。STREAMINGアプリケーションとSEEKABLEアプリケーションの両方について、CREATE_APP_xxx (xxxはSTREAMINGまたはSEEKABLE)を使用してアプリケーションを作成します。次に、アプリケーションについてLOAD_TEMP_TABLEをコールするときにアプリケーション・ループを使用し、一時表にロードされたデータを処理します。

102.3.1 LOAD_TEMP_TABLE

DBMS_KAFKAのプロシージャLOAD_TEMP_TABLEは、一時表に対するすべてのデータをOracle SQL access to Kafkaビューから選択します。このプロシージャを使用して、データを分析したり、Oracle Database表と結合するためにアプリケーションがSQL問合せで使用できるOracle SQL access to Kafka (OSAK)専用一時表を作成します。

構文

FUNCTION LOAD_TEMP_TABLE(
                   temporary-table-name   IN VARCHAR2
  ) RETURN INTEGER;

パラメータ

表102-3 DBMS_KAFKAのLOAD_TEMP_TABLEプロシージャのパラメータ

パラメータ 説明

temporary-table-name

作成する一時表の名前

タイプ: VARCHAR

大/小文字は区別されません。

parallel-hint

(オプション)特定のOSAKビューに排他的にマップされるグローバル一時表のロード時の並列度。パラレル・ヒントが指定されていないか、1以下の場合、表をロードするために並列処理は使用されません。

ノート: パラレル・ヒントは、ユーザー・セッションまたはシステムのPARALLEL_DEGREE_POLICYMANUALに設定されている場合にのみ使用します。他のすべてのポリシー(AUTOなど)については、パラレル・ヒントを渡さないでください。パラレル・ヒントがOSAKビューのグラニュル数を超えると、例外が発生します。

使用上のノート

Oracle SQL access to Kafka (OSAK)ビューは、専用のOSAKグローバル一時表またはユーザー定義表にKafkaトピックのレコードをロードするためにPL/SQLコールによって透過的に使用されます。STREAMINGおよびSEEKABLEアプリケーションは、専用のOSAKビューからOSAKグローバル一時表をロードするLOAD_TEMP_TABLEをコールし、LOADアプリケーションは、EXECUTE_LOAD_APPをコールして専用のOSAKビューからユーザー表をロードします。

注意:

OSAKビューは専用ビューであり、1つのアプリケーション・インスタンスでのみ使用できます。これらはアプリケーションの他のインスタンスでは共有されず、汎用ツール(SQL*PlusやSQL Developerなど)では問い合せないでください。OSAKビューの同時アクセスによって競合状態が発生する可能性があり、ツールが専用アプリケーションよりも多くの行を誤って読み取り、専用アプリケーションが読み取ったものを超えてKafkaオフセットを誤って進めることがあります。その結果、アプリケーションは、処理する必要があったKafkaトピック・レコードの取得に失敗する場合があります。

KafkaクラスタExampleClusterから、ExampleAppというストリーミング・アプリケーションに関連付けられたレコードをロードして、処理するとします。デフォルトでは、OSAKビューは、最初に作成されたときには存在する最も古いレコードから現在パブリッシュされている最後のレコードまでを読み取るように構成されます。DBMS_KAFKA.CREATE_STREAMING_APPを使用してストリーミング・アプリケーションを作成した後の、そのストリーミング・アプリケーションのKafkaデータ処理ループの例を次に示します。

BEGIN
   LOOP 
       SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                                              (ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0);
       FOR kafka_record IN (
             SELECT kafka_offset offset 
                    FROM ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0)
       LOOP
              SYS.DBMS_OUTPUT.PUT_LINE (‘Processing record: ‘ || kafka_record.offset);
              --application logic to process the Kafka records
       END LOOP;
       IF (application logic was successful) THEN
            --Update internal metadata to confirm Kafka records were successfully processed
            SYS.DBMS_KAFKA.UPDATE_OFFSET 
                                                       (‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’);
            COMMIT;
       ELSE
            --add your application logic to correct for any failures
       END IF;
   END LOOP;
END;

102.4 DBMS_KAFKAストリーミング・モード

Kafkaトピックのデータを専用のOracle SQL access to Kafka (OSAK)ビューからOSAKグローバル一時表にストリームするには、DBMS_KAFKA STREAMINGモード・パッケージを使用します。 このパッケージを使用するには、まずLOAD_TEMP_TABLEを使用して専用のOSAKビューを作成する必要があります。

最初から、またはKafkaトピックの特定の開始ポイントから順番にKafkaトピックにアクセスする必要があるアプリケーションには、STREAMINGモードを使用します。このモードでは、OSAK一時表を使用するSQL問合せが、アプリケーション処理ループでKafkaレコードに順番にアクセスできます。アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_STREAMING_APPをコールすることにより、それがストリーミング・アプリケーションであることを宣言して、OSAKビューの後続の問合せについて状態を初期化します。

102.4.1 CREATE_STREAMING_APP

DBMS_KAFKAのプロシージャCREATE_STREAMING_APPは、Oracle SQL access to Kafka (OSAK)ストリーミング・アプリケーションを作成します。 このアプリケーションには、Kafkaトピックのパーティションから新規および読み取られていないレコードを取得するために使用される、専用のOSAKグローバル一時表およびOSAKビューのセットが含まれています。また、KafkaクラスタでKafkaトピックに関するトピックおよびパーティションのライブ情報を調べるために使用されるメタデータ・ビューも作成します(まだ存在しない場合)。このビューは1回作成され、同じクラスタを共有するすべてのアプリケーションで使用されます。

構文

PROCEDURE CREATE_STREAMING_APP (
                                  cluster_name         IN  VARCHAR2,
                                  application_name     IN  VARCHAR2,
                                  topic_name           IN  VARCHAR2,
                                  options              IN  CLOB,
                                  view_count           IN  INTEGER DEFAULT 1
  );

パラメータ

表102-4 DBMS_KAFKAのCREATE_STREAMING_APPプロシージャのパラメータ

パラメータ 説明

cluster_name

このアプリケーションに関連付けたトピックが含まれている登録済Oracle SQL access to Kafkaクラスタの名前。

大/小文字は区別されません。

登録済クラスタの名前は、OSAK管理者から取得するか、次のSQL文を使用して取得できます。SELECT cluster_name from SYS.USER_KAFKA_CLUSTERS;

application_name

アプリケーションの名前。トピックを読み取るKafkaグループとしても使用されます。

大/小文字は区別されません。

topic_name

コンテンツを取得する、Kafkaクラスタのトピック名。

options

JSONドキュメント形式のプロパティのリストを含みます。オプションの詳細は、「CREATE_XXX_APPに渡されるDBMS_KAFKAのオプション」のトピックを参照してください。

view_count

(オプション)作成するOracle SQL access to Kafka (OSAK)ビュー・ペアの数を指定します。有効な値は、1からN (Nはトピック内のKafkaパーティションの数)、または0 (デフォルトでNに設定される)です。(デフォルトは1です)。

使用上のノート

各OSAKビューは、Oracle SQL access to Kafkaアプリケーションの1つのインスタンスによって排他的に使用されます。各アプリケーション・インスタンス・コールによって、ビューにKafka行が移入されます。その後、アプリケーションはOSAKビュー内のコンテンツに対して1つ以上のSQL問合せを実行できます。STREAMINGアプリケーションは、作成する必要があるOSAKビューの数を選択できるという点で、LOADまたはSEEKINGアプリケーションとは異なります。OSAKビューの数は、1からN (NはKafkaトピック内のパーティションの数)の間である必要があります。

他のタイプのOSAKアプリケーションと同様に、各アプリケーション・インスタンスは、1つの一意のOSAK一時表を排他的に問い合せます。各OSAKビューには、クラスタ名、アプリケーション名およびアプリケーション・インスタンス識別子(ID)が含まれます。複数のアプリケーション・インスタンスを作成すると、アプリケーションはスケール・アウトして、1つ以上のスレッド、プロセスまたはシステムで同時に実行されているアプリケーション・インスタンス間で、Kafkaデータを分析するワークロードを分割できます。

特定のOSAKビューおよびそれに関連付けられたOSAKグローバル一時表にバインドされるKafkaパーティションの数は、作成されるビューの数および存在するパーティションの数によって異なります。N個のOSAKビュー/一時表のペアを作成する場合は、N個のアプリケーション・インスタンスを同時に実行できるように、アプリケーション・ユーザーに、ユーザー当たり少なくともN個のセッションが割り当てられている必要があります。

4つのパーティションがあるKafkaクラスタExampleClusterExampleTopicというKafkaトピックからストリームされるExampleAppというストリーミング・アプリケーションについて一連の4つのビューを作成し、各ビューが1つのパーティションに関連付けられているとします。次の文を入力できます。

DECLARE
   v_options  VARCHAR2;
BEGIN
  v_options := '{"fmt" : "DSV", "reftable" : "user_reftable_name"}';
  SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
                                    'ExampleCluster',
                                    'ExampleApp',
                                    'ExampleTopic',
                                     v_options, 
                                     4);
END;
/

または、トピックの4つのパーティションすべてに関連付けられているアプリケーションについて1つのビューを作成するには、次の文を入力します。

DECLARE
v_options VARCHAR2;
BEGIN
              v_options := '{"fmt" : "DSV", "reftable" : "user_reftable_name"}';
 SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
                                                                   'ExampleCluster',
                                                                   'ExampleApp',
                                                                   'ExampleTopic',
                                                                    v_options,
                                                                    1);
END;
/

102.4.2 DROP_STREAMING_APP

DBMS_KAFKAのプロシージャDROP_STREAMING_APPは、ストリーミング・アプリケーションを削除します。このファンクションは、Oracle SQL access to Kafka (OSAK)ビューを削除するとともに、関連するすべてのデータベース・オブジェクトを削除します。

構文

PROCEDURE DROP_STREAMING_APP ( 
                                cluster_name     IN VARCHAR2,
                                application_name IN VARCHAR2
  );

パラメータ

表102-5 DBMS_KAFKAのDROP_STREAMING_APPプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

application_name

Kafkaクラスタを使用する既存のアプリケーションの名前。

大/小文字は区別されません。

使用上のノート

アプリケーションでの作業が終わったら、このファンクションを使用して、アプリケーションに関連付けられているOSAKビューを削除します。

ストリーミング・アプリケーションExampleAppで使用されていたKafkaクラスタExampleClusterでの作業が完了したとします。その場合、このプロシージャを使用してアプリケーションを削除できます。

EXEC SYS.DBMS_KAFKA.DROP_STREAMING_APP (
                                        ‘ExampleCluster’, ‘ExampleApp’);

102.4.3 INIT_OFFSET

DBMS_KAFKA_ADMのプロシージャINIT_OFFSETを使用すると、Kafkaデータの読取りの開始ポイントとして特定のオフセットを選択できます。 このオプションは、使用可能な最初のレコードからデータをロードするのではなく、ロードするデータの特定の開始ポイントを選択する場合に使用します。

構文

PROCEDURE INIT_OFFSET (
         view_name IN VARCHAR2,
         record_count IN INTEGER,
         water_mark IN VARCHAR2 ;

パラメータ

表102-6 DBMS_KAFKAのINIT_OFFSETプロシージャのパラメータ

パラメータ 説明

view_name

既存のOSAKビューの名前(VARCHAR2)

record_count

レコード数(INTEGER)

water_mark

水位標(VARCHAR)

目的の相対位置を示す最高または最低水位標。値は、WATER_MARK_HIGH ('WMH')またはWATER_MARK_LOW ('WM')定数に制限されています。

デフォルト: WATER_MARK_HIGH

使用上のノート

INIT_OFSETプロシージャを使用すると、STREAMINGまたはLOADアプリケーションは、有用でなくなった古いレコードのバックログを最初に読み取ることを強制されることなく、アプリケーションが作成された後の現在のレコードの読取りを開始できます。

このファンクションを使用して、Kafkaトピックのデータ・レコードの最初から読取りポイントを開始するのではなく、アプリケーションでデータを読み取る各パーティションの最高または最低水位標からのレコードの差分(デルタ)数に基づいて開始ポイントを指定します。

使用可能な最後の100個のレコードを使用してアプリケーションで処理を再開し、そのポイントから続行するとします。この結果を得るには、アプリケーションを再開する前に、またはデータ取得ループの前にアプリケーション・ロジックの一部として次のプロシージャを実行します。

SYS.DBMS_KAFKA.INIT_OFFSET (
                                          ‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’,
                                          100, SYS.DBMS_KAFKA.WATER_MARK_HIGH);

102.4.4 INIT_OFFSET_TS (Epoch以降のミリ秒)

Epoch以降のミリ秒を使用するDBMS_KAFKAのプロシージャINIT_OFFSET_TSは、タイムスタンプを使用して開始オフセットを指定します。

Epoch以降のミリ秒を使用すると、OSAKビューに属するそれぞれのKafkaパーティションについてタイムスタンプに関連する開始オフセットが初期化されます。INIT_OFFSET_TSは通常、ビューを処理する専用の新規アプリケーション・インスタンスの開始時、あるいはアプリケーション・インスタンスの停止または障害後のリカバリ時にコールされます。

.

構文

PROCEDURE INIT_OFFSET_TS (
                            view_name           IN VARCHAR2,
                            start_timestamp_ms  IN INTEGER);

パラメータ

表102-7 DBMS_KAFKAのINIT_OFFSET_TS (Epoch以降のミリ秒)プロシージャのパラメータ

パラメータ 説明

view_name

既存のKafkaクラスタの名前(VARCHAR)

大/小文字は区別されません。

start_timestamp

アプリケーションを開始するオフセットのタイムスタンプ(INTEGER)。戻される最初のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより後の最も近いタイムスタンプになります。

使用上のノート

Kafka Epoch以降のミリ秒を使用するINIT_OFFSET_TSは、OSAKビューに属するそれぞれのKafkaパーティションについてタイムスタンプに関連する開始オフセットを初期化します。一般的なユースケースでは、ビューを処理する専用の新規アプリケーション・インスタンスの開始時に、あるいはアプリケーション・インスタンスの停止または障害後にリカバリするために、Epoch開始ポイントを指定してINIT_OFFSET_TSをコールします。

このプロシージャは、Kafkaトピック・レコードの処理を相対的に最新のポイントに配置するために使用され、Kafkaパーティション内の未処理の古いレコードをスキップする可能性があります。

ノート:

オフセットの初期化から最初のフェッチまでの時間が遅延する可能性があることに注意してください。この時間のギャップの間に、レコードがKafkaの保存期間を超えるか、レコードが明示的に削除されることにより、選択したオフセットのレコードが削除される可能性があります。

入力のTIMESTAMPからEpoch時間以降のミリ秒数を計算して戻すとします。タイムゾーンが指定されていないかぎり、タイムスタンプはセッションのタイムゾーンとみなされます。Epoch時間以降のミリ秒に変換するパラメータdatetime (整数)のタイムスタンプを指定します。パラメータtimezone (整数)はオプションであり、タイムスタンプのタイムゾーンを指定します。タイムゾーンを指定しない場合、タイムゾーンはデフォルトでセッションのタイムゾーンになります。

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                               1593829800,
                               1593833400);

SYS.DBMS_KAFKA.LOAD_TEMP_TABLE (‘ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0’);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

102.4.5 INIT_OFFSET_TS (別個のタイムゾーン・パラメータ付きタイムスタンプ)

TIMESTAMP TIME ZONEを指定する文字列パラメータとともにTIMESTAMPを使用するDBMS_KAFKAのプロシージャINIT_OFFSET_TSは、データを戻す開始位置を指定しますタイムゾーン付きタイムスタンプで指定されたタイムスタンプを使用すると、Oracle SQL access to Kafka (OSAK)ビューに属するそれぞれのKafkaパーティションについて、タイムスタンプに関連するタイムゾーンにKafkaデータの処理が配置されます。INIT_OFFSET_TSは通常、ビューを処理する専用の新規アプリケーション・インスタンスの開始時、あるいはアプリケーション・インスタンスの停止または障害後のリカバリ時にコールされます。

構文

PROCEDURE INIT_OFFSET_TS (
                            view_name           IN VARCHAR2,
                            start_timestamp     IN TIMESTAMP,
                            timezone            IN VARCHAR2 DEFAULT NULL);

パラメータ

表102-8 DBMS_KAFKAのINIT_OFFSET_TS (タイムゾーン付きタイムスタンプ)プロシージャのパラメータ

パラメータ 説明

view_name

既存のKafkaクラスタの名前(VARCHAR)

大/小文字は区別されません。

start_timestamp

アプリケーションを開始するオフセットのタイムスタンプ(INTEGER)。戻される最初のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより後の最も近いタイムスタンプになります。

timezone

タイムスタンプのタイムゾーン(INTEGER)。値を指定しない場合は、デフォルトでセッションのタイムゾーンが使用されます。

使用上のノート

別個のタイムゾーン・パラメータとともにタイムスタンプを使用するINIT_OFFSET_TSは、OSAKビューに属するそれぞれのKafkaパーティションについてタイムスタンプおよびタイムゾーンに関連する開始オフセットを初期化します。一般的なユースケースでは、ビューを処理する専用の新規アプリケーション・インスタンスの開始時に、あるいはアプリケーション・インスタンスの停止または障害後にリカバリするために、別個のタイムゾーン・パラメータとともにタイムスタンプを指定してINIT_OFFSET_TSをコールします。

このプロシージャは、Kafkaトピック・レコードの処理を相対的に最新のポイントに配置するために使用され、Kafkaパーティション内の未処理の古いレコードをスキップする可能性があります。

ノート:

オフセットの初期化から最初のフェッチまでの時間が遅延する可能性があることに注意してください。この時間のギャップの間に、レコードがKafkaの保存期間を超えるか、レコードが明示的に削除されることにより、選択したオフセットのレコードが削除される可能性があります。

使用可能な最初のレコードよりも後の時間を選択して、Kafkaトピックのデータを処理するとします。たとえば、データ・センターがメンテナンスのために週末に停止しており、データ・センターが午後6時(18:00:00)に再開された後に生成された新しいKafkaデータのみを処理する場合は、午後6時より後のタイムスタンプが付いたデータ・レコードを開始します。OSAKビューでこれを実現するには、アプリケーションを再開する前に、またはデータ取得ループの前にアプリケーション・ロジックの一部として次のプロシージャを実行します。

SYS.DBMS_KAFKA.INIT_OFFSET_TS (
                                          ‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’,
                                           TO_DATE (‘2023/07/05 18:00:00’, ‘YYYY/MM/DD HH:MI:SS’))
                                           timestamp=1603507387101;

102.4.6 INIT_OFFSET_TS (タイムゾーン付きタイムスタンプ)

タイムゾーン付きタイムスタンプを使用するDBMS_KAFKAのプロシージャINIT_OFFSET_TSは、タイムスタンプを使用して開始オフセットを指定します。タイムゾーン付きで指定されたタイムスタンプを使用すると、Oracle SQL access to Kafka (OSAK)ビューに属するそれぞれのKafkaパーティションについてタイムスタンプに関連する開始オフセットが初期化されます。INIT_OFFSET_TSは通常、ビューを処理する専用の新規アプリケーション・インスタンスの開始時、あるいはアプリケーション・インスタンスの停止または障害後のリカバリ時にコールされます。

構文

PROCEDURE INIT_OFFSET_TS (
                            view_name           IN VARCHAR2,
                            start_timestamp     IN TIMESTAMP WITH TIME ZONE);

パラメータ

表102-9 DBMS_KAFKAのINIT_OFFSET_TSプロシージャのパラメータ

パラメータ 説明

view_name

既存のKafkaクラスタの名前(VARCHAR)

大/小文字は区別されません。

start_timestamp

アプリケーションを開始するオフセットのタイムスタンプ(INTEGER)。戻される最初のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより後の最も近いタイムスタンプになります。

timezone

(オプション)。タイムスタンプのタイムゾーン(INTEGER)。

指定しない場合は、デフォルトでセッションのタイムゾーンになります

使用上のノート

INIT_OFFSET_TSは、OSAKビューに属するそれぞれのKafkaパーティションについてタイムスタンプに関連する開始オフセットを初期化します。一般的なユースケースでは、ビューを処理する専用の新規アプリケーション・インスタンスの開始時に、あるいはアプリケーション・インスタンスの停止または障害後にリカバリするためにINIT_OFFSET_TSをコールします。

このプロシージャは、Kafkaトピック・レコードの処理を相対的に最新のポイントに配置するために使用され、Kafkaパーティション内の未処理の古いレコードをスキップする可能性があります。

ノート:

オフセットの初期化から最初のフェッチまでの時間が遅延する可能性があることに注意してください。この時間のギャップの間に、レコードがKafkaの保存期間を超えるか、レコードが明示的に削除されることにより、選択したオフセットのレコードが削除される可能性があります。

タイムゾーンが指定されたタイムスタンプ

使用可能な最初のレコードよりも後の時間を選択して、Kafkaトピックのデータを処理するとします。たとえば、データ・センターがメンテナンスのために週末に停止しており、データ・センターが午後6時(18:00:00)に再開された後に生成された新しいKafkaデータのみを処理する場合は、午後6時より後のタイムスタンプが付いたデータ・レコードを開始します。OSAKビューでこれを実現するには、アプリケーションを再開する前に、またはデータ取得ループの前にアプリケーション・ロジックの一部として次のプロシージャを実行します。

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                               TO_TIMESTAMP (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                               TO_TIMESTAMP (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’), 
                               ‘UTC’);

SYS.DBMS_KAFKA.LOAD_TEMP_TABLE (‘ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0’);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

タイムゾーンが指定されていないタイムスタンプ

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                               TO_TIMESTAMP_TZ (‘2020/07/04 02:30:00 -8:00’, ‘YYYY/MM/DD HH:MI:SS TZH:TZM’,
                               TO_TIMESTAMP_TZ (‘2020/07/04 03:30:00 -8:00’, ‘YYYY/MM/DD HH:MI:SS TZH:TZM’));

SYS.DBMS_KAFKA.LOAD_TEMP_TABLE (‘ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0’);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

102.4.7 UPDATE_OFFSET

DBMS_KAFKAのプロシージャUPDATE_OFFSETは、読み取られていない新しいKafkaレコードをループ内の次のパスが取得して処理するように、最後に読み取られたKafkaオフセットを更新します。 UPDATE_OFFSETは、Oracle SQL access to Kafka (OSAK)ビューに属するすべてのパーティションについてKafkaグループIDのKafkaパーティション・オフセットを透過的に進めるため、DBMS_KAFKA.LOAD_TEMP_TABLEをコールするたびに、読み取られていないKafkaレコードの新しいセットが取得されて処理されます。

構文

PROCEDURE UPDATE_OFFSET (view_name IN VARCHAR2);

パラメータ

表102-10 DBMS_KAFKAのUPDATE_OFFSETプロシージャのパラメータ

パラメータ 説明

view_name

OSAKビューの名前

大/小文字は区別されません。

使用上のノート

UPDATE_OFFSETは、前のレコードの処理が成功した場合、Kafkaオフセット読取りポイントを次のKafkaレコード・セットに進めます。また、UPDATE_OFFSETは、Oracle Databaseトランザクションを開始し(トランザクションがまだ開始されていない場合)、最後のオフセットをメタデータ表に記録します。このため、アプリケーションは、UPDATE_OFFSETへの各コールの後にトランザクションをコミットする必要があります。OSAKはオフセットをOracle Databaseトランザクション内で管理し、ACID (原子性、一貫性、独立性、永続性)がトランザクションで保持されるため、レコードが失われたり、再読取りされることはありません。トランザクションが正常に完了しなかった場合、オフセットは進められず、アプリケーションによって、前にオフになった場所が再開時に取得されます。

一般的なユースケースでは、ストリーミング・モード・アプリケーションなどで前のKafkaレコード・セットが正常に処理された後に、次のセットにオフセット読取りを進めます。次に例を示します。

BEGIN
   LOOP 
       SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                                              (ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0);
       FOR kafka_record IN (
             SELECT kafka_offset offset 
                    FROM ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0)
       LOOP
              SYS.DBMS_OUTPUT.PUT_LINE (‘Processing record: ‘ || kafka_record.offset);
              --application logic to process the Kafka records
       END LOOP;
       IF (application logic was successful) THEN
            --Update internal metadata to confirm Kafka records were successfully processed
            SYS.DBMS_KAFKA.UPDATE_OFFSET 
                                                       (‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’);
            COMMIT;
       ELSE
            --application logic to correct what failed
       END IF;
   END LOOP;
END;

OSAKビューに直接アクセスして更新を取得するプロセスの一部として使用することもできます。たとえば、Lab1というラボで一定期間にわたって平均温度を監視しているストリーミング・モード・アプリケーションがあるとします。UPDATE_OFFSETを使用して、最後のチェック以降の平均温度を取得できます。

EXEC DBMS_KAFKA.ENABLE_VIEW_QUERY(
                                                      ‘ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0’);
SELECT AVG(temperature) FROM ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0
                                                 WHERE sensor_name = ‘LAB1_SENSOR’;
EXEC DBMS_KAFKA.UPDATE_OFFSET(
                                                         ‘ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0’);
COMMIT;

102.5 DBMS_KAFKAシーク可能モード

2つの時点の間に存在するKafkaレコードを読み取るには、DBMS_KAFKA SEEKABLEモード・パッケージを使用します。

アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_SEEKABLE_APPをコールすることにより、それがロード・アプリケーションであることを宣言して、シーク可能なOracle SQL access to Kafka (OSAK)ビューと一時表を設定および作成します。

SEEK_OFFSET_TSを指定してシーク可能モード・ビューを使用することで、Kafkaデータを検索して、2つの時点の間にタイムスタンプが存在するデータのセットを見つけます。その後、LOAD_TEMP_TABLEを使用してそのデータ範囲をOSAK一時表にロードし、そのタイムスタンプ範囲内のKafkaレコードを含むOSAKグローバル一時表に対して1つ以上のアプリケーション問合せを実行できます。問合せが完了したら、DROP_SEEKABLE_APPを使用してアプリケーションを削除できます。

.

102.5.1 CREATE_SEEKABLE_APP

DBMS_KAFKAのプロシージャCREATE_SEEKABLE_APPは、1つのOracle SQL access to Kafka (OSAK)ビューおよび関連するグローバル一時表を作成します CREATE_SEEKABLE_APPを使用して、特定の時間枠の間のKafkaレコードをシークしてロードできます。このプロシージャの使用は、シーク操作を実行する1つのアプリケーション・インスタンスに制限されています。

構文

PROCEDURE CREATE_SEEKABLE_APP (
                                 cluster_name         IN  VARCHAR2,
                                 application_name     IN  VARCHAR2,
                                 topic_name           IN  VARCHAR2,
                                 options              IN  CLOB

パラメータ

パラメータ 説明

cluster_name

このアプリケーションに関連付けるトピックが含まれている登録済Oracle SQL access to Kafkaクラスタの名前。

大/小文字は区別されません。

登録済クラスタの名前は、OSAK管理者から、次の文を使用して取得できます。

SELECT cluster_name FROM sys.user_kafka_clusters;

application_name

アプリケーション名。このパラメータは、トピックを読み取ることができるKafkaグループとしても使用されます。

大/小文字は区別されません。

topic_name

コンテンツを取得する、Kafkaクラスタのトピック名。

大/小文字が区別されます。

options

JSONドキュメント形式のプロパティのリストを含みます。オプションの詳細は、「CREATE_XXX_APPに渡されるDBMS_KAFKAのオプション」のトピックを参照してください。

例102-2 Oracle SQL Access to KafkaのCREATE_SEEKABLE_APPプロシージャ

DECLARE
   v_options  VARCHAR2;
BEGIN
  v_options := ‘{“fmt” : “DSV”, “reftable” : “user_reftable_name”}’;
  SYS.DBMS_KAFKA.CREATE_SEEKABLE_APP (
                                    ‘ExampleCluster’,
                                    ‘ExampleApp’,
                                    ‘ExampleTopic’,
END;
/


102.5.2 DROP_SEEKABLE_APP

DBMS_KAFKAのプロシージャDROP_SEEKABLE_APPは、シーク・アプリケーションを削除します。 このファンクションは、Oracle SQL access to Kafka (OSAK)ビューおよびそれに関連するメタデータを削除します。

構文

PROCEDURE DROP_SEEKABLE_APP ( 
                               cluster_name     IN VARCHAR2,
                               application_name IN VARCHAR2
  );

パラメータ

表102-11 DBMS_KAFKAのDROP_SEEKABLE_APPプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

application_name

Kafkaクラスタを使用する既存のアプリケーションの名前。

大/小文字は区別されません。

使用上のノート

アプリケーションでの作業が終わったら、このファンクションを使用して、アプリケーションに関連付けられているOSAKビューを削除します。

シーク可能アプリケーションExampleAppで使用されていたKafkaクラスタExampleClusterでの作業が完了したとします。その場合、このプロシージャを使用してアプリケーションを削除できます。

EXEC SYS.DBMS_KAFKA.DROP_SEEKABLE_APP (
                                        ‘ExampleCluster’, ‘ExampleApp’);

102.5.3 SEEK_OFFSET_TS (別個のタイムゾーン・パラメータ付きタイムスタンプ)

タイムゾーン・パラメータとともにタイムスタンプを使用するDBMS_KAFKAのプロシージャSEEK_OFFSET_TSは、指定した2つのKafka Epochタイムスタンプ間でKafkaレコードの読取りを開始するようにOracle SQL access to Kafka (OSAK)ビューを配置します。このプロシージャを使用して、Epoch時間(ミリ秒)で指定される、シークするKafkaトピック内のレコードの時間枠を指定します。

構文

PROCEDURE SEEK_OFFSET_TS(
                view_name           IN VARCHAR2, 
                start_timestamp_ms  IN INTEGER,
                end_timestamp_ms    IN INTEGER,
                timezone  IN INTEGER);
 

パラメータ

表102-12 DBMS_KAFKAのSEEK_OFFSET_TS (別個のタイムゾーン・パラメータ付きタイムスタンプ)プロシージャのパラメータ

パラメータ 説明

view_name

既存のKafkaクラスタの名前(VARCHAR)

大/小文字は区別されません。

start_timestamp

アプリケーションを開始するオフセットのタイムスタンプ(INTEGER)。戻される最後のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより後の最も近いタイムスタンプになります。

end_timestamp

アプリケーションを終了するオフセットのタイムスタンプ(INTEGER)。戻される最後のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより前の最も近いタイムスタンプになります。戻されるレコードの範囲に開始タイムスタンプは含まれますが、終了タイムスタンプは含まれません。

たとえば、2:00PMから3:00PMの範囲では、2:00:00.000000 PMから2:59:59.999999PMの範囲のタイムスタンプを持つレコードが戻されます。この範囲を使用すると、アプリケーションで2:00から3:00までを検索した後、3:00から4:00までを検索でき、これらのセット間に重複するレコードが含まれることはありません。

timezone

両方のタイムスタンプ引数のタイムゾーン(INTEGER)。値を指定しない場合は、デフォルトでセッションのタイムゾーンが使用されます。

使用上のノート

SEEK_OFFSET_TSは、Kafka Epoch時間(ミリ秒)で指定された、選択した特定の時間枠内に存在するKafkaトピック・レコードをシークします。戻される最後のレコードは、最後にパブリッシュされたレコードです。

SEEK_OFFSET_TSの目的は、指定したEpochタイムスタンプによって定義される特定の時間枠内でKafkaレコードの読取りを開始するようにOSAKビューを配置することです。レコードのウィンドウがKafkaトピックの実際のレコードの範囲を超える場合、このプロシージャは、存在するすべてのレコードを戻します。

次に例を示します。

  • タイムスタンプがどちらも最低水位標を下回るか、どちらも最高水位標を超える場合、レコードは戻されません。
  • 開始タイムスタンプが最低水位標を下回る場合、戻される最初のレコードは最低水位標になります。
  • 終了タイムスタンプが最高水位標を超える場合、戻される最後のレコードは、最高水位標(HWM)から1を差し引いた差分(HWM - 1)になります。たとえば、新しいトピックに100個のレコードがある場合、オフセット範囲は0から99までで、HWMは100に設定されます。

ノート:

Kafka Epoch時間枠を取得の仕様として使用するSEEKABLEアプリケーションによって取得されるデータには、指定したタイムスタンプ・ウィンドウ内にタイムスタンプがある場合でも、OSAKグローバル一時表のロード後に配信された外れ値レコードを含めることはできません。

過去に発生した問題を調査するとします。データがまだKafkaストリームに存在する場合は、DBMS_KAFKA.CREATE_SEEKABLE_APPをコールしてシーク可能アプリケーションを作成できます。その後、SEEK_OFFSET_TSプロシージャをコールして、OSAKビューに一連のデータ・レコードを取得するよう要求できます。たとえば、午前3時頃に本番環境の問題が発生したことがITコンサルタントに知らされた場合、コンサルタントは次のプロシージャを使用して一時表をロードし、その時間を中心とする1時間分のデータを取得することを選択できます。

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
                                          ‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                                          TO_DATE (‘2023/04/02 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                                          TO_DATE (‘2023/04/02 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                                              (ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

別のユースケースとして、Kafkaストリームへの順次アクセス権を持つアプリケーションが異常の可能性を検出しており、異常表に行を挿入するように構成されていたとします。異常表には、Kafkaタイムスタンプと、記録するように構成された他のデータが含まれます。その場合、別のアプリケーションがその後、この情報を使用して、疑わしいレコード周辺のレコードを取得し、他の問題があるかどうかを確認できます。この目標を達成するには、次のプロシージャを実行し、一時表をロードした後、アプリケーション・ロジックを選択して結果に適用します。

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
                                           ‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                                          TO_DATE (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                                          TO_DATE (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));

SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                                              (ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
--application logic

102.5.4 SEEK_OFFSET_TS (ミリ秒単位のタイムスタンプ)

DBMS_KAFKAのプロシージャSEEK_OFFSET_TS (ミリ秒単位のタイムスタンプ)は、ミリ秒単位のEpoch時間で指定された時間ウィンドウの間に存在するトピック内のKafkaレコードをシークします。

構文

PROCEDURE SEEK_OFFSET_TS(
                view_name           IN VARCHAR2, 
                start_timestamp_ms  IN INTEGER,
                end_timestamp_ms    IN INTEGER);

パラメータ

表102-13 DBMS_KAFKAのSEEK_OFFSET_TS (ミリ秒単位のタイムスタンプ)プロシージャのパラメータ

パラメータ 説明

view_name

既存のOracle SQL access to Kafka (OSAK)ビューの名前。

大/小文字は区別されません。

start_timestamp_ms

最初のレコードをシークするタイムスタンプ(ミリ秒)

end_timestamp_ms

最後のレコードをシークするタイムスタンプ(ミリ秒)

使用上のノート

ミリ秒単位のタイムスタンプ範囲をシークするSEEK_OFFSET_TSは、ミリ秒単位のKafka Epoch時間で指定された、選択した開始タイムスタンプと終了タイムスタンプのウィンドウ内に存在するKafkaトピック・レコードをシークします。戻される最後のレコードは、最後にパブリッシュされたレコードです。

ミリ秒を使用してウィンドウを指定するSEEK_OFFSET_TSの目的は、指定したEpochタイムスタンプによって定義される特定の時間枠内でKafkaレコードの読取りを開始するようにOSAKビューを配置することです。レコードのウィンドウがKafkaトピックの実際のレコードの範囲を超える場合、このプロシージャは、存在するすべてのレコードを戻します。

次に例を示します。

  • タイムスタンプがどちらも最低水位標を下回るか、どちらも最高水位標を超える場合、レコードは戻されません。
  • 開始タイムスタンプが最低水位標を下回る場合、戻される最初のレコードは最低水位標になります。
  • 終了タイムスタンプが最高水位標を超える場合、戻される最後のレコードは、最高水位標(HWM)から1を差し引いた差分(HWM - 1)になります。たとえば、新しいトピックに100個のレコードがある場合、オフセット範囲は0から99までで、HWMは100に設定されます。

ノート:

Kafkaレコードのタイムスタンプは、Kafkaによって(つまり、トランザクション時間によって)割り当てられるか、アプリケーション(つまり、有効または決定時間)によって割り当てられます。SEEKABLEアプリケーションによって取得されるデータには、指定したタイムスタンプ・ウィンドウ内にタイムスタンプがある、OSAKグローバル一時表のロード後に配信された外れ値レコードは含まれません。

過去に発生した問題を調査するとします。データがまだKafkaストリームに存在する場合は、DBMS_KAFKA.CREATE_SEEKABLE_APPをコールしてシーク可能アプリケーションを作成できます。その後、SEEK_OFFSET_TSプロシージャをコールして、OSAKビューに一連のデータ・レコードを取得するよう要求できます。たとえば、午前3時頃に本番環境の問題が発生したことがITコンサルタントに知らされた場合、コンサルタントは次のプロシージャを使用して一時表をロードし、その時間を中心とする1時間分のデータを取得することを選択できます。

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
                                          ‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                                          TO_DATE (‘2023/04/02 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                                          TO_DATE (‘2023/04/02 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                                              (ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

別のユースケースとして、Kafkaストリームへの順次アクセス権を持つアプリケーションが異常の可能性を検出しており、異常表に行を挿入するように構成されていたとします。異常表には、Kafkaタイムスタンプと、記録するように構成された他のデータが含まれます。その場合、別のアプリケーションがその後、この情報を使用して、疑わしいレコード周辺のレコードを取得し、他の問題があるかどうかを確認できます。この目標を達成するには、次のプロシージャを実行し、一時表をロードした後、アプリケーション・ロジックを選択して結果に適用します。

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
                                           ‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                                          TO_DATE (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                                          TO_DATE (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));

SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                                              (ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
--application logic

102.5.5 SEEK_OFFSET_TS (タイムゾーン付きタイムスタンプ)

DBMS_KAFKAのプロシージャSEEK_OFFSET_TSは、TIMESTAMP WITH TIME ZONEで指定された時間ウィンドウの間に存在するトピック内のKafkaレコードをシークします。このプロシージャを使用して、指定した2つのTIMESTAMP WITH TIME ZONEタイムスタンプの間でKafkaレコードの読取りを開始するようにOracle SQL access to Kafka (OSAK)ビューを配置します。

構文

PROCEDURE SEEK_OFFSET_TS(
                view_name           IN VARCHAR2, 
                start_timestamp_ms  IN INTEGER,
                end_timestamp_ms    IN INTEGER);

パラメータ

表102-14 DBMS_KAFKAのSEEK_OFFSET_TSプロシージャのパラメータ

パラメータ 説明

view_name

既存のKafkaビューの名前(VARCHAR)

大/小文字は区別されません。

start_timestamp

最初のレコードをシークするタイムスタンプ(INTEGER)

end_timestamp

最後のレコードをシークするタイムスタンプ(INTEGER)。

使用上のノート

開始および終了タイムスタンプによって定義されるSEEK_OFFSET_TSの目的は、指定したEpochタイムスタンプによって定義される特定の時間枠内でKafkaレコードの読取りを開始するようにOSAKビューを配置することです。レコードのウィンドウがKafkaトピックの実際のレコードの範囲を超える場合、このプロシージャは、存在するすべてのレコードを戻します。

次に例を示します。

  • タイムスタンプがどちらも最低水位標を下回るか、どちらも最高水位標を超える場合、レコードは戻されません。
  • 開始タイムスタンプが最低水位標を下回る場合、戻される最初のレコードは最低水位標になります。
  • 終了タイムスタンプが最高水位標を超える場合、戻される最後のレコードは、最高水位標(HWM)から1を差し引いた差分(HWM - 1)になります。たとえば、新しいトピックに100個のレコードがある場合、オフセット範囲は0から99までで、HWMは100に設定されます。

ノート:

Kafkaレコードのタイムスタンプは、Kafka (つまり、トランザクション時間)によって割り当てられるか、アプリケーション(つまり、有効または決定時間)によって割り当てられます。Kafka Epoch時間枠を取得の仕様として使用するSEEKABLEアプリケーションによって取得されるデータには、指定したタイムスタンプ・ウィンドウ内にタイムスタンプがある場合でも、OSAKグローバル一時表のロード後に配信された外れ値レコードを含めることはできません。

過去に発生した問題を調査するとします。データがまだKafkaストリームに存在する場合は、DBMS_KAFKA.CREATE_SEEKABLE_APPをコールしてシーク可能アプリケーションを作成できます。その後、SEEK_OFFSET_TSプロシージャをコールして、OSAKビューに一連のデータ・レコードを取得するよう要求できます。たとえば、午前3時頃に本番環境の問題が発生したことがITコンサルタントに知らされた場合、コンサルタントは次のプロシージャを使用して一時表をロードし、その時間を中心とする1時間分のデータを取得することを選択できます。

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
                                          ‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                                          TO_DATE (‘2023/04/02 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                                          TO_DATE (‘2023/04/02 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                                              (ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

別のユースケースとして、Kafkaストリームへの順次アクセス権を持つアプリケーションが異常の可能性を検出しており、異常表に行を挿入するように構成されていたとします。異常表には、Kafkaタイムスタンプと、記録するように構成された他のデータが含まれます。その場合、別のアプリケーションがその後、この情報を使用して、疑わしいレコード周辺のレコードを取得し、他の問題があるかどうかを確認できます。この目標を達成するには、次のプロシージャを実行し、一時表をロードした後、アプリケーション・ロジックを選択して結果に適用します。

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
                                           ‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                                          TO_DATE (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                                          TO_DATE (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));

SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                                              (ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
--application logic

102.6 ADD_PARTITIONS

DBMS_KAFKA_ADMのプロシージャADD_PARTITIONSは、既存のOracle SQL access to Kafka (OSAK)ビューのセットにKafkaパーティションを追加します。

構文

PROCEDURE ADD_PARTITIONS (
                       cluster_name         IN  VARCHAR2,
                       application_name     IN  VARCHAR2

パラメータ

表102-15 DBMS_KAFKAのADD_PARTITIONSプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

application_name

Kafkaクラスタに関連付けられている既存のアプリケーションの名前。

大/小文字は区別されません。

使用上のノート

ADD_PARTITIONSのセマンティクスはCREATE_xxx_APPのコールに似ていますが、既存のKafkaトピック・パーティションに関する状態情報(committed_offsetなど)を保持し、新しいパーティションを既存のビューまたは新しいOSAKビューにバインドする点が異なります。

注意:

ADD_PARTITIONSをコールする前に、このアプリケーションのすべてのアプリケーション・インスタンスを停止する必要があります。ADD_PARTITIONSが正常にコールされた後、アプリケーション・インスタンスを再起動できます。アプリケーションが、パーティションごとに1つのビュー・インスタンスを作成するように構成されたストリーミング・アプリケーションである場合は、新しいビューが作成されます。これらのビューを使用するには、それぞれが、作成された新しいビューの1つを排他的に処理する専用のものである、追加のアプリケーション・インスタンスを起動する必要があります。

パーティションがトピックに追加されていない場合、このプロシージャは、既存のビューを変更せずに正常に終了します。

OSAKビューが作成され、パーティションが割り当てられると、ビューによってパーティションのリストが管理されます。たとえば、Kafkaトピックのパーティション0、1、2、3を読み取るためにOSAKビューが以前に割り当てられているとします。ADD_PARTITIONSによって後でこのビューに新しいパーティション(例: パーティション16)が追加された場合、OSAKビューは、パーティション0、1、2、3および16からKafkaレコードをフェッチするように構成されます。

このプロシージャでは、トランザクション処理であるDDLとDMLの両方が実行されます。このコールは、既存のトランザクション・コンテキストの外部でのみ実行する必要があります。

BEGIN
  SYS.DBMS_KAFKA.ADD_PARTITIONS (‘ExampleCluster’, 
                                                    ‘ExampleApp’);
END;
/

102.7 DROP_ALL_APPS

DBMS_KAFKAのプロシージャDROP_ALL_APPSは、Kafkaクラスタのすべてのアプリケーションを削除します。このプロシージャを使用して、構成されたセキュリティ情報で接続を確立できることを確認します。このファンクションは、クラスタの状態を戻します。

構文

PROCEDURE DROP_ALL_APPS (
                            \cluster_name    IN VARCHAR2
);

パラメータ

表102-16 DBMS_KAFKAのDROP_ALL_APPSプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

使用上のノート

DROP_ALL_APPSプロシージャは、トランザクション処理であるデータ定義言語(DDL)とデータ操作言語(DML)の両方の変更を実行します。このコールは、既存のトランザクション・コンテキストの外部でのみ実行してください。

ノート:

これは自律型トランザクションです。

1つ以上のOSAKアプリケーションがKafkaクラスタExampleClusterに存在しなくなったとします。その場合、次の文を使用して、そのクラスタ内のすべてのアプリケーションを削除できます。

EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS (‘ExampleCluster’);

102.8 ENABLE_VIEW_QUERY

DBMS_KAFKAのプロシージャENABLE_VIEW_QUERYは、現在のOracle Databaseセッション内でコンテキストを設定し、アプリケーションがビューを問い合せることができるようにします。 このプロシージャを使用して、アプリケーションがOracle SQL access to Kafka (OSAK)ビューを直接問い合せることができるようにします。

構文

PROCEDURE ENABLE_VIEW_QUERY(view_name IN VARCHAR2);

パラメータ

表102-17 DBMS_KAFKAのENABLE_VIEW_QUERYプロシージャのパラメータ

パラメータ 説明

view_name

既存のKafkaビューの名前

大/小文字は区別されません。

使用上のノート

1つのトランザクション内で行うOSAKビューに対する問合せは1つのみにしてください。OSAKビューの複数の問合せでは、繰返し可能な読取り動作を保証できません。Kafkaはストリーミング・サービスであるため、SELECT問合せの間に新しいレコードがKafkaトピックにパブリッシュされる可能性があります。新しいレコードがパブリッシュされると、アプリケーションによって追加の行が認識される場合があります。

注意:

これは高度なプロシージャです。このコールは、Oracle SQL access to Kafka (OSAK)処理モデルを深く理解し、Oracleアクセス計画のデバッグに関する知識がある開発者のみが使用することをお薦めします。これには、OSAKビューの問合せ後にOSAKによってKafkaオフセットがどのように進められるのかについての理解も含まれます。

通常は、LOAD_TEMP_TABLEまたはEXECUTE_LOAD_TABLEを使用することをお薦めします。これらのプロシージャはKafkaビューを透過的に問い合せて、グローバル一時表に、またはSQLを使用してさらに分析するためにOracle Database表にそれらを1回ロードします。

ユーザーがOSAKビューを直接問い合せることができるようにする方法は、次の2つのケースでのみ使用してください:

  1. 問合せがFROM句のOSAKビューを参照してKafkaストリームをスキャンするのみである場合。たとえば、OSAKビューに直接アクセスし、最後のチェック以降のラボ1のセンサーの平均温度を取得するとします。次の問合せを使用できます。

    EXEC DBMS_KAFKA.ENABLE_VIEW_QUERY(
    
    ‘ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0’);
    SELECT AVG(temperature) 
    FROM ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0WHERE sensor_name = ‘LAB1_SENSOR’;
    EXEC DBMS_KAFKA.UPDATE_OFFSET(
                                   ‘ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0’);
    COMMIT;
    
  2. 問合せによってOSAKビューとOracle Database表の間に単純な表結合が作成され、問合せでORDEREDヒントを使用することにより、OSAKビューが強制的に結合の外部表になる場合。このケースでは、Kafkaデータが確実に1回のみ取得されるようになります。次に例を示します。

    SELECT /*+ ORDERED *\/ COUNT(*) 
    FROM thermostat_spec s, ora$dkv_thermostat_0 t  
    WHERE s.device_type = t.device_type 
    AND  t.current_setting >
     t.temperature_setting + 
    s.device_max_variation;

102.9 SET_TRACING

DBMS_KAFKAのプロシージャSET_TRACINGは、Oracle SQL access to Kafka (OSAK)アプリケーションに関連付けられた外部表ドライバ・コードのデバッグ・レベル・トレースを有効または無効にします。このファンクションは、セッションのトレース・ファイルへのロギング出力を生成します。

構文

PROCEDURE SET_TRACING(
                        cluster_name        IN VARCHAR2,
                        application_name IN VARCHAR2,
                        enable                   IN BOOLEAN);

パラメータ

表102-18 DBMS_KAFKAのDROP_SEEKABLE_APPプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

application_name

Kafkaクラスタを使用する既存のアプリケーションの名前。

大/小文字は区別されません。

enable

[true |false]

デフォルト: false

デバッグ出力を有効にするにはtrueを設定し、デバッグ出力を無効にするにはfalseを設定します

使用上のノート

セッションのトレース・ファイルへのデバッグ・ロギング出力を生成するために使用します。デバッグ出力を有効にするにはtrueに設定し、デバッグ出力を無効にするにはfalseに設定します。

ノート:

次のイベントがデータベースについてすでに有効になっている必要があります。

event="39431 trace name context forever, level 1" # Enable external table debug tracing 

アプリケーションExampleAppで使用されていたKafkaクラスタExampleClusterでトレースを開始するとします。その場合、このプロシージャを使用できます。

EXEC SYS.DBMS_KAFKA.SET_TRACING('KAFKACLUS1', 'KAFKACLUS1', True);

102.10 CREATE_xxx_APPに渡されるDBMS_KAFKAのオプション

DBMS_KAFKA.CREATE_LOAD_APPCREATE_STREAMING_APPCREATE_SEEKABLE_APPパッケージのオプション・パラメータは、JSONドキュメントのパッケージに提供されます
DBMS_KAFKA のオプションを各DBMS_KAFKAパッケージで使用して、Oracle SQL access to Kafka (OSAK)アプリケーションを作成および定義できます。

ノート:

バッファ内の最大Kafkaメタデータ・チャンク・サイズとして、54バイトが予約されています。たとえば、1Mのbufsizeは1Mから54バイトのレコードを保持できます。bufsizeのデフォルトは1000 KBで、最大10,000 KBです。そのため、サポートされる最大のKafkaレコード・サイズは10000 kb – 54です。

表102-19 DBMS_KAFKAのオプション

オプション名 説明

avrodecimaltype

バイト配列に格納されている10進数の表現を指定します。

有効な値: intintegerstrstring

デフォルト: このパラメータを使用しない場合、Avroの10進数列は、バイト配列に格納されているのはAvro仕様で定義されているとおりの値の数値表現(デフォルトはint)であるという想定で読み取られます。

fmtオプションがAVROとして指定されている場合にのみ許可されます。

関連するアクセス・パラメータ: com.oracle.bigdata.avro.decimaltpe

avroschema

Kafka値ペイロードのスキーマを表すJSONドキュメント。スキーマではAvroレコード・タイプを定義する必要があります。

デフォルト値はありません。fmtオプションがAVROとして指定されている場合は、値を指定する必要があります

blankasnull

trueに設定すると、空白で構成されるフィールドがNULLとしてロードされます。

値: [true | false]

デフォルト: false

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.blankasnull

bufsize

大きいレコード読取りのバッファ・サイズをKB単位で設定します。デフォルトのバッファ・サイズよりも大きいレコードを読み取る必要がある場合にこの値を設定します。

デフォルト(KB): 1000

関連するアクセス・パラメータ: com.oracle.bigdata.buffersize

conversionerrs

行でデータ型の変換エラーが発生した場合、関連する列をnullとして格納するか、その行を拒否します。

値: [reject_record | store_null]

デフォルト: store_null

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.conversionerrors

datefmt

ソース・ファイル内の日付書式を指定します。値autoでは、次の書式がチェックされます。

J、MM-DD-YYYYBC、MM-DD-YYYY、YYYYMMDD HHMISS、YYMMDD HHMISS、YYYY.DDD、YYYY-MM-DD

デフォルト: yyyy-mm-dd hh24:mi:ss

次の場合にのみ許可されます

fmtオプションがDSVとして指定されている

関連するアクセス・パラメータ: com.oracle.bigdata.dateformat

escapedby

埋込みフィールドの終了記号またはフィールド値の行の終了文字をエスケープするために使用する文字を指定します。文字値は一重引用符で囲む必要があります。例: '\'

パラメータを指定しない場合、値はありません。

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.csv.rowformat.fields.escapedby

fmt

Kafkaレコードの値ペイロードの書式。

値: [DSV | JSON | AVRO]

デフォルト値はありません。渡されるオプションで書式を指定する必要があります。

関連するアクセス・パラメータ: com.oracle.bigdata.kafka.format

jsondt

JSONをvarchar2 (最大サイズ)またはclobに格納して、大規模なJSONレコードをサポートします。

値: [varchar2 | clob]

デフォルト: varchar2

fmtオプションがJSONとして指定されている場合にのみ許可されます。

nulldefinedas

フィールドの値を示すために使用する文字がNULLであることを指定します。パラメータを指定しない場合、値はありません。

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.csv.rowformat.nulldefinedas

quote

フィールドの引用符文字を指定します。指定すると、引用符文字として定義されている文字がロード中に削除されます。

有効な値: 文字

デフォルト: Null (引用符文字なしを意味します)

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.quote

rejectlmt

指定した行数(number、整数)が拒否されると、操作はエラーになります。この操作は、変換エラーでレコードが拒否される場合にのみ適用されます。変換エラーが存在する場合でもすべてのレコードを処理できるようにするには、値'unlimited'を渡します。

有効な値: [number | 'unlimited']

デフォルト: 0 (変換エラーが許可されないことを意味します)。

removequotes

ソース・ファイル内のフィールドを囲む引用符を削除します。

値: [true | false]

デフォルト: false

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.removequotes

separator

フィールド値を区切るために使用する文字を指定します。文字値は一重引用符で囲む必要があります。例: '|'

デフォルト: ','

関連するアクセス・パラメータ: com.oracle.bigdata.csv.rowformat.fields.terminator

terminator

レコード値を区切るために使用する文字を指定します。文字値は一重引用符で囲む必要があります。例: '|'

デフォルト: '\n'

関連するアクセス・パラメータ: com.oracle.bigdata.csv.rowformat.lines.terminator

trimspaces

フィールドの先頭と末尾の空白を切り捨てる方法を指定します。

有効な値: rtrimltrimnotrimltrimldrtrim

デフォルト: notrim

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.trimspaces

truncatecol

ファイルのデータがフィールドに対して長すぎる場合、このオプションでは、行を拒否したりフィールドをNULLに設定したりするかわりに、フィールドの値を切り捨てます。

値: [true | false]

デフォルト: false

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.truncatecol

tsfmt

ソース・ファイル内のタイムスタンプ書式を指定します。値autoでは、次の書式がチェックされます。

YYYY-MM-DD HH:MI:SS.FF、YYYY-MM-DD HH:MI:SS.FF3、MM/DD/YYYY HH:MI:SS.FF3

デフォルト: yyyy-mm-dd hh24:mi:ss.ff

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.timestampformat

tslzfmt

ソース・ファイル内のローカル・タイムゾーン付きタイムスタンプ書式を指定します。値autoでは、次の書式がチェックされます。

DD Mon YYYY HH:MI:SS.FF TZR、MM/DD/YYYY HH:MI:SS.FF TZR、YYYY-MM-DD HH:MI:SS+/-TZR、YYYY-MM-DD HH:MI:SS.FF3、DD.MM.YYYY HH:MI:SS TZR

デフォルト: yyyy-mm-dd hh24:mi:ss.ff

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.timestampltzformat

tstzfmt

ソース・ファイル内のタイムゾーン付きタイムスタンプ書式を指定します。値autoでは、次の書式がチェックされます。

DD Mon YYYY HH:MI:SS.FF TZR、MM/DD/YYYY HH:MI:SS.FF TZR、YYYY-MM-DD HH:MI:SS+/-TZR、YYYY-MM-DD HH:MI:SS.FF3、DD.MM.YYYY HH:MI:SS TZR

デフォルト: yyyy-mm-dd hh24:mi:ss.ff

fmtオプションがDSVとして指定されている場合にのみ許可されます

関連するアクセス・パラメータ: com.oracle.bigdata.timestamptzformat

例102-3 AVROスキーマ・レコード・タイプ

AVROデータ・スキーマの例を次に示します。この場合、定義されているAVROデータのタイプはレコードであり、データは、一定期間にわたって監視された温度モニターのセンサー値のレコードです。

{
  "type" : "record",
  "name" : "sensor_value",
  "namespace" : "example.sensor",
  "fields" : [ {
    "name" : "EventTime",
    "type" : "long",
    "logicalType" : "timestamp-millis"
  }, {
    "name" : "IotDeviceType",
    "type" : "int"
  }, {
    "name" : "IotDeviceUnitId",
    "type" : "int"
  }, {
    "name" : "TempSetting",
    "type" : "double"
  }, {
    "name" : "TempReading",
    "type" : "double"
  } ]
}