Scenario: Streaming Messages from a Normalized Data Queue

Create a database queue to stream normalized IoT data.

After you set up your device and data is sending and receiving, then you can stream messages. This example shows how to stream messages from normalized data. You can use a similar approach to subscribe to the raw data or rejected data queues. For more information, see Introduction to Transactional Event Queues and Advanced Queuing.

Before you begin

Using database queues requires configuring a direct database connection.

Note

Maximum retention period for public queues 24 hours.

Step 1: Add a Subscriber

Use this SQL database command to register a subscriber on the queue to receiving messages. Specify the queue name and a unique subscriber name to allow this subscriber or consumer to dequeue messages.

For more information, see DBMS_AQADM.ADD_SUBSCRIBER.

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

Step 2: Bulk Dequeue Messages

Use this bulk dequeuing SQL database command to process many messages at once. This example processes or consumes up to 30 messages at a time for the specific subscriber.

For more information, see DBMS_AQ.DEQUEUE_ARRAY.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

Optional Step: Remove the Subscriber

After processing the information, use this SQL database command to remove the subscriber. Removing the subscriber is a best practice when the subscriber is no longer needed, preventing unwanted message retention. For more information, see DBMS_AQADM.REMOVE_SUBSCRIBER.

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/

Additional Resources

Java Samples: Use this sample Java Maven project on GitHub to access OCI IoT domain data using direct database queries to count raw database records or by subscribing to queues to stream raw and normalized messages in real time:
  • SampleDataAccess: Connect directly to the IoT domain database, get the IoT domain's database connection details, using an IAM database token authentication, and query <domainShortId>__IOT.RAW_DATA.
  • SampleDataStreaming: Subscribe to the public RAW_DATA_IN and NORMALIZED_DATA TxEventQ queues, so that you can filter by digital twin instance OCID, print messages, and remove the subscribers it created.

MCP Server: Use this IoT MCP server on GitHub to manage IoT Resources.