13 通知メソッドとStreamsアドバンスト・キューイング

この章では、連続問合せ通知、パブリッシュ・サブスクライブ通知およびStreamsアドバンスト・キューイング機能について説明します。

連続問合せ通知について

連続問合せ通知(CQN)を使用すると、クライアント・アプリケーションがデータベースに問合せを登録し、オブジェクトでのDMLまたはDDL変更に応じて、あるいは問合せに関連付けられた結果セット変更に応じて通知を受信できます。

通知は、DMLトランザクションまたはDDLトランザクションのコミット時にデータベースによって発行されます。

登録時に、アプリケーションは通知ハンドラを指定し、登録する一連の問合せを通知ハンドラに関連付けます。通知ハンドラは、サーバー側のPL/SQLプロシージャでも、クライアント側のC言語のコールバックでもかまいません。登録は、オブジェクト・レベルまたは問合せレベルで作成されます。登録がオブジェクト・レベルにある場合、トランザクションで登録済オブジェクトのいずれかが変更されてコミットされると常に、通知ハンドラが起動されます。登録が問合せレベルにある場合、トランザクションで問合せの結果セットが変わるような変更内容がコミットされると常に、通知ハンドラが起動されますが、変更内容が問合せの結果セットに影響しない場合、通知ハンドラは起動されません。

問合せ変更通知は、OCI_STMT_SELECTOCI_STMT_BEGINOCI_STMT_DECLAREおよびOCI_STMT_CALLといった種類の文に登録できます。

問合せ変更通知では、PLSQLコードにより、SELECT文のみが実行され、SELECT文ごとに登録されるとみなされます。そうではなく、PLSQLコードにSELECT以外の文が含まれると、エラーが発生します。

連続問合せ通知の用途の1つは、データをキャッシュし、そのキャッシュをバックエンド・データベースのためにできるだけ最新の状態で維持する必要がある中間層アプリケーションでの使用です。

通知には次の情報が含まれます。

  • 結果セットが変更された問合せの問合せID。これは、登録が問合せの細分化レベルにあった場合です。

  • 変更されたオブジェクトの名前または変更された行。

  • 操作タイプ(INSERTUPDATEDELETEALTER TABLEDROP TABLE)。

  • 変更された行のROWIDと関連するDML操作(INSERTUPDATEDELETE)。

  • グローバル・データベース・イベント(STARTUPSHUTDOWN)。Oracle Real Application Clusters (Oracle RAC)では、データベースは最初のインスタンスの起動時または最後のインスタンスの停止時に通知を送信します。

関連項目:

OCIでのパブリッシュ・サブスクライブの通知

パブリッシュ・サブスクライブの通知機能により、OCIアプリケーションでは、クライアント通知の直接受信、通知を送信できる電子メール・アドレスの登録、通知を送信できるHTTP URLの登録、あるいは通知に対してコールされるPL/SQLプロシージャの登録を行うことができます。

図13-1は、そのプロセスを示しています。

図13-1 パブリッシュ・サブスクライブのモデル

図13-1の説明が続きます
「図13-1 パブリッシュ・サブスクライブのモデル」の説明

OCIアプリケーションでは次の処理を行います。

  • AQネームスペースに通知の実行を登録し、エンキューが発生したときに通知を受け取ります。

  • データベース・イベントのサブスクリプションの実行を登録し、そのイベントがトリガーされたときに通知を受け取ります。

  • 一時的な登録の解除またはすべての登録の破棄など、登録を管理します。

  • 登録されたクライアントに通知を送信します。

前述のすべての場合において、OCIアプリケーションでの通知の直接受信、事前に指定された電子メール・アドレスへの通知の送信、事前に定義されたHTTP URLへの通知の送信、または通知の結果として、事前に指定したデータベースPL/SQLプロシージャのコールが可能です。

登録されたクライアントは、イベントが発生した時、または明示的なAQエンキューにあるとき、非同期に通知を受け取ります。クライアントは、データベースに接続する必要はありません。

関連項目:

OCIでのパブリッシュ・サブスクライブの登録関数

データベースに直接登録することも、Lightweight Directory Access Protocol (LDAP)を使用して登録することもできます。

登録する方法には、次の2通りがあります。

  • 直接登録。直接データベースに登録します。この方法は単純で、登録は即時に有効になります。

  • オープン登録。Lightweight Directory Access Protocol (LDAP)を使用して登録し、データベースはここから登録要求を受け取ります。これは、クライアントがデータベースに接続できない場合(データベースの停止中にクライアントがデータベースのオープン・イベントに登録する場合)、またはクライアントが複数のデータベース内の同じイベントまたは複数のイベントに同時に登録する場合に役立ちます。

データベースへのパブリッシュ・サブスクライブの直接登録

イベント通知の直接登録および受信を行うには、OCIアプリケーションで次のステップを実行する必要があります。

適切なイベント・トリガーまたはAQエンキューがセットアップされていることを前提としています。初期化パラメータCOMPATIBLEは8.1以上に設定されている必要があります。

ノート:

