Oracle Streamsアドバンスト・キューイング(AQ)は、データベース統合型のメッセージ・キューイング機能を提供します。Oracle Streams AQはOracle Streamsを基盤とし、Oracle Databaseの機能を活用して、メッセージの永続的な格納、異なるコンピュータおよびデータベース上のキュー間でのメッセージの伝播、Oracle Net ServicesおよびHTTP(S)を使用したメッセージの送信を可能にしています。
|
関連項目: Oracle Streamsアドバンスト・キューイング・ユーザーズ・ガイドおよびリファレンス |
Oracle Streams AQはデータベース表に実装されるため、運用上の利点である高可用性、スケーラビリティ、信頼性のすべてがキュー・データにも適用されます。Oracle Streams AQは、リカバリや再起動、セキュリティなどの標準的なデータベース機能もサポートしています。
次の各項でOracle Streams AQの概念について説明します。
キューおよびキュー表
キューにエンキューされたメッセージは、キュー表に格納されます。キュー表に基づいてキューを作成するには、先にキュー表を作成しておく必要があります。キュー表およびキューの作成と管理には、DBMS_AQADM PL/SQLパッケージまたはOracle Developer Tools for Visual Studioを使用します。
キューはOracleAQQueueオブジェクトで表されます。
単一コンシューマ・キューおよび複数コンシューマ・キュー
単一コンシューマ・キューは、単一コンシューマ・キュー表に基づいて作成されます。単一コンシューマ・キューにエンキューされたメッセージは、単一コンシューマのみがデキューできます。
複数コンシューマ・キューは、複数コンシューマ・キュー表に基づいて作成されます。このキューは、キュー・サブスクライバとメッセージ受信者をサポートしています。
メッセージ受信者
メッセージ・プロデューサは、メッセージのエンキュー時に受信者のリストを発行できます。これにより、キュー内の各メッセージと一連の受信者を一意に対応させることができます。キューに関連付けられたサブスクライバ・リストがある場合でも、メッセージに関連付けられた受信者リストが優先されます。受信者がサブスクライバ・リストに指定されている必要はありません。ただし、サブスクライバの中から受信者を選択できます。OracleAQMessageのRecipientsプロパティを使用すると、OracleAQAgentオブジェクトの観点から特定のメッセージの受信者を指定できます。
エンキュー
プロデューサ・アプリケーションがキューにメッセージをプッシュすると、そのメッセージはエンキューされます。これを実現するには、OracleAQQueueオブジェクトに対してEnqueueメソッドをコールします。EnqueueArrayメソッドを使用すれば、複数のメッセージをエンキューできます。
デキュー
コンシューマ・アプリケーションがキューからメッセージをプルすると、そのメッセージはデキューされます。これを実現するには、OracleAQQueueオブジェクトに対してDequeueメソッドをコールします。DequeueArrayメソッドを使用すれば、複数のメッセージをデキューできます。
リスニング
サブスクライバ・アプリケーションでは、Listenコールを使用して複数のキューを監視することで、様々なキューをサブスクライブできます。これは、サブスクライバ・アプリケーションが多くのキューをサブスクライブしており、どのキューに到着したメッセージもすべて受信する場合に適している、スケーラブルなソリューションです。これを実現するには、OracleAQQueueクラスのListenメソッドをコールし、サブスクリプションのリストを配列の形式で渡します。
通知
サブスクライバ・アプリケーションでは、通知メカニズムを利用して、キュー内のメッセージの可用性に関する通知を受信できます。受信した情報に基づいて、メッセージをスキップするか、キューからデキューすることを決定できます。
サブスクライバ・アプリケーションでは、通知の受信元となるキューのイベント通知を登録する必要があります。これは、OracleAQQueueのMessageAvailableイベントで表されます。サブスクリプションに一致するメッセージが到着すると、イベントがトリガーされます。
通知は、通常の通知またはグループ化通知として登録できます。これらの通知のタイムアウト値も指定可能です。OracleAQQueue.Notificationプロパティを使用すると、様々な通知オプションを設定できます。OracleAQQueueオブジェクトに設定した通知は、オブジェクトが破棄されると自動的に取り消されます。
バッファ・メッセージング
バッファ・メッセージングはOracle Streams AQ 10gリリース2(10.2)で導入されました。バッファ・メッセージングでは、メッセージは共有メモリー領域に格納されます。そのため、バッファ・メッセージングは永続メッセージングより高速です。メッセージがディスクに書き込まれるのは、バッファ・メッセージの総メモリー使用率が使用可能な共有メモリーの上限に達した場合のみです。バッファ・メッセージングは、Oracle Streams AQの永続メッセージングの信頼性とトランザクション・サポートを必要としないアプリケーションに最適です。
バッファ・メッセージと永続メッセージは、同一の単一コンシューマ・キューまたは複数コンシューマ・キューと、同一の管理インタフェースおよび操作インタフェースを使用します。この2種類のメッセージは、配信モード・パラメータで区別されます。アプリケーションでは、Oracle Streams AQキューにメッセージをエンキューする際に、配信モード・パラメータも設定します。
OracleAQMessageに配信モード・パラメータを設定するには、DeliveryModeプロパティを変更します。バッファ・メッセージングは、8.1以上の互換性で作成されたすべてのキュー表でサポートされます。
.NETアプリケーションでは、ODP.NETを使用して、エンキュー、デキュー、リスニング、通知など、AQのすべての操作機能を利用できます。
表3-27は、AQ機能を対応するODP.NET実装にマップしたものです。
表3-27 AQ機能とODP.NET実装とのマッピング
| 機能 | ODP.NET実装 |
|---|---|
|
メッセージの作成 |
|
|
単一メッセージのエンキュー |
メッセージを |
|
複数メッセージのエンキュー |
|
|
単一メッセージのデキュー |
|
|
複数メッセージのデキュー |
|
|
キューのメッセージのリスニング |
|
|
メッセージ通知 |
|
|
注意: ORACLE_BASE\ORACLE_HOME\ODP.NET\SamplesディレクトリにAQのサンプルが用意されています。 |
次の例では、単一コンシューマ・キューを使用するエンキューおよびデキューのメッセージを示しています。この例の最初の部分では、データベース・ユーザーSCOTTの必要なデータベース設定を実行しています。この例の2番目の部分は、エンキューおよびデキューのメッセージを示しています。
-- Part I: Database setup required for this demo
------------------------------------------------------------------
-- SQL to grant appropriate privilege to database user, SCOTT
------------------------------------------------------------------
SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
SQL> GRANT ALL ON DBMS_AQADM TO scott;
------------------------------------------------------------------
-- PL/SQL to create queue-table and queue and start queue for SCOTT
------------------------------------------------------------------
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table=>'scott.test_q_tab',
queue_payload_type=>'RAW',
multiple_consumers=>FALSE);
DBMS_AQADM.CREATE_QUEUE(
queue_name=>'scott.test_q',
queue_table=>'scott.test_q_tab');
DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/
------------------------------------------------------------------
-- PL/SQL to stop queue and drop queue & queue-table from SCOTT
------------------------------------------------------------------
BEGIN
DBMS_AQADM.STOP_QUEUE('scott.test_q');
DBMS_AQADM.DROP_QUEUE(
queue_name => 'scott.test_q',
auto_commit => TRUE);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'scott.test_q_tab',
force => FALSE,
auto_commit => TRUE);
END;
/
-- End of Part I, database setup.
//Part II: Enqueuing and dequeuing messages
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
namespace ODPSample
{
/// <summary>
/// Demonstrates Enqueuing and Dequeuing raw message
/// using a single consumer queue
/// </summary>
class EnqueueDequeue
{
static void Main(string[] args)
{
// Create connection
string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
OracleConnection con = new OracleConnection(constr);
// Create queue
OracleAQQueue queue = new OracleAQQueue("scott.test_q", con);
try
{
// Open connection
con.Open();
// Begin txn for enqueue
OracleTransaction txn = con.BeginTransaction();
// Set message type for the queue
queue.MessageType = OracleAQMessageType.Raw;
// Prepare message and RAW payload
OracleAQMessage enqMsg = new OracleAQMessage();
byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
enqMsg.Payload = bytePayload;
// Prepare to Enqueue
queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
// Enqueue message
queue.Enqueue(enqMsg);
Console.WriteLine("Enqueued Message Payload : "
+ ByteArrayToString(enqMsg.Payload as byte[]));
Console.WriteLine("MessageId of Enqueued Message : "
+ ByteArrayToString(enqMsg.MessageId));
// Enqueue txn commit
txn.Commit();
// Begin txn for Dequeue
txn = con.BeginTransaction();
// Prepare to Dequeue
queue.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
queue.DequeueOptions.Wait = 10;
// Dequeue message
OracleAQMessage deqMsg = queue.Dequeue();
Console.WriteLine("Dequeued Message Payload : "
+ ByteArrayToString(deqMsg.Payload as byte[]));
Console.WriteLine("MessageId of Dequeued Message : "
+ ByteArrayToString(deqMsg.MessageId));
// Dequeue txn commit
txn.Commit();
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e.Message);
}
finally
{
// Close/Dispose objects
queue.Dispose();
con.Close();
con.Dispose();
}
}
// Function to convert byte[] to string
static private string ByteArrayToString(byte[] byteArray)
{
StringBuilder sb = new StringBuilder();
for (int n = 0; n < byteArray.Length; n++)
{
sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
}
return sb.ToString();
}
}
}