10 Oracle Streamsアドバンスト・キューイング

この章では、メッセージに対応するOracle Streams Advanced Queuing (AQ)のOCCI実装について説明します。

この章には次のトピックが含まれます:

関連項目:

10.1 Oracle Streams Advanced Queuingの概要

Oracle Streamsは、レプリケーション、メッセージ・キューイング、データ・ウェアハウスへのロード、およびイベント通知を提供する、新規の情報共有機能です。また、Oracle StreamsはOracle Streams Advanced Queuing(AQ)の基盤です。

Advanced Queuingは、Oracle Streamsのメッセージ・キューイング機能を公開する、統合されたメッセージ・キューイング機能です。AQにより、アプリケーションでは次のことが可能になります。

  • Oracleデータベースから、SQL操作に類似したメッセージ・キューイング操作を実行

  • AQキューのメッセージを介して非同期に通信

  • データベースとの統合により、メッセージ・キューイングに対してこれまでにない操作上の簡潔性、信頼性およびセキュリティを提供

  • メッセージの監査および追跡

  • 同期モードおよび非同期モードの通信をサポート

関連項目:

アドバンスト・キューイングの詳細は、Oracle Technology Network — アドバンスト・キューイングを参照してください。

OCCIアプリケーションでAQを使用する利点は、次のとおりです。

  • 一貫性と信頼性を保ちながらセキュアな自律型の方法で相互に通信するアプリケーションを作成できます。

  • メッセージをデータベース表に格納することで、メッセージ交換インフラストラクチャにデータベースの信頼性とリカバリ可能性を取り込みます。

  • メッセージが自動的にデータベースに保存されることで、監査およびビジネス・インテリジェンスに使用可能です。

  • 異なるセキュリティ、データ型または操作モードを使用することなくメッセージを送受信するアプリケーションを作成できます。

  • データベースのトランザクションに関する特性を活用できます。

従来のメッセージ・ソリューションでは単一のサブスクライバ・キューを使用するため、キューは相互に通信するアプリケーションのペアごとに1つずつ作成する必要があります。AQのパブリッシュ/サブスクライブ・プロトコルにより、複数のアプリケーション間の対話にアプリケーション(サブスクライバ)を簡単に追加できます。

10.2 OCCIでのAQ実装について

OCCI AQは、企業のメッセージ・アプリケーションにおいて、メッセージ・クライアントがOracleのアドバンスト・キューイング機能にアクセスできるようにする一連のインタフェースです。現在、OCCI AQでは管理インタフェースではなく操作インタフェースのみがサポートされていますが、埋込みのPL/SQLコールによって管理操作にもアクセスできます。

関連項目:

AQサポートでのPL/SQLによる管理操作の詳細は、『Oracle Database PL/SQLパッケージおよびタイプ・リファレンス』DBMS_AQADMパッケージに関する項を参照してください。

AQ機能は、OCCIで使用可能な他のインタフェースと組み合せて、メッセージ対応データベースでの送信、受信、パブリッシュおよびサブスクライブに使用できます。メッセージ選択ルールに基づいた同期および非同期メッセージの受入れが可能です。

エンキューとはキューへのメッセージ送信を指し、デキューとはメッセージを受け取ることを指します。クライアント・アプリケーションでは、メッセージを作成して必要なプロパティを設定し、キュー(データベース内の表)に格納することでメッセージをエンキューできます。メッセージをデキューする場合、アプリケーションでは、キューに対して受取りメソッドをコールしてメッセージを同期的にデキューするか、データベースからの通知を待機して非同期的にデキューできます。

AQ機能は、次の抽象化を通じて実装されます。

10.2.1 Message

メッセージは情報の基本単位で、キューに挿入されたりキューから取得されたりします。メッセージは、制御情報およびペイロード・データで構成されます。制御情報とは、AQでメッセージの管理に使用されるメッセージ・プロパティを指します。ペイロード・データは、キューに格納される情報で、AQに対して透過的です。

10.2.2 Agent