パブリッシュ・サブスクライブ機能は、マルチスレッド・オペレーティング・システム上でのみ使用できます。

  1. OCI_EVENTSモードでOCIEnvCreate()またはOCIEnvNlsCreate()をコールし、アプリケーションで通知の登録および受信を行うことを指定します。クライアント上で、通知専用のリスニング・スレッドが起動します。

  2. ハンドル・タイプをOCI_HTYPE_SUBSCRIPTIONに指定してOCIHandleAlloc()をコールし、サブスクリプション・ハンドルを割り当てます。

  3. OCIAttrSet()をコールし、次のサブスクリプション・ハンドル属性を設定します。

    • OCI_ATTR_SUBSCR_NAME- サブスクリプション名。

    • OCI_ATTR_SUBSCR_NAMESPACE- サブスクリプションのネームスペース。

    • OCI_ATTR_SUBSCR_HOSTADDR- 通信の送信先であるクライアントIP (IPv4またはIPv6書式)を設定する環境ハンドル属性。

      Oracle Databaseコンポーネントおよびユーティリティでは、インターネット・プロトコル・バージョン6 (IPv6)アドレスをサポートします。

      関連項目:

      IPアドレスのIPv6書式の詳細は、「OCI_ATTR_SUBSCR_HOSTADDR」「OCI_ATTR_SUBSCR_IPADDR」および『Oracle Database Net Services管理者ガイド』を参照してください

    • OCI_ATTR_SUBSCR_CALLBACK- 通知コールバック。

    • OCI_ATTR_SUBSCR_CTX- コールバック・コンテキスト。

    • OCI_ATTR_SUBSCR_PAYLOAD- 送信用ペイロード・バッファ。

    • OCI_ATTR_SUBSCR_RECPT- 受信者名。

    • OCI_ATTR_SUBSCR_RECPTPROTO- 通知を受信するプロトコル。

    • OCI_ATTR_SUBSCR_RECPTPRES- 通知を受信するための表示。

    • OCI_ATTR_SUBSCR_QOSFLAGS- QOS (サービス品質)レベルで、次の値を設定します。

      • OCI_SUBSCR_QOS_PURGE_ON_NTFNを設定すると、最初の通知で登録が削除されます。

      • OCI_SUBSCR_QOS_RELIABLEを設定すると、通知は永続的になります。ノードに障害が発生した後でも、この登録に関連付けられた無効化メッセージがデータベースのキューに永続的に残っているために、残存しているOracle RACのインスタンスを使用して変更通知メッセージを送受信できます。FALSEの場合、無効化は高速インメモリー・キューにエンキューされます。このオプションは、登録の永続性ではなく通知の永続性を指定するものです。デフォルトでは、登録は自動的に持続されます。

    • OCI_ATTR_SUBSCR_TIMEOUT- 登録のタイムアウト間隔(秒単位)。タイムアウトが設定されていない場合、デフォルトは0 (ゼロ)です。

    • OCI_ATTR_SUBSCR_NTFN_GROUPING_CLASS- 通知グループ化クラス。

      次の定数でNTFNグループ化オプションを使用することで、通知の間隔を置くことができます。通知グループ化クラスでサポートされる値は、次のとおりです。

      #define OCI_SUBSCR_NTFN_GROUPING_CLASS_TIME   1 /* time  */
      
    • OCI_ATTR_SUBSCR_NTFN_GROUPING_VALUE- 通知グループ化の値(秒単位)。

    • OCI_ATTR_SUBSCR_NTFN_GROUPING_TYPE- 通知グループ化タイプ。

      通知グループ化タイプでサポートされる値は、次のとおりです。

      #define OCI_SUBSCR_NTFN_GROUPING_TYPE_SUMMARY 1  /* summary */
      #define OCI_SUBSCR_NTFN_GROUPING_TYPE_LAST    2  /* last */ 
      
    • OCI_ATTR_SUBSCR_NTFN_GROUPING_START_TIME- 通知グループ化開始時間。

    • OCI_ATTR_SUBSCR_NTFN_GROUPING_REPEAT_COUNT- 通知グループ化繰返し回数。

    サブスクリプションを登録する前に、OCI_ATTR_SUBSCR_NAMEOCI_ATTR_SUBSCR_NAMESPACEおよびOCI_ATTR_SUBSCR_RECPTPROTOを設定しておく必要があります。

    OCI_ATTR_SUBSCR_RECPTPROTOOCI_SUBSCR_PROTO_OCIに設定した場合は、OCI_ATTR_SUBSCR_CALLBACKおよびOCI_ATTR_SUBSCR_CTXも設定する必要があります。

    OCI_ATTR_SUBSCR_RECPTPROTOOCI_SUBSCR_PROTO_MAILOCI_SUBSCR_PROTO_SERVERまたはOCI_SUBSCR_PROTO_HTTPに設定した場合は、OCI_ATTR_SUBSCR_RECPTも設定する必要があります。

    OCI_ATTR_SUBSCR_CALLBACKOCI_ATTR_SUBSCR_RECPTを同時に設定すると、アプリケーション・エラーが発生します。

    アプリケーションでサブスクリプションに対する通知の送信を実行するには、OCI_ATTR_SUBSCR_PAYLOADが必要です。

    関連項目:

    mode = OCI_EVENTS | OCI_OBJECTによる環境の設定については、「サブスクリプション・ハンドル属性」および「OCI環境の作成について」を参照してください。OCI_OBJECTは通知のグループ化に必須です。

  4. QOS、タイムアウト間隔、ネームスペースおよびポートの値を設定します(例9 15を参照) 。

  5. OCI_ATTR_SUBSCR_RECPTPROTOOCI_SUBSCR_PROTO_OCIに設定し、サブスクリプション・ハンドルで使用するコールバック・ルーチンを定義します。

  6. OCI_ATTR_SUBSCR_RECPTPROTOOCI_SUBSCR_PROTO_SERVERに設定し、通知に対してコールするPL/SQLプロシージャをデータベースに定義します。

    関連項目:

    通知プロシージャ

  7. OCISubscriptionRegister()をコールし、サブスクリプションを登録します。このコールにより、複数のサブスクリプションを同時に登録できます。

例13-1では、QOSレベルの設定の例を示しています。

例13-1 QOSレベル、通知グループ化のクラス、値およびタイプ、ネームスペース固有のコンテキストの設定

/* Set QOS levels */
ub4 qosflags = OCI_SUBSCR_QOS_PAYLOAD;
 
/* Set QOS flags in subscription handle */
(void) OCIAttrSet((dvoid *) subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &qosflags, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_QOSFLAGS, errhp);
 
/* Set notification grouping class */
ub4 ntfn_grouping_class = OCI_SUBSCR_NTFN_GROUPING_CLASS_TIME;
(void) OCIAttrSet((dvoid *) subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &ntfn_grouping_class, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NTFN_GROUPING_CLASS, errhp);
 
/* Set notification grouping value of 10 minutes */
ub4 ntfn_grouping_value = 600;
(void) OCIAttrSet((dvoid *) subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &ntfn_grouping_value, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NTFN_GROUPING_VALUE, errhp);
 
/* Set notification grouping type */
ub4 ntfn_grouping_type = OCI_SUBSCR_NTFN_GROUPING_TYPE_SUMMARY;
 
/* Set notification grouping type in subscription handle */
(void) OCIAttrSet((dvoid *) subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &ntfn_grouping_type, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NTFN_GROUPING_TYPE, errhp);
 
/* Set namespace specific context */
(void) OCIAttrSet((dvoid *) subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) NULL, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE_CTX, errhp);
パブリッシュ・サブスクライブのオープン登録

パブリッシュ・サブスクライブのオープン登録の前提条件を示します。

パブリッシュ・サブスクライブのオープン登録の前提条件は、次のとおりです。

  • データベースの互換性は、9.0以上であることが必要です。

  • LDAP_REGISTRATION_ENABLEDTRUEに設定する必要があります。次のように設定します。

     ALTER SYSTEM SET LDAP_REGISTRATION_ENABLED=TRUE
    

    デフォルトはFALSEです。

  • LDAP_REG_SYNC_INTERVALをLDAPからの登録をリフレッシュする時間隔(秒単位)に設定します。

     ALTER SYSTEM SET LDAP_REG_SYNC_INTERVAL =  time_interval
    

    デフォルトは0 (ゼロ)で、この場合リフレッシュは行われません。

  • LDAPの登録情報の即時データベース・リフレッシュを強制するには:

    ALTER SYSTEM REFRESH LDAP_REGISTRATION
    

Oracle Enterprise Security Manager (OESM)を使用したオープン登録のステップは、次のとおりです。

  1. 各エンタープライズ・ドメインで、エンタープライズ・ロールのENTERPRISE_AQ_USER_ROLEを作成します。
  2. エンタープライズ・ドメイン内の各データベースについて、グローバル・ロールのGLOBAL_AQ_USER_ROLEをエンタープライズ・ロールのENTERPRISE_AQ_USER_ROLEに追加します。
  3. 各エンタープライズ・ドメインについて、エンタープライズ・ロールのENTERPRISE_AQ_USER_ROLEを、管理コンテキストの下のcn=oraclecontextの下にある権限グループのcn=OracleDBAQUsersに追加します。
  4. データベース内のイベントへの登録が承認される各エンタープライズ・ユーザーに対して、エンタープライズ・ロールのENTERPRISE_AQ_USER_ROLEを付与します。
OCIを使用したLDAPのオープン登録

