この章では、パブリッシュ・サブスクライブ・モデルでアプリケーションを開発する方法について説明します。
内容は次のとおりです。
データベースは企業内の最も重要な情報資源であるため、この役割を補完するために、Oracleでは、企業の情報配信およびメッセージ交換用にパブリッシュ・サブスクライブ・ソリューションを作成しました。
ネットワーキング・テクノロジおよびネットワーキング製品によって、多数のコンピュータ、アプリケーションおよびユーザーにまたがる高度な接続が可能です。これらの環境では、疎結合で自律的に動作する分散システムに非同期通信を提供することが重要であり、ネットワーク障害に強い操作性が要求されます。この要件は、メッセージ機能、メッセージ指向ミドルウェア(MOM)、メッセージ・キューイング、パブリッシュ・サブスクライブなどの特長を持つ様々なミドルウェア製品によって満たされています。
パブリッシュ・サブスクライブ・パラダイムを介して通信するアプリケーションには、受信者を明示的に指定したり意図された受信者を知らせなくても、メッセージを発行できる送信アプリケーション(パブリッシャ)が必要です。同様に、受信アプリケーション(サブスクライバ)は、サブスクライバが登録しているメッセージのみを受信する必要があります。
この送信者と受信者の切離しは、通常、パブリッシャとサブスクライバの間に存在して間接的なレベルとして機能するエンティティによって実現されます。このエンティティが、サブジェクトまたはチャネルを表すキューです。図20-1に、パブリッシュおよびサブスクライブの機能を示します。
サブスクライバは、あるキューにエンキューされたメッセージに関心を示すことによって、また、サブジェクト・ベースまたはコンテンツ・ベースのルールをフィルタとして使用することによって、キューをサブスクライブします。この結果、指定されたキューに一連のルールベースのサブスクリプションが対応付けられます。
実行時、パブリッシャは様々なキューにメッセージを転送します。次に、キュー(基礎となるインフラストラクチャの配信メカニズム)は、様々なサブスクリプションに一致したメッセージを該当するサブスクライバに配信します。
Oracle Databaseには、データベースに対応したパブリッシュ・サブスクライブのメッセージ機能をサポートする機能が含まれています。
データベース・イベントは、データベース・イベントを公開するための宣言定義、検出およびこれらのイベントの実行時の発行をサポートします。この機能によって、イベント駆動方式でエンドユーザーに情報を能動的に発行し、従来のプル型の情報アクセスの方法を補足することができます。
参照: 『Oracle Database PL/SQL言語リファレンス』 |
Oracle Advanced Queuing(AQ)では、キュー・ベースのパブリッシュ・サブスクライブ・パラダイムがサポートされます。データベース・キューは、メッセージの永続的な格納場所として機能し、キューを基にしたパブリッシュおよびサブスクライブが可能になります。ルール・エンジンおよびサブスクリプション・サービスが、設定された関心事項に基づいて、受信者にメッセージを動的に送付します。これによって、送信者と受信者の間のアドレス関係が切り離され、送信者と受信者間での明示的なメッセージ・アドレッシングを補足することができます。
参照: 『Oracle Databaseアドバンスト・キューイング・ユーザーズ・ガイド』 |
キューは、名前付きの関心対象サブジェクトをサポートするエンティティです。キューは永続または非永続(軽量)の特性を持ちます。
永続キューは、メッセージの永続的なコンテナとして機能します。メッセージは、遅延付き高信頼モードで配信されます。
非永続(軽量)キューの基礎となるインフラストラクチャでは、公開されたメッセージを接続クライアントに対して最高で1回送信します。
パブリッシャおよびサブスクライバは、内部的にはエージェントとして表されます。
エージェントは、サブスクリプションを介してキューに関心を示す持続的で論理的なサブスクライブ・エンティティです。エージェントには、関連するサブスクリプション、アドレス、メッセージの配信モードなどのプロパティがあります。この意味では、エージェントはパブリッシャまたはサブスクライバの電子的なプロキシです。
クライアント
クライアントは、一時的な物理エンティティです。クライアントの属性には、クライアント・プログラムが実行される物理プロセス、ノード名、クライアント・アプリケーション・ロジックが含まれます。単一のエージェントにかわって複数のクライアントが動作する場合もあります。認証されている場合は、同一のクライアントが複数のエージェントにかわって動作できます。
キューでのルールは、メッセージ形式属性またはメッセージ・ヘッダー属性に関する一連の事前定義済演算子を使用する条件式として指定されます。各キューには、そのキューが示すメッセージの構造を記述したメッセージ内容形式が対応付けられています。メッセージ形式は、構造化されていなくても(RAW
)、正しく定義された構造体(ADT)であってもかまいません。これによって、サブジェクト・ベースおよびコンテンツ・ベースの両方のサブスクリプションが可能です。
サブスクライバ(エージェント)は、ルールを使用してキューにサブスクリプションを指定できます。サブスクライバは永続的であり、カタログに格納されます。
データベース・イベント発行フレームワーク
データベースは、重要な情報公開元を表します。イベント・フレームワークは、データベース・イベント発行の宣言定義ができるように提案されたものです。これらの事前定義済イベントが発生すると、このフレームワークがイベントを検出および公開します。これによって、パブリッシュ・サブスクライブ機能の一部として、エンドユーザーに対してイベント駆動方式によるアクティブな情報配信が可能になります。
登録は、エージェントにかわって動作する所定のクライアントによって対応付けられた配信情報のプロセスです。エージェントとクライアントの区別に関連して、サブスクリプションと登録の間には重要な区別があります。
サブスクリプションは、エージェントによる特定のキューへの関心を示します。配信の場所および方法は指定しません。配信情報は、クライアントに対応付けられた物理的なプロパティで、論理エージェント(サブスクライバ)の一時的な発現です。エージェントにかわって動作する特定のクライアント・プロセスは、配信を行う 場所を示すホストとポート、および配信の方法を示すコールバックを対応付けることによって、配信情報を登録します。
メッセージの公開
パブリッシャは、適切なキューイング・インタフェースを使用して、キューにメッセージを公開します。インタフェースは、キューが実装されているモデルに依存することがあります。たとえば、エンキュー・コールは、メッセージの公開を表します。
指定されたキューにメッセージが転送または公開されると、ルール・エンジンはそのキューに関して定義されたすべてのルールから、公開されたメッセージと一致する一連の候補となるルールを取得します。
指定されたキュー上の候補ルールのリストに応じて、候補ルールに一致する一連のサブスクライバを評価できます。次に、このサブスクリプション・リストに対応する一連のエージェントが決定され、通知されます。
ポスト
キューは、登録されたすべてのクライアントに対して、公開された該当メッセージを通知します。この概念はポストと呼ばれます。関心があるすべてのクライアントに通知が必要な場合、キューは、登録されたすべてのクライアントにメッセージをポストします。
メッセージの受信
サブスクライバは、次のメカニズムのいずれかを介して、メッセージを受信できます。
サブスクライバにかわって動作するクライアント・プロセスが、登録メカニズムを使用してコールバックを指定します。その後、メッセージがサブスクライバのサブスクリプションと一致する場合、ポスト・メカニズムによってコールバックが非同期に起動されます。メッセージ・コンテンツは、コールバック関数に渡される場合があります(非永続キューの場合のみ)。
サブスクライバにかわって動作するクライアント・プロセスが、登録メカニズムを使用してコールバックを指定します。その後、ポスト・メカニズムによってコールバック関数が非同期に起動されますが、完全なメッセージ・コンテンツは渡されません。コールバック関数はクライアントへの通知として機能し、これに続いてプル型の方法でメッセージ・コンテンツを取得します(永続的キューの場合のみ)。
サブスクライバにかわって動作するクライアント・プロセスが、周期的またはその他の方法で適宜キューから単純にメッセージを取得します。メッセージの遅延がある場合、エンド・クライアントへの非同期配信は行われません。
この例では、データベース・イベント、クライアント通知、AQがどのように連携してパブリッシュ・サブスクライブを実装するかを示します。
ユーザー・スキーマの下で、パブリッシュ・サブスクライブ・メカニズムをサポートするために必要なすべてのオブジェクトを持つpubsub
を作成します。このコードでは、エージェントsnoop
は、ログイン・イベントで公開されるメッセージをサブスクライブします。ユーザーpubsub
がAQ機能を使用するには、AQ_ADMINISTRATOR_ROLE
権限と、DBMS_AQ
およびDBMS_AQADM
のEXECUTE
権限が必要です。
Rem ------------------------------------------------------ REM create queue table for persistent multiple consumers: Rem ------------------------------------------------------ Rem Create or replace a queue table BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'Pubsub.Raw_msg_table', Multiple_consumers => TRUE, Queue_payload_type => 'RAW', Compatible => '8.1'); END; / Rem ------------------------------------------------------ Rem Create a persistent queue for publishing messages: Rem ------------------------------------------------------ Rem Create a queue for logon events BEGIN DBMS_AQADM.CREATE_QUEUE( Queue_name => 'Pubsub.Logon', Queue_table => 'Pubsub.Raw_msg_table', Comment => 'Q for error triggers'); END; / Rem ------------------------------------------------------ Rem Start the queue: Rem ------------------------------------------------------ BEGIN DBMS_AQADM.START_QUEUE('pubsub.logon'); END; / Rem ------------------------------------------------------ Rem define new_enqueue for convenience: Rem ------------------------------------------------------ CREATE OR REPLACE PROCEDURE New_enqueue( Queue_name IN VARCHAR2, Payload IN RAW , Correlation IN VARCHAR2 := NULL, Exception_queue IN VARCHAR2 := NULL) AS Enq_ct DBMS_AQ.Enqueue_options_t; Msg_prop DBMS_AQ.Message_properties_t; Enq_msgid RAW(16); Userdata RAW(1000); BEGIN Msg_prop.Exception_queue := Exception_queue; Msg_prop.Correlation := Correlation; Userdata := Payload; DBMS_AQ.ENQUEUE(Queue_name, Enq_ct, Msg_prop, Userdata, Enq_msgid); END; / Rem ------------------------------------------------------ Rem add subscriber with rule based on current user name, Rem using correlation_id Rem ------------------------------------------------------ DECLARE Subscriber Sys.Aq$_agent; BEGIN Subscriber := sys.aq$_agent('SNOOP', NULL, NULL); DBMS_AQADM.ADD_SUBSCRIBER( Queue_name => 'Pubsub.logon', Subscriber => subscriber, Rule => 'CORRID = ''HR'' '); END; / Rem ------------------------------------------------------ Rem create a trigger on logon on database: Rem ------------------------------------------------------ Rem create trigger on after logon: CREATE OR REPLACE TRIGGER pubsub.Systrig2 AFTER LOGON ON DATABASE BEGIN New_enqueue('Pubsub.Logon', HEXTORAW('9999'), Dbms_standard.login_user); END; /
サブスクリプションが作成された後、次の手順として、クライアントがコールバック関数を使用した通知を登録します。これにはOracle Call Interface(OCI)を使用します。次のコードは、登録に必要な手順を実行します。セッション・ハンドルの割当ておよび初期化を行う最初の手順は、例をわかりやすくするために、ここでは省略します。
ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; /* callback function for notification of logon of user 'HR' on database: */ ub4 notifySnoop(ctx, subscrhp, pay, payl, desc, mode) dvoid *ctx; OCISubscription *subscrhp; dvoid *pay; ub4 payl; dvoid *desc; ub4 mode; { printf("Notification : User HR Logged on\n"); } int main() { OCISession *authp = (OCISession *) 0; OCISubscription *subscrhpSnoop = (OCISubscription *)0; /***************************************************** Initialize OCI Process/Environment Initialize Server Contexts Connect to Server Set Service Context ******************************************************/ /* Registration Code Begins */ /* Each call to initSubscriptionHn allocates and Initialises a Registration Handle */ initSubscriptionHn( &subscrhpSnoop, /* subscription handle */ "ADMIN:PUBSUB.SNOOP", /* subscription name */ /* <agent_name>:<queue_name> */ (dvoid*)notifySnoop); /* callback function */ /***************************************************** The Client Process does not need a live Session for Callbacks End Session and Detach from Server ******************************************************/ OCISessionEnd ( svchp, errhp, authp, (ub4) OCI_DEFAULT); /* detach from server */ OCIServerDetach( srvhp, errhp, OCI_DEFAULT); while (1) /* wait for callback */ sleep(1); } void initSubscriptionHn (subscrhp, subscriptionName, func) OCISubscription **subscrhp; char* subscriptionName; dvoid * func; { /* allocate subscription handle: */ (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); /* set subscription name in handle: */ (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) subscriptionName, (ub4) strlen((char *)subscriptionName), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); /* set callback function in handle: */ (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) func, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) 0, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CTX, errhp); /* set namespace in handle: */ (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp, OCI_DEFAULT)); }
ユーザーHR
がデータベースにログインすると、クライアントに通知され、コールバック関数notifySnoop
が起動されます。