Agentは、キューのユーザーである、メッセージのプロデューサまたはコンシューマ、あるいはエンド・ユーザーまたはアプリケーションを指し、識別します。Agentは、名前、アドレスおよびプロトコルによって識別されます。名前は、アプリケーションによって割り当てられるか、またはアプリケーション自体です。アドレスは、通信プロトコルとして判別されます。プロトコルが0 (デフォルト)である場合、アドレスの書式は[schema.]queuename[@dblink] (データベース・リンク)となります。

同一キュー上のAgentには、名前、アドレスおよびプロトコルの一意の組合せが必要です。例10-1は、クライアント・プログラムにおける新規Agentオブジェクトのインスタンス化を示しています。

例10-1 Agentの作成

Agent agt(env, "Billing_app", "billqueue", 0);

10.2.3 Producer

クライアントは、Producerオブジェクトを使用してMessageをキューにエンキューします。また、各種エンキュー・オプションの指定にも使用されます。

10.2.4 Consumer

クライアントは、Consumerオブジェクトを使用して、キューに配信されたMessageをデキューします。また、各種デキュー・オプションを指定します。

Consumerがメッセージを受信するには、ConsumerでAgentを設定します。

例10-2 ConsumerでのAgentの設定

Consumer cons(conn);
...
cons.setAgent(ag);
cons.receive();

10.2.5 Listener

Listenerは、指定されたキューで、登録されたAgentMessageをリスニングします。

10.2.6 Subscription

Subscriptionは、通知用のサブスクライバの登録に必要な情報および操作をカプセル化します。

10.3 メッセージの作成について

前述のように、Messageは、メッセージのプロパティとその内容(ペイロード)の両方を含む情報の基本単位です。各メッセージはProducerによってエンキューされ、Consumerオブジェクトによってデキューされます。

この項には次のトピックが含まれます:

10.3.1 メッセージ・ペイロードについて

OCCIは、3つのタイプのメッセージ・ペイロードをサポートしています。
10.3.1.1 RAW

RAWペイロードは、OCCIでBytesクラスのオブジェクトとしてマッピングされます。

10.3.1.2 AnyData

AnyData型は、自己記述データのカプセル化をモデル化します。型情報と実際のデータの値の両方が含まれています。大半のSQL型のデータ値はAnyDataに変換可能であり、さらに元のデータ型に変換可能です。AnyDataはユーザー定義のデータ型もサポートしています。AnyDataペイロードを使用する利点は、エンキュー処理およびデキュー処理の後にも両方の型の保持が保証されることと、ユーザーがアプリケーションで使用するすべての型に単一キューを使用できることです。例10-3AnyDataメッセージを生成する方法を示します。例10-4にメッセージから元データ型を取得する方法を示します。

例10-3 文字列ペイロードを含むAnyDataメッセージの作成

AnyData any(conn);
any.setFromString("item1");
Message mes(env);
mes.setAnyData(any);

例10-4 AnyDataメッセージにおけるペイロードの型の判別

TypeCode tc = any.getType();
10.3.1.3 ユーザー定義型のペイロードとしての使用

OCCIでは、ユーザー定義型のペイロードのエンキューおよびデキューをサポートしています。例10-5は、ユーザー定義のEmployeeオブジェクトによるペイロードの作成方法を示しています。

例10-5 ユーザー定義ペイロードの作成

// Assuming type Employee ( name varchar2(25),
//                          deptid number(10),
//                          manager varchar2(25) )
Employee *emp = new Employee();
emp.setName("Scott");
emp.setDeptid(10);
emp.setManager("James");
Message mes(env);
mes.setObject(emp);

10.3.2 メッセージ・プロパティ

ペイロードの他に、ユーザーは、次のようないくつかの追加メッセージ・プロパティを指定できます。
10.3.2.1 相関

例10-6に示すように、アプリケーションでは、エンキュー・プロセスでメッセージの相関識別子を指定できます。この識別子は、後で、デキューするアプリケーションで使用できます。

例10-6 相関識別子の指定

mes.setCorrelationId("enq_corr_di");
10.3.2.2 送信者

例10-7に示すように、アプリケーションではメッセージの送信者を指定できます。送信者識別子は、後でメッセージの受信者が使用できます。

