12.7.6.1 MessageAvailableイベント
このイベントは、NotificationConsumers
がキュー内のメッセージを使用できるようになると通知されます。
宣言
// C# public event OracleAQMessageAvailableEventHandler MessageAvailable;
イベント・データ
このイベント・ハンドラは、OracleAQMessageAvailableEventArgs
オブジェクトを受信します。
例外
InvalidOperationException
- 接続がオープンされていません。
備考
非同期通知は、8.1以上のデータベース互換性レベルで作成されたすべてのキュー表でサポートされます。
メッセージの可用性に関する通知を受信するために、クライアントはOracleAQMessageAvailableEventHandler
デリゲートを作成してこのイベントをリスニングする必要があります。デリゲートをこのイベントに追加するのは、必ずNotificationConsumersおよびNotificationプロパティを設定した後にしてください。
通知の登録が行われるのは、最初のデリゲートがイベントに追加された後です。最後のデリゲートがイベントから削除されると、通知の登録は解除されます。OracleAQQueue
オブジェクトに設定された通知は、オブジェクトが破棄されると自動的に取り消されます。
Oracle Data Provider for .NETは、通知をリスニングするためのポートをオープンします。HAイベント、ロード・バランシング、連続問合せ通知の各機能も同じポートを共有します。このポートは、アプリケーション構成ファイルまたはWeb構成ファイルにデータベース通知ポートを設定することで、一元的に構成できます。次のサンプル・コードでは、ポート番号1200を指定しています。
<configuration> <oracle.dataaccess.client> <settings> <add name="DbNotificationPort" value="1200"/> </settings> </oracle.dataaccess.client> </configuration>
構成ファイルが存在しないか、データベース通知ポートが指定されていない場合、ODP.NETは有効なポート番号をランダムに使用します。構成ファイルにデータベース通知ポートの値として-1を指定した場合も、ランダムなポート番号が要求されます。
ODP.NETと同じアプリケーション・ドメイン内で実行される通知リスナーは、指定のポート番号を使用してデータベースからの通知をリスニングします。通知リスナーが作成されるのは、アプリケーションがOracleAQQueue.MessageAvailable
イベントに登録されたときです。1つの通知リスナーがすべての通知タイプをリスニングできます。通知リスナーは、アプリケーション・ドメインごとに1つだけ作成されます。
例
次の例は、アプリケーション通知を示しています。この例の最初の部分では、データベース・ユーザーSCOTT
の必要なデータベース設定を実行しています。この例の2番目の部分は、キューのメッセージが使用可能になったときのアプリケーションへの通知方法を示しています。
-- Part I: Database setup required for this demo ------------------------------------------------------------------ -- SQL to grant appropriate privilege to database user, SCOTT ------------------------------------------------------------------ SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct; User altered. SQL> GRANT ALL ON DBMS_AQADM TO scott; ------------------------------------------------------------------ -- PLSQL to create queue-table and queue and start queue for SCOTT ------------------------------------------------------------------ BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table=>'scott.test_q_tab', queue_payload_type=>'RAW', multiple_consumers=>FALSE); DBMS_AQADM.CREATE_QUEUE( queue_name=>'scott.test_q', queue_table=>'scott.test_q_tab'); DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q'); END; / ------------------------------------------------------------------ -- PLSQL to stop queue and drop queue & queue-table from SCOTT ------------------------------------------------------------------ BEGIN DBMS_AQADM.STOP_QUEUE('scott.test_q'); DBMS_AQADM.DROP_QUEUE( queue_name => 'scott.test_q', auto_commit => TRUE); DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => 'scott.test_q_tab', force => FALSE, auto_commit => TRUE); END; / -- End of Part I, database setup. //Part II: Demonstrates application notification //C# using System; using System.Text; using Oracle.DataAccess.Client; using Oracle.DataAccess.Types; namespace ODPSample { /// <summary> /// Demonstrates how the application can be notified when a message is /// available in a queue. /// </summary> class Notification { static bool isNotified = false; static void Main(string[] args) { // Create connection string constr = "user id=scott;password=Pwd4Sct;data source=oracle"; OracleConnection con = new OracleConnection(constr); // Create queue OracleAQQueue queue = new OracleAQQueue("scott.test_q", con); try { // Open connection con.Open(); // Set message type for the queue queue.MessageType = OracleAQMessageType.Raw; // Add the event handler to handle the notification. The // MsgReceived method will be invoked when a message is enqueued queue.MessageAvailable += new OracleAQMessageAvailableEventHandler(Notification.MsgReceived); Console.WriteLine("Notification registered..."); // Begin txn for enqueue OracleTransaction txn = con.BeginTransaction(); Console.WriteLine("Now enqueuing message..."); // Prepare message and RAW payload OracleAQMessage enqMsg = new OracleAQMessage(); byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; enqMsg.Payload = bytePayload; // Prepare to Enqueue queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit; // Enqueue message queue.Enqueue(enqMsg); Console.WriteLine("Enqueued Message Payload : " + ByteArrayToString(enqMsg.Payload as byte[])); Console.WriteLine("MessageId of Enqueued Message : " + ByteArrayToString(enqMsg.MessageId)); Console.WriteLine(); // Enqueue txn commit txn.Commit(); // Loop while waiting for notification while (isNotified == false) { System.Threading.Thread.Sleep(2000); } } catch (Exception e) { Console.WriteLine("Error: {0}", e.Message); } finally { // Close/Dispose objects queue.Dispose(); con.Close(); con.Dispose(); } } static void MsgReceived(object src, OracleAQMessageAvailableEventArgs arg) { try { Console.WriteLine("Notification Received..."); Console.WriteLine("QueueName : {0}", arg.QueueName); Console.WriteLine("Notification Type : {0}", arg.NotificationType); //following type-cast to "byte[]" is required only for .NET 1.x byte[] notifiedMsgId = (byte[]) arg.MessageId[0]; Console.WriteLine("MessageId of Notified Message : " + ByteArrayToString(notifiedMsgId)); isNotified = true; } catch (Exception e) { Console.WriteLine("Error: {0}", e.Message); } } // Function to convert byte[] to string static private string ByteArrayToString(byte[] byteArray) { StringBuilder sb = new StringBuilder(); for (int n = 0; n < byteArray.Length; n++) { sb.Append((int.Parse(byteArray[n].ToString())).ToString("X")); } return sb.ToString(); } } }