2.4.4 Kafkaトピック内のCSVデータにアクセスするためのビューの作成
Kafkaトピック内のデータを問い合せるには、まず、KafkaトピックにマップするOracle SQL Access to Kafka (OSaK)ビューを作成する必要があります。
Kafkaのレコード形式がCSVである場合は、ビューを作成する前に、スキーマがKafkaトピック内のレコードにマップされている参照表を作成する必要があります。
注意:
Kafkaのレコード形式がJSON_VARCHAR2
である場合は、参照表をNULLとして渡す必要があります。Kafkaトピック内のJSONデータにアクセスするためのビューの作成を参照してください。
次の例では、スキーマがKafkaトピックsensor内のレコードにマップされている参照表SENSOR_RECORD_SHAPE
を作成します。
CREATE TABLE sensor_record_shape(
msg_number INTEGER PRIMARY KEY,
msg_timestamp TIMESTAMP,
sensor_type_id INTEGER,
sensor_unit_id INTEGER,
temperature_setting NUMBER(6,3),
temperature_reading NUMBER(6,3)
);
参照表が作成されたので、OSaKビューを作成できます。次の例では、レコード形式がCSVであるトピックsensor内のパーティションごとに1つのビューを作成します。sensorトピックには1つのパーティションがあるため、1つのビューが作成されます。
DECLARE
views_created INTEGER;
application_id VARCHAR2(128);
BEGIN
ORA_KAFKA.CREATE_VIEWS
('MA1', -- The name of the cluster (specified in ORA_KAFKA.REGISTER_CLUSTER)
'QUERYAPP', -- The name given by the user for a set of views, corresponds to the concept of a Kafka group
'sensor', -- The name of the Kafka topic
'CSV', -- The format of the topic record
'SENSOR_RECORD_SHAPE', -- The name of the database reference table
views_created, -- Output: number of views created
application_id); -- Output: the application id of the set of views
-- created that uniquely identifies the view objects
dbms_output.put_line(‘views created = ‘ || views_created);
dbms_output.put_line(‘application id = ‘ || application_id);
END;
/
前述の例により、次のビューが作成されます。
SQL> describe KV_MA1_QUERYAPP_SENSOR_0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA$PARTITION NUMBER(38)
KAFKA$OFFSET NUMBER(38)
KAFKA$EPOCH_TIMESTAMP NUMBER(38)
MSG_NUMBER NOT NULL NUMBER
MSG_TIMESTAMP TIMESTAMP(6)
SENSOR_TYPE_ID NUMBER
SENSOR_UNIT_ID NUMBER
TEMPERATURE_SETTING NUMBER(6,3)
TEMPERATURE_READING NUMBER(6,3)
ここでは、KAFKA$PARTITION
はKafkaパーティションID、KAFKA$OFFSET
はKafkaレコードのオフセット、KAFKA$EPOCH_TIMESTAMP
はKafkaレコードのタイムスタンプです。残りの列は、CSVデータ内のフィールドを表します。