日本語PDF

Oracle Databaseアドバンスト・キューイングのサポート

Oracle Databaseアドバンスト・キューイング(AQ)は、データベース統合型のメッセージ・キューイング機能を提供します。Oracle Database AQでは、メッセージを永続的に格納し、異なるコンピュータおよびデータベースにあるキュー間でメッセージを伝播し、Oracle Net ServicesおよびHTTP(S)を使用してメッセージを送信できるように、Oracle Databaseの機能を利用します。

注意:

ODP.NET管理対象ドライバおよびODP.NET Coreは、AQ .NETのクラスをサポートしていません。

Oracle Database AQはデータベース表に実装されるため、運用上の利点である高可用性、スケーラビリティ、信頼性のすべてがキュー・データにも適用されます。Oracle Database AQは、リカバリや再起動、セキュリティなどの標準的なデータベース機能もサポートしています。

次の各項でOracle Database AQの概念について説明します。

  • キューおよびキュー表

    キューにエンキューされたメッセージは、キュー表に格納されます。キュー表に基づいてキューを作成するには、先にキュー表を作成しておく必要があります。キュー表およびキューの作成と管理には、DBMS_AQADM PL/SQLパッケージまたはOracle Developer Tools for Visual Studioを使用します。

    キューはOracleAQQueueオブジェクトで表されます。

  • 単一コンシューマ・キューおよび複数コンシューマ・キュー

    単一コンシューマ・キューは、単一コンシューマ・キュー表に基づいて作成されます。単一コンシューマ・キューにエンキューされたメッセージは、単一コンシューマのみがデキューできます。

    複数コンシューマ・キューは、複数コンシューマ・キュー表に基づいて作成されます。このキューは、キュー・サブスクライバとメッセージ受信者をサポートしています。

  • メッセージ受信者

    メッセージ・プロデューサは、メッセージのエンキュー時に受信者のリストを発行できます。これにより、キュー内の各メッセージと一連の受信者を一意に対応させることができます。キューに関連付けられたサブスクライバ・リストがある場合でも、メッセージに関連付けられた受信者リストが優先されます。受信者がサブスクライバ・リストに指定されている必要はありません。ただし、サブスクライバの中から受信者を選択できます。OracleAQMessageRecipientsプロパティを使用すると、OracleAQAgentオブジェクトの観点から特定のメッセージの受信者を指定できます。

  • エンキュー

    プロデューサ・アプリケーションがキューにメッセージをプッシュすると、そのメッセージはエンキューされます。これを実現するには、OracleAQQueueオブジェクトに対してEnqueueメソッドをコールします。EnqueueArrayメソッドを使用すれば、複数のメッセージをエンキューできます。

  • デキュー

    コンシューマ・アプリケーションがキューからメッセージをプルすると、そのメッセージはデキューされます。これを実現するには、OracleAQQueueオブジェクトに対してDequeueメソッドをコールします。DequeueArrayメソッドを使用すれば、複数のメッセージをデキューできます。

  • リスニング

    サブスクライバ・アプリケーションでは、Listenコールを使用して複数のキューを監視することで、様々なキューをサブスクライブできます。これは、サブスクライバ・アプリケーションが多くのキューをサブスクライブしており、どのキューに到着したメッセージもすべて受信する場合に適している、スケーラブルなソリューションです。これを実現するには、OracleAQQueueクラスのListenメソッドをコールし、サブスクリプションのリストを配列の形式で渡します。

  • 通知

    サブスクライバ・アプリケーションでは、通知メカニズムを利用して、キュー内のメッセージの可用性に関する通知を受信できます。受信した情報に基づいて、メッセージをスキップするか、キューからデキューすることを決定できます。

    サブスクライバ・アプリケーションでは、通知の受信元となるキューのイベント通知を登録する必要があります。これは、OracleAQQueueMessageAvailableイベントで表されます。サブスクリプションに一致するメッセージが到着すると、イベントがトリガーされます。

    通知は、通常の通知またはグループ化通知として登録できます。これらの通知のタイムアウト値も指定可能です。OracleAQQueue.Notificationプロパティを使用すると、様々な通知オプションを設定できます。OracleAQQueueオブジェクトに設定した通知は、オブジェクトが破棄されると自動的に取り消されます。

  • バッファ・メッセージング

    バッファ・メッセージングでは、メッセージは共有メモリー領域に格納されます。そのため、バッファ・メッセージングは永続メッセージングより高速です。メッセージがディスクに書き込まれるのは、バッファ・メッセージの総メモリー使用率が使用可能な共有メモリーの上限に達した場合のみです。バッファ・メッセージングは、Oracle Database AQの永続メッセージングの信頼性とトランザクション・サポートを必要としないアプリケーションに最適です。

    バッファ・メッセージと永続メッセージは、同一の単一コンシューマ・キューまたは複数コンシューマ・キューと、同一の管理インタフェースおよび操作インタフェースを使用します。この2種類のメッセージは、配信モード・パラメータで区別されます。アプリケーションでは、Oracle Database AQキューにメッセージをエンキューする際に、配信モード・パラメータも設定します。

    OracleAQMessageに配信モード・パラメータを設定するには、DeliveryModeプロパティを変更します。バッファ・メッセージングは、8.1以上の互換性で作成されたすべてのキュー表でサポートされます。

