Scénario : transmission en continu de messages à partir d'une file d'attente de données normalisée

Créez une file d'attente de base de données pour diffuser les données IoT normalisées.

Une fois que vous avez configuré votre appareil et que les données sont en cours d'envoi et de réception, vous pouvez diffuser des messages. Cet exemple montre comment diffuser des messages à partir de données normalisées. Vous pouvez utiliser une approche similaire pour vous abonner aux données brutes ou aux files d'attente de données rejetées. Pour plus d'informations, voir Introduction aux files d'attente d'événements transactionnels et à la file d'attente avancée.

Avant de débuter

L'utilisation de files d'attente de base de données nécessite la configuration d'une connexion de base de données directe.

Remarque

Période de conservation maximale pour les files d'attente publiques de 24 heures.

Etape 1 : Ajouter un abonné

Utilisez cette commande de base de données SQL pour inscrire un abonné dans la file d'attente afin de recevoir des messages. Indiquez le nom de la file d'attente et un nom d'abonné unique pour permettre à cet abonné ou destinataire de retirer des messages de la file d'attente.

Pour plus d'informations, reportez-vous à DBMS_AQADM.ADD_SUBSCRIBER.

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

Etape 2 : Messages de retrait en masse de la file d'attente

Utilisez cette commande de sortie en masse de la file d'attente SQL pour traiter plusieurs messages à la fois. Cet exemple traite ou consomme jusqu'à 30 messages à la fois pour l'abonné spécifique.

Pour plus d'informations, reportez-vous à DBMS_AQ.DEQUEUE_ARRAY.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

Etape facultative : suppression de l'abonné

Après avoir traité les informations, utilisez cette commande de base de données SQL pour enlever l'abonné. La suppression de l'abonné est une bonne pratique lorsque l'abonné n'est plus nécessaire, ce qui empêche la conservation des messages indésirables. Pour plus d'informations, reportez-vous à DBMS_AQADM.REMOVE_SUBSCRIBER.

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/