Escenario: transmisión de mensajes desde una cola de datos normalizada

Cree una cola de base de datos para transmitir los datos IoT normalizados.

Después de configurar el dispositivo y de enviar y recibir datos, puede transmitir mensajes. En este ejemplo se muestra cómo transmitir mensajes de datos normalizados. Puede utilizar un enfoque similar para suscribirse a las colas de datos raw o las colas de datos rechazadas. Para obtener más información, consulte Introducción a las colas de eventos transaccionales y Advanced Queuing.

Antes de empezar

El uso de colas de base de datos necesita configurar una conexión de base de datos directa.

Nota

Período máximo de retención para colas públicas de 24 horas.

Paso 1: Agregar un Suscriptor

Utilice este comando de base de datos SQL para registrar un suscriptor en la cola para recibir mensajes. Especifique el nombre de cola y un nombre de suscriptor único para permitir que este suscriptor o consumidor elimine los mensajes de la cola.

Para obtener más información, consulte DBMS_AQADM.ADD_SUBSCRIBER.

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

Paso 2: Eliminación masiva de mensajes de cola

Utilice este comando de eliminación de cola en bloque de la base de datos SQL para procesar muchos mensajes a la vez. En este ejemplo, se procesan o consumen hasta 30 mensajes a la vez para el suscriptor específico.

Para obtener más información, consulte DBMS_AQ.DEQUEUE_ARRAY.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

Paso opcional: eliminar el suscriptor

Después de procesar la información, utilice este comando de la base de datos SQL para eliminar el suscriptor. La eliminación del suscriptor es una práctica recomendada cuando el suscriptor ya no es necesario, lo que evita la retención de mensajes no deseados. Para obtener más información, consulte DBMS_AQADM.REMOVE_SUBSCRIBER.

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/