LDAP登録を使用したオープン登録の方法を示します。

  1. modeOCI_EVENTS | OCI_USE_LDAPに設定したOCIEnvCreate()または OCIEnvNlsCreate()をコールします。
  2. OCIAttrSet()をコールし、次の環境ハンドル属性を設定してLDAPにアクセスします。
    • OCI_ATTR_LDAP_HOST- LDAPサーバーが常駐しているホスト名。

    • OCI_ATTR_LDAP_PORT- LDAPサーバーがリスニングしているポート。

    • OCI_ATTR_BIND_DN- LDAPサーバーにログインするための識別名で、通常はエンタープライズ・ユーザーのDNです。

    • OCI_ATTR_LDAP_CRED- クライアントの認証に使用する資格証明で、簡易認証(ユーザー名およびパスワード)のパスワードなどです。

    • OCI_ATTR_WALL_LOC- SSL認証の場合のクライアントWalletの場所。

    • OCI_ATTR_LDAP_AUTH- 認証方式コード。

      関連項目:

      認証モードの完全なリストは、「環境ハンドル属性」を参照してください

    • OCI_ATTR_LDAP_CTX- LDAPサーバーのOracle Databaseに対する管理コンテキスト。

  3. ハンドル・タイプをOCI_HTYPE_SUBSCRIPTION,に指定してOCIHandleAlloc()をコールし、サブスクリプション・ハンドルを割り当てます。
  4. 記述子タイプをOCI_DTYPE_SRVDNに指定してOCIArrayDescriptorAlloc()をコールし、サーバーのDN記述子を割り当てます。
  5. OCIAttrSet()をコールし、サーバーのDN記述子属性を、クライアントが通知を受信するデータベースの識別名であるOCI_ATTR_SERVER_DNに設定します。OCIAttrSet()は、この属性について複数回コールできるため、登録には複数のデータベース・サーバーが含まれます。
  6. OCIAttrSet()をコールし、次のサブスクリプション・ハンドル属性を設定します。
    • OCI_ATTR_SUBSCR_NAME- サブスクリプション名。

    • OCI_ATTR_SUBSCR_NAMESPACE- サブスクリプションのネームスペース。

    • OCI_ATTR_SUBSCR_CALLBACK- 通知コールバック。

    • OCI_ATTR_SUBSCR_CTX- コールバック・コンテキスト。

    • OCI_ATTR_SUBSCR_PAYLOAD- 送信用ペイロード・バッファ。

    • OCI_ATTR_SUBSCR_RECPT- 受信者名。

    • OCI_ATTR_SUBSCR_RECPTPROTO- 通知を受信するプロトコル。

    • OCI_ATTR_SUBSCR_RECPTRES- 通知を受信するための表示。

    • OCI_ATTR_SUBSCR_QOSFLAGS- QOS (サービス品質)レベル。

    • OCI_ATTR_SUBSCR_TIMEOUT- 登録のタイムアウト間隔(秒単位)。タイムアウトが設定されていない場合、デフォルトは0 (ゼロ)です。

    • OCI_ATTR_SUBSCR_SERVER_DN- ステップ5で設定した記述子ハンドル。

  7. QOS、タイムアウト間隔、ネームスペースおよびポートの値を設定します。「QOS、タイムアウト間隔、ネームスペース、クライアント・アドレスおよびポート番号の設定」を参照してください。
  8. OCISubscriptionRegister()をコールし、サブスクリプションを登録します。データベースがLDAPにアクセスして新規登録を取り出すと、登録が有効になります。取り出す頻度は、LDAP_REG_SYNC_INTERVALの値によって決まります。
QOS、タイムアウト間隔、ネームスペース、クライアント・アドレスおよびポート番号の設定

OCIAttrSet()を使用して、QOSFLAGSをQOSレベルに設定する方法を示します。

OCIAttrSet()を使用して、QOSFLAGSを次のQOSレベルに設定できます。

  • OCI_SUBSCR_QOS_RELIABLE- 信頼性のある通知は、インスタンスやデータベースが再起動しても持続します。信頼性とは、サーバーの信頼性のことであり、それも永続的なキューやバッファ・メッセージについての信頼性です。このオプションは、通知の永続性を記述します。デフォルトでは、登録は永続的です。

  • OCI_SUBSCR_QOS_PURGE_ON_NTFN- 通知を受信すると、最初の通知で登録を削除します。(サブスクリプションの登録は解除されます。)

/* Set QOS levels */
ub4 qosflags = OCI_SUBSCR_QOS_RELIABLE | OCI_SUBSCR_QOS_PURGE_ON_NTFN;

/* Set flags in subscription handle */
(void)OCIAttrSet((void *)subscrhp, (ub4)OCI_HTYPE_SUBSCRIPTION,
               (void *)&qosflags, (ub4)0, (ub4)OCI_ATTR_SUBSCR_QOSFLAGS, errhp);

/* Set auto-expiration after 30 seconds */
ub4 timeout = 30;
(void)OCIAttrSet((void *)subscrhp, (ub4)OCI_HTYPE_SUBSCRIPTION,
                 (void *)&timeout, (ub4)0, (ub4)OCI_ATTR_SUBSCR_TIMEOUT, errhp);

タイムアウトを過ぎると、登録が削除され、通知がクライアントに送信されるため、クライアントはコールバックを呼び出し、必要なアクションを実行できるようになります。タイムアウト前にクライアントに障害が発生した場合、登録は消去されます。

環境ハンドルにはポート番号を設定でき、クライアントがファイアウォールの背後のシステム上にあり、特定のポートの通知のみを受信できる場合に、この番号が重要になります。クライアントは、環境ハンドルの属性を使用して、最初の登録の前にリスナー・スレッドのポートを指定できます。このスレッドは、OCISubscriptionRegister()の初回のコール時に開始されます。指定されたこのポート番号が使用可能な場合、この番号が使用されます。クライアントが異なる環境ハンドルを使用して異なるポートで別のスレッドを開始しようとすると、エラーが戻されます。

ub4 port = 1581;
(void)OCIAttrSet((void *)envhp, (ub4)OCI_HTYPE_ENV, (void *)&port, (ub4)0,
                 (ub4)OCI_ATTR_SUBSCR_PORTNO, errhp);

これとは異なりポートが自動的に決定される場合は、環境ハンドルから属性を取得することにより、クライアント・スレッドが通知をリスニングしているポート番号を取得できます。

(void)OCIAttrGet((void *)subhp, (ub4)OCI_HTYPE_ENV, (void *)&port, (ub4)0, 
                 (ub4)OCI_ATTR_SUBSCR_PORTNO, errhp);

クライアント・アドレス設定の例

text ipaddr[16] = "10.177.246.40";
(void)(OCIAttrSet((dvoid *) envhp, (ub4) OCI_HTYPE_ENV,
       (dvoid *) ipaddr, (ub4) strlen((const char *)ipaddr),
       (ub4) OCI_ATTR_SUBSCR_IPADDR, errhp));

関連項目:

OCI_ATTR_SUBSCR_IPADDR

パブリッシュ・サブスクライブ通知の管理に使用するOCI関数

パブリッシュ・サブスクライブ通知の管理に使用する関数をリストし、説明します。

表13-1は、パブリッシュ・サブスクライブ通知に使用される関数のリストです。

表13-1 パブリッシュ・サブスクライブ関数

関数 用途

OCISubscriptionDisable()

サブスクリプションを無効にします。

OCISubscriptionEnable()

サブスクリプションを有効にします。

OCISubscriptionPost()

サブスクリプションを送信します。

OCISubscriptionRegister()

サブスクリプションを登録します。

OCISubscriptionUnRegister()

サブスクリプションの登録を解除します。

OCIでの通知コールバック

クライアントは、登録したサブスクリプションに対してアクティビティが発生したときにコールされる、通知コールバックを登録する必要があります。

