Cenário: Streaming de Mensagens de uma Fila de Dados Normalizada
Crie uma fila de banco de dados para transmitir dados IoT normalizados.
- Dados Brutos de Entrada: Captura os dados iniciais de entrada IoT.
- Dados Rejeitados em: Armazena dados que falharam na validação ou no processamento.
- Dados Normalizados: Armazena dados IoT processados e padronizados prontos para uso downstream.
Antes de começar
O uso de filas de banco de dados requer configuração de uma conexão direta com o banco de dados.
Período máximo de retenção para filas públicas de 24 horas.
Etapa 1: Adicionar um Assinante
Use este comando do banco de dados SQL para registrar um assinante na fila para receber mensagens. Especifique o nome da fila e um nome de assinante exclusivo para permitir que este assinante ou consumidor desenfileire mensagens.
Para obter mais informações, consulte DBMS_AQADM.ADD_SUBSCRIBER.
BEGIN
dbms_aqadm.add_subscriber(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
END;
/Etapa 2: Mensagens de Desenfileiramento em Massa
Use este comando de desenfileiramento em massa do banco de dados SQL para processar muitas mensagens de uma só vez. Este exemplo processa ou consome até 30 mensagens por vez para o assinante específico.
Para obter mais informações, consulte DBMS_AQ.DEQUEUE_ARRAY.
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_array_t;
l_normalized_data_records JSON;
msgid_array dbms_aq.msgid_array_t;
retval PLS_INTEGER;
l_json_size NUMBER;
BEGIN
dequeue_options.consumer_name := 'test_public_subscriber'; -- Filter messages for this subscriber
dequeue_options.wait := 10; -- Wait up to 10 seconds for messages
dequeue_options.dequeue_mode := dbms_aq.remove; -- Remove after dequeueing
dequeue_options.navigation := dbms_aq.first_message; -- Get oldest message first
retval := dbms_aq.dequeue_array(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
dequeue_options => dequeue_options,
array_size => 30, -- Try to fetch up to 30 messages at once
message_properties_array => message_properties,
payload_array => l_normalized_data_records, -- The JSON data
msgid_array => msgid_array
);
l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
dbms_output.put_line('Successfully dequeued! ' || l_json_size); -- Output number of messages
COMMIT;
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('Error Code: ' || sqlcode);
dbms_output.put_line('Error Message: ' || sqlerrm);
ROLLBACK;
raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/Etapa Opcional: Remover o Assinante
Depois de processar as informações, use este comando do banco de dados SQL para remover o assinante. A remoção do assinante é uma prática recomendada quando o assinante não é mais necessário, evitando a retenção indesejada de mensagens. Para obter mais informações, consulte DBMS_AQADM.REMOVE_SUBSCRIBER.
BEGIN
DBMS_AQADM.REMOVE_SUBSCRIBER (
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
NULL; -- Ignore errors (in case the subscriber wasn't present)
END;
/