Oracle Streamsアドバンスト・キューイング(AQ)は、データベース統合型のメッセージ・キューイング機能を提供します。Oracle Streams AQはOracle Streamsを基盤とし、Oracle Databaseの機能を活用して、メッセージの永続的な格納、異なるコンピュータおよびデータベース上のキュー間でのメッセージの伝播、Oracle Net ServicesおよびHTTP(S)を使用したメッセージの送信を可能にしています。
関連項目: Oracle Streamsアドバンスト・キューイング・ユーザーズ・ガイドおよびリファレンス |
Oracle Streams AQはデータベース表に実装されるため、運用上の利点である高可用性、スケーラビリティ、信頼性のすべてがキュー・データにも適用されます。Oracle Streams AQは、リカバリや再起動、セキュリティなどの標準的なデータベース機能もサポートしています。
次の各項でOracle Streams AQの概念について説明します。
キューおよびキュー表
キューにエンキューされたメッセージは、キュー表に格納されます。キュー表に基づいてキューを作成するには、先にキュー表を作成しておく必要があります。キュー表およびキューの作成と管理には、DBMS_AQADM
PL/SQLパッケージまたはOracle Developer Tools for Visual Studioを使用します。
キューはOracleAQQueue
オブジェクトで表されます。
単一コンシューマ・キューおよび複数コンシューマ・キュー
単一コンシューマ・キューは、単一コンシューマ・キュー表に基づいて作成されます。単一コンシューマ・キューにエンキューされたメッセージは、単一コンシューマのみがデキューできます。
複数コンシューマ・キューは、複数コンシューマ・キュー表に基づいて作成されます。このキューは、キュー・サブスクライバとメッセージ受信者をサポートしています。
メッセージ受信者
メッセージ・プロデューサは、メッセージのエンキュー時に受信者のリストを発行できます。これにより、キュー内の各メッセージと一連の受信者を一意に対応させることができます。キューに関連付けられたサブスクライバ・リストがある場合でも、メッセージに関連付けられた受信者リストが優先されます。受信者がサブスクライバ・リストに指定されている必要はありません。ただし、サブスクライバの中から受信者を選択できます。OracleAQMessage
のRecipients
プロパティを使用すると、OracleAQAgent
オブジェクトの観点から特定のメッセージの受信者を指定できます。
エンキュー
プロデューサ・アプリケーションがキューにメッセージをプッシュすると、そのメッセージはエンキューされます。これを実現するには、OracleAQQueue
オブジェクトに対してEnqueue
メソッドをコールします。EnqueueArray
メソッドを使用すれば、複数のメッセージをエンキューできます。
デキュー
コンシューマ・アプリケーションがキューからメッセージをプルすると、そのメッセージはデキューされます。これを実現するには、OracleAQQueue
オブジェクトに対してDequeue
メソッドをコールします。DequeueArray
メソッドを使用すれば、複数のメッセージをデキューできます。
リスニング
サブスクライバ・アプリケーションでは、Listen
コールを使用して複数のキューを監視することで、様々なキューをサブスクライブできます。これは、サブスクライバ・アプリケーションが多くのキューをサブスクライブしており、どのキューに到着したメッセージもすべて受信する場合に適している、スケーラブルなソリューションです。これを実現するには、OracleAQQueue
クラスのListen
メソッドをコールし、サブスクリプションのリストを配列の形式で渡します。
通知
サブスクライバ・アプリケーションでは、通知メカニズムを利用して、キュー内のメッセージの可用性に関する通知を受信できます。受信した情報に基づいて、メッセージをスキップするか、キューからデキューすることを決定できます。
サブスクライバ・アプリケーションでは、通知の受信元となるキューのイベント通知を登録する必要があります。これは、OracleAQQueue
のMessageAvailable
イベントで表されます。サブスクリプションに一致するメッセージが到着すると、イベントがトリガーされます。
通知は、通常の通知またはグループ化通知として登録できます。これらの通知のタイムアウト値も指定可能です。OracleAQQueue.Notification
プロパティを使用すると、様々な通知オプションを設定できます。OracleAQQueue
オブジェクトに設定した通知は、オブジェクトが破棄されると自動的に取り消されます。
バッファ・メッセージング
バッファ・メッセージングはOracle Streams AQ 10gリリース2(10.2)で導入されました。バッファ・メッセージングでは、メッセージは共有メモリー領域に格納されます。そのため、バッファ・メッセージングは永続メッセージングより高速です。メッセージがディスクに書き込まれるのは、バッファ・メッセージの総メモリー使用率が使用可能な共有メモリーの上限に達した場合のみです。バッファ・メッセージングは、Oracle Streams AQの永続メッセージングの信頼性とトランザクション・サポートを必要としないアプリケーションに最適です。
バッファ・メッセージと永続メッセージは、同一の単一コンシューマ・キューまたは複数コンシューマ・キューと、同一の管理インタフェースおよび操作インタフェースを使用します。この2種類のメッセージは、配信モード・パラメータで区別されます。アプリケーションでは、Oracle Streams AQキューにメッセージをエンキューする際に、配信モード・パラメータも設定します。
OracleAQMessage
に配信モード・パラメータを設定するには、DeliveryMode
プロパティを変更します。バッファ・メッセージングは、8.1以上の互換性で作成されたすべてのキュー表でサポートされます。
.NETアプリケーションでは、ODP.NETを使用して、エンキュー、デキュー、リスニング、通知など、AQのすべての操作機能を利用できます。
表3-25は、AQ機能を対応するODP.NET実装にマップしたものです。
表3-25 AQ機能とODP.NET実装とのマッピング
機能 | ODP.NET実装 |
---|---|
メッセージの作成 |
|
単一メッセージのエンキュー |
メッセージを |
複数メッセージのエンキュー |
|
単一メッセージのデキュー |
|
複数メッセージのデキュー |
|
キューのメッセージのリスニング |
|
メッセージ通知 |
|
注意: ORACLE_BASE ¥ ORACLE_HOME ¥ODP.NET¥Samples ディレクトリにAQのサンプルが用意されています。 |
次の例では、単一コンシューマ・キューを使用するエンキューおよびデキューのメッセージを示しています。この例の最初の部分では、データベース・ユーザーSCOTT
の必要なデータベース設定を実行しています。この例の2番目の部分は、エンキューおよびデキューのメッセージを示しています。
-- Part I: Database setup required for this demo ------------------------------------------------------------------ -- SQL to grant appropriate privilege to database user, SCOTT ------------------------------------------------------------------ SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct; User altered. SQL> GRANT ALL ON DBMS_AQADM TO scott; ------------------------------------------------------------------ -- PL/SQL to create queue-table and queue and start queue for SCOTT ------------------------------------------------------------------ BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table=>'scott.test_q_tab', queue_payload_type=>'RAW', multiple_consumers=>FALSE); DBMS_AQADM.CREATE_QUEUE( queue_name=>'scott.test_q', queue_table=>'scott.test_q_tab'); DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q'); END; / ------------------------------------------------------------------ -- PL/SQL to stop queue and drop queue & queue-table from SCOTT ------------------------------------------------------------------ BEGIN DBMS_AQADM.STOP_QUEUE('scott.test_q'); DBMS_AQADM.DROP_QUEUE( queue_name => 'scott.test_q', auto_commit => TRUE); DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => 'scott.test_q_tab', force => FALSE, auto_commit => TRUE); END; / -- End of Part I, database setup. //Part II: Enqueuing and dequeuing messages //C# using System; using System.Text; using Oracle.DataAccess.Client; using Oracle.DataAccess.Types; namespace ODPSample { /// <summary> /// Demonstrates Enqueuing and Dequeuing raw message /// using a single consumer queue /// </summary> class EnqueueDequeue { static void Main(string[] args) { // Create connection string constr = "user id=scott;password=Pwd4Sct;data source=oracle"; OracleConnection con = new OracleConnection(constr); // Create queue OracleAQQueue queue = new OracleAQQueue("scott.test_q", con); try { // Open connection con.Open(); // Begin txn for enqueue OracleTransaction txn = con.BeginTransaction(); // Set message type for the queue queue.MessageType = OracleAQMessageType.Raw; // Prepare message and RAW payload OracleAQMessage enqMsg = new OracleAQMessage(); byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; enqMsg.Payload = bytePayload; // Prepare to Enqueue queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit; // Enqueue message queue.Enqueue(enqMsg); Console.WriteLine("Enqueued Message Payload : " + ByteArrayToString(enqMsg.Payload as byte[])); Console.WriteLine("MessageId of Enqueued Message : " + ByteArrayToString(enqMsg.MessageId)); // Enqueue txn commit txn.Commit(); // Begin txn for Dequeue txn = con.BeginTransaction(); // Prepare to Dequeue queue.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit; queue.DequeueOptions.Wait = 10; // Dequeue message OracleAQMessage deqMsg = queue.Dequeue(); Console.WriteLine("Dequeued Message Payload : " + ByteArrayToString(deqMsg.Payload as byte[])); Console.WriteLine("MessageId of Dequeued Message : " + ByteArrayToString(deqMsg.MessageId)); // Dequeue txn commit txn.Commit(); } catch (Exception e) { Console.WriteLine("Error: {0}", e.Message); } finally { // Close/Dispose objects queue.Dispose(); con.Close(); con.Dispose(); } } // Function to convert byte[] to string static private string ByteArrayToString(byte[] byteArray) { StringBuilder sb = new StringBuilder(); for (int n = 0; n < byteArray.Length; n++) { sb.Append((int.Parse(byteArray[n].ToString())).ToString("X")); } return sb.ToString(); } } }