たとえば、AQネームスペースでは、目的のメッセージがエンキューされたときに通知コールバックが発生します。

このコールバックは通常、サブスクリプション・ハンドルのOCI_ATTR_SUBSCR_CALLBACK属性によって設定されます。

通知コールバックからは、OCI_CONTINUE値が戻される必要があります。また、次の仕様に準拠している必要があります。

typedef ub4 (*OCISubscriptionNotify) ( void            *pCtx,
                                       OCISubscription *pSubscrHp,
                                       void            *pPayload,
                                       ub4             iPayloadLen,
                                       void            *pDescriptor,
                                       ub4             iMode);

次に、各パラメータについて説明します。

pCtx (IN)

コールバックが登録されたときに指定されたユーザー定義コンテキスト。

pSubscrHp (IN)

コールバックが登録されたときに指定されたサブスクリプション・ハンドル。

pPayload (IN)

この通知のペイロード。現在のところ、ペイロードのub1 * (バイト・シーケンス)のみがサポートされています。

iPayloadLen (IN)

この通知のペイロードの長さ。

pDescriptor (IN)

ネームスペース固有の記述子。この記述子からネームスペース指定のパラメータを抽出できます。この記述子の構造はユーザーには不透明で、型はネームスペースによって異なります。

記述子の属性は、ネームスペース固有です。アドバンスト・キューイング(AQ)の場合、記述子はOCI_DTYPE_AQNFYです。AQネームスペースの場合、グループで受信する通知の数は通知記述子に提供されます。pDescriptorの属性は次のとおりです。

  • 通知フラグ(通常=0、タイムアウト=1、通知のグループ化=2)- OCI_ATTR_NFY_FLAGS

  • キュー名- OCI_ATTR_QUEUE_NAME

  • コンシューマ名- OCI_ATTR_CONSUMER_NAME

  • メッセージID - OCI_ATTR_NFY_MSGID

  • メッセージ・プロパティ- OCI_ATTR_MSG_PROP

  • グループで受信する通知の数- OCI_ATTR_AQ_NTFN_GROUPING_COUNT

  • グループ(OCIコレクション)- OCI_ATTR_AQ_NTFN_GROUPING_MSGID_ARRAY

iMode (IN)

コール固有モードです。有効な値はOCI_DEFAULTのみです。この値は、デフォルトのコールを実行します。

例13-2では、通知コールバックでのAQグループ化通信属性の使用方法を示しています。

例13-2 OCI通知コールバックでのAQグループ化通信属性の使用

ub4 notifyCB1(void *ctx, OCISubscription *subscrhp, void *pay, ub4 payl,
              void *desc, ub4 mode)
{
 oratext            *subname;
 ub4                 size;
 OCIColl            *msgid_array = (OCIColl *)0;
 ub4                 msgid_cnt = 0;
 OCIRaw             *msgid;
 void              **msgid_ptr;
 sb4                 num_msgid = 0;
 void               *elemind = (void *)0;
 boolean             exist;
 ub2                 flags;
 oratext            *hexit = (oratext *)"0123456789ABCDEF";
 ub4                 i, j;
 
 /* get subscription name */
 OCIAttrGet(subscrhp, OCI_HTYPE_SUBSCRIPTION, (void *)&subname, &size,
            OCI_ATTR_SUBSCR_NAME,ctxptr->errhp);
 
 /* print subscripton name */
 printf("Got notification for %.*s\n", size, subname);
 fflush((FILE *)stdout);
 
 /* get the #ntfns received in this group */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY, (void *)&msgid_cnt, &size,
            OCI_ATTR_AQ_NTFN_GROUPING_COUNT, ctxptr->errhp);
 
 /* get the group - collection of msgids */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY, (void *)&msgid_array, &size,
            OCI_ATTR_AQ_NTFN_GROUPING_MSGID_ARRAY, ctxptr->errhp);
 
 /* get notification flag - regular, timeout, or grouping notification? */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY, (void *)&flags, &size,
            OCI_ATTR_NFY_FLAGS, ctxptr->errhp);
 
 /* print notification flag */
 printf("Flag: %d\n", (int)flags);
 
 /* get group (collection) size */
 if (msgid_array)
   checkerr(ctxptr->errhp,
        OCICollSize(ctxptr->envhp, ctxptr->errhp,
        CONST OCIColl *) msgid_array, &num_msgid),
        "Inside notifyCB1-OCICollSize");
 else
   num_msgid =0;
 
 /* print group size */
 printf("Collection size: %d\n", num_msgid);
 
 /* print all msgids in the group */
 for(i = 0; i < num_msgid; i++)
 {
   ub4  rawSize;                                             /* raw size    */
   ub1 *rawPtr;                                              /* raw pointer */
     /* get msgid from group */
   checkerr(ctxptr->errhp,
        OCICollGetElem(ctxptr->envhp, ctxptr->errhp,
               (OCIColl *) msgid_array, i, &exist,
               (void **)(&msgid_ptr), &elemind),
        "Inside notifyCB1-OCICollGetElem");
   msgid = *msgid_ptr;
   rawSize = OCIRawSize(ctxptr->envhp, msgid);
   rawPtr = OCIRawPtr(ctxptr->envhp, msgid);
 
   /* print msgid size */
   printf("Msgid size: %d\n", rawSize);
 
   /* print msgid in hexadecimal format */
   for (j = 0; j < rawSize; j++)
   {                                           /* for each byte in the raw */
     printf("%c", hexit[(rawPtr[j] & 0xf0) >> 4]);
     printf("%c", hexit[(rawPtr[j] & 0x0f)]);
   }
   printf("\n");
 }
 
 /* print #ntfns received in group */
 printf("Notification Count: %d\n", msgid_cnt);
 printf("\n");
 printf("***********************************************************\n");
 fflush((FILE *)stdout);
 return 1;
}

通知プロシージャ

実行が登録されているサブスクリプションでなんらかのアクティビティが発生したときにコールするPL/SQLプロシージャは、データベース内に作成する必要があります。

このプロシージャは通常、サブスクリプション・ハンドルのOCI_ATTR_SUBSCR_RECPT属性によって設定されます。

関連項目:

パブリッシュ・サブスクライブの直接登録の例

直接登録を使用したパブリッシュ・サブスクライブ通知の実装例を示します。

例13-3では、システム・イベント、クライアント通知およびアドバンスト・キューイングが連動して、パブリッシュ・サブスクライブ通知を実装する方法を示しています。

例13-3のPL/SQLコードでは、ユーザー・スキーマpubsub環境で公開サブスクライブ・メカニズムをサポートするために必要なすべてのオブジェクトを作成します。このコードでは、エージェントsnoopが、ログイン・イベント時にパブリッシュされるメッセージをサブスクライブします。ユーザーpubsubには、アドバンスト・キューイング機能を使用するためにAQ_ADMINISTRATOR_ROLEおよびAQ_USER_ROLEの権限が必要です。システム・イベントのトリガーを有効にするには、初期化パラメータ_SYSTEM_TRIG_ENABLEDTRUE (デフォルト)に設定する必要があります。 例13-3を実行する前に、pubsubとして接続します。

サブスクリプションが作成された後、クライアントはコールバック関数を使用して通知を登録する必要があります。例13-4では、登録に必要なステップを実行するサンプル・コードを示しています。ここでは、わかりやすくするために、セッション・ハンドルの割当ておよび初期化のステップを省略しています。