アドバンスト・キューイングでのODP.NETの使用

.NETアプリケーションでは、ODP.NETを使用して、エンキュー、デキュー、リスニング、通知など、AQのすべての操作機能を利用できます。

表3-27は、AQ機能を対応するODP.NET実装にマップしたものです。

表3-27 AQ機能とODP.NET実装とのマッピング

機能 ODP.NET実装

メッセージの作成

OracleAQMessageオブジェクトを作成

単一メッセージのエンキュー

メッセージをOracleAQMessageとして、キューをOracleAQQueueとして指定し、OracleAQQueueにエンキュー・オプションを指定して、OracleAQQueue.Enqueueをコール

複数メッセージのエンキュー

OracleAQQueue.EnqueueArrayにメッセージをOracleAQMessage配列として指定

単一メッセージのデキュー

OracleAQQueueにデキュー・オプションを指定し、OracleAQQueue.Dequeueをコール

複数メッセージのデキュー

OracleAQQueue.DequeueArrayをコール

キューのメッセージのリスニング

OracleAQQueue.Listenをコール。複数のキューでリスニングするには、OracleAQQueueListen静的メソッドを使用

メッセージ通知

OracleAQQueue.MessageAvailable EventNotificationConsumersプロパティとともに使用

注意:

AQサンプルは、Oracle Universal Installerを使用して実行されるODACインストールのORACLE_BASE\ORACLE_HOME\ODP.NET\Samplesディレクトリにあります。

エンキューおよびデキューの例

次の例では、単一コンシューマ・キューを使用するエンキューおよびデキューのメッセージを示しています。この例の最初の部分では、データベース・ユーザーSCOTTの必要なデータベース設定を実行しています。この例の2番目の部分は、エンキューおよびデキューのメッセージを示しています。

-- Part I: Database setup required for this demo
 
------------------------------------------------------------------
-- SQL to grant appropriate privilege to database user, SCOTT
------------------------------------------------------------------
SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
SQL> GRANT ALL ON DBMS_AQADM TO scott;
 
------------------------------------------------------------------
-- PL/SQL to create queue-table and queue and start queue for SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table=>'scott.test_q_tab', 
    queue_payload_type=>'RAW', 
    multiple_consumers=>FALSE);
 
  DBMS_AQADM.CREATE_QUEUE(
    queue_name=>'scott.test_q', 
    queue_table=>'scott.test_q_tab');
 
  DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/
 
------------------------------------------------------------------
-- PL/SQL to stop queue and drop queue & queue-table from SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.STOP_QUEUE('scott.test_q');
 
  DBMS_AQADM.DROP_QUEUE(
    queue_name => 'scott.test_q', 
    auto_commit => TRUE);
 
  DBMS_AQADM.DROP_QUEUE_TABLE(
    queue_table => 'scott.test_q_tab',
    force => FALSE, 
    auto_commit => TRUE);
END;
/
-- End of Part I, database setup.

//Part II: Enqueuing and dequeuing messages
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
 
namespace ODPSample
{
  /// <summary>
  /// Demonstrates Enqueuing and Dequeuing raw message 
  /// using a single consumer queue
  /// </summary>
  class EnqueueDequeue
  {
    static void Main(string[] args)
    {
      // Create connection
      string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
      OracleConnection con = new OracleConnection(constr);
 
      // Create queue
      OracleAQQueue queue = new OracleAQQueue("scott.test_q", con);
 
      try
      {
        // Open connection
        con.Open();
 
        // Begin txn for enqueue
        OracleTransaction txn = con.BeginTransaction();
 
        // Set message type for the queue
        queue.MessageType = OracleAQMessageType.Raw;
 
        // Prepare message and RAW payload
        OracleAQMessage enqMsg = new OracleAQMessage();
        byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        enqMsg.Payload = bytePayload;
 
        // Prepare to Enqueue
        queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
 
        // Enqueue message
        queue.Enqueue(enqMsg);
 
        Console.WriteLine("Enqueued Message Payload      : "
          + ByteArrayToString(enqMsg.Payload as byte[]));
        Console.WriteLine("MessageId of Enqueued Message : "
          + ByteArrayToString(enqMsg.MessageId));
 
        // Enqueue txn commit
        txn.Commit();
 
        // Begin txn for Dequeue
        txn = con.BeginTransaction();
 
        // Prepare to Dequeue
        queue.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
        queue.DequeueOptions.Wait = 10;
 
        // Dequeue message
        OracleAQMessage deqMsg = queue.Dequeue();
 
        Console.WriteLine("Dequeued Message Payload      : "
          + ByteArrayToString(deqMsg.Payload as byte[]));
        Console.WriteLine("MessageId of Dequeued Message : "
          + ByteArrayToString(deqMsg.MessageId));
 
        // Dequeue txn commit
        txn.Commit();
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: {0}", e.Message);
      }
      finally
      {
        // Close/Dispose objects
        queue.Dispose();
        con.Close();
        con.Dispose();
      }
    }
 
    // Function to convert byte[] to string
    static private string ByteArrayToString(byte[] byteArray)
    {
      StringBuilder sb = new StringBuilder();
      for (int n = 0; n < byteArray.Length; n++)
      {
        sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
      }
      return sb.ToString();
    }
  }
}