2.4.4 Kafkaトピック内のCSVデータにアクセスするためのビューの作成

Kafkaトピック内のデータを問い合せるには、まず、KafkaトピックにマップするOracle SQL Access to Kafka (OSaK)ビューを作成する必要があります。

Kafkaのレコード形式がCSVである場合は、ビューを作成する前に、スキーマがKafkaトピック内のレコードにマップされている参照表を作成する必要があります。

注意:

Kafkaのレコード形式がJSON_VARCHAR2である場合は、参照表をNULLとして渡す必要があります。Kafkaトピック内のJSONデータにアクセスするためのビューの作成を参照してください。

次の例では、スキーマがKafkaトピックsensor内のレコードにマップされている参照表SENSOR_RECORD_SHAPEを作成します。

CREATE TABLE sensor_record_shape(
 msg_number            INTEGER PRIMARY KEY,
 msg_timestamp         TIMESTAMP,
 sensor_type_id        INTEGER,
 sensor_unit_id        INTEGER,
 temperature_setting   NUMBER(6,3),
 temperature_reading   NUMBER(6,3)
 );
参照表が作成されたので、OSaKビューを作成できます。次の例では、レコード形式がCSVであるトピックsensor内のパーティションごとに1つのビューを作成します。sensorトピックには1つのパーティションがあるため、1つのビューが作成されます。
DECLARE
    views_created INTEGER;
    application_id VARCHAR2(128);
  BEGIN
    ORA_KAFKA.CREATE_VIEWS
      ('MA1',                        -- The name of the cluster (specified in ORA_KAFKA.REGISTER_CLUSTER)
       'QUERYAPP',                   -- The name given by the user for a set of views, corresponds to the concept of a Kafka group
       'sensor',                     -- The name of the Kafka topic
       'CSV',                        -- The format of the topic record
       'SENSOR_RECORD_SHAPE',        -- The name of the database reference table
       views_created,                -- Output: number of views created
       application_id);              -- Output: the application id of the set of views                                                   
                                     -- created that uniquely identifies the view objects                                                  
    dbms_output.put_line(‘views created = ‘ || views_created);
    dbms_output.put_line(‘application id = ‘ || application_id);
  END;
/
前述の例により、次のビューが作成されます。
SQL> describe KV_MA1_QUERYAPP_SENSOR_0;
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 KAFKA$PARTITION                                    NUMBER(38)
 KAFKA$OFFSET                                       NUMBER(38)
 KAFKA$EPOCH_TIMESTAMP                              NUMBER(38)
 MSG_NUMBER                                NOT NULL NUMBER
 MSG_TIMESTAMP                                      TIMESTAMP(6)
 SENSOR_TYPE_ID                                     NUMBER
 SENSOR_UNIT_ID                                     NUMBER
 TEMPERATURE_SETTING                                NUMBER(6,3)
 TEMPERATURE_READING                                NUMBER(6,3)

ここでは、KAFKA$PARTITIONはKafkaパーティションID、KAFKA$OFFSETはKafkaレコードのオフセット、KAFKA$EPOCH_TIMESTAMPはKafkaレコードのタイムスタンプです。残りの列は、CSVデータ内のフィールドを表します。