ユーザーIXがデータベースにログインした場合、クライアントは電子メールで通知を受け取り、コールバック関数notifySnoopがコールされます。電子メール通知は、アドレスxyz@company.comに送信され、PL/SQLプロシージャplsqlnotifySnoopもデータベースでコールされます。

例13-3 パブリッシュ・サブスクライブ通知の実装

----------------------------------------------------------
----create queue table for persistent multiple consumers
----------------------------------------------------------
---- Create or replace a queue table
begin
  DBMS_AQADM.CREATE_QUEUE_TABLE(
  QUEUE_TABLE=>'pubsub.raw_msg_table', 
  MULTIPLE_CONSUMERS => TRUE,
  QUEUE_PAYLOAD_TYPE =>'RAW',
  COMPATIBLE => '8.1.5');
end;
/
----------------------------------------------------------
---- Create a persistent queue for publishing messages
----------------------------------------------------------
---- Create a queue for logon events
begin
  DBMS_AQADM.CREATE_QUEUE(QUEUE_NAME=>'pubsub.logon',
  QUEUE_TABLE=>'pubsub.raw_msg_table',
  COMMENT=>'Q for error triggers');
end;
/
----------------------------------------------------------
---- Start the queue
----------------------------------------------------------
begin
  DBMS_AQADM.START_QUEUE('pubsub.logon');
end;
/
----------------------------------------------------------
---- define new_enqueue for convenience
----------------------------------------------------------
create or replace procedure new_enqueue(queue_name  in varchar2,
                                        payload  in raw ,
correlation in varchar2 := NULL,
exception_queue in varchar2 := NULL)
as
  enq_ct     dbms_aq.enqueue_options_t;
  msg_prop   dbms_aq.message_properties_t;
  enq_msgid  raw(16);
  userdata   raw(1000);
begin
  msg_prop.exception_queue := exception_queue;
  msg_prop.correlation := correlation;
  userdata := payload;
  DBMS_AQ.ENQUEUE(queue_name,enq_ct, msg_prop,userdata,enq_msgid);
end;
/
----------------------------------------------------------
---- add subscriber with rule based on current user name, 
---- using correlation_id
----------------------------------------------------------
declare
subscriber sys.aq$_agent;
begin
  subscriber := sys.aq$_agent('SNOOP', null, null);
  dbms_aqadm.add_subscriber(queue_name => 'pubsub.logon',
                            subscriber => subscriber,
                            rule => 'CORRID = ''ix'' ');
end;
/
----------------------------------------------------------
---- create a trigger on logon on database
----------------------------------------------------------
---- create trigger on after logon
create or replace trigger systrig2
   AFTER LOGON
   ON DATABASE
   begin
     new_enqueue('pubsub.logon', hextoraw('9999'), dbms_standard.login_user);
   end;
/

----------------------------------------------------------
---- create a PL/SQL callback for notification of logon 
---- of user 'ix' on database
----------------------------------------------------------
---- 
create or replace procedure plsqlnotifySnoop(
  context raw, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor,
  payload raw, payloadl number)
as
begin
 dbms_output.put_line('Notification : User ix Logged on\n');
end;
/

例13-4 コールバック関数を使用する通知の登録

...
static ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;

static OCISubscription *subscrhpSnoop = (OCISubscription *)0;
static OCISubscription *subscrhpSnoopMail = (OCISubscription *)0;
static OCISubscription *subscrhpSnoopServer = (OCISubscription *)0;

/* callback function for notification of logon of user 'ix' on database */

static ub4 notifySnoop(ctx, subscrhp, pay, payl, desc, mode)
    void *ctx;
    OCISubscription *subscrhp;
    void *pay;
    ub4 payl;
    void *desc;
    ub4 mode;
{
    printf("Notification : User ix Logged on\n");
  (void)OCIHandleFree((void *)subscrhpSnoop,
            (ub4) OCI_HTYPE_SUBSCRIPTION);
    return 1;
}

static void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
  text errbuf[512];
  ub4 buflen;
  sb4 errcode;

  if (status == OCI_SUCCESS) return;

  switch (status)
  {
  case OCI_SUCCESS_WITH_INFO:
    printf("Error - OCI_SUCCESS_WITH_INFO\n");
    break;
  case OCI_NEED_DATA:
    printf("Error - OCI_NEED_DATA\n");
    break;
  case OCI_NO_DATA:
    printf("Error - OCI_NO_DATA\n");
    break;
  case OCI_ERROR:
    OCIErrorGet ((void *) errhp, (ub4) 1, (text *) NULL, &errcode,
            errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
    printf("Error - %s\n", errbuf);
    break;
  case OCI_INVALID_HANDLE:
    printf("Error - OCI_INVALID_HANDLE\n");
    break;
  case OCI_STILL_EXECUTING:
    printf("Error - OCI_STILL_EXECUTING\n");
    break;
  case OCI_CONTINUE:
    printf("Error - OCI_CONTINUE\n");
    break;
  default:
    printf("Error - %d\n", status);
    break;
  }
}

static void initSubscriptionHn (subscrhp,
                         subscriptionName,
                         func,
                         recpproto,
                         recpaddr,
                         recppres)
