3.9.9 Oracle Databaseに格納された表へのKafkaデータのロード
Oracle SQL access to Kafka (OSaK)のORA_KAFKA.LOAD_TABLE
プロシージャは、Kafkaトピックからデータベース表にデータをロードします。ORA_KAFKA.LOAD_TABLE
は、内部で使用されるビューを作成し、Kafkaトピックのすべてのパーティションにマップします。
このビューは、ORA_KAFKA.LOAD_TABLE
の実行の最後に削除されません。つまり、次回に、前回と同じクラスタ、グループおよびトピックの引数を指定してORA_KAFKA.LOAD_TABLE
をコールした場合には、同じビューを使用して、前回ORA_KAFKA.LOAD_TABLE
が中断した位置からロードが開始されるということです。
Kafkaデータをデータベースに連続的にロードするには、ORA_KAFKA.LOAD_TABLE
プロシージャをループでコールします。
例1:
次の例では、sensorトピックからOracleデータベース表sensortabにデータをロードするORA_KAFKA.LOAD_TABLE
プロシージャを1回コールします。
ノート:
クラスタ名、グループ名およびトピック名は、それぞれ30文字以内にする必要があります。DECLARE
num_records_loaded INTEGER;
BEGIN
ORA_KAFKA.LOAD_TABLE
('MA1', -- The name of the cluster
'LOADAPP', -- The name of the Kafka group
'sensor', -- The name of the topic
'CSV', -- The format of the Kafka record
'sensortab', -- The name of the target table in Oracle.
-- This table must reflect the shape of the rows
-- retrieved from Kafka
num_records_loaded); -- The number of Kafka records loaded
dbms_output.put_line(‘Kafka records loaded = ‘ || num_records_loaded);
COMMIT;
END;
/
例2:
次の例では、sensorjトピックからOracleデータベース表
sensortab_json
にJSONデータをロードするORA_KAFKA.LOAD_TABLE
プロシージャを1回コールをします。ここでのsensortab_json
表は次のように定義されています。CREATE TABLE sensortab_json (KEY VARCHAR2(32767), VALUE VARCHAR2(32767));
ロード後、JSONデータを格納するOracle表内の任意の列と同様に、VALUE列を問い合せることができます。なお、このJSONデータ・レコードのサイズの上限は32767となります。
DECLARE
num_records_loaded INTEGER;
BEGIN
ORA_KAFKA.LOAD_TABLE
('MA1', -- The name of the cluster
'LOADAPP', -- The name of the Kafka group
'sensorj', -- The name of the topic
‘JSON_VARCHAR2’, -- The format of the Kafka record
'sensortab_json', -- The name of the target table in Oracle. This
-- table must reflect the shape of the rows
-- retrieved from Kafka
num_records_loaded); -- The number of Kafka records loaded
dbms_output.put_line(‘Kafka records loaded = ' || num_records_loaded;
COMMIT;
END;
/