Escenario: transmisión de mensajes desde una cola de datos normalizada
Cree una cola de base de datos para transmitir los datos IoT normalizados.
- Datos brutos de entrada: captura los datos IoT entrantes iniciales.
- Datos rechazados en: almacena los datos que han fallado en la validación o el procesamiento.
- Datos normalizados: almacena los datos IoT procesados y estandarizados listos para su uso descendente.
Antes de empezar
El uso de colas de base de datos necesita configurar una conexión de base de datos directa.
Período máximo de retención para colas públicas de 24 horas.
Paso 1: Agregar un Suscriptor
Utilice este comando de base de datos SQL para registrar un suscriptor en la cola para recibir mensajes. Especifique el nombre de cola y un nombre de suscriptor único para permitir que este suscriptor o consumidor elimine los mensajes de la cola.
Para obtener más información, consulte DBMS_AQADM.ADD_SUBSCRIBER.
BEGIN
dbms_aqadm.add_subscriber(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
END;
/Paso 2: Eliminación masiva de mensajes de cola
Utilice este comando de eliminación de cola en bloque de la base de datos SQL para procesar muchos mensajes a la vez. En este ejemplo, se procesan o consumen hasta 30 mensajes a la vez para el suscriptor específico.
Para obtener más información, consulte DBMS_AQ.DEQUEUE_ARRAY.
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_array_t;
l_normalized_data_records JSON;
msgid_array dbms_aq.msgid_array_t;
retval PLS_INTEGER;
l_json_size NUMBER;
BEGIN
dequeue_options.consumer_name := 'test_public_subscriber'; -- Filter messages for this subscriber
dequeue_options.wait := 10; -- Wait up to 10 seconds for messages
dequeue_options.dequeue_mode := dbms_aq.remove; -- Remove after dequeueing
dequeue_options.navigation := dbms_aq.first_message; -- Get oldest message first
retval := dbms_aq.dequeue_array(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
dequeue_options => dequeue_options,
array_size => 30, -- Try to fetch up to 30 messages at once
message_properties_array => message_properties,
payload_array => l_normalized_data_records, -- The JSON data
msgid_array => msgid_array
);
l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
dbms_output.put_line('Successfully dequeued! ' || l_json_size); -- Output number of messages
COMMIT;
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('Error Code: ' || sqlcode);
dbms_output.put_line('Error Message: ' || sqlerrm);
ROLLBACK;
raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/Paso opcional: eliminar el suscriptor
Después de procesar la información, utilice este comando de la base de datos SQL para eliminar el suscriptor. La eliminación del suscriptor es una práctica recomendada cuando el suscriptor ya no es necesario, lo que evita la retención de mensajes no deseados. Para obtener más información, consulte DBMS_AQADM.REMOVE_SUBSCRIBER.
BEGIN
DBMS_AQADM.REMOVE_SUBSCRIBER (
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
NULL; -- Ignore errors (in case the subscriber wasn't present)
END;
/