3.9.9 Oracle Databaseに格納された表へのKafkaデータのロード

Oracle SQL access to Kafka (OSaK)のORA_KAFKA.LOAD_TABLEプロシージャは、Kafkaトピックからデータベース表にデータをロードします。ORA_KAFKA.LOAD_TABLEは、内部で使用されるビューを作成し、Kafkaトピックのすべてのパーティションにマップします。

このビューは、ORA_KAFKA.LOAD_TABLEの実行の最後に削除されません。つまり、次回に、前回と同じクラスタ、グループおよびトピックの引数を指定してORA_KAFKA.LOAD_TABLEをコールした場合には、同じビューを使用して、前回ORA_KAFKA.LOAD_TABLEが中断した位置からロードが開始されるということです。

Kafkaデータをデータベースに連続的にロードするには、ORA_KAFKA.LOAD_TABLEプロシージャをループでコールします。

例1:

次の例では、sensorトピックからOracleデータベース表sensortabにデータをロードするORA_KAFKA.LOAD_TABLEプロシージャを1回コールします。

ノート:

クラスタ名、グループ名およびトピック名は、それぞれ30文字以内にする必要があります。
DECLARE
  num_records_loaded INTEGER;
BEGIN
  ORA_KAFKA.LOAD_TABLE
    ('MA1',                   -- The name of the cluster
    'LOADAPP',                -- The name of the Kafka group
    'sensor',                 -- The name of the topic
    'CSV',                    -- The format of the Kafka record
    'sensortab',              -- The name of the target table in Oracle.
                              -- This table must reflect the shape of the rows
                              -- retrieved from Kafka
    num_records_loaded);      -- The number of Kafka records loaded
  dbms_output.put_line(‘Kafka records loaded = ‘ || num_records_loaded);
  COMMIT;
END;
/
例2:
次の例では、sensorjトピックからOracleデータベース表sensortab_jsonにJSONデータをロードするORA_KAFKA.LOAD_TABLEプロシージャを1回コールをします。ここでのsensortab_json表は次のように定義されています。
CREATE TABLE sensortab_json (KEY VARCHAR2(32767), VALUE VARCHAR2(32767));

ロード後、JSONデータを格納するOracle表内の任意の列と同様に、VALUE列を問い合せることができます。なお、このJSONデータ・レコードのサイズの上限は32767となります。

DECLARE
        num_records_loaded INTEGER;
BEGIN
        ORA_KAFKA.LOAD_TABLE
                ('MA1', -- The name of the cluster
                'LOADAPP', -- The name of the Kafka group
                'sensorj', -- The name of the topic
                ‘JSON_VARCHAR2’, -- The format of the Kafka record
                'sensortab_json', -- The name of the target table in Oracle. This
                                     -- table must reflect the shape of the rows
                                     -- retrieved from Kafka
                num_records_loaded); -- The number of Kafka records loaded

        dbms_output.put_line(‘Kafka records loaded = ' || num_records_loaded;

COMMIT;
END;
/