2.9.4 Create Views to Access CSV Data in a Kafka Topic
In order to query data from a Kafka topic, you first need to create the Oracle SQL Access to Kafka (OSaK) views that map to the Kafka topic.
When the Kafka record format is CSV, before creating the views, you must create a
reference table whose schema is mapped to records in the Kafka topic.
Note:
If the Kafka record format isJSON_VARCHAR2
, a reference table should be passed as NULL. See
Create Views to Access JSON Data in a Kafka Topic.
The following example creates reference table SENSOR_RECORD_SHAPE
whose
schema maps to records in the Kafka topic ‘sensor’:
CREATE TABLE sensor_record_shape(
msg_number INTEGER PRIMARY KEY,
msg_timestamp TIMESTAMP,
sensor_type_id INTEGER,
sensor_unit_id INTEGER,
temperature_setting NUMBER(6,3),
temperature_reading NUMBER(6,3)
);
Now that the reference table has been created, we can create the OSaK views.
The following example creates one view per partition in the topic ‘sensor’ where the
record format is CSV. Since the 'sensor' topic has one partition, one view is
created.
DECLARE
views_created INTEGER;
application_id VARCHAR2(128);
BEGIN
ORA_KAFKA.CREATE_VIEWS
('MA1', -- The name of the cluster (specified in ORA_KAFKA.REGISTER_CLUSTER)
'QUERYAPP', -- The name given by the user for a set of views, corresponds to the concept of a Kafka group
'sensor', -- The name of the Kafka topic
'CSV', -- The format of the topic record
'SENSOR_RECORD_SHAPE', -- The name of the database reference table
views_created, -- Output: number of views created
application_id); -- Output: the application id of the set of views
-- created that uniquely identifies the view objects
dbms_output.put_line(‘views created = ‘ || views_created);
dbms_output.put_line(‘application id = ‘ || application_id);
END;
/
The above example causes the following view to be
created:
SQL> describe KV_MA1_QUERYAPP_SENSOR_0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA$PARTITION NUMBER(38)
KAFKA$OFFSET NUMBER(38)
KAFKA$EPOCH_TIMESTAMP NUMBER(38)
MSG_NUMBER NOT NULL NUMBER
MSG_TIMESTAMP TIMESTAMP(6)
SENSOR_TYPE_ID NUMBER
SENSOR_UNIT_ID NUMBER
TEMPERATURE_SETTING NUMBER(6,3)
TEMPERATURE_READING NUMBER(6,3)
Where KAFKA$PARTITION
is the Kafka partition id,
KAFKA$OFFSET
is the offset of the Kafka record,
KAFKA$EPOCH_TIMESTAMP
is the timestamp of the Kafka record. The
remaining columns represent the fields in the CSV data.