3.9.4 Kafkaトピック内のCSVデータにアクセスするためのビューの作成
Kafkaトピック内のデータを問い合せるには、まず、KafkaトピックにマップするOracle SQL Access to Kafka (OSaK)ビューを作成する必要があります。
Kafkaのレコード形式がCSVである場合は、ビューを作成する前に、スキーマがKafkaトピック内のレコードにマップされている参照表を作成する必要があります。
ノート:
Kafkaのレコード形式がJSON_VARCHAR2
である場合は、参照表をNULLとして渡す必要があります。Kafkaトピック内のJSONデータにアクセスするためのビューの作成を参照してください。
その参照表は、そのCSVデータの形状を説明するものであり、そのデータからOSAK外部表およびビューを作成するために使用されます。参照表内の列の順序が、Kafkaレコードのキーおよび値での、フィールドの順序と一致する必要があります。Kafkaレコードのキーは、必ずNULLであるか、必ずNULL以外である必要があります。キーがNULLでない場合は、キー内のフィールドが、参照表内の最初の列である必要があります。Kafkaレコードの値内のフィールドが、参照表内の残りの列となります。
次の例では、スキーマがKafkaトピックsensor内のレコードにマップされている参照表SENSOR_RECORD_SHAPE
を作成します。
CREATE TABLE sensor_record_shape(
msg_number INTEGER PRIMARY KEY,
msg_timestamp TIMESTAMP,
sensor_type_id INTEGER,
sensor_unit_id INTEGER,
temperature_setting NUMBER(6,3),
temperature_reading NUMBER(6,3)
);
参照表が作成されたので、OSaKビューを作成できます。次の例では、レコード形式がCSVであるトピックsensor内のパーティションごとに1つのビューを作成します。sensorトピックには1つのパーティションがあるため、1つのビューが作成されます。
ノート:
クラスタ名、グループ名およびトピック名は、それぞれ30文字以内にする必要があります。DECLARE
views_created INTEGER;
application_id VARCHAR2(128);
BEGIN
ORA_KAFKA.CREATE_VIEWS
('MA1', -- The name of the cluster (specified in ORA_KAFKA.REGISTER_CLUSTER)
'QUERYAPP', -- The name given by the user for a set of views, corresponds to the concept of a Kafka group
'sensor', -- The name of the Kafka topic
'CSV', -- The format of the topic record
'SENSOR_RECORD_SHAPE', -- The name of the database reference table
views_created, -- Output: number of views created
application_id); -- Output: the application id of the set of views
-- created that uniquely identifies the view objects
dbms_output.put_line(‘views created = ‘ || views_created);
dbms_output.put_line(‘application id = ‘ || application_id);
END;
/
前述の例により、次のビューが作成されます。
SQL> describe KV_MA1_QUERYAPP_SENSOR_0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA$PARTITION NUMBER(38)
KAFKA$OFFSET NUMBER(38)
KAFKA$EPOCH_TIMESTAMP NUMBER(38)
MSG_NUMBER NOT NULL NUMBER
MSG_TIMESTAMP TIMESTAMP(6)
SENSOR_TYPE_ID NUMBER
SENSOR_UNIT_ID NUMBER
TEMPERATURE_SETTING NUMBER(6,3)
TEMPERATURE_READING NUMBER(6,3)
ここでは、KAFKA$PARTITION
はKafkaパーティションID、KAFKA$OFFSET
はKafkaレコードのオフセット、KAFKA$EPOCH_TIMESTAMP
はKafkaレコードのタイムスタンプです。残りの列は、CSVデータ内のフィールドを表します。