Scénario : Messages de diffusion en continu à partir d'une file d'attente de données normalisée

Créez une file d'attente de base de données pour diffuser les données IoT normalisées.

Une fois que vous avez configuré votre appareil et que les données sont en cours d'envoi et de réception, vous pouvez diffuser des messages en continu. Cet exemple montre comment diffuser des messages à partir de données normalisées. Vous pouvez utiliser une approche similaire pour vous abonner aux données brutes ou aux files d'attente de données rejetées. Pour plus d'informations, voir Présentation des files d'attente d'événements transactionnels et de la mise en file d'attente avancée.

Avant de commencer

L'utilisation des files d'attente de base de données nécessite la configuration d'une connexion directe à la base de données.

Note

Période de conservation maximale pour les files d'attente publiques 24 heures.

Étape 1 : Ajouter un abonné

Utilisez cette commande de base de données SQL pour enregistrer un abonné dans la file d'attente pour recevoir des messages. Spécifiez le nom de la file d'attente et un nom d'abonné unique pour permettre à cet abonné ou consommateur de retirer les messages de la file d'attente.

Pour plus de renseignements, consultez la page DBMS_AQADM.ADD_SUBSCRIBER.

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

Étape 2 : Messages de sortie de file d'attente en masse

Utilisez cette commande de sortie en masse de la base de données SQL pour traiter plusieurs messages à la fois. Cet exemple traite ou consomme jusqu'à 30 messages à la fois pour l'abonné spécifique.

Pour plus de renseignements, consultez la page DBMS_AQ.DEQUEUE_ARRAY.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

Étape facultative : Supprimer l'abonné

Après avoir traité les informations, utilisez cette commande de base de données SQL pour supprimer l'abonné. La suppression de l'abonné est une bonne pratique lorsque l'abonné n'est plus nécessaire, ce qui empêche la conservation des messages indésirables. Pour plus de renseignements, consultez la page DBMS_AQADM.REMOVE_SUBSCRIBER.

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/