Listen(string[])
このメソッドは、listenConsumersに代わってキューのメッセージをリスニングします。
宣言
// C#
public string Listen(string[] listenConsumers);パラメータ
-
listenConsumersこのキューでリスニングするコンシューマの配列。単一コンシューマ・キューの場合は、このパラメータを
nullにする必要があります。
戻り値
string。
例外
InvalidOperationException - 接続がオープンされていません。
ObjectDisposedException - オブジェクトはすでに処理されています。
備考
このコールは、listenConsumers配列内のコンシューマがメッセージを消費できるようになるまでコール元のスレッドをブロックします。このコールは、消費可能になったメッセージのコンシューマ名を表すstringを戻します。
Listenは、listenConsumersに指定された名前のコンシューマがキュー内のメッセージを使用できるようになるまで待つ必要がある場合に便利です。
例
次の例は、Listenメソッドの使用方法を示すものです。この例の最初の部分では、データベース・ユーザーSCOTTの必要なデータベース設定を実行しています。この例の2番目の部分は、メッセージがエンキューされるまでのスレッドのリスニング方法および待機方法を示しています。
-- Part I: Database setup required for this demo
------------------------------------------------------------------
-- SQL to grant appropriate privilege to database user, SCOTT
------------------------------------------------------------------
SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
GRANT ALL ON DBMS_AQADM TO scott;
------------------------------------------------------------------
-- PLSQL to create queue-table and queue and start queue for SCOTT
------------------------------------------------------------------
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table=>'scott.test_q_tab',
queue_payload_type=>'RAW',
multiple_consumers=>FALSE);
DBMS_AQADM.CREATE_QUEUE(
queue_name=>'scott.test_q',
queue_table=>'scott.test_q_tab');
DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/
------------------------------------------------------------------
-- PLSQL to stop queue and drop queue & queue-table from SCOTT
------------------------------------------------------------------
BEGIN
DBMS_AQADM.STOP_QUEUE('scott.test_q');
DBMS_AQADM.DROP_QUEUE(
queue_name => 'scott.test_q',
auto_commit => TRUE);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'scott.test_q_tab',
force => FALSE,
auto_commit => TRUE);
END;
/
-- End of Part I, database setup.
//Part II: Demonstrates using the Listen method
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
using System.Threading;
namespace ODPSample
{
/// <summary>
/// Demonstrates how a thread can listen and wait until a message is enqueued.
/// Once a message is enqueued, the listening thread returns from the
/// blocked Listen() method invocation and dequeues the message.
/// </summary>
class EnqueueDequeue
{
static bool s_bListenReturned = false;
static void Main(string[] args)
{
// Create connection
string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
OracleConnection con = new OracleConnection(constr);
// Create queue
OracleAQQueue queue = new OracleAQQueue("scott.test_q", con);
try
{
// Open connection
con.Open();
// Set message type for the queue
queue.MessageType = OracleAQMessageType.Raw;
// Spawning a thread which will listen for a message
ThreadStart ts = new ThreadStart(TestListen);
Thread t = new Thread(ts);
t.Start();
System.Threading.Thread.Sleep(2000);
// Begin transaction for enqueue
OracleTransaction txn = con.BeginTransaction();
// Prepare message and RAW payload
OracleAQMessage enqMsg = new OracleAQMessage();
byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
enqMsg.Payload = bytePayload;
// Prepare to Enqueue
queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
Console.WriteLine("[Main Thread] Enqueuing a message...");
Console.WriteLine("[Main Thread] Enqueued Message Payload : "
+ ByteArrayToString(enqMsg.Payload as byte[]));
Console.WriteLine();
// Enqueue message
queue.Enqueue(enqMsg);
// Enqueue transaction commit
txn.Commit();
// Loop till Listen returns
while (!s_bListenReturned)
System.Threading.Thread.Sleep(1000);
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e.Message);
}
finally
{
// Close/Dispose objects
queue.Dispose();
con.Close();
con.Dispose();
}
}
static void TestListen()
{
// Create connection
string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
OracleConnection conListen = new OracleConnection(constr);
// Create queue
OracleAQQueue queueListen = new OracleAQQueue("scott.test_q", conListen);
try
{
// Open the connection for Listen thread.
// Connection blocked on Listen thread can not be used for other DB
// operations
conListen.Open();
// Set message type for the queue
queueListen.MessageType = OracleAQMessageType.Raw;
// Listen
queueListen.Listen(null);
Console.WriteLine("[Listen Thread] Listen returned... Dequeuing...");
// Begin txn for Dequeue
OracleTransaction txn = conListen.BeginTransaction();
// Prepare to Dequeue
queueListen.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
queueListen.DequeueOptions.Wait = 10;
// Dequeue message
OracleAQMessage deqMsg = queueListen.Dequeue();
Console.WriteLine("[Listen Thread] Dequeued Message Payload : "
+ ByteArrayToString(deqMsg.Payload as byte[]));
// Dequeue txn commit
txn.Commit();
// Allow the main thread to exit
s_bListenReturned = true;
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e.Message);
}
finally
{
// Close/Dispose objects
queueListen.Dispose();
conListen.Close();
conListen.Dispose();
}
}
// Function to convert byte[] to string
static private string ByteArrayToString(byte[] byteArray)
{
StringBuilder sb = new StringBuilder();
for (int n = 0; n < byteArray.Length; n++)
{
sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
}
return sb.ToString();
}
}
}