例10-7 送信者識別子の指定

mes.setSenderId(agt);
10.3.2.3 遅延および有効期限

例10-8に示すように、時間設定では、メッセージの遅延時間および有効期間が秒数で制御されます。

例10-8 メッセージの遅延時間および有効期間の指定

mes.setDelay(10);
mes.setExpirationTime(60);
10.3.2.4 受信者

例10-9に示すように、メッセージ・エンコーディングの際に、メッセージの宛先となるエージェントを指定できます。これによって、指定された受信者のみがメッセージにアクセスできるようになります。

例10-9 メッセージ受信者の指定

vector<Agent> agt_list;
for (i=0; i<num_recipients; i++)
   agt_list.push_back(Agent(name, address, protocol));
mes.setRecipientList(agt_list);
10.3.2.5 優先順位および順序付け

優先順位レベルをメッセージに割り当てることで、送信者はメッセージが受信者によってデキューされる順序を制御できます。例10-10は、メッセージの優先順位の設定方法を示しています。

例10-10 メッセージの優先順位の指定

mes.setPriority(3);

10.4 メッセージのキューへの登録

メッセージはProducerによってエンキューされます。また、Producerクラスはエンキュー・オプションの指定にも使用されます。例10-11に示すように、エンキューが実行される有効な接続においてProducerオブジェクトを作成できます。

エンキュー操作のトランザクションの動作は、アプリケーションの要件に基づいて定義できます。アプリケーションでは、エンキュー操作の結果を、例10-11のように操作の完了後すぐに、またはエンクロージング・トランザクションのコミット後にのみ外部に表示するにようにできます。

メッセージをエンキューするには、例10-11に示すようにsend()メソッドを使用します。クライアントは、Messageオブジェクトが送信された後もこれを保持し、変更して再度送信できます。

例10-11 Producerの作成、可視性の設定およびメッセージのエンキュー

Producer prod(conn);
...
prod.setVisibility(Producer::ENQ_IMMEDIATE);
...
Message mes(env);
...
mes.setBytes(obj);            // obj represents the content of the message
prod.send(mes, queueName);    // queueName is the name of the queue

10.5 メッセージのデキュー

キューに配信されたメッセージは、Consumerによってデキューされます。また、Consumerクラスはデキュー・オプションの指定にも使用されます。例10-12に示すように、キューが存在するデータベースへの有効な接続上で、Consumerオブジェクトを作成できます。

同一のキューで複数のコンシューマをサポートするアプリケーションでは、例10-12に示すように、コンシューマの名前をキューの登録済サブスクライバとして指定する必要があります。

メッセージをデキューするには、例10-12に示すようにreceive()メソッドを使用します。receive()メソッドのtypeNameパラメータおよびschemaNameパラメータは、ペイロードの型およびペイロード型のスキーマを指定します。

キューのペイロード型がRAWまたはAnyDataの場合、schemaNameおよびtypeNameはオプションですが、ユーザー定義ペイロードを使用する場合、これらのパラメータを明示的に指定する必要があります。これについては例10-13に示しています。

例10-12 Consumerの作成、Consumerのネーミングおよびメッセージの受信

Consumer cons(conn);
...
// Name must be registered with the queue through administrative interface
cons.setConsumerName("BillApp");
cons.setQueueName(queueName);
...
Message mes = cons.receive(Message::OBJECT, "BILL_TYPE", "BILL_PROCESSOR");
...
// obj is is assigned the content of the message
obj = mes.getObject();

例10-13 メッセージの受信

//receiving a RAW message
Message mes = cons.receive(Message::RAW);
...
//receiving an ANYDATA message
Message mes = cons.receive(Message::ANYDATA);
...
この項には次のトピックが含まれます。「デキュー・オプションについて」

10.5.1 デキュー・オプションについて

デキューするアプリケーションでは、メッセージの受信を開始する前に、複数のデキュー・オプションを指定できます。これには次のものがあります。
10.5.1.1 相関

例10-14に示すように、setCorrelationId()メソッドを使用して、相関識別子の値に基づいてメッセージをデキューできます。

