Szenario: Nachrichten aus einer normalisierten Datenqueue streamen

Erstellen Sie eine Datenbankqueue, um normalisierte IoT-Daten zu streamen.

Nachdem Sie Ihr Gerät eingerichtet haben und Daten gesendet und empfangen werden, können Sie Nachrichten streamen. In diesem Beispiel wird gezeigt, wie Sie Nachrichten aus normalisierten Daten streamen. Sie können einen ähnlichen Ansatz verwenden, um die Raw-Daten oder abgelehnten Datenqueues zu abonnieren. Weitere Informationen finden Sie unter Einführung in Transactional Event Queues und Advanced Queuing.
  • Raw-Daten in: Erfasst erste eingehende IoT-Daten.
  • Abgelehnte Daten in: Speichert Daten, deren Validierung oder Verarbeitung nicht erfolgreich war.
  • Normalisierte Daten: Speichert verarbeitete und standardisierte IoT-Daten, die für die nachgelagerte Verwendung bereit sind.

Bevor Sie beginnen

Für die Verwendung von Datenbankqueues muss eine direkte Datenbankverbindung konfiguriert werden.

Hinweis

Maximale Aufbewahrungsfrist für öffentliche Queues 24 Stunden.

Schritt 1: Subscriber hinzufügen

Mit diesem SQL-Datenbankbefehl können Sie einen Subscriber in der Queue für den Empfang von Nachrichten registrieren. Geben Sie den Queue-Namen und einen eindeutigen Subscriber-Namen an, damit dieser Subscriber oder Consumer Nachrichten aus der Queue entfernen kann.

Weitere Informationen finden Sie unter DBMS_AQADM.ADD_SUBSCRIBER.

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

Schritt 2: Nachrichten aus der Queue entfernen

Mit diesem Bulk Dequeuing SQL-Datenbankbefehl können Sie viele Nachrichten gleichzeitig verarbeiten. In diesem Beispiel werden bis zu 30 Nachrichten gleichzeitig für den jeweiligen Subscriber verarbeitet oder konsumiert.

Weitere Informationen finden Sie unter DBMS_AQ.DEQUEUE_ARRAY.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

Optionaler Schritt: Abonnenten entfernen

Nachdem Sie die Informationen verarbeitet haben, entfernen Sie den Subscriber mit diesem SQL-Datenbankbefehl. Das Entfernen des Subscribers ist eine Best Practice, wenn der Subscriber nicht mehr benötigt wird. Dadurch wird eine unerwünschte Aufbewahrung von Nachrichten verhindert. Weitere Informationen finden Sie unter DBMS_AQADM.REMOVE_SUBSCRIBER.

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/