23 パブリッシュ・サブスクライブ・モデルでのアプリケーションの開発

この章では、パブリッシュ・サブスクライブ・モデルでアプリケーションを開発する方法について説明します。

トピック:

23.1 パブリッシュ・サブスクライブ・モデルの概要

データベースは企業内の最も重要な情報資源であるため、この役割を補完するために、Oracleでは、企業の情報配信およびメッセージ交換用にパブリッシュ・サブスクライブ・ソリューションを作成しました。

ネットワーキング・テクノロジおよびネットワーキング製品によって、多数のコンピュータ、アプリケーションおよびユーザーにまたがる高度な接続が可能です。これらの環境では、疎結合で自律的に動作する分散システムに非同期通信を提供することが重要であり、ネットワーク障害に強い操作性が要求されます。この要件は、メッセージ機能、メッセージ指向ミドルウェア(MOM)、メッセージ・キューイング、パブリッシュ・サブスクライブなどの特長を持つ様々なミドルウェア製品によって満たされています。

パブリッシュ・サブスクライブ・パラダイムを介して通信するアプリケーションには、受信者を明示的に指定したり意図された受信者を知らせなくても、メッセージを発行できる送信アプリケーション(パブリッシャ)が必要です。同様に、受信アプリケーション(サブスクライバ)は、サブスクライバが登録しているメッセージのみを受信する必要があります。

この送信者と受信者の切離しは、通常、パブリッシャとサブスクライバの間に存在して間接的なレベルとして機能するエンティティによって実現されます。このエンティティが、サブジェクトまたはチャネルを表すキューです。図23-1に、パブリッシュおよびサブスクライブの機能を示します。

図23-1 Oracleのパブリッシュ・サブスクライブの機能

図23-1の説明が続きます
「図23-1 Oracleのパブリッシュ・サブスクライブの機能」の説明

サブスクライバは、あるキューにエンキューされたメッセージに関心を示すことによって、また、サブジェクト・ベースまたはコンテンツ・ベースのルールをフィルタとして使用することによって、キューをサブスクライブします。この結果、指定されたキューに一連のルールベースのサブスクリプションが対応付けられます。

実行時、パブリッシャは様々なキューにメッセージを転送します。次に、キュー(基礎となるインフラストラクチャの配信メカニズム)は、様々なサブスクリプションに一致したメッセージを該当するサブスクライバに配信します。

23.2 パブリッシュ・サブスクライブのアーキテクチャ

Oracle Databaseには、データベースに対応したパブリッシュ・サブスクライブのメッセージ機能をサポートする機能が含まれています。

23.2.1 データベース・イベント

データベース・イベントは、データベース・イベントを公開するための宣言定義、検出およびこれらのイベントの実行時の発行をサポートします。この機能によって、イベント駆動方式でエンドユーザーに情報を能動的に発行し、従来のプル型の情報アクセスの方法を補足することができます。

23.2.2 Oracle Advanced Queuing

Oracle Advanced Queuing(AQ)では、キュー・ベースのパブリッシュ・サブスクライブ・パラダイムがサポートされます。データベース・キューは、メッセージの永続的な格納場所として機能し、キューを基にしたパブリッシュおよびサブスクライブが可能になります。ルール・エンジンおよびサブスクリプション・サービスが、設定された関心事項に基づいて、受信者にメッセージを動的に送付します。これによって、送信者と受信者の間のアドレス関係が切り離され、送信者と受信者間での明示的なメッセージ・アドレッシングを補足することができます。

23.2.3 クライアント通知

クライアント通知は、メッセージに関心を持つサブスクライバに対するメッセージの非同期配信をサポートします。これによって、データベース・クライアントは特定のキューに対する関心を登録し、そのようなキューで発行が発生した場合に通知を受け取ることができます。データベース・クライアントへのメッセージの非同期配信は、情報を取得するために使用される従来のポーリングとは対照的な技法です。

23.3 パブリッシュ・サブスクライブの概念

キュー

キューは、名前付きの関心対象サブジェクトをサポートするエンティティです。キューは永続または非永続(軽量)の特性を持ちます。

永続キューは、メッセージの永続的なコンテナとして機能します。メッセージは、遅延付き高信頼モードで配信されます。

非永続(軽量)キューの基礎となるインフラストラクチャでは、公開されたメッセージを接続クライアントに対して最高で1回送信します。

エージェント

パブリッシャおよびサブスクライバは、内部的にはエージェントとして表されます。

エージェントは、サブスクリプションを介してキューに関心を示す持続的で論理的なサブスクライブ・エンティティです。エージェントには、関連するサブスクリプション、アドレス、メッセージの配信モードなどのプロパティがあります。この意味では、エージェントはパブリッシャまたはサブスクライバの電子的なプロキシです。

クライアント

クライアントは、一時的な物理エンティティです。クライアントの属性には、クライアント・プログラムが実行される物理プロセス、ノード名、クライアント・アプリケーション・ロジックが含まれます。単一のエージェントにかわって複数のクライアントが動作する場合もあります。認証されている場合は、同一のクライアントが複数のエージェントにかわって動作できます。