10.5.1.2 モード

ユーザーはアプリケーションの要件に基づいて、例10-14に示すように、setDequeueMode()メソッドを使用して、キューのメッセージの単なる閲覧、キューからのメッセージの削除、またはメッセージのロックを選択できます。

10.5.1.3 ナビゲーション

単一トランザクションでエンキューされたメッセージは、例10-14に示すように、setPositionOfMessage()メソッドを実装することで単一グループとして表示できます。

例10-14 デキュー・オプションの指定

cons.setCorrelationId(corrId);
...
cons.setDequeueMode(deqMode);
...
cons.setPositionOfMessage(Consumer::DEQ_NEXT_TRANSACTION);

10.6 メッセージのリスニング

リスナーは、登録クライアントの代理でキュー上のメッセージをリスニングします。リスナー・クラスにはlisten()メソッドが実装されますが、このメソッドは、登録済エージェントに対するメッセージがキューに存在すると戻り、タイム・アウト期間経過時にエラーをスローするブロッキング・コールです。例10-15にリスニング・プロトコルを示します。

例10-15 メッセージのリスニング

Listener listener(conn);

vector<Agent> agtList;
for( int i=0; i<num_agents; i++)
   agtList.push_back( Agent( name, address, protocol);

listener.setAgentList(agtList);
listener.setTimeOutForListen(10);

Agent agt(env);

try{
   agt = listener.listen();
}
catch{
   cout<<e.getMessage()<<endl;
}

10.7 通知の登録について

Subscriptionクラスは、パブリッシュ・サブスクライブ通知機能を実装しています。この機能により、OCCI AQアプリケーションでは、クライアント通知の直接受信、通知の送信先の電子メール・アドレスの登録、通知をポストするHTTP URLの登録、あるいは通知に対してコールされるPL/SQLプロシージャの登録を行うことができます。登録されたクライアントは、イベントが発生した時、または明示的なAQエンキューにあるとき、非同期に通知を受け取ります。クライアントはデータベースに接続されている必要はありません。

OCCIアプリケーションでは、次のすべての処理を実行できます。

  • AQネームスペースに通知の実行を登録し、エンキューが発生したときに通知を受け取ります。

  • データベース・イベントのサブスクリプションの実行を登録し、これらのイベントがトリガーされたときに通知を受け取ります。

  • 一時的な登録の解除またはすべての登録の削除など、登録を管理します。

  • 登録されたクライアントに通知を送信します。

10.7.1 パブリッシュ・サブスクライブ通知

通知は複数の方法で機能します。次のような場合があります。

  • OCCIアプリケーションでの直接受信

  • 事前に指定された電子メール・アドレスへの送信

  • 事前に定義されたHTTP URLへの送信

  • 事前に指定したデータベースPL/SQLプロシージャのコール

登録されたクライアントは、イベントがトリガーされたとき、またはイベントが明示的なAQエンキューにあるとき、非同期に通知を受け取ります。クライアントは、通知を機能させるためにデータベースに接続する必要はありません。登録は次のいずれかにより実行できます。
10.7.1.1 直接登録の使用方法

直接データベースに登録できます。これは比較的単純な方法で、登録は即時に有効になります。例10-16は、直接イベント通知に正常に登録するための必要なステップについて、概要を示しています。適切なイベント・トリガーまたはキューが存在し、初期化パラメータCOMPATIBLE8.1以上に設定されていることを前提としています。

例10-16 通知への登録方法(直接登録)

  1. Environment::EVENTSモードで環境を作成します。

  2. Subscriptionオブジェクトを作成します。

  3. 次のSubscription属性を設定します。

    namespaceは次のオプションに設定できます。

    • AQキューから通知を受信するには、namespaceSubscription::NS_AQに設定する必要があります。サブスクリプション名は、シングル・コンシューマ・キューで登録する場合はSCHEMA.QUEUE、またはマルチ・コンシューマ・キューで登録する場合はSCHEMA.QUEUE:CONSUMER_NAMEという書式になります。

    • conn->postToSubscription()メソッドを使用する他のアプリケーションから通知を受信するには、namespaceSubscription::NS_ANONYMOUSに設定する必要があります。

    protocolは次のオプションに設定できます。

    • OCCIクライアントがイベント通知を受信する必要がある場合、この属性をSubscription::PROTO_CBKに設定します。また、Subscriptionを登録する前に、通知コールバックおよびサブスクリプション・コンテキストを設定する必要があります。通知コールバックは、イベントが発生したときにコールされます。

    • 電子メール通知の場合、プロトコルをSubscription::PROTO_MAILに設定します。アプリケーション・エラーを避けるために、サブスクライブの前に受信者名を設定する必要があります。

    • HTTP URL通知の場合、プロトコルをSubscription::HTTPに設定します。アプリケーション・エラーを避けるために、サブスクライブの前に受信者名を設定する必要があります。

    • イベント通知に基づいてデータベースのPL/SQLプロシージャをコールするには、プロトコルをSubscription::PROTO_SERVERに設定します。アプリケーション・エラーを避けるために、サブスクライブの前に受信者名を設定する必要があります。

  4. connection->registerSubscriptions()を使用してサブスクリプションを登録します。

10.7.1.2 オープン登録の使用

データベースに登録要求を送信する中間LDAPを使用して登録することもできます。この方法は、データベースの停止中にクライアントがオープン・イベントに登録する場合など、クライアントがデータベースに直接接続できない場合に役立ちます。この方法は、クライアントが複数のデータベース内の同じイベントまたは複数のイベントに同時に登録する場合にも使用されます。

例10-17は、Oracle Enterprise Security Manager (OESM)を使用したLDAPオープン登録について、概要を示しています。オープン登録の前提条件は、次のとおりです。

  • クライアントがエンタープライズ・ユーザーであることが必要です。

    • 各エンタープライズ・ドメインで、エンタープライズ・ロールENTERPRISE_AQ_USER_ROLEを作成します。

    • エンタープライズ・ドメイン内の各データベースについて、グローバル・ロールGLOBAL_AQ_USER_ROLEをエンタープライズ・ロールENTERPRISE_AQ_USER_ROLEに追加します。

    • 各エンタープライズ・ドメインについて、エンタープライズ・ロールENTERPRISE_AQ_USER_ROLEを、管理コンテキスト内のcn=oraclecontextの下にある権限グループcn=OracleDBAQUsersに追加します。

    • データベース内のイベントへの登録が承認された各エンタープライズ・ユーザーに対して、エンタープライズ・ロールENTERPRISE_AQ_USER_ROLEを付与します。

  • データベースの互換性は、9.0以上であることが必要です。

  • LDAP_REGISTRATION_ENABLEDTRUEに設定する必要があります(デフォルトはFALSE)。

    ALTER SYSTEM SET LDAP_REGISTRATION_ENABLED=TRUE
    
  • LDAP_REG_SYNC_INTERVALを、LDAPからの登録をリフレッシュするためにtime_interval (秒単位)に設定する必要があります(デフォルトは0 (ゼロ)で、この場合リフレッシュは行われません)。

    ALTER SYSTEM SET LDAP_REG_SYNC_INTERVAL = time_interval
    

データベースでLDAPの登録情報を即時にリフレッシュするには、次のコマンドを発行します。

ALTER SYSTEM REFRESH LDAP_REGISTRATION

データベースが新規登録を取り出すためにLDAPにアクセスすると、オープン登録された登録が有効になります。取り出す頻度は、REG_SYNC_INTERVALの値によって決定されます。

クライアントは、サブスクリプションを一時的に無効にしたり、再度有効にしたり、あるいは将来の通知から永続的に登録解除できます。

例10-17 LDAPを介したオープン登録の使用方法

  1. Environment::EVENTS|Environment::USE_LDAPモードで環境を作成します。

  2. LDAPにアクセスするためのEnvironmentオブジェクトを設定します。

    • LDAPサーバーが常駐し、リスニングしているホストおよびポート

    • 認証方式(現在は、ユーザー名とパスワードの単純な認証のみがサポートされています)

    • LDAPサーバーで認証するためのユーザー名(識別名)およびパスワード

    • LDAPサーバー内のOracleに関する管理コンテキスト

  3. Subscriptionオブジェクトを作成します。

  4. クライアントがSubscriptionオブジェクトに関する通知を受信する、各データベースの識別名を設定します。

  5. 次のSubscription属性を設定します。

    namespaceは次のオプションに設定できます。

    • AQキューから通知を受信するには、namespaceSubscription::NS_AQに設定する必要があります。サブスクリプション名は、シングル・コンシューマ・キューで登録する場合はSCHEMA.QUEUE、またはマルチ・コンシューマ・キューで登録する場合はSCHEMA.QUEUE:CONSUMER_NAMEという書式になります。

    • conn->postToSubscription()メソッドを使用する他のアプリケーションから通知を受信するには、namespaceSubscription::NS_ANONYMOUSに設定する必要があります。

    protocolは次のオプションに設定できます。

    • OCCIクライアントがイベント通知を受信する必要がある場合、この属性をSubscription::PROTO_CBKに設定します。また、Subscriptionを登録する前に、通知コールバックおよびサブスクリプション・コンテキストを設定する必要があります。通知コールバックは、イベントが発生したときにコールされます。

    • 電子メール通知の場合、プロトコルをSubscription::PROTO_MAILに設定します。その後、通知を送信する必要のある電子メール・アドレスに対して受信者名を設定する必要があります。

    • HTTP URL通知の場合、プロトコルをSubscription::HTTPに設定します。通知がポストされるURLに対して受信者名を設定する必要があります。

    • イベント通知に基づいてデータベースのPL/SQLプロシージャをコールするには、プロトコルをSubscription::PROTO_SERVERに設定します。通知に対してコールされるデータベース・プロシージャに対して、受信者名を設定する必要があります。

  6. サブスクリプションenvironment->registerSubscriptions()を登録します。

10.7.2 通知コールバックについて

クライアントは通知コールバックを登録する必要があります。通知コールバックは、登録したサブスクリプションに対してアクティビティが発生したときにのみコールされます。Streams AQネームスペースでは、対象となるメッセージがエンキューされたときに通知コールバックが発生します。

通知コールバックからは、0が戻される必要があります。また、次の指定が含まれる必要があります。

typedef unsigned int (*callbackfn) (Subscription &sub, NotifyResult *nr);

説明:

  • subパラメータは、コールバックの登録時に使用されたSubscriptionオブジェクトです。

  • nrパラメータは、通知情報を保持するNotifyResultオブジェクトです。

通知への登録に使用したSubscriptionオブジェクトは、明示的にサブスクリプションを登録解除するまで破棄しないようにしてください。

ユーザーは、通知ソースに応じて、ペイロード、メッセージ、メッセージID、キュー名、コンシューマ名をNotifyResultオブジェクトから取得できます。結果のまとめを表10-1に示します。現在はバイト・ペイロードのみがサポートされており、メッセージをAQ名前空間の永続キューから明示的にデキューする必要があります。通知が非永続キューから届いた場合には、メッセージは直接コールバックで利用可能であり、RAWペイロードのみがサポートされています。通知が永続キューから届いた場合には、メッセージは明示的にデキューされる必要があり、すべてのペイロード型がサポートされています。

表10-1 通知結果属性(ANONYMOUSおよびAQネームスペース)

通知結果属性 ANONYMOUSネームスペース AQネームスペース、永続キュー AQネームスペース、非永続キュー

ペイロード

有効

無効

無効

message

無効

無効

有効

messageID

無効

有効

有効

consumer name

無効

有効

有効

queue name

無効

有効

有効

10.8 メッセージ・フォーマットの変換について

アプリケーションでは異なる形式のデータを使用することが多く、型の変換が必要となります。変換は、ソース・データ型を入力として使用し、ターゲット・データ型のオブジェクトを戻すSQL関数として実装されます。変換を適用できるのは、メッセージがエンキュー/デキューされるとき、またはメッセージがリモート・サブスクライバに伝播されるときです。