Scenario: streaming di messaggi da una coda dati normalizzata

Creare una coda di database per lo streaming dei dati IoT normalizzati.

Dopo aver impostato il dispositivo e aver inviato e ricevuto i dati, è possibile eseguire lo streaming dei messaggi. Questo esempio mostra come eseguire lo streaming dei messaggi dai dati normalizzati. È possibile utilizzare un approccio simile per eseguire la sottoscrizione ai dati raw o alle coda dati rifiutate. Per ulteriori informazioni, vedere Introduzione alle code degli eventi transazionali e all'accodamento avanzato.
  • Raw Data In: acquisisce i dati IoT iniziali in entrata.
  • Dati rifiutati in: memorizza i dati che non hanno superato la convalida o l'elaborazione.
  • Dati normalizzati: memorizza i dati IoT elaborati e standardizzati pronti per l'uso a valle.

Prima di iniziare

L'uso delle code di database richiede la configurazione di una connessione diretta al database.

Nota

Periodo di conservazione massimo per le code pubbliche 24 ore.

Passo 1: aggiungere un sottoscrittore

Utilizzare questo comando del database SQL per registrare un sottoscrittore nella coda per la ricezione dei messaggi. Specificare il nome della coda e un nome di sottoscrittore univoco per consentire a questo sottoscrittore o consumatore di rimuovere dalla coda i messaggi.

Per ulteriori informazioni, vedere DBMS_AQADM.ADD_SUBSCRIBER.

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

Passo 2: Rimozione di massa dei messaggi

Utilizzare questo comando di rimozione dalla coda di massa del database SQL per elaborare più messaggi contemporaneamente. Questo esempio elabora o consuma fino a 30 messaggi alla volta per il sottoscrittore specifico.

Per ulteriori informazioni, vedere DBMS_AQ.DEQUEUE_ARRAY.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

Passo facoltativo: Rimuovere l'iscritto

Dopo aver elaborato le informazioni, utilizzare questo comando del database SQL per rimuovere il sottoscrittore. La rimozione dell'abbonato è una procedura ottimale quando l'abbonato non è più necessario, impedendo la conservazione indesiderata dei messaggi. Per ulteriori informazioni, vedere DBMS_AQADM.REMOVE_SUBSCRIBER.

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/