Szenario: Nachrichten aus einer normalisierten Datenqueue streamen
Erstellen Sie eine Datenbankqueue, um normalisierte IoT-Daten zu streamen.
- Raw-Daten in: Erfasst erste eingehende IoT-Daten.
- Abgelehnte Daten in: Speichert Daten, deren Validierung oder Verarbeitung nicht erfolgreich war.
- Normalisierte Daten: Speichert verarbeitete und standardisierte IoT-Daten, die für die nachgelagerte Verwendung bereit sind.
Bevor Sie beginnen
Für die Verwendung von Datenbankqueues muss eine direkte Datenbankverbindung konfiguriert werden.
Maximale Aufbewahrungsfrist für öffentliche Queues 24 Stunden.
Schritt 1: Subscriber hinzufügen
Mit diesem SQL-Datenbankbefehl können Sie einen Subscriber in der Queue für den Empfang von Nachrichten registrieren. Geben Sie den Queue-Namen und einen eindeutigen Subscriber-Namen an, damit dieser Subscriber oder Consumer Nachrichten aus der Queue entfernen kann.
Weitere Informationen finden Sie unter DBMS_AQADM.ADD_SUBSCRIBER.
BEGIN
dbms_aqadm.add_subscriber(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
END;
/Schritt 2: Nachrichten aus der Queue entfernen
Mit diesem Bulk Dequeuing SQL-Datenbankbefehl können Sie viele Nachrichten gleichzeitig verarbeiten. In diesem Beispiel werden bis zu 30 Nachrichten gleichzeitig für den jeweiligen Subscriber verarbeitet oder konsumiert.
Weitere Informationen finden Sie unter DBMS_AQ.DEQUEUE_ARRAY.
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_array_t;
l_normalized_data_records JSON;
msgid_array dbms_aq.msgid_array_t;
retval PLS_INTEGER;
l_json_size NUMBER;
BEGIN
dequeue_options.consumer_name := 'test_public_subscriber'; -- Filter messages for this subscriber
dequeue_options.wait := 10; -- Wait up to 10 seconds for messages
dequeue_options.dequeue_mode := dbms_aq.remove; -- Remove after dequeueing
dequeue_options.navigation := dbms_aq.first_message; -- Get oldest message first
retval := dbms_aq.dequeue_array(
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
dequeue_options => dequeue_options,
array_size => 30, -- Try to fetch up to 30 messages at once
message_properties_array => message_properties,
payload_array => l_normalized_data_records, -- The JSON data
msgid_array => msgid_array
);
l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
dbms_output.put_line('Successfully dequeued! ' || l_json_size); -- Output number of messages
COMMIT;
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('Error Code: ' || sqlcode);
dbms_output.put_line('Error Message: ' || sqlerrm);
ROLLBACK;
raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/Optionaler Schritt: Abonnenten entfernen
Nachdem Sie die Informationen verarbeitet haben, entfernen Sie den Subscriber mit diesem SQL-Datenbankbefehl. Das Entfernen des Subscribers ist eine Best Practice, wenn der Subscriber nicht mehr benötigt wird. Dadurch wird eine unerwünschte Aufbewahrung von Nachrichten verhindert. Weitere Informationen finden Sie unter DBMS_AQADM.REMOVE_SUBSCRIBER.
BEGIN
DBMS_AQADM.REMOVE_SUBSCRIBER (
queue_name => '&&domain.__IOT.NORMALIZED_DATA',
subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
NULL; -- Ignore errors (in case the subscriber wasn't present)
END;
/