キューでのルール

キューでのルールは、メッセージ形式属性またはメッセージ・ヘッダー属性に関する一連の事前定義済演算子を使用する条件式として指定されます。各キューには、そのキューが示すメッセージの構造を記述したメッセージ内容形式が対応付けられています。メッセージ形式は、構造化されていなくても(RAW)、正しく定義された構造体(ADT)であってもかまいません。これによって、サブジェクト・ベースおよびコンテンツ・ベースの両方のサブスクリプションが可能です。

サブスクライバ

サブスクライバ(エージェント)は、ルールを使用してキューにサブスクリプションを指定できます。サブスクライバは永続的であり、カタログに格納されます。

データベース・イベント発行フレームワーク

データベースは、重要な情報公開元を表します。イベント・フレームワークは、データベース・イベント発行の宣言定義ができるように提案されたものです。これらの事前定義済イベントが発生すると、このフレームワークがイベントを検出および公開します。これによって、パブリッシュ・サブスクライブ機能の一部として、エンドユーザーに対してイベント駆動方式によるアクティブな情報配信が可能になります。

登録

登録は、エージェントにかわって動作する所定のクライアントによって対応付けられた配信情報のプロセスです。エージェントとクライアントの区別に関連して、サブスクリプションと登録の間には重要な区別があります。

サブスクリプションは、エージェントによる特定のキューへの関心を示します。配信の場所および方法は指定しません。配信情報は、クライアントに対応付けられた物理的なプロパティで、論理エージェント(サブスクライバ)の一時的な発現です。エージェントにかわって動作する特定のクライアント・プロセスは、配信を行う 場所を示すホストとポート、および配信の方法を示すコールバックを対応付けることによって、配信情報を登録します。

メッセージの公開

パブリッシャは、適切なキューイング・インタフェースを使用して、キューにメッセージを公開します。インタフェースは、キューが実装されているモデルに依存することがあります。たとえば、エンキュー・コールは、メッセージの公開を表します。

ルール・エンジン

指定されたキューにメッセージが転送または公開されると、ルール・エンジンはそのキューに関して定義されたすべてのルールから、公開されたメッセージと一致する一連の候補となるルールを取得します。

サブスクリプション・サービス

指定されたキュー上の候補ルールのリストに応じて、候補ルールに一致する一連のサブスクライバを評価できます。次に、このサブスクリプション・リストに対応する一連のエージェントが決定され、通知されます。

ポスト

キューは、登録されたすべてのクライアントに対して、公開された該当メッセージを通知します。この概念はポストと呼ばれます。関心があるすべてのクライアントに通知が必要な場合、キューは、登録されたすべてのクライアントにメッセージをポストします。

メッセージの受信

サブスクライバは、次のメカニズムのいずれかを介して、メッセージを受信できます。

  • サブスクライバにかわって動作するクライアント・プロセスが、登録メカニズムを使用してコールバックを指定します。その後、メッセージがサブスクライバのサブスクリプションと一致する場合、ポスト・メカニズムによってコールバックが非同期に起動されます。メッセージ・コンテンツは、コールバック関数に渡される場合があります(非永続キューの場合のみ)。

  • サブスクライバにかわって動作するクライアント・プロセスが、登録メカニズムを使用してコールバックを指定します。その後、ポスト・メカニズムによってコールバック関数が非同期に起動されますが、完全なメッセージ・コンテンツは渡されません。コールバック関数はクライアントへの通知として機能し、これに続いてプル型の方法でメッセージ・コンテンツを取得します(永続的キューの場合のみ)。

  • サブスクライバにかわって動作するクライアント・プロセスが、周期的またはその他の方法で適宜キューから単純にメッセージを取得します。メッセージの遅延がある場合、エンド・クライアントへの非同期配信は行われません。

23.4 パブリッシュ・サブスクライブ・メカニズムの例

この例では、データベース・イベント、クライアント通知、AQがどのように連携してパブリッシュ・サブスクライブを実装するかを示します。

  • ユーザー・スキーマの下で、パブリッシュ・サブスクライブ・メカニズムをサポートするために必要なすべてのオブジェクトを持つpubsubを作成します。このコードでは、エージェントsnoopは、ログイン・イベントで公開されるメッセージをサブスクライブします。ユーザーpubsubがAQ機能を使用するには、AQ_ADMINISTRATOR_ROLE権限と、DBMS_AQおよびDBMS_AQADMEXECUTE権限が必要です。

Rem ------------------------------------------------------
REM create queue table for persistent multiple consumers:
Rem ------------------------------------------------------

Rem  Create or replace a queue table
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
   Queue_table        =>  'Pubsub.Raw_msg_table', 
   Multiple_consumers =>   TRUE,
   Queue_payload_type =>  'RAW',
   Compatible         =>  '8.1');
