2.4.6 連続ストリームとしてのKafkaデータの問合せ
Oracle SQL access to Kafka (OSaK)ビューを作成した後は、標準SQLを使用してそのビューを問い合せることができます。ビュー名は、生成されたapplication_id (クラスタ名、Kafkaグループ名、トピック名が連結されている)がview_id
と連結されたものとなります。
前述のsensorトピックにはパーティションは1つみであるため、ビューは1つとなります。このビューの名前はKV_MA1_QUERYAPP_SENSOR_0です。ビュー名はORA_KAFKAパッケージがインストールされているスキーマ内のORA_KAFKA_PARTITIONメタデータ表を問い合せることで特定できるということを覚えておいてください。
OSaKビューには連続的にアクセスでき、最初のオフセットまたはタイムスタンプからストリームの最後まで読み取ることができます。これは、指定したトピックの最新のKafkaレコードを問い合せるための一般的な使用例です。
次の例では、ORA_KAFKA.INIT_OFFSET
プロシージャを使用して開始オフセットをKafkaパーティションの最高水位標の下の100レコードに設定し、そこからストリームの最後までを読み取ります。次回ループに入ったときには、前回中断した位置から新しいストリーム終端まで読み取ります。この例では、分析は、PL/SQLでLOOPロジックを使用してcount(*)を実行するという、簡単な例に要約されています。使用例: アプリケーション内のループで、ORA_KAFKA.NEXT_OFFSET
のコールを実行し、OSaKビューを問い合せ、取得したKafkaレコードに対して分析を実行し、条件が満たされている場合はORA_KAFKA.UPDATE_OFFSET
をコールし、トランザクションをコミットします。ORA_KAFKA.NEXT_OFFSET
プロシージャは、ビューへのアクセス時に最後に読み取ったKafkaオフセットが保存されている、ORA_KAFKA.UPDATE_OFFSET
またはORA_KAFKA.INIT_OFFSET
/ORA_KAFKA.INIT_OFFSET_TS. ORA_KAFKA.UPDATE_OFFSET
の結果に基づいて、次に読み取るKafkaオフセットを記録します。
また、読取りの開始位置を特定のタイムスタンプに設定することもできます。ORA_KAFKA.INIT_OFFSET_TS
は、OSaKビューに属する各Kafkaパーティションのタイムスタンプに関連する開始オフセットを初期化します。ORA_KAFKA.INIT_OFFSET
と同様に、ORA_KAFKA.INIT_OFFSET_TS
は、通常は、そのビューの処理専用の新しいアプリケーション・インスタンスの発生時、またはアプリケーション・インスタンスの停止後または障害発生後のリカバリの際にコールされます。
注意:
同じ一連のOSaKビューを読み取るアプリケーションが複数ある場合は、各アプリケーションでオフセット管理が試みられるため、Kafkaレコードが重複して処理されるか、Kafkaレコードがスキップされる可能性があります。複数のアプリケーションが同じトピックを読み取る場合は、アプリケーション固有のKafkaグループ名を使用することで、アプリケーションごとに一連のビューを作成します。この場合、各アプリケーションで独自のオフセットを使用して読取り位置を決定できます。たとえば、あるアプリケーションで100を指定してORA_KAFKA.INIT_OFFSET
をコールし1つのビュー・セットを使用し、別のアプリケーションで550を指定してORA_KAFKA.INIT_OFFSET
をコールし別のビュー・セットを使用するということもできます。
BEGIN
-- Before entering the loop, we initialize the starting offsets for the view relative to the current Kafka high water mark for the Kafka partition managed by our view.
-- Without an INIT_OFFSET call, records are read from either the beginning of the stream or from the offset last recorded by a COMMIT after an UPDATE_OFFSET call.
ORA_KAFKA.INIT_OFFSET
('KV_MA1_QUERYAPP_SENSOR_0', -- The view for which to initialize offsets
100, -- The number of records before the high water mark that designates the starting offset
ORA_KAFKA.WATER_MARK_HIGH); -- The above record count parameter is 100 records below the high water mark
LOOP
-- Set the offset of the next Kafka record to be processed.
-- Since we have called INIT_OFFSET,
-- the starting offset will be 100 records below the high water mark.
ORA_KAFKA.NEXT_OFFSET
('KV_MA1_QUERYAPP_SENSOR_0'); -- The view for which to set offsets
-- Now query for rows starting at 100 records below the high water mark.
SELECT count(*) from KV_MA1_QUERYAPP_SENSOR_0;
-- Now that we've done a query, record the last offset processed.
ORA_KAFKA.UPDATE_OFFSET
('KV_MA1_QUERYAPP_SENSOR_0); -- The view for which to set offsets
COMMIT;
END LOOP;
END;
/