日本語PDF

Listen(string[])

このメソッドは、listenConsumersに代わってキューのメッセージをリスニングします。

宣言

// C#
public string Listen(string[] listenConsumers);

パラメータ

  • listenConsumers

    このキューでリスニングするコンシューマの配列。単一コンシューマ・キューの場合は、このパラメータをnullにする必要があります。

戻り値

string

例外

InvalidOperationException - 接続がオープンされていません。

ObjectDisposedException - オブジェクトはすでに処理されています。

備考

このコールは、listenConsumers配列内のコンシューマがメッセージを消費できるようになるまでコール元のスレッドをブロックします。このコールは、消費可能になったメッセージのコンシューマ名を表すstringを戻します。

Listenは、listenConsumersに指定された名前のコンシューマがキュー内のメッセージを使用できるようになるまで待つ必要がある場合に便利です。

次の例は、Listenメソッドの使用方法を示すものです。この例の最初の部分では、データベース・ユーザーSCOTTの必要なデータベース設定を実行しています。この例の2番目の部分は、メッセージがエンキューされるまでのスレッドのリスニング方法および待機方法を示しています。

-- Part I: Database setup required for this demo
 
------------------------------------------------------------------
-- SQL to grant appropriate privilege to database user, SCOTT
------------------------------------------------------------------
SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
GRANT ALL ON DBMS_AQADM TO scott;
 
------------------------------------------------------------------
-- PLSQL to create queue-table and queue and start queue for SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table=>'scott.test_q_tab', 
    queue_payload_type=>'RAW', 
    multiple_consumers=>FALSE);
 
  DBMS_AQADM.CREATE_QUEUE(
    queue_name=>'scott.test_q', 
    queue_table=>'scott.test_q_tab');
 
  DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/
 
------------------------------------------------------------------
-- PLSQL to stop queue and drop queue & queue-table from SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.STOP_QUEUE('scott.test_q');
 
  DBMS_AQADM.DROP_QUEUE(
    queue_name => 'scott.test_q', 
    auto_commit => TRUE);
 
  DBMS_AQADM.DROP_QUEUE_TABLE(
    queue_table => 'scott.test_q_tab',
    force => FALSE, 
    auto_commit => TRUE);
END;
/
-- End of Part I, database setup.

//Part II: Demonstrates using the Listen method
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
using System.Threading;
 
namespace ODPSample
{
  /// <summary>
  /// Demonstrates how a thread can listen and wait until a message is enqueued.
  /// Once a message is enqueued, the listening thread returns from the 
  /// blocked Listen() method invocation and dequeues the message.
  /// </summary>
  class EnqueueDequeue
  {
    static bool s_bListenReturned = false;
 
    static void Main(string[] args)
    {
      // Create connection
      string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
      OracleConnection con = new OracleConnection(constr);
 
      // Create queue
      OracleAQQueue queue = new OracleAQQueue("scott.test_q", con);
 
      try
      {
        // Open connection
        con.Open();
 
        // Set message type for the queue
        queue.MessageType = OracleAQMessageType.Raw;
 
        // Spawning a thread which will listen for a message
        ThreadStart ts = new ThreadStart(TestListen);
        Thread t = new Thread(ts);
        t.Start();
 
        System.Threading.Thread.Sleep(2000);
 
        // Begin transaction for enqueue
        OracleTransaction txn = con.BeginTransaction();
 
        // Prepare message and RAW payload
        OracleAQMessage enqMsg = new OracleAQMessage();
        byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        enqMsg.Payload = bytePayload;
 
        // Prepare to Enqueue
        queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
 
        Console.WriteLine("[Main Thread]   Enqueuing a message...");
        Console.WriteLine("[Main Thread]   Enqueued Message Payload      : "
          + ByteArrayToString(enqMsg.Payload as byte[]));
        Console.WriteLine();
 
        // Enqueue message
        queue.Enqueue(enqMsg);
 
        // Enqueue transaction commit
        txn.Commit();
 
        // Loop till Listen returns
        while (!s_bListenReturned)
          System.Threading.Thread.Sleep(1000);
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: {0}", e.Message);
      }
      finally
      {
        // Close/Dispose objects
        queue.Dispose();
        con.Close();
        con.Dispose();
      }
    }
 
    static void TestListen()
    {
      // Create connection
      string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
      OracleConnection conListen = new OracleConnection(constr);
 
      // Create queue
      OracleAQQueue queueListen = new OracleAQQueue("scott.test_q", conListen);
 
      try
      {
          // Open the connection for Listen thread.
          // Connection blocked on Listen thread can not be used for other DB 
          // operations
          conListen.Open();
 
          // Set message type for the queue
          queueListen.MessageType = OracleAQMessageType.Raw;
 
        // Listen
        queueListen.Listen(null);
 
        Console.WriteLine("[Listen Thread] Listen returned... Dequeuing...");
 
        // Begin txn for Dequeue
        OracleTransaction txn = conListen.BeginTransaction();
 
        // Prepare to Dequeue
        queueListen.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
        queueListen.DequeueOptions.Wait = 10;
 
        // Dequeue message
        OracleAQMessage deqMsg = queueListen.Dequeue();
        Console.WriteLine("[Listen Thread] Dequeued Message Payload      : "
          + ByteArrayToString(deqMsg.Payload as byte[]));
 
        // Dequeue txn commit
        txn.Commit();
 
        // Allow the main thread to exit
        s_bListenReturned = true;
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: {0}", e.Message);
      }
      finally
      {
        // Close/Dispose objects
        queueListen.Dispose();
        conListen.Close();
        conListen.Dispose();
      }
    }
 
    // Function to convert byte[] to string
    static private string ByteArrayToString(byte[] byteArray)
    {
      StringBuilder sb = new StringBuilder();
      for (int n = 0; n < byteArray.Length; n++)
      {
        sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
      }
      return sb.ToString();
    }
  }
}