MessageAvailable Event
このイベントは、NotificationConsumersがキュー内のメッセージを使用できるようになると通知されます。
宣言
// C# public event OracleAQMessageAvailableEventHandler MessageAvailable;
イベント・データ
このイベント・ハンドラは、OracleAQMessageAvailableEventArgsオブジェクトを受信します。
例外
InvalidOperationException - 接続がオープンされていません。
備考
非同期通知は、8.1以上のデータベース互換性レベルで作成されたすべてのキュー表でサポートされます。
メッセージの可用性に関する通知を受信するために、クライアントはOracleAQMessageAvailableEventHandlerデリゲートを作成してこのイベントをリスニングする必要があります。デリゲートをこのイベントに追加するのは、必ずNotificationConsumersおよびNotificationプロパティを設定した後にしてください。
通知の登録が行われるのは、最初のデリゲートがイベントに追加された後です。最後のデリゲートがイベントから削除されると、通知の登録は解除されます。OracleAQQueueオブジェクトに設定された通知は、オブジェクトが破棄されると自動的に取り消されます。
Oracle Data Provider for .NETは、通知をリスニングするためのポートをオープンします。HAイベント、ロード・バランシング、連続問合せ通知の各機能も同じポートを共有します。このポートは、アプリケーション構成ファイルまたはWeb構成ファイルにデータベース通知ポートを設定することで、一元的に構成できます。次のサンプル・コードでは、ポート番号1200を指定しています。
<configuration>
<oracle.dataaccess.client>
<settings>
<add name="DbNotificationPort" value="1200"/>
</settings>
</oracle.dataaccess.client>
</configuration>
構成ファイルが存在しないか、データベース通知ポートが指定されていない場合、ODP.NETは有効なポート番号をランダムに使用します。構成ファイルにデータベース通知ポートの値として-1を指定した場合も、ランダムなポート番号が要求されます。
ODP.NETと同じアプリケーション・ドメイン内で実行される通知リスナーは、指定のポート番号を使用してデータベースからの通知をリスニングします。通知リスナーが作成されるのは、アプリケーションがOracleAQQueue.MessageAvailableイベントに登録されたときです。1つの通知リスナーがすべての通知タイプをリスニングできます。通知リスナーは、アプリケーション・ドメインごとに1つだけ作成されます。
例
次の例は、アプリケーション通知を示しています。この例の最初の部分では、データベース・ユーザーSCOTTの必要なデータベース設定を実行しています。この例の2番目の部分は、キューのメッセージが使用可能になったときのアプリケーションへの通知方法を示しています。
-- Part I: Database setup required for this demo
------------------------------------------------------------------
-- SQL to grant appropriate privilege to database user, SCOTT
------------------------------------------------------------------
SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
SQL> GRANT ALL ON DBMS_AQADM TO scott;
------------------------------------------------------------------
-- PLSQL to create queue-table and queue and start queue for SCOTT
------------------------------------------------------------------
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table=>'scott.test_q_tab',
queue_payload_type=>'RAW',
multiple_consumers=>FALSE);
DBMS_AQADM.CREATE_QUEUE(
queue_name=>'scott.test_q',
queue_table=>'scott.test_q_tab');
DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/
------------------------------------------------------------------
-- PLSQL to stop queue and drop queue & queue-table from SCOTT
------------------------------------------------------------------
BEGIN
DBMS_AQADM.STOP_QUEUE('scott.test_q');
DBMS_AQADM.DROP_QUEUE(
queue_name => 'scott.test_q',
auto_commit => TRUE);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'scott.test_q_tab',
force => FALSE,
auto_commit => TRUE);
END;
/
-- End of Part I, database setup.
//Part II: Demonstrates application notification
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
namespace ODPSample
{
/// <summary>
/// Demonstrates how the application can be notified when a message is
/// available in a queue.
/// </summary>
class Notification
{
static bool isNotified = false;
static void Main(string[] args)
{
// Create connection
string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
OracleConnection con = new OracleConnection(constr);
// Create queue
OracleAQQueue queue = new OracleAQQueue("scott.test_q", con);
try
{
// Open connection
con.Open();
// Set message type for the queue
queue.MessageType = OracleAQMessageType.Raw;
// Add the event handler to handle the notification. The
// MsgReceived method will be invoked when a message is enqueued
queue.MessageAvailable +=
new OracleAQMessageAvailableEventHandler(Notification.MsgReceived);
Console.WriteLine("Notification registered...");
// Begin txn for enqueue
OracleTransaction txn = con.BeginTransaction();
Console.WriteLine("Now enqueuing message...");
// Prepare message and RAW payload
OracleAQMessage enqMsg = new OracleAQMessage();
byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
enqMsg.Payload = bytePayload;
// Prepare to Enqueue
queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
// Enqueue message
queue.Enqueue(enqMsg);
Console.WriteLine("Enqueued Message Payload : "
+ ByteArrayToString(enqMsg.Payload as byte[]));
Console.WriteLine("MessageId of Enqueued Message : "
+ ByteArrayToString(enqMsg.MessageId));
Console.WriteLine();
// Enqueue txn commit
txn.Commit();
// Loop while waiting for notification
while (isNotified == false)
{
System.Threading.Thread.Sleep(2000);
}
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e.Message);
}
finally
{
// Close/Dispose objects
queue.Dispose();
con.Close();
con.Dispose();
}
}
static void MsgReceived(object src, OracleAQMessageAvailableEventArgs arg)
{
try
{
Console.WriteLine("Notification Received...");
Console.WriteLine("QueueName : {0}", arg.QueueName);
Console.WriteLine("Notification Type : {0}", arg.NotificationType);
//following type-cast to "byte[]" is required only for .NET 1.x
byte[] notifiedMsgId = (byte[]) arg.MessageId[0];
Console.WriteLine("MessageId of Notified Message : "
+ ByteArrayToString(notifiedMsgId));
isNotified = true;
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e.Message);
}
}
// Function to convert byte[] to string
static private string ByteArrayToString(byte[] byteArray)
{
StringBuilder sb = new StringBuilder();
for (int n = 0; n < byteArray.Length; n++)
{
sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
}
return sb.ToString();
}
}
}