END;
/
Rem ------------------------------------------------------
Rem  Create a persistent queue for publishing messages:
Rem ------------------------------------------------------

Rem  Create a queue for logon events
BEGIN
   DBMS_AQADM.CREATE_QUEUE(
         Queue_name     =>   'Pubsub.Logon',
         Queue_table    =>   'Pubsub.Raw_msg_table',
         Comment        =>   'Q for error triggers');
END;
/

Rem ------------------------------------------------------
Rem  Start the queue:
Rem ------------------------------------------------------

BEGIN
   DBMS_AQADM.START_QUEUE('pubsub.logon');
END;
/

Rem ------------------------------------------------------
Rem  define new_enqueue for convenience:
Rem ------------------------------------------------------

CREATE OR REPLACE PROCEDURE New_enqueue(
               Queue_name      IN VARCHAR2,
               Payload         IN RAW ,
               Correlation     IN VARCHAR2 := NULL,
               Exception_queue IN VARCHAR2 := NULL)
AS

Enq_ct     DBMS_AQ.Enqueue_options_t;
Msg_prop   DBMS_AQ.Message_properties_t;
Enq_msgid  RAW(16);
Userdata   RAW(1000);
 
BEGIN
   Msg_prop.Exception_queue := Exception_queue;
   Msg_prop.Correlation := Correlation;
   Userdata := Payload;

DBMS_AQ.ENQUEUE(Queue_name, Enq_ct, Msg_prop, Userdata, Enq_msgid);
END;
/

Rem ------------------------------------------------------
Rem  add subscriber with rule based on current user name, 
Rem  using correlation_id
Rem ------------------------------------------------------

 
DECLARE
Subscriber Sys.Aq$_agent;
BEGIN
   Subscriber := sys.aq$_agent('SNOOP', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(
    Queue_name         => 'Pubsub.logon',
    Subscriber         => subscriber,
    Rule               => 'CORRID = ''HR'' ');
END;
/

Rem ------------------------------------------------------
Rem  create a trigger on logon on database:
Rem ------------------------------------------------------


Rem  create trigger on after logon:
CREATE OR REPLACE TRIGGER pubsub.Systrig2
   AFTER LOGON
   ON DATABASE
   BEGIN
      New_enqueue('Pubsub.Logon', HEXTORAW('9999'), Dbms_standard.login_user);
   END;
/
  • サブスクリプションが作成された後、次のステップとして、クライアントがコールバック関数を使用した通知を登録します。これにはOracle Call Interface(OCI)を使用します。次のコードは、登録に必要なステップを実行します。セッション・ハンドルの割当ておよび初期化を行う最初のステップは、例をわかりやすくするために、ここでは省略します。

ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;

/* callback function for notification of logon of user 'HR' on database: */

ub4 notifySnoop(ctx, subscrhp, pay, payl, desc, mode)
dvoid *ctx;
OCISubscription *subscrhp;
dvoid *pay;
ub4 payl;
dvoid *desc;
ub4 mode;
{
    printf("Notification : User HR Logged on\n");
}

int main()
{
    OCISession *authp = (OCISession *) 0;
    OCISubscription *subscrhpSnoop = (OCISubscription *)0;

    /*****************************************************
       Initialize OCI Process/Environment
       Initialize Server Contexts
       Connect to Server
       Set Service Context
    ******************************************************/

    /* Registration Code Begins */

    /* Each call to initSubscriptionHn allocates 
           and Initialises a Registration Handle */

   
    initSubscriptionHn(    &subscrhpSnoop,    /* subscription handle */
        "ADMIN:PUBSUB.SNOOP", /* subscription name */ 
                  /* <agent_name>:<queue_name> */
        (dvoid*)notifySnoop); /* callback function */

     /*****************************************************
       The Client Process does not need a live Session for Callbacks
       End Session and Detach from Server
     ******************************************************/

    OCISessionEnd ( svchp,  errhp, authp, (ub4) OCI_DEFAULT);

    /* detach from server */
    OCIServerDetach( srvhp, errhp, OCI_DEFAULT);

    while (1)     /* wait for callback */
        sleep(1);

}

void initSubscriptionHn (subscrhp,
subscriptionName,
func)

OCISubscription **subscrhp;
char* subscriptionName;
dvoid * func;
{

    /* allocate subscription handle: */

    (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)subscrhp, 
        (ub4) OCI_HTYPE_SUBSCRIPTION,
        (size_t) 0, (dvoid **) 0);

    /* set subscription name in handle: */

    (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (dvoid *) subscriptionName, 
        (ub4) strlen((char *)subscriptionName),
        (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

    /* set callback function in handle: */

    (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (dvoid *) func, (ub4) 0,
        (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

    (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (dvoid *) 0, (ub4) 0,
        (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

    /* set namespace in handle: */

    (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (dvoid *) &namespace, (ub4) 0,
        (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

    checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp,

        OCI_DEFAULT));
}

ユーザーHRがデータベースにログインすると、クライアントに通知され、コールバック関数notifySnoopが起動されます。