2.9.4 Create Views to Access CSV Data in a Kafka Topic

In order to query data from a Kafka topic, you first need to create the Oracle SQL Access to Kafka (OSaK) views that map to the Kafka topic.

When the Kafka record format is CSV, before creating the views, you must create a reference table whose schema is mapped to records in the Kafka topic.

Note:

If the Kafka record format is JSON_VARCHAR2, a reference table should be passed as NULL. See Create Views to Access JSON Data in a Kafka Topic.

The following example creates reference table SENSOR_RECORD_SHAPE whose schema maps to records in the Kafka topic ‘sensor’:

CREATE TABLE sensor_record_shape(
 msg_number            INTEGER PRIMARY KEY,
 msg_timestamp         TIMESTAMP,
 sensor_type_id        INTEGER,
 sensor_unit_id        INTEGER,
 temperature_setting   NUMBER(6,3),
 temperature_reading   NUMBER(6,3)
 );
Now that the reference table has been created, we can create the OSaK views. The following example creates one view per partition in the topic ‘sensor’ where the record format is CSV. Since the 'sensor' topic has one partition, one view is created.
DECLARE
    views_created INTEGER;
    application_id VARCHAR2(128);
  BEGIN
    ORA_KAFKA.CREATE_VIEWS
      ('MA1',                        -- The name of the cluster (specified in ORA_KAFKA.REGISTER_CLUSTER)
       'QUERYAPP',                   -- The name given by the user for a set of views, corresponds to the concept of a Kafka group
       'sensor',                     -- The name of the Kafka topic
       'CSV',                        -- The format of the topic record
       'SENSOR_RECORD_SHAPE',        -- The name of the database reference table
       views_created,                -- Output: number of views created
       application_id);              -- Output: the application id of the set of views                                                   
                                     -- created that uniquely identifies the view objects                                                  
    dbms_output.put_line(‘views created = ‘ || views_created);
    dbms_output.put_line(‘application id = ‘ || application_id);
  END;
/
The above example causes the following view to be created:
SQL> describe KV_MA1_QUERYAPP_SENSOR_0;
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 KAFKA$PARTITION                                    NUMBER(38)
 KAFKA$OFFSET                                       NUMBER(38)
 KAFKA$EPOCH_TIMESTAMP                              NUMBER(38)
 MSG_NUMBER                                NOT NULL NUMBER
 MSG_TIMESTAMP                                      TIMESTAMP(6)
 SENSOR_TYPE_ID                                     NUMBER
 SENSOR_UNIT_ID                                     NUMBER
 TEMPERATURE_SETTING                                NUMBER(6,3)
 TEMPERATURE_READING                                NUMBER(6,3)

Where KAFKA$PARTITION is the Kafka partition id, KAFKA$OFFSET is the offset of the Kafka record, KAFKA$EPOCH_TIMESTAMP is the timestamp of the Kafka record. The remaining columns represent the fields in the CSV data.