Scénario : Messages de diffusion en continu à partir d'une file d'attente de données normalisée
Créez une file d'attente de base de données pour diffuser les données IoT normalisées.
- Données brutes entrantes : saisit les données IoT entrantes initiales.
- Données rejetées dans : Stocke les données dont la validation ou le traitement a échoué.
- Données normalisées : Stocke les données IoT traitées et normalisées prêtes pour une utilisation en aval.
Avant de commencer
L'utilisation des files d'attente de base de données nécessite la configuration d'une connexion directe à la base de données.
Période de conservation maximale pour les files d'attente publiques 24 heures.
Étape 1 : Ajouter un abonné
Utilisez cette commande de base de données SQL pour enregistrer un abonné dans la file d'attente pour recevoir des messages. Spécifiez le nom de la file d'attente et un nom d'abonné unique pour permettre à cet abonné ou consommateur de retirer les messages de la file d'attente.
Pour plus de renseignements, consultez la page DBMS_AQADM.ADD_SUBSCRIBER.
BEGIN
dbms_aqadm.add_subscriber(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
END;
/Étape 2 : Messages de sortie de file d'attente en masse
Utilisez cette commande de sortie en masse de la base de données SQL pour traiter plusieurs messages à la fois. Cet exemple traite ou consomme jusqu'à 30 messages à la fois pour l'abonné spécifique.
Pour plus de renseignements, consultez la page DBMS_AQ.DEQUEUE_ARRAY.
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_array_t;
l_normalized_data_records JSON;
msgid_array dbms_aq.msgid_array_t;
retval PLS_INTEGER;
l_json_size NUMBER;
BEGIN
dequeue_options.consumer_name := 'test_public_subscriber'; -- Filter messages for this subscriber
dequeue_options.wait := 10; -- Wait up to 10 seconds for messages
dequeue_options.dequeue_mode := dbms_aq.remove; -- Remove after dequeueing
dequeue_options.navigation := dbms_aq.first_message; -- Get oldest message first
retval := dbms_aq.dequeue_array(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
dequeue_options => dequeue_options,
array_size => 30, -- Try to fetch up to 30 messages at once
message_properties_array => message_properties,
payload_array => l_normalized_data_records, -- The JSON data
msgid_array => msgid_array
);
l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
dbms_output.put_line('Successfully dequeued! ' || l_json_size); -- Output number of messages
COMMIT;
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('Error Code: ' || sqlcode);
dbms_output.put_line('Error Message: ' || sqlerrm);
ROLLBACK;
raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/Étape facultative : Supprimer l'abonné
Après avoir traité les informations, utilisez cette commande de base de données SQL pour supprimer l'abonné. La suppression de l'abonné est une bonne pratique lorsque l'abonné n'est plus nécessaire, ce qui empêche la conservation des messages indésirables. Pour plus de renseignements, consultez la page DBMS_AQADM.REMOVE_SUBSCRIBER.
BEGIN
DBMS_AQADM.REMOVE_SUBSCRIBER (
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
NULL; -- Ignore errors (in case the subscriber wasn't present)
END;
/