2.9.6 Query Kafka Data as Continuous Stream

After creating the Oracle SQL access to Kafka (OSaK) views, the views can be queried using standard SQL. The view name consists of a generated application_id (which is the concatenation of cluster name, Kafka group name, topic name) concatenated with view_id.

The sensor topic described above has just one partition, and therefore one view. The view name would be “KV_MA1_QUERYAPP_SENSOR_0”. Note, view names can be identified by querying the ORA_KAFKA_PARTITION metadata table in the schema in which the ORA_KAFKA package was installed.

OSaK views can be accessed continuously, reading from an initial offset or timestamp to the end of the stream. This is the typical usage case for querying the most recent Kafka records for the specified topic.

The example below sets the starting offset to 100 records below the Kafka partition high water mark using the ORA_KAFKA.INIT_OFFSET procedure, and reads from there to the end of the stream. The next time through the loop, you read from where you left off last time to the new end of the stream. In the example, the analytics are reduced to a simple example doing a count(*) with LOOP logic in PL/SQL. Expected usage is: there is a loop within an application which executes a call to ORA_KAFKA.NEXT_OFFSET, queries the OSaK views, performs analytics on retrieved Kafka records, and if satisfied, calls ORA_KAFKA.UPDATE_OFFSET, and commits the transaction. The ORA_KAFKA.NEXT_OFFSET procedure records the next Kafka offset from which to read, based on the results of ORA_KAFKA.UPDATE_OFFSET or ORA_KAFKA.INIT_OFFSET/ORA_KAFKA.INIT_OFFSET_TS. ORA_KAFKA.UPDATE_OFFSET saved the last Kafka offset read when the view was accessed.

It is also possible to set the point from which to start reading to a particular timestamp. ORA_KAFKA.INIT_OFFSET_TS initializes the starting offset related to a timestamp for each Kafka partition belonging to the OSaK view. As with ORA_KAFKA.INIT_OFFSET, ORA_KAFKA.INIT_OFFSET_TS would normally be called at the outset of a new application instance dedicated to processing the view or when recovering after an application instance shutdown or failure.

Note:

Multiple applications reading the same set of OSaK views can result in duplicate Kafka records being processed or Kafka records being skipped, because each application will attempt to manage the offset. When multiple applications read the same topic, create a set of views for each application by using application-specific Kafka group names. Then each application can use their own offset to determine where to read. One application can call ORA_KAFKA.INIT_OFFSET with 100 and use one set of views, another application can call ORA_KAFKA.INIT_OFFSET with 550 and use another set of views, and so on.

BEGIN
   -- Before entering the loop, we initialize the starting offsets for the view relative to the current Kafka high water mark for the Kafka partition managed by our view.
   -- Without an INIT_OFFSET call, records are read from either the beginning of the stream or from the offset last recorded by a COMMIT after an UPDATE_OFFSET call.
   ORA_KAFKA.INIT_OFFSET
     ('KV_MA1_QUERYAPP_SENSOR_0',          -- The view for which to initialize offsets
      100,                                 -- The number of records before the high water mark that designates the starting offset
      ORA_KAFKA.WATER_MARK_HIGH);          -- The above record count parameter is 100 records below the high water mark
   LOOP

    -- Set the offset of the next Kafka record to be processed.  
    -- Since we have called INIT_OFFSET,
    -- the starting offset will be 100 records below the high water mark.
    ORA_KAFKA.NEXT_OFFSET
      ('KV_MA1_QUERYAPP_SENSOR_0');       -- The view for which to set offsets  

    -- Now query for rows starting at 100 records below the high water mark.
    SELECT count(*) from KV_MA1_QUERYAPP_SENSOR_0;

    -- Now that we've done a query, record the last offset processed.
    ORA_KAFKA.UPDATE_OFFSET
      ('KV_MA1_QUERYAPP_SENSOR_0);        -- The view for which to set offsets

    COMMIT;
  END LOOP;
END;
/