Cenário: Streaming de Mensagens de uma Fila de Dados Normalizada

Crie uma fila de banco de dados para transmitir dados IoT normalizados.

Depois de configurar o dispositivo e de os dados serem enviados e recebidos, é possível transmitir mensagens. Este exemplo mostra como transmitir mensagens de dados normalizados. Você pode usar uma abordagem semelhante para se inscrever nas filas de dados brutos ou filas de dados rejeitadas. Para obter mais informações, consulte Introduction to Transactional Event Queues and Advanced Queuing.

Antes de começar

O uso de filas de banco de dados requer configuração de uma conexão direta com o banco de dados.

Observação

Período máximo de retenção para filas públicas de 24 horas.

Etapa 1: Adicionar um Assinante

Use este comando do banco de dados SQL para registrar um assinante na fila para receber mensagens. Especifique o nome da fila e um nome de assinante exclusivo para permitir que este assinante ou consumidor desenfileire mensagens.

Para obter mais informações, consulte DBMS_AQADM.ADD_SUBSCRIBER.

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

Etapa 2: Mensagens de Desenfileiramento em Massa

Use este comando de desenfileiramento em massa do banco de dados SQL para processar muitas mensagens de uma só vez. Este exemplo processa ou consome até 30 mensagens por vez para o assinante específico.

Para obter mais informações, consulte DBMS_AQ.DEQUEUE_ARRAY.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

Etapa Opcional: Remover o Assinante

Depois de processar as informações, use este comando do banco de dados SQL para remover o assinante. A remoção do assinante é uma prática recomendada quando o assinante não é mais necessário, evitando a retenção indesejada de mensagens. Para obter mais informações, consulte DBMS_AQADM.REMOVE_SUBSCRIBER.

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/