ヘッダーをスキップ
Oracle® Data Provider for .NET開発者ガイド
11g リリース2 (11.2.0.4)
B66456-02
  目次へ移動
目次
索引へ移動
索引

前
 
次
 

Oracle Streamsアドバンスト・キューイングのサポート

Oracle Streamsアドバンスト・キューイング(AQ)は、データベース統合型のメッセージ・キューイング機能を提供します。Oracle Streams AQはOracle Streamsを基盤とし、Oracle Databaseの機能を活用して、メッセージの永続的な格納、異なるコンピュータおよびデータベース上のキュー間でのメッセージの伝播、Oracle Net ServicesおよびHTTP(S)を使用したメッセージの送信を可能にしています。


関連項目:

Oracle Streamsアドバンスト・キューイング・ユーザーズ・ガイドおよびリファレンス

Oracle Streams AQはデータベース表に実装されるため、運用上の利点である高可用性、スケーラビリティ、信頼性のすべてがキュー・データにも適用されます。Oracle Streams AQは、リカバリや再起動、セキュリティなどの標準的なデータベース機能もサポートしています。

次の各項でOracle Streams AQの概念について説明します。

アドバンスト・キューイングでのODP.NETの使用

.NETアプリケーションでは、ODP.NETを使用して、エンキュー、デキュー、リスニング、通知など、AQのすべての操作機能を利用できます。

表3-27は、AQ機能を対応するODP.NET実装にマップしたものです。

表3-27 AQ機能とODP.NET実装とのマッピング

機能 ODP.NET実装

メッセージの作成

OracleAQMessageオブジェクトを作成

単一メッセージのエンキュー

メッセージをOracleAQMessageとして、キューをOracleAQQueueとして指定し、OracleAQQueueにエンキュー・オプションを指定して、OracleAQQueue.Enqueueをコール

複数メッセージのエンキュー

OracleAQQueue.EnqueueArrayにメッセージをOracleAQMessage配列として指定

単一メッセージのデキュー

OracleAQQueueにデキュー・オプションを指定し、OracleAQQueue.Dequeueをコール

複数メッセージのデキュー

OracleAQQueue.DequeueArrayをコール

キューのメッセージのリスニング

OracleAQQueue.Listenをコール。複数のキューでリスニングするには、OracleAQQueueListen静的メソッドを使用

メッセージ通知

OracleAQQueue.MessageAvailable EventNotificationConsumersプロパティとともに使用



注意:

ORACLE_BASE\ORACLE_HOME\ODP.NET\SamplesディレクトリにAQのサンプルが用意されています。

エンキューおよびデキューの例

次の例では、単一コンシューマ・キューを使用するエンキューおよびデキューのメッセージを示しています。この例の最初の部分では、データベース・ユーザーSCOTTの必要なデータベース設定を実行しています。この例の2番目の部分は、エンキューおよびデキューのメッセージを示しています。

-- Part I: Database setup required for this demo
 
------------------------------------------------------------------
-- SQL to grant appropriate privilege to database user, SCOTT
------------------------------------------------------------------
SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
SQL> GRANT ALL ON DBMS_AQADM TO scott;
 
------------------------------------------------------------------
-- PL/SQL to create queue-table and queue and start queue for SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table=>'scott.test_q_tab', 
    queue_payload_type=>'RAW', 
    multiple_consumers=>FALSE);
 
  DBMS_AQADM.CREATE_QUEUE(
    queue_name=>'scott.test_q', 
    queue_table=>'scott.test_q_tab');
 
  DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/
 
------------------------------------------------------------------
-- PL/SQL to stop queue and drop queue & queue-table from SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.STOP_QUEUE('scott.test_q');
 
  DBMS_AQADM.DROP_QUEUE(
    queue_name => 'scott.test_q', 
    auto_commit => TRUE);
 
  DBMS_AQADM.DROP_QUEUE_TABLE(
    queue_table => 'scott.test_q_tab',
    force => FALSE, 
    auto_commit => TRUE);
END;
/
-- End of Part I, database setup.

//Part II: Enqueuing and dequeuing messages
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
 
namespace ODPSample
{
  /// <summary>
  /// Demonstrates Enqueuing and Dequeuing raw message 
  /// using a single consumer queue
  /// </summary>
  class EnqueueDequeue
  {
    static void Main(string[] args)
    {
      // Create connection
      string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
      OracleConnection con = new OracleConnection(constr);
 
      // Create queue
      OracleAQQueue queue = new OracleAQQueue("scott.test_q", con);
 
      try
      {
        // Open connection
        con.Open();
 
        // Begin txn for enqueue
        OracleTransaction txn = con.BeginTransaction();
 
        // Set message type for the queue
        queue.MessageType = OracleAQMessageType.Raw;
 
        // Prepare message and RAW payload
        OracleAQMessage enqMsg = new OracleAQMessage();
        byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        enqMsg.Payload = bytePayload;
 
        // Prepare to Enqueue
        queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
 
        // Enqueue message
        queue.Enqueue(enqMsg);
 
        Console.WriteLine("Enqueued Message Payload      : "
          + ByteArrayToString(enqMsg.Payload as byte[]));
        Console.WriteLine("MessageId of Enqueued Message : "
          + ByteArrayToString(enqMsg.MessageId));
 
        // Enqueue txn commit
        txn.Commit();
 
        // Begin txn for Dequeue
        txn = con.BeginTransaction();
 
        // Prepare to Dequeue
        queue.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
        queue.DequeueOptions.Wait = 10;
 
        // Dequeue message
        OracleAQMessage deqMsg = queue.Dequeue();
 
        Console.WriteLine("Dequeued Message Payload      : "
          + ByteArrayToString(deqMsg.Payload as byte[]));
        Console.WriteLine("MessageId of Dequeued Message : "
          + ByteArrayToString(deqMsg.MessageId));
 
        // Dequeue txn commit
        txn.Commit();
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: {0}", e.Message);
      }
      finally
      {
        // Close/Dispose objects
        queue.Dispose();
        con.Close();
        con.Dispose();
      }
    }
 
    // Function to convert byte[] to string
    static private string ByteArrayToString(byte[] byteArray)
    {
      StringBuilder sb = new StringBuilder();
      for (int n = 0; n < byteArray.Length; n++)
      {
        sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
      }
      return sb.ToString();
    }
  }
}