102 DBMS_KAFKA
DBMS_KAFKA
パッケージは、Kafkaクラスタ内のトピックへのOracle SQLアクセスを有効にするためのPL/SQLインタフェースを提供します。
Oracle SQL access to Kafka (OSAK)クラスタに対するREAD
アクセス権を付与されたユーザーは、DBMS_KAFKA
パッケージを使用して、Oracle Databaseのビューおよび表からKafkaデータを問い合せるアプリケーションを作成できます。
102.1 DBMS_KAFKAの概要
DBMS_KAFKA
パッケージでは、Oracle SQL access to Kafka (OSAK)を使用してKafkaデータにアクセスして処理できます。
アプリケーションでKafkaデータを使用できるようにするには、DBMS_KAFKA
パッケージを使用します。Kafkaリソースを使用する前に、Kafkaの管理者がKafkaクラスタを登録してアクセスを有効にしておく必要があります。
Kafkaトピックにアクセスするには、Oracle SQL Access to Kafka (OSAK)アプリケーションを作成します。まず、使用するモードを決定する必要があります。
- ロード・モード: KafkaトピックのデータをOracle Database表にロードするために使用します。
- ストリーミング・モード: Kafkaトピックを順番に読み取るために使用します。
- シーク可能モード: 開始タイムスタンプと終了タイムスタンプの間のKafkaトピックにランダムにアクセスするために使用します。
次に、適切なOSAKパッケージを使用してアプリケーションを作成します。
CREATE_LOAD_APP
: ロード・モードで使用できるアプリケーションを作成します。CREATE_STREAMING_APP
: ストリーミング・モードで使用できるアプリケーションを作成します。CREATE_SEEKABLE_APP
: シーク可能モードで使用できるアプリケーションを作成します。
他のDBMS_KAFKAパッケージでは、Kafkaデータを管理できます。
各パッケージの手順の概要を次に示します
ロード・アプリケーションへのデータのロード
DBMS_KAFKA.CREATE_LOAD_APP
を使用して、Oracle SQL Access to Kafkaロード・アプリケーションを作成します- オプションで、
DBMS_KAFKA_INIT_OFFSET_TS
またはDBMS_KAFKA_INIT_OFFSET
を使用して、読み取る最初のKafkaレコードを設定します。 - 完了するまで
LOOP
を実行します。DBMS_KAFKA.EXECUTE_LOAD_APP
を使用して、前の読取りが中断した場所から現在の最高水位標までのKafkaデータをロードします。
- Kafkaデータの操作が終了したら、
DBMS_KAFKA.DROP_LOAD_APP
を使用してロード・アプリケーションを削除します。
ストリーミング・モード・アプリケーションへのデータのロード
ストリーミング・モードでKafkaデータを問い合せて、Kafkaトピックを順番に読み取る手順は、次のとおりです。
DBMS_KAFKA.CREATE_STREAMING_APP
を使用して、Oracle SQL Access to Kafkaストリーミング・アプリケーションを作成します。- オプションで、
DBMS_KAFKA_INIT_OFFSET_TS
またはDBMS_KAFKA_INIT_OFFSET
を使用して、読み取る最初のKafkaレコードを設定します。 - SQLでデータに対して
LOOP
を実行します。DBMS_KAFKA.LOAD_TEMP_TABLE
をコールして、Kafkaの次の行セットを含むグローバル一時表をロードします- OSAKグローバル一時表からの
SELECT
を使用します。 - 取得されたデータを処理します
- 処理が成功した場合、
DBMS_KAFKA.UPDATE_OFFSET
を使用してKafkaレコードの次のセットに進みます。 COMMIT
を使用して、オフセット・トラッキング情報をコミットします。
- アプリケーションでの作業が終了したら、
DBMS_KAFKA.DROP_STREAMING_APP
を使用してアプリケーションを削除します。
シーク可能モード・アプリケーションへのデータのロード
2つのタイムスタンプ間のKafkaレコードにアクセスできるように、シーク可能モードでKafkaデータを問い合せる手順の概要は、次のとおりです。
DBMS_KAFKA.CREATE_SEEKABLE_APP
を使用して、Oracle SQL Access to Kafkaシーク可能アプリケーションを作成します- SQLでKafkaデータに対して
LOOP
を実行します。DBMS_KAFKA.SEEK_OFFSET_TS
を使用して、Kafkaトピック内の、定義した時間ウィンドウをシークします。DBMS_KAFKA.LOAD_TEMP_TABLE
をコールして、分析するKafkaの行セットを含むグローバル一時表をロードします。- OSAKグローバル一時表からの
SELECT
を使用します。 - データを処理します。
- アプリケーションでの作業が終わったら、
DBMS_KAFKA.DROP_SEEKABLE_APP
を使用してアプリケーションを削除します。
102.2 DBMS_KAFKAロード・モード
KafkaデータをOracle Databaseに増分的にロードするには、DBMS_KAFKA
LOADING
モード・パッケージを使用します。
ロード・プロシージャを使用すると、使用可能なKafkaレコードをOracle Databaseにロードでき、これをその後、そのデータのデータ・ウェアハウスとして使用できます。その後、分析のためにそのデータをOracle Database表と結合できます。
アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_LOAD_APP
をコールすることにより、それがロード・アプリケーションであることを宣言して、DBMS_KAFKA.EXECUTE_LOAD_APP
への後続のコールについて状態を初期化します。DBMS_KAFKA.CREATE_LOAD_APP
は、トピックのすべてのパーティションを網羅する単一のビューを作成します。アプリケーションは、オプションでDBMS_KAFKA.INIT_OFFSET[_TS]
プロシージャをコールして、Kafkaトピック・パーティションの開始ポイントを設定することもできます。
DBMS_KAFKA.EXECUTE_LOAD_APP
プロシージャがアプリケーション・ループでコールされると、前のコールが中断した場所からKafkaトピックの現在の最高水位標までのデータがロードされます。このプロシージャは自律型トランザクションで実行されます。
Kafkaデータの操作が終了したら、DBMS_KAFKA.DROP_LOAD_APP
を使用してアプリケーションを削除できます。
102.2.1 CREATE_LOAD_APP
LOAD
アプリケーションについてDBMS_KAFKA.EXECUTE_LOAD_APP
のコールを許可されるという点で限定的です。
パラメータ
パラメータ | 説明 |
---|---|
|
このアプリケーションに関連付けるトピックが含まれている登録済Oracle SQL access to Kafkaクラスタの名前。 大/小文字は区別されません。 登録済クラスタの名前は、OSAK管理者から、次の文を使用して取得できます。
|
|
アプリケーション名。このパラメータは、トピックを読み取ることができるKafkaグループとしても使用されます。 大/小文字は区別されません。 |
|
コンテンツを取得する、Kafkaクラスタのトピック名。 大/小文字が区別されます。 |
|
JSONドキュメント形式のプロパティのリストを含みます。オプションの詳細は、「CREATE_XXX_APPに渡されるDBMS_KAFKAのオプション」のトピックを参照してください。 |
例
例102-1 Oracle SQL Access to KafkaのCREATE_LOAD_APPプロシージャ
次の例では、Kafkaトピックmy-company-app-event1
およびオプションProducerRecord
を使用して、KafkaクラスタExampleCluster
のデータについてExampleApp
というロード・アプリケーションが作成されます。
PROCEDURE CREATE_LOAD_APP (
ExampleCluster IN VARCHAR2,
ExampleApp IN VARCHAR2,
my-company-app-event1 IN VARCHAR2,
ProducerRecord IN CLOB
102.2.2 DROP_LOAD_APP
DBMS_KAFKA_ADM
のプロシージャDROP_LOAD_APP
は、Oracle SQL for Kafka (OSAK) LOADアプリケーションを削除するとともに、関連するメタデータを削除します。
構文
EXEC DROP_LOAD_APP (
cluster_name IN VARCHAR2,
application_name IN VARCHAR2
);
パラメータ
表102-1 DBMS_KAFKA_ADMのDROP_LOAD_APPプロシージャのパラメータ。
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
application_name |
Kafkaクラスタに関連付けられている既存のアプリケーションの名前。 大/小文字は区別されません。 |
使用上のノート
KafkaトピックのデータをOracle Database表にロードする必要がなくなった場合、このプロシージャを使用してOracle SQL access to Kafka (OSAK)アプリケーションを削除します。
例
KafkaクラスタExampleCluster
のデータを使用するExampleApp
というアプリケーションでの作業が完了したとします。このプロシージャを使用してアプリケーションを削除します。
EXEC DROP_LOAD_APP (
ExampleCluster IN VARCHAR2,
ExampleApp IN VARCHAR2
);
102.2.3 EXECUTE_LOAD_APP
DBMS_KAFKA
のプロシージャEXECUTE_LOAD_APP
は、専用のOracle SQL access to Kafka (OSAK)ビューからユーザー表をロードします。このプロシージャを使用するには、CREATE_LOAD_APP
を使用してロード・アプリケーションを事前に作成しておく必要があります。
構文
PROCEDURE EXECUTE_LOAD_APP (
cluster_name IN VARCHAR2,
application_name IN VARCHAR2,
target_table IN VARCHAR2,
records_loaded OUT INTEGER,
parallel_hint IN INTEGER DEFAULT 0
);
パラメータ
表102-2 DBMS_KAFKAのEXECUTE_LOAD_APPSプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
このアプリケーションに関連付けたトピックが含まれている登録済Oracle SQL access to Kafkaクラスタの名前。 大/小文字は区別されません。 登録済クラスタの名前は、OSAK管理者から取得するか、次のSQL文を使用して取得できます。 |
|
Kafkaクラスタに関連付けられている既存のアプリケーションの名前 大/小文字は区別されません。 |
|
KafkaトピックのデータとともにロードされるOracle Databaseのターゲット表。 |
|
(OUT)ロードされたKafkaレコードの数 |
|
(IN) (オプション)特定のOSAKビューに排他的にマップされるアプリケーションのロード時に使用する並列度。パラレル・ヒントが指定されていないか、1以下の場合、表をロードするために並列処理は使用されません。 ノート: パラレル・ヒントは、ユーザー・セッションまたはシステムの |
使用上のノート
Oracle SQL access to Kafka (OSAK)ビューは、CREATE_LOAD_APP
プロシージャによって事前に作成されたKafkaクラスタ内のトピックのレコードをロードするためにPL/SQLコールによって透過的に使用されます。アプリケーションは、EXECUTE_LOAD_APP
をコールして、専用のOSAKビューからOracle Databaseユーザー表をロードします。
ノート:
OSAKアプリケーションを処理する単一のデータベース・アプリケーション・インスタンスを使用してOSAK一時表を問い合せることができるのは、1人のユーザーのみです。ただし、EXECUTE_LOAD_APP
を使用すると、複数のアプリケーションからアクセスできる永続データベース表にKafkaデータがロードされます。
EXECUTE_LOAD_APP
プロシージャをコールするたびに、Kafkaトピックから新しいレコードが読み取られ、これらのレコードがOracle Database表に挿入されます。また、EXECUTE_LOAD_APP
はすべてのKafkaパーティションのオフセットを進めるため、次にEXECUTE_LOAD_APP
が実行されたときには、新しい行が挿入されます。EXECUTE_LOAD_APPを使用すると、増分ロードを実行できるため、Kafkaトピックに対する更新でOracle Database表を更新できます。そのKafkaデータは標準のOracle Database表に移動されるため、複数のアプリケーションで処理および分析に使用できるようになります。
例
KafkaクラスタExampleCluster
の初期クラスタ定義を完了し、クラスタを登録したとします。次に、このプロシージャを使用して、KafkaクラスタExampleCluster
からアプリケーションExampleLoadApp
にデータをロードします。
DECLARE
v_records_inserted INTEGER;
BEGIN
SYS.DBMS_KAFKA.EXECUTE_LOAD_APP (
‘ExampleCluster’,
‘ExampleLoadApp’,
‘ExampleLoadTable’,
v_records_inserted);
END;
102.3 DBMS_KAFKAのグローバル一時表
Oracle DatabaseのKafkaデータ・ビューの作成元のKafkaデータを一時表にロードするには、DBMS_KAFKA
LOAD_TEMP_TABLE
モード・パッケージを使用します。
DBMS_KAFKA.LOAD_TEMP_TABLE
プロシージャがアプリケーション・ループでコールされると、Oracle SQL access to Kafka (OSAK)アプリケーションにデータがロードされます。STREAMING
アプリケーションとSEEKABLE
アプリケーションの両方について、CREATE_APP_xxx
(xxxはSTREAMING
またはSEEKABLE
)を使用してアプリケーションを作成します。次に、アプリケーションについてLOAD_TEMP_TABLE
をコールするときにアプリケーション・ループを使用し、一時表にロードされたデータを処理します。
102.3.1 LOAD_TEMP_TABLE
DBMS_KAFKA
のプロシージャLOAD_TEMP_TABLE
は、一時表に対するすべてのデータをOracle SQL access to Kafkaビューから選択します。このプロシージャを使用して、データを分析したり、Oracle Database表と結合するためにアプリケーションがSQL問合せで使用できるOracle SQL access to Kafka (OSAK)専用一時表を作成します。
構文
FUNCTION LOAD_TEMP_TABLE(
temporary-table-name IN VARCHAR2
) RETURN INTEGER;
パラメータ
表102-3 DBMS_KAFKAのLOAD_TEMP_TABLEプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
作成する一時表の名前 タイプ: VARCHAR 大/小文字は区別されません。 |
|
(オプション)特定のOSAKビューに排他的にマップされるグローバル一時表のロード時の並列度。パラレル・ヒントが指定されていないか、1以下の場合、表をロードするために並列処理は使用されません。 ノート: パラレル・ヒントは、ユーザー・セッションまたはシステムの |
使用上のノート
Oracle SQL access to Kafka (OSAK)ビューは、専用のOSAKグローバル一時表またはユーザー定義表にKafkaトピックのレコードをロードするためにPL/SQLコールによって透過的に使用されます。STREAMING
およびSEEKABLE
アプリケーションは、専用のOSAKビューからOSAKグローバル一時表をロードするLOAD_TEMP_TABLE
をコールし、LOAD
アプリケーションは、EXECUTE_LOAD_APP
をコールして専用のOSAKビューからユーザー表をロードします。
注意:
OSAKビューは専用ビューであり、1つのアプリケーション・インスタンスでのみ使用できます。これらはアプリケーションの他のインスタンスでは共有されず、汎用ツール(SQL*PlusやSQL Developerなど)では問い合せないでください。OSAKビューの同時アクセスによって競合状態が発生する可能性があり、ツールが専用アプリケーションよりも多くの行を誤って読み取り、専用アプリケーションが読み取ったものを超えてKafkaオフセットを誤って進めることがあります。その結果、アプリケーションは、処理する必要があったKafkaトピック・レコードの取得に失敗する場合があります。例
KafkaクラスタExampleCluster
から、ExampleApp
というストリーミング・アプリケーションに関連付けられたレコードをロードして、処理するとします。デフォルトでは、OSAKビューは、最初に作成されたときには存在する最も古いレコードから現在パブリッシュされている最後のレコードまでを読み取るように構成されます。DBMS_KAFKA.CREATE_STREAMING_APP
を使用してストリーミング・アプリケーションを作成した後の、そのストリーミング・アプリケーションのKafkaデータ処理ループの例を次に示します。
BEGIN
LOOP
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0);
FOR kafka_record IN (
SELECT kafka_offset offset
FROM ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0)
LOOP
SYS.DBMS_OUTPUT.PUT_LINE (‘Processing record: ‘ || kafka_record.offset);
--application logic to process the Kafka records
END LOOP;
IF (application logic was successful) THEN
--Update internal metadata to confirm Kafka records were successfully processed
SYS.DBMS_KAFKA.UPDATE_OFFSET
(‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’);
COMMIT;
ELSE
--add your application logic to correct for any failures
END IF;
END LOOP;
END;
102.4 DBMS_KAFKAストリーミング・モード
DBMS_KAFKA
STREAMING
モード・パッケージを使用します。 このパッケージを使用するには、まずLOAD_TEMP_TABLE
を使用して専用のOSAKビューを作成する必要があります。
最初から、またはKafkaトピックの特定の開始ポイントから順番にKafkaトピックにアクセスする必要があるアプリケーションには、STREAMING
モードを使用します。このモードでは、OSAK一時表を使用するSQL問合せが、アプリケーション処理ループでKafkaレコードに順番にアクセスできます。アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_STREAMING_APP
をコールすることにより、それがストリーミング・アプリケーションであることを宣言して、OSAKビューの後続の問合せについて状態を初期化します。
102.4.1 CREATE_STREAMING_APP
DBMS_KAFKA
のプロシージャCREATE_STREAMING_APP
は、Oracle SQL access to Kafka (OSAK)ストリーミング・アプリケーションを作成します。 このアプリケーションには、Kafkaトピックのパーティションから新規および読み取られていないレコードを取得するために使用される、専用のOSAKグローバル一時表およびOSAKビューのセットが含まれています。また、KafkaクラスタでKafkaトピックに関するトピックおよびパーティションのライブ情報を調べるために使用されるメタデータ・ビューも作成します(まだ存在しない場合)。このビューは1回作成され、同じクラスタを共有するすべてのアプリケーションで使用されます。
構文
PROCEDURE CREATE_STREAMING_APP (
cluster_name IN VARCHAR2,
application_name IN VARCHAR2,
topic_name IN VARCHAR2,
options IN CLOB,
view_count IN INTEGER DEFAULT 1
);
パラメータ
表102-4 DBMS_KAFKAのCREATE_STREAMING_APPプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
このアプリケーションに関連付けたトピックが含まれている登録済Oracle SQL access to Kafkaクラスタの名前。 大/小文字は区別されません。 登録済クラスタの名前は、OSAK管理者から取得するか、次のSQL文を使用して取得できます。 |
|
アプリケーションの名前。トピックを読み取るKafkaグループとしても使用されます。 大/小文字は区別されません。 |
|
コンテンツを取得する、Kafkaクラスタのトピック名。 |
|
JSONドキュメント形式のプロパティのリストを含みます。オプションの詳細は、「CREATE_XXX_APPに渡されるDBMS_KAFKAのオプション」のトピックを参照してください。 |
|
(オプション)作成するOracle SQL access to Kafka (OSAK)ビュー・ペアの数を指定します。有効な値は、1からN (Nはトピック内のKafkaパーティションの数)、または0 (デフォルトでNに設定される)です。(デフォルトは1です)。 |
使用上のノート
各OSAKビューは、Oracle SQL access to Kafkaアプリケーションの1つのインスタンスによって排他的に使用されます。各アプリケーション・インスタンス・コールによって、ビューにKafka行が移入されます。その後、アプリケーションはOSAKビュー内のコンテンツに対して1つ以上のSQL問合せを実行できます。STREAMINGアプリケーションは、作成する必要があるOSAKビューの数を選択できるという点で、LOADまたはSEEKINGアプリケーションとは異なります。OSAKビューの数は、1からN (NはKafkaトピック内のパーティションの数)の間である必要があります。
他のタイプのOSAKアプリケーションと同様に、各アプリケーション・インスタンスは、1つの一意のOSAK一時表を排他的に問い合せます。各OSAKビューには、クラスタ名、アプリケーション名およびアプリケーション・インスタンス識別子(ID)が含まれます。複数のアプリケーション・インスタンスを作成すると、アプリケーションはスケール・アウトして、1つ以上のスレッド、プロセスまたはシステムで同時に実行されているアプリケーション・インスタンス間で、Kafkaデータを分析するワークロードを分割できます。
特定のOSAKビューおよびそれに関連付けられたOSAKグローバル一時表にバインドされるKafkaパーティションの数は、作成されるビューの数および存在するパーティションの数によって異なります。N個のOSAKビュー/一時表のペアを作成する場合は、N個のアプリケーション・インスタンスを同時に実行できるように、アプリケーション・ユーザーに、ユーザー当たり少なくともN個のセッションが割り当てられている必要があります。
例
4つのパーティションがあるKafkaクラスタExampleCluster
のExampleTopic
というKafkaトピックからストリームされるExampleApp
というストリーミング・アプリケーションについて一連の4つのビューを作成し、各ビューが1つのパーティションに関連付けられているとします。次の文を入力できます。
DECLARE
v_options VARCHAR2;
BEGIN
v_options := '{"fmt" : "DSV", "reftable" : "user_reftable_name"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options,
4);
END;
/
または、トピックの4つのパーティションすべてに関連付けられているアプリケーションについて1つのビューを作成するには、次の文を入力します。
DECLARE
v_options VARCHAR2;
BEGIN
v_options := '{"fmt" : "DSV", "reftable" : "user_reftable_name"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options,
1);
END;
/
102.4.2 DROP_STREAMING_APP
DBMS_KAFKA
のプロシージャDROP_STREAMING_APP
は、ストリーミング・アプリケーションを削除します。このファンクションは、Oracle SQL access to Kafka (OSAK)ビューを削除するとともに、関連するすべてのデータベース・オブジェクトを削除します。
構文
PROCEDURE DROP_STREAMING_APP (
cluster_name IN VARCHAR2,
application_name IN VARCHAR2
);
パラメータ
表102-5 DBMS_KAFKAのDROP_STREAMING_APPプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
|
Kafkaクラスタを使用する既存のアプリケーションの名前。 大/小文字は区別されません。 |
使用上のノート
アプリケーションでの作業が終わったら、このファンクションを使用して、アプリケーションに関連付けられているOSAKビューを削除します。
例
ストリーミング・アプリケーションExampleApp
で使用されていたKafkaクラスタExampleCluster
での作業が完了したとします。その場合、このプロシージャを使用してアプリケーションを削除できます。
EXEC SYS.DBMS_KAFKA.DROP_STREAMING_APP (
‘ExampleCluster’, ‘ExampleApp’);
102.4.3 INIT_OFFSET
DBMS_KAFKA_ADM
のプロシージャINIT_OFFSET
を使用すると、Kafkaデータの読取りの開始ポイントとして特定のオフセットを選択できます。 このオプションは、使用可能な最初のレコードからデータをロードするのではなく、ロードするデータの特定の開始ポイントを選択する場合に使用します。
構文
PROCEDURE INIT_OFFSET (
view_name IN VARCHAR2,
record_count IN INTEGER,
water_mark IN VARCHAR2 ;
パラメータ
表102-6 DBMS_KAFKAのINIT_OFFSETプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のOSAKビューの名前(VARCHAR2) |
|
レコード数(INTEGER) |
|
水位標(VARCHAR) 目的の相対位置を示す最高または最低水位標。値は、 デフォルト: |
使用上のノート
INIT_OFSET
プロシージャを使用すると、STREAMING
またはLOAD
アプリケーションは、有用でなくなった古いレコードのバックログを最初に読み取ることを強制されることなく、アプリケーションが作成された後の現在のレコードの読取りを開始できます。
このファンクションを使用して、Kafkaトピックのデータ・レコードの最初から読取りポイントを開始するのではなく、アプリケーションでデータを読み取る各パーティションの最高または最低水位標からのレコードの差分(デルタ)数に基づいて開始ポイントを指定します。
例
使用可能な最後の100個のレコードを使用してアプリケーションで処理を再開し、そのポイントから続行するとします。この結果を得るには、アプリケーションを再開する前に、またはデータ取得ループの前にアプリケーション・ロジックの一部として次のプロシージャを実行します。
SYS.DBMS_KAFKA.INIT_OFFSET (
‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’,
100, SYS.DBMS_KAFKA.WATER_MARK_HIGH);
102.4.4 INIT_OFFSET_TS (Epoch以降のミリ秒)
DBMS_KAFKA
のプロシージャINIT_OFFSET_TS
は、タイムスタンプを使用して開始オフセットを指定します。
Epoch以降のミリ秒を使用すると、OSAKビューに属するそれぞれのKafkaパーティションについてタイムスタンプに関連する開始オフセットが初期化されます。INIT_OFFSET_TSは通常、ビューを処理する専用の新規アプリケーション・インスタンスの開始時、あるいはアプリケーション・インスタンスの停止または障害後のリカバリ時にコールされます。
.構文
PROCEDURE INIT_OFFSET_TS (
view_name IN VARCHAR2,
start_timestamp_ms IN INTEGER);
パラメータ
表102-7 DBMS_KAFKAのINIT_OFFSET_TS (Epoch以降のミリ秒)プロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前( 大/小文字は区別されません。 |
|
アプリケーションを開始するオフセットのタイムスタンプ(INTEGER)。戻される最初のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより後の最も近いタイムスタンプになります。 |
使用上のノート
Kafka Epoch以降のミリ秒を使用するINIT_OFFSET_TS
は、OSAKビューに属するそれぞれのKafkaパーティションについてタイムスタンプに関連する開始オフセットを初期化します。一般的なユースケースでは、ビューを処理する専用の新規アプリケーション・インスタンスの開始時に、あるいはアプリケーション・インスタンスの停止または障害後にリカバリするために、Epoch開始ポイントを指定してINIT_OFFSET_TS
をコールします。
このプロシージャは、Kafkaトピック・レコードの処理を相対的に最新のポイントに配置するために使用され、Kafkaパーティション内の未処理の古いレコードをスキップする可能性があります。
ノート:
オフセットの初期化から最初のフェッチまでの時間が遅延する可能性があることに注意してください。この時間のギャップの間に、レコードがKafkaの保存期間を超えるか、レコードが明示的に削除されることにより、選択したオフセットのレコードが削除される可能性があります。例
入力のTIMESTAMP
からEpoch時間以降のミリ秒数を計算して戻すとします。タイムゾーンが指定されていないかぎり、タイムスタンプはセッションのタイムゾーンとみなされます。Epoch時間以降のミリ秒に変換するパラメータdatetime
(整数)のタイムスタンプを指定します。パラメータtimezone (整数)はオプションであり、タイムスタンプのタイムゾーンを指定します。タイムゾーンを指定しない場合、タイムゾーンはデフォルトでセッションのタイムゾーンになります。
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
1593829800,
1593833400);
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE (‘ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0’);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
102.4.5 INIT_OFFSET_TS (別個のタイムゾーン・パラメータ付きタイムスタンプ)
DBMS_KAFKA
のプロシージャINIT_OFFSET_TS
は、データを戻す開始位置を指定しますタイムゾーン付きタイムスタンプで指定されたタイムスタンプを使用すると、Oracle SQL access to Kafka (OSAK)ビューに属するそれぞれのKafkaパーティションについて、タイムスタンプに関連するタイムゾーンにKafkaデータの処理が配置されます。INIT_OFFSET_TS
は通常、ビューを処理する専用の新規アプリケーション・インスタンスの開始時、あるいはアプリケーション・インスタンスの停止または障害後のリカバリ時にコールされます。
構文
PROCEDURE INIT_OFFSET_TS (
view_name IN VARCHAR2,
start_timestamp IN TIMESTAMP,
timezone IN VARCHAR2 DEFAULT NULL);
パラメータ
表102-8 DBMS_KAFKAのINIT_OFFSET_TS (タイムゾーン付きタイムスタンプ)プロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前( 大/小文字は区別されません。 |
|
アプリケーションを開始するオフセットのタイムスタンプ(INTEGER)。戻される最初のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより後の最も近いタイムスタンプになります。 |
|
タイムスタンプのタイムゾーン(INTEGER)。値を指定しない場合は、デフォルトでセッションのタイムゾーンが使用されます。 |
使用上のノート
別個のタイムゾーン・パラメータとともにタイムスタンプを使用するINIT_OFFSET_TS
は、OSAKビューに属するそれぞれのKafkaパーティションについてタイムスタンプおよびタイムゾーンに関連する開始オフセットを初期化します。一般的なユースケースでは、ビューを処理する専用の新規アプリケーション・インスタンスの開始時に、あるいはアプリケーション・インスタンスの停止または障害後にリカバリするために、別個のタイムゾーン・パラメータとともにタイムスタンプを指定してINIT_OFFSET_TS
をコールします。
このプロシージャは、Kafkaトピック・レコードの処理を相対的に最新のポイントに配置するために使用され、Kafkaパーティション内の未処理の古いレコードをスキップする可能性があります。
ノート:
オフセットの初期化から最初のフェッチまでの時間が遅延する可能性があることに注意してください。この時間のギャップの間に、レコードがKafkaの保存期間を超えるか、レコードが明示的に削除されることにより、選択したオフセットのレコードが削除される可能性があります。例
使用可能な最初のレコードよりも後の時間を選択して、Kafkaトピックのデータを処理するとします。たとえば、データ・センターがメンテナンスのために週末に停止しており、データ・センターが午後6時(18:00:00)に再開された後に生成された新しいKafkaデータのみを処理する場合は、午後6時より後のタイムスタンプが付いたデータ・レコードを開始します。OSAKビューでこれを実現するには、アプリケーションを再開する前に、またはデータ取得ループの前にアプリケーション・ロジックの一部として次のプロシージャを実行します。
SYS.DBMS_KAFKA.INIT_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’,
TO_DATE (‘2023/07/05 18:00:00’, ‘YYYY/MM/DD HH:MI:SS’))
timestamp=1603507387101;
102.4.6 INIT_OFFSET_TS (タイムゾーン付きタイムスタンプ)
DBMS_KAFKA
のプロシージャINIT_OFFSET_TS
は、タイムスタンプを使用して開始オフセットを指定します。タイムゾーン付きで指定されたタイムスタンプを使用すると、Oracle SQL access to Kafka (OSAK)ビューに属するそれぞれのKafkaパーティションについてタイムスタンプに関連する開始オフセットが初期化されます。INIT_OFFSET_TS
は通常、ビューを処理する専用の新規アプリケーション・インスタンスの開始時、あるいはアプリケーション・インスタンスの停止または障害後のリカバリ時にコールされます。
構文
PROCEDURE INIT_OFFSET_TS (
view_name IN VARCHAR2,
start_timestamp IN TIMESTAMP WITH TIME ZONE);
パラメータ
表102-9 DBMS_KAFKAのINIT_OFFSET_TSプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前( 大/小文字は区別されません。 |
|
アプリケーションを開始するオフセットのタイムスタンプ(INTEGER)。戻される最初のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより後の最も近いタイムスタンプになります。 |
|
(オプション)。タイムスタンプのタイムゾーン(INTEGER)。 指定しない場合は、デフォルトでセッションのタイムゾーンになります |
使用上のノート
INIT_OFFSET_TS
は、OSAKビューに属するそれぞれのKafkaパーティションについてタイムスタンプに関連する開始オフセットを初期化します。一般的なユースケースでは、ビューを処理する専用の新規アプリケーション・インスタンスの開始時に、あるいはアプリケーション・インスタンスの停止または障害後にリカバリするためにINIT_OFFSET_TSをコールします。
このプロシージャは、Kafkaトピック・レコードの処理を相対的に最新のポイントに配置するために使用され、Kafkaパーティション内の未処理の古いレコードをスキップする可能性があります。
ノート:
オフセットの初期化から最初のフェッチまでの時間が遅延する可能性があることに注意してください。この時間のギャップの間に、レコードがKafkaの保存期間を超えるか、レコードが明示的に削除されることにより、選択したオフセットのレコードが削除される可能性があります。例
タイムゾーンが指定されたタイムスタンプ
使用可能な最初のレコードよりも後の時間を選択して、Kafkaトピックのデータを処理するとします。たとえば、データ・センターがメンテナンスのために週末に停止しており、データ・センターが午後6時(18:00:00)に再開された後に生成された新しいKafkaデータのみを処理する場合は、午後6時より後のタイムスタンプが付いたデータ・レコードを開始します。OSAKビューでこれを実現するには、アプリケーションを再開する前に、またはデータ取得ループの前にアプリケーション・ロジックの一部として次のプロシージャを実行します。
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_TIMESTAMP (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_TIMESTAMP (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’),
‘UTC’);
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE (‘ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0’);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
タイムゾーンが指定されていないタイムスタンプ
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_TIMESTAMP_TZ (‘2020/07/04 02:30:00 -8:00’, ‘YYYY/MM/DD HH:MI:SS TZH:TZM’,
TO_TIMESTAMP_TZ (‘2020/07/04 03:30:00 -8:00’, ‘YYYY/MM/DD HH:MI:SS TZH:TZM’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE (‘ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0’);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
102.4.7 UPDATE_OFFSET
DBMS_KAFKA
のプロシージャUPDATE_OFFSET
は、読み取られていない新しいKafkaレコードをループ内の次のパスが取得して処理するように、最後に読み取られたKafkaオフセットを更新します。 UPDATE_OFFSET
は、Oracle SQL access to Kafka (OSAK)ビューに属するすべてのパーティションについてKafkaグループIDのKafkaパーティション・オフセットを透過的に進めるため、DBMS_KAFKA.LOAD_TEMP_TABLE
をコールするたびに、読み取られていないKafkaレコードの新しいセットが取得されて処理されます。
構文
PROCEDURE UPDATE_OFFSET (view_name IN VARCHAR2);
パラメータ
表102-10 DBMS_KAFKAのUPDATE_OFFSETプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
OSAKビューの名前 大/小文字は区別されません。 |
使用上のノート
UPDATE_OFFSET
は、前のレコードの処理が成功した場合、Kafkaオフセット読取りポイントを次のKafkaレコード・セットに進めます。また、UPDATE_OFFSET
は、Oracle Databaseトランザクションを開始し(トランザクションがまだ開始されていない場合)、最後のオフセットをメタデータ表に記録します。このため、アプリケーションは、UPDATE_OFFSET
への各コールの後にトランザクションをコミットする必要があります。OSAKはオフセットをOracle Databaseトランザクション内で管理し、ACID (原子性、一貫性、独立性、永続性)がトランザクションで保持されるため、レコードが失われたり、再読取りされることはありません。トランザクションが正常に完了しなかった場合、オフセットは進められず、アプリケーションによって、前にオフになった場所が再開時に取得されます。
例
一般的なユースケースでは、ストリーミング・モード・アプリケーションなどで前のKafkaレコード・セットが正常に処理された後に、次のセットにオフセット読取りを進めます。次に例を示します。
BEGIN
LOOP
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0);
FOR kafka_record IN (
SELECT kafka_offset offset
FROM ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0)
LOOP
SYS.DBMS_OUTPUT.PUT_LINE (‘Processing record: ‘ || kafka_record.offset);
--application logic to process the Kafka records
END LOOP;
IF (application logic was successful) THEN
--Update internal metadata to confirm Kafka records were successfully processed
SYS.DBMS_KAFKA.UPDATE_OFFSET
(‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’);
COMMIT;
ELSE
--application logic to correct what failed
END IF;
END LOOP;
END;
OSAKビューに直接アクセスして更新を取得するプロセスの一部として使用することもできます。たとえば、Lab1
というラボで一定期間にわたって平均温度を監視しているストリーミング・モード・アプリケーションがあるとします。UPDATE_OFFSET
を使用して、最後のチェック以降の平均温度を取得できます。
EXEC DBMS_KAFKA.ENABLE_VIEW_QUERY(
‘ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0’);
SELECT AVG(temperature) FROM ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0
WHERE sensor_name = ‘LAB1_SENSOR’;
EXEC DBMS_KAFKA.UPDATE_OFFSET(
‘ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0’);
COMMIT;
102.5 DBMS_KAFKAシーク可能モード
DBMS_KAFKA
SEEKABLE
モード・パッケージを使用します。
アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_SEEKABLE_APP
をコールすることにより、それがロード・アプリケーションであることを宣言して、シーク可能なOracle SQL access to Kafka (OSAK)ビューと一時表を設定および作成します。
SEEK_OFFSET_TS
を指定してシーク可能モード・ビューを使用することで、Kafkaデータを検索して、2つの時点の間にタイムスタンプが存在するデータのセットを見つけます。その後、LOAD_TEMP_TABLE
を使用してそのデータ範囲をOSAK一時表にロードし、そのタイムスタンプ範囲内のKafkaレコードを含むOSAKグローバル一時表に対して1つ以上のアプリケーション問合せを実行できます。問合せが完了したら、DROP_SEEKABLE_APP
を使用してアプリケーションを削除できます。
.
102.5.1 CREATE_SEEKABLE_APP
DBMS_KAFKA
のプロシージャCREATE_SEEKABLE_APP
は、1つのOracle SQL access to Kafka (OSAK)ビューおよび関連するグローバル一時表を作成します CREATE_SEEKABLE_APP
を使用して、特定の時間枠の間のKafkaレコードをシークしてロードできます。このプロシージャの使用は、シーク操作を実行する1つのアプリケーション・インスタンスに制限されています。
構文
PROCEDURE CREATE_SEEKABLE_APP (
cluster_name IN VARCHAR2,
application_name IN VARCHAR2,
topic_name IN VARCHAR2,
options IN CLOB
パラメータ
パラメータ | 説明 |
---|---|
|
このアプリケーションに関連付けるトピックが含まれている登録済Oracle SQL access to Kafkaクラスタの名前。 大/小文字は区別されません。 登録済クラスタの名前は、OSAK管理者から、次の文を使用して取得できます。
|
|
アプリケーション名。このパラメータは、トピックを読み取ることができるKafkaグループとしても使用されます。 大/小文字は区別されません。 |
|
コンテンツを取得する、Kafkaクラスタのトピック名。 大/小文字が区別されます。 |
|
JSONドキュメント形式のプロパティのリストを含みます。オプションの詳細は、「CREATE_XXX_APPに渡されるDBMS_KAFKAのオプション」のトピックを参照してください。 |
例
例102-2 Oracle SQL Access to KafkaのCREATE_SEEKABLE_APPプロシージャ
DECLARE
v_options VARCHAR2;
BEGIN
v_options := ‘{“fmt” : “DSV”, “reftable” : “user_reftable_name”}’;
SYS.DBMS_KAFKA.CREATE_SEEKABLE_APP (
‘ExampleCluster’,
‘ExampleApp’,
‘ExampleTopic’,
END;
/
102.5.2 DROP_SEEKABLE_APP
DBMS_KAFKA
のプロシージャDROP_SEEKABLE_APP
は、シーク・アプリケーションを削除します。 このファンクションは、Oracle SQL access to Kafka (OSAK)ビューおよびそれに関連するメタデータを削除します。
構文
PROCEDURE DROP_SEEKABLE_APP (
cluster_name IN VARCHAR2,
application_name IN VARCHAR2
);
パラメータ
表102-11 DBMS_KAFKAのDROP_SEEKABLE_APPプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
|
Kafkaクラスタを使用する既存のアプリケーションの名前。 大/小文字は区別されません。 |
使用上のノート
アプリケーションでの作業が終わったら、このファンクションを使用して、アプリケーションに関連付けられているOSAKビューを削除します。
例
シーク可能アプリケーションExampleApp
で使用されていたKafkaクラスタExampleCluster
での作業が完了したとします。その場合、このプロシージャを使用してアプリケーションを削除できます。
EXEC SYS.DBMS_KAFKA.DROP_SEEKABLE_APP (
‘ExampleCluster’, ‘ExampleApp’);
102.5.3 SEEK_OFFSET_TS (別個のタイムゾーン・パラメータ付きタイムスタンプ)
DBMS_KAFKA
のプロシージャSEEK_OFFSET_TS
は、指定した2つのKafka Epochタイムスタンプ間でKafkaレコードの読取りを開始するようにOracle SQL access to Kafka (OSAK)ビューを配置します。このプロシージャを使用して、Epoch時間(ミリ秒)で指定される、シークするKafkaトピック内のレコードの時間枠を指定します。
構文
PROCEDURE SEEK_OFFSET_TS(
view_name IN VARCHAR2,
start_timestamp_ms IN INTEGER,
end_timestamp_ms IN INTEGER,
timezone IN INTEGER);
パラメータ
表102-12 DBMS_KAFKAのSEEK_OFFSET_TS (別個のタイムゾーン・パラメータ付きタイムスタンプ)プロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前( 大/小文字は区別されません。 |
|
アプリケーションを開始するオフセットのタイムスタンプ(INTEGER)。戻される最後のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより後の最も近いタイムスタンプになります。 |
|
アプリケーションを終了するオフセットのタイムスタンプ(INTEGER)。戻される最後のレコードのタイムスタンプは、指定したタイムスタンプと同じか、指定したタイムスタンプより前の最も近いタイムスタンプになります。戻されるレコードの範囲に開始タイムスタンプは含まれますが、終了タイムスタンプは含まれません。 たとえば、2:00PMから3:00PMの範囲では、2:00:00.000000 PMから2:59:59.999999PMの範囲のタイムスタンプを持つレコードが戻されます。この範囲を使用すると、アプリケーションで2:00から3:00までを検索した後、3:00から4:00までを検索でき、これらのセット間に重複するレコードが含まれることはありません。 |
|
両方のタイムスタンプ引数のタイムゾーン(INTEGER)。値を指定しない場合は、デフォルトでセッションのタイムゾーンが使用されます。 |
使用上のノート
SEEK_OFFSET_TS
は、Kafka Epoch時間(ミリ秒)で指定された、選択した特定の時間枠内に存在するKafkaトピック・レコードをシークします。戻される最後のレコードは、最後にパブリッシュされたレコードです。
SEEK_OFFSET_TS
の目的は、指定したEpochタイムスタンプによって定義される特定の時間枠内でKafkaレコードの読取りを開始するようにOSAKビューを配置することです。レコードのウィンドウがKafkaトピックの実際のレコードの範囲を超える場合、このプロシージャは、存在するすべてのレコードを戻します。
次に例を示します。
- タイムスタンプがどちらも最低水位標を下回るか、どちらも最高水位標を超える場合、レコードは戻されません。
- 開始タイムスタンプが最低水位標を下回る場合、戻される最初のレコードは最低水位標になります。
- 終了タイムスタンプが最高水位標を超える場合、戻される最後のレコードは、最高水位標(HWM)から1を差し引いた差分(HWM - 1)になります。たとえば、新しいトピックに100個のレコードがある場合、オフセット範囲は0から99までで、HWMは100に設定されます。
ノート:
Kafka Epoch時間枠を取得の仕様として使用するSEEKABLEアプリケーションによって取得されるデータには、指定したタイムスタンプ・ウィンドウ内にタイムスタンプがある場合でも、OSAKグローバル一時表のロード後に配信された外れ値レコードを含めることはできません。例
過去に発生した問題を調査するとします。データがまだKafkaストリームに存在する場合は、DBMS_KAFKA.CREATE_SEEKABLE_APP
をコールしてシーク可能アプリケーションを作成できます。その後、SEEK_OFFSET_TS
プロシージャをコールして、OSAKビューに一連のデータ・レコードを取得するよう要求できます。たとえば、午前3時頃に本番環境の問題が発生したことがITコンサルタントに知らされた場合、コンサルタントは次のプロシージャを使用して一時表をロードし、その時間を中心とする1時間分のデータを取得することを選択できます。
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_DATE (‘2023/04/02 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_DATE (‘2023/04/02 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
別のユースケースとして、Kafkaストリームへの順次アクセス権を持つアプリケーションが異常の可能性を検出しており、異常表に行を挿入するように構成されていたとします。異常表には、Kafkaタイムスタンプと、記録するように構成された他のデータが含まれます。その場合、別のアプリケーションがその後、この情報を使用して、疑わしいレコード周辺のレコードを取得し、他の問題があるかどうかを確認できます。この目標を達成するには、次のプロシージャを実行し、一時表をロードした後、アプリケーション・ロジックを選択して結果に適用します。
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_DATE (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_DATE (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
--application logic
102.5.4 SEEK_OFFSET_TS (ミリ秒単位のタイムスタンプ)
DBMS_KAFKA
のプロシージャSEEK_OFFSET_TS
(ミリ秒単位のタイムスタンプ)は、ミリ秒単位のEpoch時間で指定された時間ウィンドウの間に存在するトピック内のKafkaレコードをシークします。
構文
PROCEDURE SEEK_OFFSET_TS(
view_name
IN VARCHAR2,
start_timestamp_ms
IN INTEGER,
end_timestamp_ms
IN INTEGER);
パラメータ
表102-13 DBMS_KAFKAのSEEK_OFFSET_TS (ミリ秒単位のタイムスタンプ)プロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のOracle SQL access to Kafka (OSAK)ビューの名前。 大/小文字は区別されません。 |
|
最初のレコードをシークするタイムスタンプ(ミリ秒) |
|
最後のレコードをシークするタイムスタンプ(ミリ秒) |
使用上のノート
ミリ秒単位のタイムスタンプ範囲をシークするSEEK_OFFSET_TS
は、ミリ秒単位のKafka Epoch時間で指定された、選択した開始タイムスタンプと終了タイムスタンプのウィンドウ内に存在するKafkaトピック・レコードをシークします。戻される最後のレコードは、最後にパブリッシュされたレコードです。
ミリ秒を使用してウィンドウを指定するSEEK_OFFSET_TS
の目的は、指定したEpochタイムスタンプによって定義される特定の時間枠内でKafkaレコードの読取りを開始するようにOSAKビューを配置することです。レコードのウィンドウがKafkaトピックの実際のレコードの範囲を超える場合、このプロシージャは、存在するすべてのレコードを戻します。
次に例を示します。
- タイムスタンプがどちらも最低水位標を下回るか、どちらも最高水位標を超える場合、レコードは戻されません。
- 開始タイムスタンプが最低水位標を下回る場合、戻される最初のレコードは最低水位標になります。
- 終了タイムスタンプが最高水位標を超える場合、戻される最後のレコードは、最高水位標(HWM)から1を差し引いた差分(HWM - 1)になります。たとえば、新しいトピックに100個のレコードがある場合、オフセット範囲は0から99までで、HWMは100に設定されます。
ノート:
Kafkaレコードのタイムスタンプは、Kafkaによって(つまり、トランザクション時間によって)割り当てられるか、アプリケーション(つまり、有効または決定時間)によって割り当てられます。SEEKABLEアプリケーションによって取得されるデータには、指定したタイムスタンプ・ウィンドウ内にタイムスタンプがある、OSAKグローバル一時表のロード後に配信された外れ値レコードは含まれません。例
過去に発生した問題を調査するとします。データがまだKafkaストリームに存在する場合は、DBMS_KAFKA.CREATE_SEEKABLE_APP
をコールしてシーク可能アプリケーションを作成できます。その後、SEEK_OFFSET_TS
プロシージャをコールして、OSAKビューに一連のデータ・レコードを取得するよう要求できます。たとえば、午前3時頃に本番環境の問題が発生したことがITコンサルタントに知らされた場合、コンサルタントは次のプロシージャを使用して一時表をロードし、その時間を中心とする1時間分のデータを取得することを選択できます。
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_DATE (‘2023/04/02 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_DATE (‘2023/04/02 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
別のユースケースとして、Kafkaストリームへの順次アクセス権を持つアプリケーションが異常の可能性を検出しており、異常表に行を挿入するように構成されていたとします。異常表には、Kafkaタイムスタンプと、記録するように構成された他のデータが含まれます。その場合、別のアプリケーションがその後、この情報を使用して、疑わしいレコード周辺のレコードを取得し、他の問題があるかどうかを確認できます。この目標を達成するには、次のプロシージャを実行し、一時表をロードした後、アプリケーション・ロジックを選択して結果に適用します。
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_DATE (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_DATE (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
--application logic
102.5.5 SEEK_OFFSET_TS (タイムゾーン付きタイムスタンプ)
DBMS_KAFKA
のプロシージャSEEK_OFFSET_TS
は、TIMESTAMP WITH TIME ZONE
で指定された時間ウィンドウの間に存在するトピック内のKafkaレコードをシークします。このプロシージャを使用して、指定した2つのTIMESTAMP WITH TIME ZONE
タイムスタンプの間でKafkaレコードの読取りを開始するようにOracle SQL access to Kafka (OSAK)ビューを配置します。
構文
PROCEDURE SEEK_OFFSET_TS(
view_name
IN VARCHAR2,
start_timestamp_ms
IN INTEGER,
end_timestamp_ms
IN INTEGER);
パラメータ
表102-14 DBMS_KAFKAのSEEK_OFFSET_TSプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaビューの名前( 大/小文字は区別されません。 |
|
最初のレコードをシークするタイムスタンプ( |
|
最後のレコードをシークするタイムスタンプ( |
使用上のノート
開始および終了タイムスタンプによって定義されるSEEK_OFFSET_TS
の目的は、指定したEpochタイムスタンプによって定義される特定の時間枠内でKafkaレコードの読取りを開始するようにOSAKビューを配置することです。レコードのウィンドウがKafkaトピックの実際のレコードの範囲を超える場合、このプロシージャは、存在するすべてのレコードを戻します。
次に例を示します。
- タイムスタンプがどちらも最低水位標を下回るか、どちらも最高水位標を超える場合、レコードは戻されません。
- 開始タイムスタンプが最低水位標を下回る場合、戻される最初のレコードは最低水位標になります。
- 終了タイムスタンプが最高水位標を超える場合、戻される最後のレコードは、最高水位標(HWM)から1を差し引いた差分(HWM - 1)になります。たとえば、新しいトピックに100個のレコードがある場合、オフセット範囲は0から99までで、HWMは100に設定されます。
ノート:
Kafkaレコードのタイムスタンプは、Kafka (つまり、トランザクション時間)によって割り当てられるか、アプリケーション(つまり、有効または決定時間)によって割り当てられます。Kafka Epoch時間枠を取得の仕様として使用するSEEKABLEアプリケーションによって取得されるデータには、指定したタイムスタンプ・ウィンドウ内にタイムスタンプがある場合でも、OSAKグローバル一時表のロード後に配信された外れ値レコードを含めることはできません。例
過去に発生した問題を調査するとします。データがまだKafkaストリームに存在する場合は、DBMS_KAFKA.CREATE_SEEKABLE_APP
をコールしてシーク可能アプリケーションを作成できます。その後、SEEK_OFFSET_TS
プロシージャをコールして、OSAKビューに一連のデータ・レコードを取得するよう要求できます。たとえば、午前3時頃に本番環境の問題が発生したことがITコンサルタントに知らされた場合、コンサルタントは次のプロシージャを使用して一時表をロードし、その時間を中心とする1時間分のデータを取得することを選択できます。
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_DATE (‘2023/04/02 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_DATE (‘2023/04/02 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
別のユースケースとして、Kafkaストリームへの順次アクセス権を持つアプリケーションが異常の可能性を検出しており、異常表に行を挿入するように構成されていたとします。異常表には、Kafkaタイムスタンプと、記録するように構成された他のデータが含まれます。その場合、別のアプリケーションがその後、この情報を使用して、疑わしいレコード周辺のレコードを取得し、他の問題があるかどうかを確認できます。この目標を達成するには、次のプロシージャを実行し、一時表をロードした後、アプリケーション・ロジックを選択して結果に適用します。
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_DATE (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_DATE (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT <columns> FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
--application logic
102.6 ADD_PARTITIONS
DBMS_KAFKA_ADM
のプロシージャADD_PARTITIONS
は、既存のOracle SQL access to Kafka (OSAK)ビューのセットにKafkaパーティションを追加します。
構文
PROCEDURE ADD_PARTITIONS (
cluster_name IN VARCHAR2,
application_name IN VARCHAR2
パラメータ
表102-15 DBMS_KAFKAのADD_PARTITIONSプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
|
Kafkaクラスタに関連付けられている既存のアプリケーションの名前。 大/小文字は区別されません。 |
使用上のノート
ADD_PARTITIONS
のセマンティクスはCREATE_xxx_APP
のコールに似ていますが、既存のKafkaトピック・パーティションに関する状態情報(committed_offset
など)を保持し、新しいパーティションを既存のビューまたは新しいOSAKビューにバインドする点が異なります。
注意:
ADD_PARTITIONS
をコールする前に、このアプリケーションのすべてのアプリケーション・インスタンスを停止する必要があります。ADD_PARTITIONS
が正常にコールされた後、アプリケーション・インスタンスを再起動できます。アプリケーションが、パーティションごとに1つのビュー・インスタンスを作成するように構成されたストリーミング・アプリケーションである場合は、新しいビューが作成されます。これらのビューを使用するには、それぞれが、作成された新しいビューの1つを排他的に処理する専用のものである、追加のアプリケーション・インスタンスを起動する必要があります。
パーティションがトピックに追加されていない場合、このプロシージャは、既存のビューを変更せずに正常に終了します。
OSAKビューが作成され、パーティションが割り当てられると、ビューによってパーティションのリストが管理されます。たとえば、Kafkaトピックのパーティション0、1、2、3を読み取るためにOSAKビューが以前に割り当てられているとします。ADD_PARTITIONS
によって後でこのビューに新しいパーティション(例: パーティション16)が追加された場合、OSAKビューは、パーティション0、1、2、3および16からKafkaレコードをフェッチするように構成されます。
このプロシージャでは、トランザクション処理であるDDLとDMLの両方が実行されます。このコールは、既存のトランザクション・コンテキストの外部でのみ実行する必要があります。
例
BEGIN
SYS.DBMS_KAFKA.ADD_PARTITIONS (‘ExampleCluster’,
‘ExampleApp’);
END;
/
102.7 DROP_ALL_APPS
DBMS_KAFKA
のプロシージャDROP_ALL_APPS
は、Kafkaクラスタのすべてのアプリケーションを削除します。このプロシージャを使用して、構成されたセキュリティ情報で接続を確立できることを確認します。このファンクションは、クラスタの状態を戻します。
構文
PROCEDURE DROP_ALL_APPS (
\cluster_name IN VARCHAR2
);
パラメータ
表102-16 DBMS_KAFKAのDROP_ALL_APPSプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
使用上のノート
DROP_ALL_APPS
プロシージャは、トランザクション処理であるデータ定義言語(DDL)とデータ操作言語(DML)の両方の変更を実行します。このコールは、既存のトランザクション・コンテキストの外部でのみ実行してください。
ノート:
これは自律型トランザクションです。例
1つ以上のOSAKアプリケーションがKafkaクラスタExampleClusterに存在しなくなったとします。その場合、次の文を使用して、そのクラスタ内のすべてのアプリケーションを削除できます。
EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS (‘ExampleCluster’);
102.8 ENABLE_VIEW_QUERY
DBMS_KAFKA
のプロシージャENABLE_VIEW_QUERY
は、現在のOracle Databaseセッション内でコンテキストを設定し、アプリケーションがビューを問い合せることができるようにします。 このプロシージャを使用して、アプリケーションがOracle SQL access to Kafka (OSAK)ビューを直接問い合せることができるようにします。
構文
PROCEDURE ENABLE_VIEW_QUERY(view_name IN VARCHAR2);
パラメータ
表102-17 DBMS_KAFKAのENABLE_VIEW_QUERYプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaビューの名前 大/小文字は区別されません。 |
使用上のノート
1つのトランザクション内で行うOSAKビューに対する問合せは1つのみにしてください。OSAKビューの複数の問合せでは、繰返し可能な読取り動作を保証できません。Kafkaはストリーミング・サービスであるため、SELECT
問合せの間に新しいレコードがKafkaトピックにパブリッシュされる可能性があります。新しいレコードがパブリッシュされると、アプリケーションによって追加の行が認識される場合があります。
注意:
これは高度なプロシージャです。このコールは、Oracle SQL access to Kafka (OSAK)処理モデルを深く理解し、Oracleアクセス計画のデバッグに関する知識がある開発者のみが使用することをお薦めします。これには、OSAKビューの問合せ後にOSAKによってKafkaオフセットがどのように進められるのかについての理解も含まれます。
通常は、LOAD_TEMP_TABLE
またはEXECUTE_LOAD_TABLE
を使用することをお薦めします。これらのプロシージャはKafkaビューを透過的に問い合せて、グローバル一時表に、またはSQLを使用してさらに分析するためにOracle Database表にそれらを1回ロードします。
例
ユーザーがOSAKビューを直接問い合せることができるようにする方法は、次の2つのケースでのみ使用してください:
-
問合せが
FROM
句のOSAKビューを参照してKafkaストリームをスキャンするのみである場合。たとえば、OSAKビューに直接アクセスし、最後のチェック以降のラボ1のセンサーの平均温度を取得するとします。次の問合せを使用できます。EXEC DBMS_KAFKA.ENABLE_VIEW_QUERY( ‘ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0’); SELECT AVG(temperature) FROM ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0WHERE sensor_name = ‘LAB1_SENSOR’; EXEC DBMS_KAFKA.UPDATE_OFFSET( ‘ORA$DKV_EXAMPLECLUSTER_STREAMINGAPP_0’); COMMIT;
-
問合せによってOSAKビューとOracle Database表の間に単純な表結合が作成され、問合せで
ORDERED
ヒントを使用することにより、OSAKビューが強制的に結合の外部表になる場合。このケースでは、Kafkaデータが確実に1回のみ取得されるようになります。次に例を示します。SELECT /*+ ORDERED *\/ COUNT(*) FROM thermostat_spec s, ora$dkv_thermostat_0 t WHERE s.device_type = t.device_type AND t.current_setting > t.temperature_setting + s.device_max_variation;
102.9 SET_TRACING
DBMS_KAFKA
のプロシージャSET_TRACING
は、Oracle SQL access to Kafka (OSAK)アプリケーションに関連付けられた外部表ドライバ・コードのデバッグ・レベル・トレースを有効または無効にします。このファンクションは、セッションのトレース・ファイルへのロギング出力を生成します。
構文
PROCEDURE SET_TRACING(
cluster_name IN VARCHAR2,
application_name IN VARCHAR2,
enable IN BOOLEAN);
パラメータ
表102-18 DBMS_KAFKAのDROP_SEEKABLE_APPプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
|
Kafkaクラスタを使用する既存のアプリケーションの名前。 大/小文字は区別されません。 |
|
デフォルト: デバッグ出力を有効にするには |
使用上のノート
セッションのトレース・ファイルへのデバッグ・ロギング出力を生成するために使用します。デバッグ出力を有効にするにはtrue
に設定し、デバッグ出力を無効にするにはfalse
に設定します。
ノート:
次のイベントがデータベースについてすでに有効になっている必要があります。
event="39431 trace name context forever, level 1" # Enable external table debug tracing
例
アプリケーションExampleApp
で使用されていたKafkaクラスタExampleCluster
でトレースを開始するとします。その場合、このプロシージャを使用できます。
EXEC SYS.DBMS_KAFKA.SET_TRACING('KAFKACLUS1', 'KAFKACLUS1', True);
102.10 CREATE_xxx_APPに渡されるDBMS_KAFKAのオプション
DBMS_KAFKA.CREATE_LOAD_APP
、CREATE_STREAMING_APP
、CREATE_SEEKABLE_APP
パッケージのオプション・パラメータは、JSONドキュメントのパッケージに提供されます
DBMS_KAFKA
のオプションを各DBMS_KAFKA
パッケージで使用して、Oracle SQL access to Kafka (OSAK)アプリケーションを作成および定義できます。
ノート:
バッファ内の最大Kafkaメタデータ・チャンク・サイズとして、54バイトが予約されています。たとえば、1Mのbufsizeは1Mから54バイトのレコードを保持できます。bufsizeのデフォルトは1000 KBで、最大10,000 KBです。そのため、サポートされる最大のKafkaレコード・サイズは10000 kb – 54です。表102-19 DBMS_KAFKAのオプション
オプション名 | 説明 |
---|---|
|
バイト配列に格納されている10進数の表現を指定します。 有効な値: デフォルト: このパラメータを使用しない場合、Avroの10進数列は、バイト配列に格納されているのはAvro仕様で定義されているとおりの値の数値表現(デフォルトは
関連するアクセス・パラメータ: |
|
Kafka値ペイロードのスキーマを表すJSONドキュメント。スキーマではAvroレコード・タイプを定義する必要があります。 デフォルト値はありません。 |
blankasnull |
値: デフォルト:
関連するアクセス・パラメータ: |
|
大きいレコード読取りのバッファ・サイズをKB単位で設定します。デフォルトのバッファ・サイズよりも大きいレコードを読み取る必要がある場合にこの値を設定します。 デフォルト(KB): 1000 関連するアクセス・パラメータ: |
|
行でデータ型の変換エラーが発生した場合、関連する列を 値: デフォルト:
関連するアクセス・パラメータ: |
|
ソース・ファイル内の日付書式を指定します。値
デフォルト: 次の場合にのみ許可されます
関連するアクセス・パラメータ: |
|
埋込みフィールドの終了記号またはフィールド値の行の終了文字をエスケープするために使用する文字を指定します。文字値は一重引用符で囲む必要があります。例: パラメータを指定しない場合、値はありません。
関連するアクセス・パラメータ: |
fmt |
Kafkaレコードの値ペイロードの書式。 値: デフォルト値はありません。渡されるオプションで書式を指定する必要があります。 関連するアクセス・パラメータ: |
|
JSONを 値: デフォルト:
|
|
フィールドの値を示すために使用する文字が
関連するアクセス・パラメータ: |
quote |
フィールドの引用符文字を指定します。指定すると、引用符文字として定義されている文字がロード中に削除されます。 有効な値: 文字 デフォルト:
関連するアクセス・パラメータ: |
|
指定した行数( 有効な値: デフォルト: |
|
ソース・ファイル内のフィールドを囲む引用符を削除します。 値: デフォルト:
関連するアクセス・パラメータ: |
|
フィールド値を区切るために使用する文字を指定します。文字値は一重引用符で囲む必要があります。例: デフォルト: 関連するアクセス・パラメータ: |
|
レコード値を区切るために使用する文字を指定します。文字値は一重引用符で囲む必要があります。例: デフォルト: 関連するアクセス・パラメータ: |
|
フィールドの先頭と末尾の空白を切り捨てる方法を指定します。 有効な値: デフォルト:
関連するアクセス・パラメータ: |
|
ファイルのデータがフィールドに対して長すぎる場合、このオプションでは、行を拒否したりフィールドをNULLに設定したりするかわりに、フィールドの値を切り捨てます。 値: デフォルト:
関連するアクセス・パラメータ: |
tsfmt |
ソース・ファイル内のタイムスタンプ書式を指定します。値
デフォルト:
関連するアクセス・パラメータ: |
tslzfmt |
ソース・ファイル内のローカル・タイムゾーン付きタイムスタンプ書式を指定します。値
デフォルト:
関連するアクセス・パラメータ: |
|
ソース・ファイル内のタイムゾーン付きタイムスタンプ書式を指定します。値
デフォルト:
関連するアクセス・パラメータ: |
例102-3 AVROスキーマ・レコード・タイプ
AVROデータ・スキーマの例を次に示します。この場合、定義されているAVROデータのタイプはレコードであり、データは、一定期間にわたって監視された温度モニターのセンサー値のレコードです。
{
"type" : "record",
"name" : "sensor_value",
"namespace" : "example.sensor",
"fields" : [ {
"name" : "EventTime",
"type" : "long",
"logicalType" : "timestamp-millis"
}, {
"name" : "IotDeviceType",
"type" : "int"
}, {
"name" : "IotDeviceUnitId",
"type" : "int"
}, {
"name" : "TempSetting",
"type" : "double"
}, {
"name" : "TempReading",
"type" : "double"
} ]
}