12.7.5.15 Listen(string[])

This method listens for messages on the queue on behalf of listenConsumers.

Declaration

// C#
public string Listen(string[] listenConsumers);

Parameters

  • listenConsumers

    An array of consumers to listen for on this queue. This parameter should be null in case of single consumer queues.

Return Value

A string.

Exceptions

InvalidOperationException - The connection is not open.

ObjectDisposedException - The object is already disposed.

Remarks

This call blocks the calling thread until there is a message ready for consumption for a consumer in the listenConsumers array. It returns a string representing the consumer name for which the message is ready.

Listen is useful in situations that require waiting until a message is available in the queue for consumers whose names are specified in listenConsumers.

Example

The following example demonstrates using the Listen method. The first part of the example performs the requisite database setup for the database user, SCOTT. The second part of the example demonstrates how a thread can listen and wait until a message is enqueued.

-- Part I: Database setup required for this demo
 
------------------------------------------------------------------
-- SQL to grant appropriate privilege to database user, SCOTT
------------------------------------------------------------------
SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
GRANT ALL ON DBMS_AQADM TO scott;
 
------------------------------------------------------------------
-- PLSQL to create queue-table and queue and start queue for SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table=>'scott.test_q_tab', 
    queue_payload_type=>'RAW', 
    multiple_consumers=>FALSE);
 
  DBMS_AQADM.CREATE_QUEUE(
    queue_name=>'scott.test_q', 
    queue_table=>'scott.test_q_tab');
 
  DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/
 
------------------------------------------------------------------
-- PLSQL to stop queue and drop queue & queue-table from SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.STOP_QUEUE('scott.test_q');
 
  DBMS_AQADM.DROP_QUEUE(
    queue_name => 'scott.test_q', 
    auto_commit => TRUE);
 
  DBMS_AQADM.DROP_QUEUE_TABLE(
    queue_table => 'scott.test_q_tab',
    force => FALSE, 
    auto_commit => TRUE);
END;
/
-- End of Part I, database setup.

//Part II: Demonstrates using the Listen method
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
using System.Threading;
 
namespace ODPSample
{
  /// <summary>
  /// Demonstrates how a thread can listen and wait until a message is enqueued.
  /// Once a message is enqueued, the listening thread returns from the 
  /// blocked Listen() method invocation and dequeues the message.
  /// </summary>
  class EnqueueDequeue
  {
    static bool s_bListenReturned = false;
 
    static void Main(string[] args)
    {
      // Create connection
      string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
      OracleConnection con = new OracleConnection(constr);
 
      // Create queue
      OracleAQQueue queue = new OracleAQQueue("scott.test_q", con);
 
      try
      {
        // Open connection
        con.Open();
 
        // Set message type for the queue
        queue.MessageType = OracleAQMessageType.Raw;
 
        // Spawning a thread which will listen for a message
        ThreadStart ts = new ThreadStart(TestListen);
        Thread t = new Thread(ts);
        t.Start();
 
        System.Threading.Thread.Sleep(2000);
 
        // Begin transaction for enqueue
        OracleTransaction txn = con.BeginTransaction();
 
        // Prepare message and RAW payload
        OracleAQMessage enqMsg = new OracleAQMessage();
        byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        enqMsg.Payload = bytePayload;
 
        // Prepare to Enqueue
        queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
 
        Console.WriteLine("[Main Thread]   Enqueuing a message...");
        Console.WriteLine("[Main Thread]   Enqueued Message Payload      : "
          + ByteArrayToString(enqMsg.Payload as byte[]));
        Console.WriteLine();
 
        // Enqueue message
        queue.Enqueue(enqMsg);
 
        // Enqueue transaction commit
        txn.Commit();
 
        // Loop till Listen returns
        while (!s_bListenReturned)
          System.Threading.Thread.Sleep(1000);
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: {0}", e.Message);
      }
      finally
      {
        // Close/Dispose objects
        queue.Dispose();
        con.Close();
        con.Dispose();
      }
    }
 
    static void TestListen()
    {
      // Create connection
      string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
      OracleConnection conListen = new OracleConnection(constr);
 
      // Create queue
      OracleAQQueue queueListen = new OracleAQQueue("scott.test_q", conListen);
 
      try
      {
          // Open the connection for Listen thread.
          // Connection blocked on Listen thread can not be used for other DB 
          // operations
          conListen.Open();
 
          // Set message type for the queue
          queueListen.MessageType = OracleAQMessageType.Raw;
 
        // Listen
        queueListen.Listen(null);
 
        Console.WriteLine("[Listen Thread] Listen returned... Dequeuing...");
 
        // Begin txn for Dequeue
        OracleTransaction txn = conListen.BeginTransaction();
 
        // Prepare to Dequeue
        queueListen.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
        queueListen.DequeueOptions.Wait = 10;
 
        // Dequeue message
        OracleAQMessage deqMsg = queueListen.Dequeue();
        Console.WriteLine("[Listen Thread] Dequeued Message Payload      : "
          + ByteArrayToString(deqMsg.Payload as byte[]));
 
        // Dequeue txn commit
        txn.Commit();
 
        // Allow the main thread to exit
        s_bListenReturned = true;
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: {0}", e.Message);
      }
      finally
      {
        // Close/Dispose objects
        queueListen.Dispose();
        conListen.Close();
        conListen.Dispose();
      }
    }
 
    // Function to convert byte[] to string
    static private string ByteArrayToString(byte[] byteArray)
    {
      StringBuilder sb = new StringBuilder();
      for (int n = 0; n < byteArray.Length; n++)
      {
        sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
      }
      return sb.ToString();
    }
  }
}