シナリオ: 正規化されたデータ・キューからのメッセージのストリーミング

正規化されたIoTデータをストリームするデータベース・キューを作成します。

デバイスを設定し、データが送受信された後、メッセージをストリーミングできます。この例では、正規化されたデータからメッセージをストリームする方法を示します。同様のアプローチを使用して、RAWデータまたは拒否されたデータ・キューをサブスクライブできます。詳細は、トランザクション・イベント・キューおよびアドバンスト・キューイングの概要を参照してください。
  • RAWデータ: 最初の受信IoTデータを取得します。
  • 拒否されたデータ: 検証または処理に失敗したデータを格納します。
  • 正規化データ: 処理済および標準化されたIoTデータを、ダウンストリームで使用できるように格納します。

始める前に

データベース・キューを使用するには、直接データベース接続の構成が必要です。

ノート

パブリック・キューの最大保持期間は24時間です。

ステップ1: サブスクライバの追加

このSQLデータベース・コマンドを使用して、キューにサブスクライバを登録し、メッセージを受信します。キュー名と一意のサブスクライバ名を指定して、このサブスクライバまたはコンシューマがメッセージをデキューできるようにします。

詳細については、DBMS_AQADM.ADD_SUBSCRIBERを参照してください。

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

ステップ2: 一括デキュー・メッセージ

このSQLデータベースの一括デキュー・コマンドを使用して、多数のメッセージを一度に処理します。この例では、特定のサブスクライバについて、一度に最大30個のメッセージを処理または消費します。

詳細については、DBMS_AQ.DEQUEUE_ARRAYを参照してください。

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

オプション・ステップ: サブスクライバの削除

情報を処理した後、このSQLデータベース・コマンドを使用してサブスクライバを削除します。サブスクライバの削除は、サブスクライバが不要になった場合にベスト・プラクティスであり、不要なメッセージ保持を防止します。詳細については、DBMS_AQADM.REMOVE_SUBSCRIBERを参照してください。

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/