OCISubscription **subscrhp;
  char * subscriptionName;
  void * func;
  ub4 recpproto;
  char * recpaddr;
  ub4 recppres;
{
    /* allocate subscription handle */
    (void) OCIHandleAlloc((void *) envhp, (void **)subscrhp,
        (ub4) OCI_HTYPE_SUBSCRIPTION,
        (size_t) 0, (void **) 0);

    /* set subscription name in handle */
    (void) OCIAttrSet((void *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (void *) subscriptionName,
        (ub4) strlen((char *)subscriptionName),
        (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

    /* set callback function in handle */
    if (func)
      (void) OCIAttrSet((void *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
          (void *) func, (ub4) 0,
          (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

    /* set context in handle */
    (void) OCIAttrSet((void *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (void *) 0, (ub4) 0,
       (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

    /* set namespace in handle */
    (void) OCIAttrSet((void *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (void *) &namespace, (ub4) 0,
        (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

    /* set receive with protocol in handle */
    (void) OCIAttrSet((void *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (void *) &recpproto, (ub4) 0,
        (ub4) OCI_ATTR_SUBSCR_RECPTPROTO, errhp);

    /* set recipient address in handle */
    if (recpaddr)
      (void) OCIAttrSet((void *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
          (void *) recpaddr, (ub4) strlen(recpaddr),
          (ub4) OCI_ATTR_SUBSCR_RECPT, errhp);

    /* set receive with presentation in handle */
    (void) OCIAttrSet((void *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
        (void *) &recppres, (ub4) 0,
        (ub4) OCI_ATTR_SUBSCR_RECPTPRES, errhp);

    printf("Begining Registration for subscription %s\n", subscriptionName);
    checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp,
        OCI_DEFAULT));
   printf("done\n");

}


int main( argc, argv)
int    argc;
char * argv[];
{
    OCISession *authp = (OCISession *) 0;

/*****************************************************
Initialize OCI Process/Environment
Initialize Server Contexts
Connect to Server
Set Service Context
******************************************************/

/* Registration Code Begins */
/* Each call to initSubscriptionHn allocates
   and initializes a Registration Handle */

/* Register for OCI notification */
    initSubscriptionHn(    &subscrhpSnoop,    /* subscription handle*/
    (char*) "PUBSUB.LOGON:SNOOP", /* subscription name */
                                  /*<queue_name>:<agent_name> */
        (void*)notifySnoop,  /* callback function */
        OCI_SUBSCR_PROTO_OCI, /* receive with protocol */
        (char *)0, /* recipient address */
        OCI_SUBSCR_PRES_DEFAULT); /* receive with presentation */

/* Register for email notification */
    initSubscriptionHn(    &subscrhpSnoopMail,  /* subscription handle */
     (char*) "PUBSUB.LOGON:SNOOP",              /* subscription name */ 
                                                /* <queue_name>:<agent_name> */
        (void*)0, /* callback function */
        OCI_SUBSCR_PROTO_MAIL, /* receive with protocol */
        (char*)  "xyz@company.com", /* recipient address */
        OCI_SUBSCR_PRES_DEFAULT); /* receive with presentation */

/* Register for server to server notification */
    initSubscriptionHn(    &subscrhpSnoopServer, /* subscription handle */
       (char*)  "PUBSUB.LOGON:SNOOP",            /* subscription name */
                                                 /* <queue_name>:<agent_name> */
        (void*)0, /* callback function */
        OCI_SUBSCR_PROTO_SERVER, /* receive with protocol */
         (char*) "pubsub.plsqlnotifySnoop", /* recipient address */
        OCI_SUBSCR_PRES_DEFAULT); /* receive with presentation */

    checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) OCI_DEFAULT));

/*****************************************************
The Client Process does not need a live Session for Callbacks.
End Session and Detach from Server.
******************************************************/

    OCISessionEnd ( svchp,  errhp, authp, (ub4) OCI_DEFAULT);

    /* detach from server */
    OCIServerDetach( srvhp, errhp, OCI_DEFAULT);

   while (1)    /* wait for callback */
    sleep(1);
}

パブリッシュ・サブスクライブのLDAP登録の例

LDAP登録を行う方法を説明する例を示します。

例13-5では、LDAP登録の方法を説明するコード・フラグメントを示しています。プログラム・コメントはすべて目を通してください。

例13-5 LDAP登録

...

  /* To use the LDAP registration feature, OCI_EVENTS | OCI_EVENTS |OCI_USE_LDAP*/
  /*   must be set in OCIEnvCreate or OCIEnvNlsCreate */
  /*     (Note: OCIInitialize is deprecated): */
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT|OCI_USE_LDAP, (void *)0,
                       (void * (*)(void *, size_t)) 0,
                       (void * (*)(void *, void *, size_t))0,
                       (void (*)(void *, void *)) 0 );

...

  /* set LDAP attributes in the environment handle */

  /* LDAP host name */
  (void) OCIAttrSet((void *)envhp, OCI_HTYPE_ENV, (void *)"yow", 3,
                    OCI_ATTR_LDAP_HOST, (OCIError *)errhp);

  /* LDAP server port */ 
  ldap_port = 389;
  (void) OCIAttrSet((void *)envhp, OCI_HTYPE_ENV, (void *)&ldap_port,
                    (ub4)0, OCI_ATTR_LDAP_PORT, (OCIError *)errhp);

  /* bind DN of the client, normally the enterprise user name */
  (void) OCIAttrSet((void *)envhp, OCI_HTYPE_ENV, (void *)"cn=orcladmin",
                    12, OCI_ATTR_BIND_DN, (OCIError *)errhp);

  /* password of the client */
  (void) OCIAttrSet((void *)envhp, OCI_HTYPE_ENV, (void *)"welcome",
                    7, OCI_ATTR_LDAP_CRED, (OCIError *)errhp);

  /* authentication method is "simple", username/password authentication */
  ldap_auth = 0x01;
  (void) OCIAttrSet((void *)envhp, OCI_HTYPE_ENV, (void *)&ldap_auth,
                    (ub4)0, OCI_ATTR_LDAP_AUTH, (OCIError *)errhp);

  /* administrative context: this is the DN above cn=oraclecontext */
  (void) OCIAttrSet((void *)envhp, OCI_HTYPE_ENV, (void *)"cn=acme,cn=com",
                    14, OCI_ATTR_LDAP_CTX, (OCIError *)errhp);

...

  /* retrieve the LDAP attributes from the environment handle */

  /* LDAP host */
  (void) OCIAttrGet((void *)envhp, OCI_HTYPE_ENV, (void *)&buf, 
                    &szp,  OCI_ATTR_LDAP_HOST,  (OCIError *)errhp);

  /* LDAP server port */
  (void) OCIAttrGet((void *)envhp, OCI_HTYPE_ENV, (void *)&intval, 
                    0,  OCI_ATTR_LDAP_PORT,  (OCIError *)errhp);

  /* client binding DN */
  (void) OCIAttrGet((void *)envhp, OCI_HTYPE_ENV, (void *)&buf, 
                    &szp,  OCI_ATTR_BIND_DN,  (OCIError *)errhp);

  /* client password */
  (void) OCIAttrGet((void *)envhp, OCI_HTYPE_ENV, (void *)&buf, 
                    &szp,  OCI_ATTR_LDAP_CRED,  (OCIError *)errhp);

  /* administrative context */
  (void) OCIAttrGet((void *)envhp, OCI_HTYPE_ENV, (void *)&buf, 
                    &szp,  OCI_ATTR_LDAP_CTX,  (OCIError *)errhp);

  /* client authentication method */
  (void) OCIAttrGet((void *)envhp, OCI_HTYPE_ENV, (void *)&intval, 
                    0,  OCI_ATTR_LDAP_AUTH,  (OCIError *)errhp);
  
  ...

  /* to set up the server DN descriptor in the subscription handle */

  /* allocate a server DN descriptor, dn is of type "OCIServerDNs **", 
     subhp is of type "OCISubscription **" */
  (void) OCIDescriptorAlloc((void *)envhp, (void **)dn, 
                         (ub4) OCI_DTYPE_SRVDN, (size_t)0, (void **)0);

  /* now *dn is the server DN descriptor, add the DN of the first database 
     that you want to register */
  (void) OCIAttrSet((void *)*dn, (ub4) OCI_DTYPE_SRVDN, 
                    (void *)"cn=server1,cn=oraclecontext,cn=acme,cn=com",
                    42, (ub4)OCI_ATTR_SERVER_DN, errhp);
  /* add the DN of another database in the descriptor */
  (void) OCIAttrSet((void *)*dn, (ub4) OCI_DTYPE_SRVDN, 
                    (void *)"cn=server2,cn=oraclecontext,cn=acme,cn=com",
                    42, (ub4)OCI_ATTR_SERVER_DN, errhp);

  /* set the server DN descriptor into the subscription handle */
  (void) OCIAttrSet((void *) *subhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (void *) *dn, (ub4)0, (ub4) OCI_ATTR_SERVER_DNS, errhp);

  ...

  /* now you will try to get the server DN information from the subscription
     handle */
 
  /* first, get the server DN descriptor out */
  (void) OCIAttrGet((void *) *subhp, (ub4) OCI_HTYPE_SUBSCRIPTION, 
                    (void *)dn, &szp, OCI_ATTR_SERVER_DNS, errhp);

  /* then, get the number of server DNs in the descriptor */
  (void) OCIAttrGet((void *) *dn, (ub4)OCI_DTYPE_SRVDN, (void *)&intval,
                    &szp, (ub4)OCI_ATTR_DN_COUNT, errhp);

  /* allocate an array of char * to hold server DN pointers returned by
     an Oracle database*/
    if (intval)
    {
      arr = (char **)malloc(intval*sizeof(char *));
      (void) OCIAttrGet((void *)*dn, (ub4)OCI_DTYPE_SRVDN, (void *)arr,
                        &intval, (ub4)OCI_ATTR_SERVER_DN, errhp);
    }

  /* OCISubscriptionRegister() calls have two modes: OCI_DEFAULT and 
     OCI_REG_LDAPONLY. If OCI_DEFAULT is used, there should be only one
     server DN in the server DN descriptor. The registration request will
     be sent to the database. If a database connection is not available,
     the registration request will be detoured to the LDAP server. However,
     if mode OCI_REG_LDAPONLY is used, the registration request
     will be directly sent to LDAP. This mode should be used when there is 
     more than one server DN in the server DN descriptor or you are sure
     that a database connection is not available.

     In this example, two DNs are entered, so you should use mode 
     OCI_REG_LDAPONLY in LDAP registration. */
  OCISubscriptionRegister(svchp, subhp, 1, errhp, OCI_REG_LDAPONLY);

  ...

  /* as OCISubscriptionRegister(), OCISubscriptionUnregister() also has
     mode OCI_DEFAULT and OCI_REG_LDAPONLY. The usage is the same. */

  OCISubscriptionUnRegister(svchp, *subhp, errhp, OCI_REG_LDAPONLY);
}
...

OCIおよびStreamsアドバンスト・キューイング

OCIでは、Streams Advanced Queuing (Streams AQ)機能へのインタフェースを提供します。Streams AQは、Oracle Databaseと統合されたメッセージ・キューイングです。

Streams AQでは、キューイング・システムをデータベースに統合することによりこの機能を実現しており、その結果メッセージ対応データベースが作成されます。Streams AQでは統合された解決策が提供されるため、アプリケーション開発者がメッセージ・インフラストラクチャを構築する必要がなくなり、特定のビジネス・ロジックに専念できるようになりました。

ノート:

Streamsアドバンスト・キューイングを使用するには、Oracle DatabaseのEnterprise Editionを使用している必要があります。

関連項目:

OCI Streamsアドバンスト・キューイング関数

OCI Streamsアドバンスト・キューイング関数を示します。

OCIライブラリには、Streamsアドバンスト・キューイングに関して次の関数が含まれています。

  • OCIAQEnq()

  • OCIAQDeq()

  • OCIAQListen() (非推奨)

  • OCIAQListen2()

  • OCIAQEnqArray()

  • OCIAQDeqArray()

シングル・キューにメッセージの配列をエンキューできます。メッセージはすべて、同じエンキュー・オプションを共有しますが、配列内の各メッセージには異なるメッセージ・プロパティを含めることができます。シングル・キューからメッセージの配列をデキューすることもできます。トランザクション・グループ・キューの場合は、1回のコールでシングル・トランザクション・グループにすべてのメッセージをデキューできます。

OCI Streamsアドバンスト・キューイングの説明

OCI Streamsアドバンスト・キューイング記述子およびその使用方法を示します。

次の記述子がOCI Streams AQ操作に使用されます。

  • OCIAQEnqOptions

  • OCIAQDeqOptions

  • OCIAQMsgProperties

  • OCIAQAgent

標準OCIDescriptorAlloc()コールを使用して、サービス・ハンドルに対してこれらの記述子を割り当てられます。次のコードでこの例を示します。

OCIDescriptorAlloc(svch, &enqueue_options, OCI_DTYPE_AQENQ_OPTIONS, 0, 0 ); 
OCIDescriptorAlloc(svch, &dequeue_options, OCI_DTYPE_AQDEQ_OPTIONS, 0, 0 ); 
OCIDescriptorAlloc(svch, &message_properties, OCI_DTYPE_AQMSG_PROPERTIES, 0, 0);
OCIDescriptorAlloc(svch, &agent, OCI_DTYPE_AQAGENT, 0, 0 ); 

各記述子には、設定または読取りができる様々な属性が含まれます。

OCI Streamsアドバンスト・キューイングとPL/SQL

OCI Streams AQ関数および記述子の関数、パラメータ、オプションと、DBMS_AQパッケージのPL/SQL AQ関数との比較を示します。

次の表は、OCI Streams AQ関数および記述子の関数、パラメータ、オプションと、DBMS_AQパッケージのPL/SQL AQ関数との比較を表します。表13-2では、AQ関数を比較しています。

表13-2 AQ関数

PL/SQLファンクション OCI関数

DBMS_AQ.ENQUEUE

OCIAQEnq()

DBMS_AQ.DEQUEUE

OCIAQDeq()

DBMS_AQ.LISTEN

OCIAQListen()、OCIAQListen2()

DBMS_AQ.ENQUEUE_ARRAY

OCIAQEnqArray()

DBMS_AQ.DEQUEUE_ARRAY

OCIAQDeqArray()

表13-3では、エンキュー関数のパラメータを比較しています。

表13-3 エンキュー・パラメータ

DBMS_AQ.ENQUEUEパラメータ OCIAQEnq()パラメータ
queue_name
queue_name
enqueue_options
enqueue_options
message_properties
message_properties
payload
payload
msgid
msgid

-

ノート: OCIAQEnq()では、さらにsvcherrh、payload_tdo、payload_indおよびflagsの各パラメータが必要です。

表13-4では、デキュー関数のパラメータを比較しています。

表13-4 デキュー・パラメータ

DBMS_AQ.DEQUEUEパラメータ OCIAQDeq()パラメータ
queue_name
queue_name
dequeue_options
dequeue_options
message_properties
message_properties
payload
payload
msgid
msgid

-

ノート: OCIAQDeq()では、さらにsvcherrh、dequeue_options、message_properties、payload_tdo、payload、payload_indおよびflagsの各パラメータが必要です。

表13-5では、リスニング関数のパラメータを比較しています。

表13-5 リスニング・パラメータ

DBMS_AQ.LISTENパラメータ OCIAQListen2()パラメータ
agent_list
agent_list
wait
wait
agent
agent
listen_delivery_mode

lopts

-

ノート: OCIAQListen2()では、さらにsvchp、errhp、agent_list、num_agents、agent、lmopsおよびflagsの各パラメータが必要です。

表13-6では、配列エンキュー関数のパラメータを比較しています。

表13-6 配列エンキュー・パラメータ

DBMS_AQ.ENQUEUE_ARRAYパラメータ OCIAQEnqArray()パラメータ
queue_name
queue_name
enqueue_options
enqopt
array_size
iters
message_properties_array
msgprop
payload_array
payload
msgid_array
msgid

-

ノート: OCIAQEnqArray()では、さらにsvcherrh、payload_tdo、payload_ind、ctxp、enqcbfpおよびflagsの各パラメータが必要です。

表13-7では、配列デキュー関数のパラメータを比較しています。

表13-7 配列デキュー・パラメータ

DBMS_AQ.DEQUEUE_ARRAYパラメータ OCIAQDeqArray()パラメータ
queue_name
queue_name
dequeue_options
deqopt
array_size
iters
message_properties_array
msgprop
payload_array
payload
msgid_array
msgid

-

ノート: OCIAQDeqArray()では、さらにsvcherrh、msgprop、payload_tdo、payload_ind、ctxp、deqcbfpおよびflagsの各パラメータが必要です。

表13-8では、エージェント属性のパラメータを比較しています。

表13-8 エージェント・パラメータ

PL/SQL Agentパラメータ OCIAQAgent属性
name

OCI_ATTR_AGENT_NAME

address

OCI_ATTR_AGENT_ADDRESS

protocol

OCI_ATTR_AGENT_PROTOCOL

表13-9では、メッセージ・プロパティのパラメータを比較しています。

表13-9 メッセージ・プロパティ

PL/SQLメッセージ・プロパティ OCIAQMsgProperties属性
priority

OCI_ATTR_PRIORITY

delay

OCI_ATTR_DELAY

expiration

OCI_ATTR_EXPIRATION

correlation

OCI_ATTR_CORRELATION

attempts

OCI_ATTR_ATTEMPTS

recipient_list

OCI_ATTR_RECIPIENT_LIST

exception_queue

OCI_ATTR_EXCEPTION_QUEUE

enqueue_time

OCI_ATTR_ENQ_TIME

state

OCI_ATTR_MSG_STATE

sender_id

OCI_ATTR_SENDER_ID

transaction_group

OCI_ATTR_TRANSACTION_NO

original_msgid

OCI_ATTR_ORIGINAL_MSGID

delivery_mode

OCI_ATTR_MSG_DELIVERY_MODE

表13-10では、エンキュー・オプション属性を比較しています。

表13-10 エンキュー・オプション属性

PL/SQLエンキュー・オプション OCIAQEnqOptions属性
visibility

OCI_ATTR_VISIBILITY

relative_msgid

OCI_ATTR_RELATIVE_MSGID

sequence_deviation

OCI_ATTR_SEQUENCE_DEVIATION

(非推奨)

transformation

OCI_ATTR_TRANSFORMATION

delivery_mode

OCI_ATTR_MSG_DELIVERY_MODE

表13-11では、デキュー・オプション属性を比較しています。

表13-11 デキュー・オプション属性

PL/SQLデキュー・オプション OCIAQDeqOptions属性
consumer_name

OCI_ATTR_CONSUMER_NAME

dequeue_mode

OCI_ATTR_DEQ_MODE

navigation

OCI_ATTR_NAVIGATION

visibility

OCI_ATTR_VISIBILITY

wait

OCI_ATTR_WAIT

msgid

OCI_ATTR_DEQ_MSGID

correlation

OCI_ATTR_CORRELATION

deq_condition

OCI_ATTR_DEQCOND

transformation

OCI_ATTR_TRANSFORMATION

delivery_mode

OCI_ATTR_MSG_DELIVERY_MODE

ノート:

OCIAQEnq()は、エンキュー・オプションOCI_ATTR_SEQUENCEOCI_ATTR_RELATIVE_MSGIDとともに指定している間は、エラーORA-25219を戻します。これは、2つのメッセージをエンキューした場合に発生します。2番目のメッセージに対しては、最初のメッセージの前にこのメッセージをデキューするように、エンキュー・オプションOCI_ATTR_SEQUENCEおよびOCI_ATTR_RELATIVE_MSGIDが設定されます。順序を指定していない場合エラーは戻されませんが、関連メッセージの前にこのメッセージはデキューされません。

OCI_ATTR_SEQUENCE属性が設定されていない場合、OCIAQEnq()はエラーを戻しませんが、このメッセージは、関連メッセージIDを持つメッセージより前にデキューされません。

関連項目:

OCIAQEnq()

バッファ・メッセージの使用

バッファ・メッセージは、Streams AQ内の非永続型メッセージ機能で、Oracle Database 10gリリース2から使用可能になりました。

バッファ・メッセージは、共有メモリーに格納され、インスタンスに障害が発生すると失われることがあります。永続型メッセージとは異なり、ディスクにはREDOが書き込まれません。永続型メッセージ操作より、バッファ・メッセージのエンキューとデキューの方がはるかに高速です。共有メモリーには制限があるため、バッファ・メッセージをディスクにオーバーフローさせることが必要になる場合があります。メッセージ・コンシューマが低速である場合や、なんらかの理由により停止した場合、フロー制御を有効にすることにより、アプリケーションによって共有メモリーが許容オーバーになるのを阻止できます。バッファ・メッセージには、次の関数が使用されます。

  • OCIAQEnq()

  • OCIAQDeq()

  • OCIAQListen2()

例13-6では、エンキュー・バッファ・メッセージの例を示しています。

例13-7では、デキュー・バッファ・メッセージの例を示しています。

ノート:

配列操作は、バッファ・メッセージに対してはサポートされません。アプリケーションでは、配列サイズを1に設定してOCIAQEnqArray()およびOCIAQDeqArray()関数を使用できます。

例13-6 エンキュー・バッファ・メッセージ

...
OCIAQMsgProperties  *msgprop;
OCIAQEnqueueOptions *enqopt;
message              msg;    /* message is an object type */
null_message         nmsg;   /* message indicator */
...
/* Allocate descriptors */
  OCIDescriptorAlloc(envhp, (void **)&enqopt, OCI_DTYPE_AQENQ_OPTIONS, 0,
                     (void **)0));
 
 OCIDescriptorAlloc(envhp, (void **)&msgprop,OCI_DTYPE_AQMSG_PROPERTIES, 0,
                    (void **)0));
 
/* Set delivery mode to buffered */
 dlvm = OCI_MSG_BUFFERED;
 OCIAttrSet(enqopt,  OCI_DTYPE_AQENQ_OPTIONS, (void *)&dlvm, sizeof(ub2),
            OCI_ATTR_MSG_DELIVERY_MODE, errhp);
/* Set visibility to Immediate (visibility must always be immediate for buffered
   messages) */
vis = OCI_ENQ_ON_COMMIT;
 
OCIAttrSet(enqopt, OCI_DTYPE_AQENQ_OPTIONS,(void *)&vis, sizeof(ub4),
           OCI_ATTR_VISIBILITY, errhp)
 
/* Message was an object type created earlier, msg_tdo is its type
   descriptor object */
OCIAQEnq(svchp, errhp, "Test_Queue", enqopt, msgprop, msg_tdo, (void **)&mesg,
         (void **)&nmesg, (OCIRaw **)0, 0));
...

例13-7 デキュー・バッファ・メッセージ

...
OCIAQMsgProperties  *msgprop;
OCIAQDequeueOptions *deqopt;
...
OCIDescriptorAlloc(envhp, (void **)&mprop, OCI_DTYPE_AQMSG_PROPERTIES, 0,
                   (void **)0));
OCIDescriptorAlloc(envhp, (void **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0,
                   (void **)0);

/* Set visibility to Immediate (visibility must always be immediate for buffered
   message operations) */
vis = OCI_ENQ_ON_COMMIT;
OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(void *)&vis, sizeof(ub4),
           OCI_ATTR_VISIBILITY, errhp)
/* delivery mode is buffered */
dlvm  = OCI_MSG_BUFFERED;
OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (void *)&dlvm,  sizeof(ub2),
           OCI_ATTR_MSG_DELIVERY_MODE, errhp);
/* Set the consumer for which to dequeue the message (this must be specified
   regardless of the type of message being dequeued).
*/
consumer = "FIRST_SUBSCRIBER";
OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (void *)consumer,
           (ub4)strlen((char*)consumer), OCI_ATTR_CONSUMER_NAME, errhp);
/* Dequeue the message but do not return the payload (to simplify the code
   fragment)
*/
OCIAQDeq(svchp, errhp,  "test_queue", deqopt, msgprop, msg_tdo, (void **)0,
         (void **)0, (OCIRaw**)0, 0);
...