Scénario : transmission en continu de messages à partir d'une file d'attente de données normalisée
Créez une file d'attente de base de données pour diffuser les données IoT normalisées.
- Données brutes entrantes : capture les données entrantes initiales IoT.
- Données rejetées dans : stocke les données dont la validation ou le traitement a échoué.
- Données normalisées : stocke les données IoT traitées et standardisées prêtes à être utilisées en aval.
Avant de débuter
L'utilisation de files d'attente de base de données nécessite la configuration d'une connexion de base de données directe.
Période de conservation maximale pour les files d'attente publiques de 24 heures.
Etape 1 : Ajouter un abonné
Utilisez cette commande de base de données SQL pour inscrire un abonné dans la file d'attente afin de recevoir des messages. Indiquez le nom de la file d'attente et un nom d'abonné unique pour permettre à cet abonné ou destinataire de retirer des messages de la file d'attente.
Pour plus d'informations, reportez-vous à DBMS_AQADM.ADD_SUBSCRIBER.
BEGIN
dbms_aqadm.add_subscriber(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
END;
/Etape 2 : Messages de retrait en masse de la file d'attente
Utilisez cette commande de sortie en masse de la file d'attente SQL pour traiter plusieurs messages à la fois. Cet exemple traite ou consomme jusqu'à 30 messages à la fois pour l'abonné spécifique.
Pour plus d'informations, reportez-vous à DBMS_AQ.DEQUEUE_ARRAY.
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_array_t;
l_normalized_data_records JSON;
msgid_array dbms_aq.msgid_array_t;
retval PLS_INTEGER;
l_json_size NUMBER;
BEGIN
dequeue_options.consumer_name := 'test_public_subscriber'; -- Filter messages for this subscriber
dequeue_options.wait := 10; -- Wait up to 10 seconds for messages
dequeue_options.dequeue_mode := dbms_aq.remove; -- Remove after dequeueing
dequeue_options.navigation := dbms_aq.first_message; -- Get oldest message first
retval := dbms_aq.dequeue_array(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
dequeue_options => dequeue_options,
array_size => 30, -- Try to fetch up to 30 messages at once
message_properties_array => message_properties,
payload_array => l_normalized_data_records, -- The JSON data
msgid_array => msgid_array
);
l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
dbms_output.put_line('Successfully dequeued! ' || l_json_size); -- Output number of messages
COMMIT;
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('Error Code: ' || sqlcode);
dbms_output.put_line('Error Message: ' || sqlerrm);
ROLLBACK;
raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/Etape facultative : suppression de l'abonné
Après avoir traité les informations, utilisez cette commande de base de données SQL pour enlever l'abonné. La suppression de l'abonné est une bonne pratique lorsque l'abonné n'est plus nécessaire, ce qui empêche la conservation des messages indésirables. Pour plus d'informations, reportez-vous à DBMS_AQADM.REMOVE_SUBSCRIBER.
BEGIN
DBMS_AQADM.REMOVE_SUBSCRIBER (
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
NULL; -- Ignore errors (in case the subscriber wasn't present)
END;
/