ヘッダーをスキップ
Oracle® Streamsアドバンスト・キューイング・ユーザーズ・ガイド
11gリリース2 (11.2)
B61355-04
  目次へ移動
目次
索引へ移動
索引

前
 
次
 

23 Oracle Streamsのメッセージの例

この章の例では、Oracle Streamsを使用して作成できるメッセージ環境を説明します。この例では、db01という名前のデータベースへのアクセス権を持つSQL*Plusテスト環境を使用していることを前提にしています。

内容は次のとおりです。

メッセージ例の概要

この例では、1つのANYDATAキューを使用して、同じキュー内に異なるタイプのメッセージ・ペイロードが格納されるOracle Streamsメッセージ環境を説明します。特にこの例ではOracle Streamsの次のメッセージ機能を説明します。

  • ANYDATAペイロードとして注文ペイロードを含むメッセージのエンキュー

  • ANYDATAペイロードとして顧客ペイロードを含むメッセージのエンキュー

  • ANYDATAペイロードとしてLCR行を含むメッセージのエンキュー

  • イベントを適用するためのルール・セットの作成

  • ルール・セットで使用される評価コンテキストの作成

  • デキューするOracle Streams適用プロセスとルールに基づくイベント処理の作成

  • メッセージ・ハンドラの作成と適用プロセスとの関連付け

  • 適用プロセスを使用せずにルールに基づく明示的なイベントのデキューおよび処理

図23-1は、この環境の概要を示したものです。

図23-1 Oracle Streamsメッセージ環境の例

図23-1の説明が続きます。
「図23-1 Oracle Streamsメッセージ環境の例」の説明

ユーザーの設定およびANYDATAキューの作成

この章の例は、oeサンプル・スキーマを使用しているため、oeユーザーにはDBMS_AQパッケージのサブプログラムを実行する権限が必要です。これは、例23-1で実行されます。


注意:

oeユーザーは、例23-2ANYDATAキューを作成するときにキュー・ユーザーとして指定されます。SET_UP_QUEUEプロシージャは、oeユーザーにキューのエンキューおよびデキュー権限を付与しますが、oeユーザーには、メッセージをエンキューおよびデキューするために、DBMS_AQパッケージのEXECUTE権限も必要です。

この例で説明するほとんどの構成および管理アクションは、Oracle Streams管理者strmadminによって実行されます。例23-1は、このユーザーを作成して、必要な権限も付与します。これらの権限によって、ユーザーはOracle Streamsに関連するパッケージでのサブプログラムの実行、ルール・セットの作成、ルールの作成、およびデータ・ディクショナリ・ビューの問合せによるOracle Streams環境の監視ができるようになります。

例23-1では、管理権限を持つユーザーとしてデータベースdb01に接続します。

例23-1 ANYDATAユーザーの設定

GRANT EXECUTE ON DBMS_AQ TO oe;
CREATE USER strmadmin IDENTIFIED  BY strmadmin DEFAULT TABLESPACE example;
GRANT DBA, SELECT_CATALOG_ROLE    TO strmadmin; 
GRANT EXECUTE ON DBMS_APPLY_ADM   TO strmadmin;
GRANT EXECUTE ON DBMS_AQ          TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM       TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;
BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

注意:

  • SELECT_CATALOG_ROLEは、Oracle Streams管理者に必須ではありません。Oracle Streams管理者が環境を容易に監視できるよう、この例では付与されています。

  • Oracle Enterprise ManagerでOracle Streamsツールを使用する場合は、Oracle Streams管理者にこの手順で示した権限の他に、SELECT ANY DICTIONARY権限を付与します。


例23-2では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、ANYDATAキューoe_queueを作成します。SET_UP_QUEUEプロシージャは、キューのキュー表を作成してから、キューを作成および開始します。

例23-2 ANYDATAキューの作成

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE
CONNECT strmadmin/&password@db01;
set echo on

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE( 
    queue_table => 'oe_queue_table', 
    queue_name  => 'oe_queue');
END;
/

例23-3では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、キューoe_queueに対してoeユーザー権限を付与して、キューに対する明示的なエンキュー操作の実行に使用されるエージェントexplicit_enqを作成し、oeユーザーをエージェントに関連付けます。

キューoe_queueは、SET_UP_QUEUEを使用して作成されたため保護キューです。保護キューに対してエンキューおよびデキュー操作を実行するユーザーは、キューの保護キュー・ユーザーとして構成する必要があります。oeユーザーをエージェントexplicit_enqに関連付けると、oeユーザーはこのキューに対してエンキュー操作を実行できます。

例23-3 ANYDATAキューに対するエンキューの有効化

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE
CONNECT strmadmin/&password@db01;
set echo on

BEGIN
  SYS.DBMS_AQADM.GRANT_QUEUE_PRIVILEGE(
    privilege  => 'ALL',
    queue_name => 'strmadmin.oe_queue',
    grantee    => 'oe');
  SYS.DBMS_AQADM.CREATE_AQ_AGENT(
    agent_name  => 'explicit_enq');
  DBMS_AQADM.ENABLE_DB_ACCESS(
    agent_name  => 'explicit_enq',
    db_username => 'oe');
END;
/

エンキュー・プロシージャの作成

この項の例では、メッセージをANYDATAキューoe_queueにエンキューする2つのPL/SQLプロシージャを作成します。一方のプロシージャはLCR以外のメッセージをエンキューし、もう一方のプロシージャはLCR行メッセージをエンキューします。

例23-4では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、oe.orders表の列に基づき注文を表す型を作成します。この型は、ANYDATAキューoe_queueにエンキューされるメッセージに使用されます。型属性には、actionという1つの追加属性とともにoe.orders表に列が含まれています。この型のインスタンスのaction属性の値は、インスタンス(適用プロセス・デキューまたは明示的デキュー)に対して実行する正しいアクションの決定に使用されます。

例23-4 注文の型の作成

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE
CONNECT oe/&password@db01;
set echo on

CREATE TYPE order_event_typ AS OBJECT(
  order_id       NUMBER(12),
  order_date     TIMESTAMP(6) WITH LOCAL TIME ZONE,
  order_mode     VARCHAR2(8),
  customer_id    NUMBER(6),
  order_status   NUMBER(2),
  order_total    NUMBER(8,2),
  sales_rep_id   NUMBER(6),
  promotion_id   NUMBER(6),
  action         VARCHAR(7));
/

例23-5では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、oe.customers表の列に基づき顧客を表す型を作成します。この型は、ANYDATAキューoe_queueにエンキューされるメッセージに使用されます。型属性には、actionという1つの追加属性とともに、oe.customers表に列が含まれています。この型のインスタンスのaction属性の値は、インスタンス(適用プロセス・デキューまたは明示的デキュー)に対して実行する正しいアクションの決定に使用されます。


注意:

この例は、oe.customers表からcust_geo_location列が削除されていることを前提としています。この列は、Oracle Spatialの場合のみ有効です。

例23-5 顧客の型の作成

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDECONNECT oe/&password@db01;set echo on

CREATE TYPE customer_event_typ AS OBJECT(
  customer_id         NUMBER(6),
  cust_first_name     VARCHAR2(20),
  cust_last_name      VARCHAR2(20),
  cust_address        CUST_ADDRESS_TYP,
  phone_numbers       PHONE_LIST_TYP,
  nls_language        VARCHAR2(3),
  nls_territory       VARCHAR2(30),
  credit_limit        NUMBER(9,2),
  cust_email          VARCHAR2(30),
  account_mgr_id      NUMBER(6),
  date_of_birth       DATE,
  marital_status      VARCHAR2(20),
  gender              VARCHAR2(1),
  income_level        VARCHAR2(20),
  action              VARCHAR(7));
/

例23-6では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、enq_procというPL/SQLプロシージャを作成して、LCR以外のメッセージをANYDATAキューoe_queueにエンキューします。


注意:

エンキューされた単一メッセージは、適用プロセスおよび明示的デキューによってデキューできますが、この章の例ではこの機能の説明はしません。

例23-6 LCR以外のメッセージをエンキューするプロシージャの作成

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDECONNECT oe/&password@db01;set echo on

CREATE PROCEDURE oe.enq_proc (event IN ANYDATA) IS
    enqopt       DBMS_AQ.ENQUEUE_OPTIONS_T;
    mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
    enq_eventid  RAW(16);
  BEGIN
    mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL);
    DBMS_AQ.ENQUEUE(
      queue_name          =>  'strmadmin.oe_queue',
      enqueue_options     =>  enqopt,
      message_properties  =>  mprop,
      payload             =>  event,
      msgid               =>  enq_eventid);
END;
/

例23-7では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、LCR行を作成するenq_row_lcrというPL/SQLプロシージャを作成し、LCR行をANYDATAキューoe_queueにエンキューします。


関連項目:

LCRコンストラクタの詳細は、『Oracle Database PL/SQLパッケージ・プロシージャおよびタイプ・リファレンス』を参照してください。

例23-7 LCRイベント行を作成およびエンキューするプロシージャの作成

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDECONNECT oe/&password@db01;set echo on

CREATE PROCEDURE oe.enq_row_lcr(
  source_dbname  VARCHAR2,
  cmd_type       VARCHAR2,
  obj_owner      VARCHAR2,
  obj_name       VARCHAR2,
  old_vals       SYS.LCR$_ROW_LIST,
  new_vals       SYS.LCR$_ROW_LIST) 
AS
  eopt           DBMS_AQ.ENQUEUE_OPTIONS_T;
  mprop          DBMS_AQ.MESSAGE_PROPERTIES_T;
  enq_msgid      RAW(16);
  row_lcr        SYS.LCR$_ROW_RECORD;
BEGIN
  mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL); 
  row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
    source_database_name  =>  source_dbname,
    command_type          =>  cmd_type,
    object_owner          =>  obj_owner,
    object_name           =>  obj_name,
    old_values            =>  old_vals,
    new_values            =>  new_vals);
  DBMS_AQ.ENQUEUE(
    queue_name         =>  'strmadmin.oe_queue', 
    enqueue_options    =>  eopt,
    message_properties =>  mprop,
    payload            =>  ANYDATA.ConvertObject(row_lcr),
    msgid              =>  enq_msgid);
END enq_row_lcr;
/

適用プロセスの構成

この項の例では、適用プロセスを構成して、ANYDATAキューoe_queue内のユーザーによってエンキューされたメッセージを適用します。

例23-8では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、get_oe_actionという関数を作成して、関数に対するEXECUTE権限を管理者ユーザーstrmadminに付与します。

この関数は、キューoe_queueにあるメッセージのaction属性の値を決定します。これは、後にこの章で、イベントのaction属性値を決定するためにルールで使用されます。その後、ルール・エンジンのクライアントが、イベントの適切なアクションを実行します(プロセスの適用によるデキューまたは明示的なデキュー)。この例では、ルール・エンジンのクライアントは適用プロセスおよびoe.explicit_dq PL/SQLプロシージャです。

例23-8 action属性値を決定する関数の作成

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDECONNECT oe/&password@db01;set echo on

CREATE FUNCTION oe.get_oe_action (event IN ANYDATA) 
RETURN VARCHAR2
IS 
  ord         oe.order_event_typ;
  cust        oe.customer_event_typ;
  num         NUMBER;
  type_name   VARCHAR2(61);
BEGIN
  type_name := event.GETTYPENAME; 
  IF type_name = 'OE.ORDER_EVENT_TYP' THEN
    num := event.GETOBJECT(ord);
    RETURN ord.action;  
  ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN
    num := event.GETOBJECT(cust);
    RETURN cust.action; 
  ELSE
    RETURN NULL;
  END IF;
END;
/
GRANT EXECUTE ON get_oe_action TO strmadmin;

例23-9では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、適用プロセスによってメッセージ・ハンドラとして使用されるmes_handlerというPL/SQLプロシージャを作成します。また、このプロシージャに対するEXECUTE権限を管理者ユーザーstrmadminに付与します。このプロシージャでは、型oe.order_event_typまたはoe.customer_event_typのユーザーによってエンキューされたメッセージのペイロードを取得し、それぞれoe.orders表とoe.customers表の行として挿入します。

例23-9 メッセージ・ハンドラの作成

set echo offset verify offACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDECONNECT oe/&password@db01;set echo on

CREATE PROCEDURE oe.mes_handler (event IN ANYDATA) IS
  ord           oe.order_event_typ;
  cust          oe.customer_event_typ;
  num           NUMBER;
  type_name     VARCHAR2(61);
BEGIN
  type_name := event.GETTYPENAME;
  IF type_name = 'OE.ORDER_EVENT_TYP' THEN
    num := event.GETOBJECT(ord);
    INSERT INTO oe.orders VALUES (ord.order_id, ord.order_date, 
      ord.order_mode, ord.customer_id, ord.order_status, ord.order_total, 
      ord.sales_rep_id, ord.promotion_id); 
  ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN
    num := event.GETOBJECT(cust);
    INSERT INTO oe.customers VALUES (cust.customer_id, cust.cust_first_name, 
      cust.cust_last_name, cust.cust_address, cust.phone_numbers, 
      cust.nls_language, cust.nls_territory, cust.credit_limit, cust.cust_email, 
      cust.account_mgr_id, cust.date_of_birth, cust.marital_status, 
      cust.gender, cust.income_level); 
  END IF;
END;
/
GRANT EXECUTE ON mes_handler TO strmadmin;

例23-10では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、ルール・セットの評価コンテキストを作成します。

例23-10 ルール・セットの評価コンテキストの作成

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE
CONNECT strmadmin/&password@db01;
set echo on

DECLARE
  table_alias     SYS.RE$TABLE_ALIAS_LIST;
BEGIN
  table_alias := SYS.RE$TABLE_ALIAS_LIST(
    SYS.RE$TABLE_ALIAS('tab', 'strmadmin.oe_queue_table'));
  DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT(
    evaluation_context_name  =>  'oe_eval_context', 
    table_aliases            =>  table_alias);
END;
/

例23-11では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、適用プロセスのルール・セットを作成します。

例23-11 適用プロセスのルール・セットの作成

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE
CONNECT strmadmin/&password@db01;
set echo on

BEGIN
  DBMS_RULE_ADM.CREATE_RULE_SET(
    rule_set_name       =>  'apply_oe_rs',
    evaluation_context  =>  'strmadmin.oe_eval_context');
END;
/

例23-12では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、メッセージのaction値がapplyの場合にTRUEと評価するルールを作成します。tab.user_dataは、oe.get_oe_action関数に渡されます。tab.user_data列には、キュー表内のイベント・ペイロードが保持されます。キュー表の表別名は、例23-10tabとして指定されています。

例23-12 actionがapplyの場合にTRUEと評価するルールの作成

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE
CONNECT strmadmin/&password@db01; 
set echo on

BEGIN
  DBMS_RULE_ADM.CREATE_RULE(
    rule_name   => 'strmadmin.apply_action',
    condition   => 'oe.get_oe_action(tab.user_data) = ''APPLY'' ');
END;
/

例23-13では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、キュー内のイベントがoe.orders表またはoe.customers表を変更するLCR行の場合にTRUEと評価するルールを作成します。このルールによって適用プロセスが使用可能になり、ユーザーによってエンキューされた変更が直接、表に適用されます。

Oracleによって提供されている評価コンテキストSYS.STREAMS$_EVALUATION_CONTEXTがLCR評価に使用されているため、便宜上このルールが使用されます。このルールがルール・セットに追加されると、評価時には例23-10で作成された評価コンテキストoe_eval_contextのかわりに、Oracleによって提供されている評価コンテキストが使用されます。

例23-13 LCR行イベントの場合にTRUEと評価するルールの作成

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE
CONNECT strmadmin/&password@db01;
set echo on

BEGIN
  DBMS_RULE_ADM.CREATE_RULE(
    rule_name           =>  'apply_lcrs',
    condition           =>  ':dml.GET_OBJECT_OWNER() = ''OE'' AND ' || 
                            ' (:dml.GET_OBJECT_NAME() = ''ORDERS'' OR ' || 
                            ':dml.GET_OBJECT_NAME() = ''CUSTOMERS'') ',
    evaluation_context  =>  'SYS.STREAMS$_EVALUATION_CONTEXT');
END;
/

例23-14では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、例23-12で作成されたapply_actionルール、および例23-13で作成されたapply_lcrsルールを、例23-11で作成されたapply_oe_rsルール・セットに追加します。

例23-14 ルール・セットへのルールの追加

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDECONNECT strmadmin/&password@db01;set echo on

BEGIN
  DBMS_RULE_ADM.ADD_RULE(
    rule_name          =>  'apply_action',
    rule_set_name      =>  'apply_oe_rs');
  DBMS_RULE_ADM.ADD_RULE(
    rule_name          =>  'apply_lcrs',
    rule_set_name      =>  'apply_oe_rs');
END;
/

例23-15では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、キューoe_queueに関連付けられ、apply_oe_rsルール・セットを使用し、メッセージ・ハンドラとしてmes_handlerプロシージャを使用する適用プロセスを作成します。

例23-15 適用プロセスの作成

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDECONNECT strmadmin/&password@db01;set echo on

BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    queue_name       =>  'strmadmin.oe_queue',
    apply_name       =>  'apply_oe',
    rule_set_name    =>  'strmadmin.apply_oe_rs',
    message_handler  =>  'oe.mes_handler',
    apply_user       =>  'oe',
    apply_captured   =>  false);
END;
/

例23-15で適用プロセスが作成された時、oeが適用ユーザーとして指定されているため、このユーザーには適用プロセスによって使用されるstrmadmin.apply_oe_rsルール・セットに対するEXECUTE権限が必要です。例23-16でこれを実行するには、管理者ユーザーstrmadminとしてデータベースdb01に接続します。

例23-16 ルール・セットに対するEXECUTE権限をoeユーザーに付与

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDECONNECT strmadmin/&password@db01;set echo on

BEGIN 
  DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.EXECUTE_ON_RULE_SET,
    object_name  => 'strmadmin.apply_oe_rs',
    grantee      => 'oe', 
    grant_option => FALSE);
END;
/

例23-17では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、エラーが発生したとき適用プロセスが使用不可にならないように、disable_on_errorパラメータをnに設定して適用プロセスを開始します。

例23-17 適用プロセスの開始

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDECONNECT strmadmin/&password@db01;set echo on

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name  => 'apply_oe', 
    parameter   => 'disable_on_error', 
    value       => 'n');
  DBMS_APPLY_ADM.START_APPLY(
    apply_name  =>  'apply_oe');
END;
/

明示的デキューの構成

この項の例では、メッセージ内容に基づきメッセージの明示的デキューを構成する方法について説明します。

例23-18では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、エージェントexplicit_dqを作成します。このエージェントは、oe_queueキューに対する明示的なデキュー操作の実行に使用されます。

例23-18 明示的デキューに対するエージェントの作成

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDECONNECT strmadmin/&password@db01;set echo on

BEGIN
  SYS.DBMS_AQADM.CREATE_AQ_AGENT(
    agent_name      => 'explicit_dq');
END;
/

キューoe_queueは、例23-2SET_UP_QUEUEを使用して作成されたため保護キューです。保護キューに対してエンキューおよびデキュー操作を実行するユーザーは、キューの保護キュー・ユーザーとして構成する必要があります。

例23-19では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、oeユーザーをエージェントexplicit_dqに関連付けます。エージェントを使用して例23-20のキューにサブスクライバを作成する場合、oeユーザーはoe_queueキューに対するデキュー操作を実行できます。

例23-19 ユーザーoeとエージェントexplicit_dqの関連付け

set echo offset verify offACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDECONNECT strmadmin/&password@db01;set echo on

BEGIN
  DBMS_AQADM.ENABLE_DB_ACCESS(
    agent_name  => 'explicit_dq',
    db_username => 'oe');
END;
/

例23-20では、管理者ユーザーstrmadminとしてデータベースdb01に接続し、oe_queueキューにサブスクライバを追加します。このサブスクライバによってメッセージの明示的デキューが実行されます。サブスクライバ・ルールを使用して、action値がapplyではない任意のメッセージをデキューします。メッセージに対するアクション値がapplyの場合、メッセージはサブスクライバによって無視されます。そのようなメッセージはデキューされ、適用プロセスによって処理されます。

例23-20 oe_queueキューへのサブスクライバの追加

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDECONNECT strmadmin/&password@db01;set echo on

DECLARE
  subscriber SYS.AQ$_AGENT;
BEGIN
  subscriber :=  SYS.AQ$_AGENT('explicit_dq', NULL, NULL);
  SYS.DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name  =>  'strmadmin.oe_queue',
    subscriber  =>  subscriber,
    rule        =>  'oe.get_oe_action(tab.user_data) != ''APPLY''');
END;
/

例23-21では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、explicit_dqというPL/SQLプロシージャを作成して、例23-20で作成されたサブスクライバを使用してメッセージを明示的にデキューします。

このプロシージャはメッセージのデキュー後にコミットされます。コミットによって、デキューされたメッセージがサブスクライバによって正常にコンシュームされたことがキューに通知されます。

このプロシージャは複数のトランザクションを処理でき、2つの例外ハンドラを使用します。他にメッセージがない場合は、例外ハンドラnext_transは次のトランザクションに移動し、例外ハンドラno_messagesはループを終了します。

例23-21 メッセージを明示的にデキューするプロシージャの作成

SET ECHO OFFSET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDECONNECT oe/&password@db01;set echo on

CREATE PROCEDURE oe.explicit_dq (consumer IN VARCHAR2) AS
  deqopt       DBMS_AQ.DEQUEUE_OPTIONS_T;
  mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
  msgid        RAW(16);
  payload      ANYDATA;
  new_messages BOOLEAN := TRUE;
  ord          oe.order_event_typ;
  cust         oe.customer_event_typ;
  tc           pls_integer;
  next_trans   EXCEPTION;
  no_messages  EXCEPTION; 
  pragma exception_init (next_trans, -25235);
  pragma exception_init (no_messages, -25228);
BEGIN
  deqopt.consumer_name := consumer;
  deqopt.wait := 1;
  WHILE (new_messages) LOOP
    BEGIN
    DBMS_AQ.DEQUEUE(
      queue_name          =>  'strmadmin.oe_queue',
      dequeue_options     =>  deqopt,
      message_properties  =>  mprop,
      payload             =>  payload,
      msgid               =>  msgid);
    COMMIT;
    deqopt.navigation := DBMS_AQ.NEXT;
    DBMS_OUTPUT.PUT_LINE('Message Dequeued');
    DBMS_OUTPUT.PUT_LINE('Type Name := ' || payload.GetTypeName);
    IF (payload.GetTypeName = 'OE.ORDER_EVENT_TYP') THEN
      tc := payload.GetObject(ord); 
      DBMS_OUTPUT.PUT_LINE('order_id     - ' || ord.order_id);
      DBMS_OUTPUT.PUT_LINE('order_date   - ' || ord.order_date);
      DBMS_OUTPUT.PUT_LINE('order_mode   - ' || ord.order_mode);
      DBMS_OUTPUT.PUT_LINE('customer_id  - ' || ord.customer_id);
      DBMS_OUTPUT.PUT_LINE('order_status - ' || ord.order_status);
      DBMS_OUTPUT.PUT_LINE('order_total  - ' || ord.order_total);
      DBMS_OUTPUT.PUT_LINE('sales_rep_id - ' || ord.sales_rep_id);
      DBMS_OUTPUT.PUT_LINE('promotion_id - ' || ord.promotion_id);
    END IF;
    IF (payload.GetTypeName = 'OE.CUSTOMER_EVENT_TYP') THEN
      tc := payload.GetObject(cust);
      DBMS_OUTPUT.PUT_LINE('customer_id       - ' || cust.customer_id);
      DBMS_OUTPUT.PUT_LINE('cust_first_name   - ' || cust.cust_first_name);
      DBMS_OUTPUT.PUT_LINE('cust_last_name    - ' || cust.cust_last_name);
      DBMS_OUTPUT.PUT_LINE('street_address    - ' || 
                              cust.cust_address.street_address);
      DBMS_OUTPUT.PUT_LINE('postal_code       - ' || 
                              cust.cust_address.postal_code);
      DBMS_OUTPUT.PUT_LINE('city              - ' || cust.cust_address.city);
      DBMS_OUTPUT.PUT_LINE('state_province    - ' || 
                              cust.cust_address.state_province);
      DBMS_OUTPUT.PUT_LINE('country_id        - ' || 
                              cust.cust_address.country_id);
      DBMS_OUTPUT.PUT_LINE('phone_number1     - ' || cust.phone_numbers(1));
      DBMS_OUTPUT.PUT_LINE('phone_number2     - ' || cust.phone_numbers(2));
      DBMS_OUTPUT.PUT_LINE('phone_number3     - ' || cust.phone_numbers(3));
      DBMS_OUTPUT.PUT_LINE('nls_language      - ' || cust.nls_language);
      DBMS_OUTPUT.PUT_LINE('nls_territory     - ' || cust.nls_territory);
      DBMS_OUTPUT.PUT_LINE('credit_limit      - ' || cust.credit_limit);
      DBMS_OUTPUT.PUT_LINE('cust_email        - ' || cust.cust_email);
      DBMS_OUTPUT.PUT_LINE('account_mgr_id    - ' || cust.account_mgr_id);
      DBMS_OUTPUT.PUT_LINE('date_of_birth     - ' || cust.date_of_birth);
      DBMS_OUTPUT.PUT_LINE('marital_status    - ' || cust.marital_status);
      DBMS_OUTPUT.PUT_LINE('gender            - ' || cust.gender);
      DBMS_OUTPUT.PUT_LINE('income_level      - ' || cust.income_level);
    END IF;
    EXCEPTION
      WHEN next_trans THEN
      deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
      WHEN no_messages THEN
        new_messages  := FALSE;
        DBMS_OUTPUT.PUT_LINE('No more messagess');
     END;
  END LOOP; 
END;
/

メッセージのエンキュー

この項の例では、LCR以外のメッセージおよびLCR行メッセージをキューにエンキューする方法について説明します。


注意:

ユーザーによってエンキューされたLCRを明示的にデキューできますが、この例ではこの機能は説明しません。

例23-22では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、action値をapplyにして2つのメッセージをエンキューします。適用プロセス・ルールに基づき、適用プロセスは例23-9で作成されたoe.mes_handlerメッセージ・ハンドラ・プロシージャを使用して、これらのメッセージをデキューして処理します。エンキュー後のCOMMITによって、これらの2つのエンキューが同じトランザクションに属すようになります。エンキューされたメッセージは、それをエンキューしたセッションがエンキューをコミットするまで表示されません。

例23-22 適用プロセスによってデキューされるLCR以外のメッセージのエンキュー

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDECONNECT oe/&password@db01;set echo on

BEGIN
  oe.enq_proc(ANYDATA.convertobject(oe.order_event_typ(
    2500,'05-MAY-01','online',117,3,44699,161,NULL,'APPLY')));
END;
/
BEGIN
  oe.enq_proc(ANYDATA.convertobject(oe.customer_event_typ(
    990,'Hester','Prynne',oe.cust_address_typ('555 Beacon Street',
    '02109','Boston','MA','US'),oe.phone_list_typ('+1 617 123 4104',
    '+1 617 083 4381','+1 617 742 5813'),'i','AMERICA',5000,
    'a@scarlet_letter.com',145,NULL,'SINGLE','F','UNDER 50,000','APPLY')));
END;
/ 
COMMIT;

例23-23では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、action値をdequeueにして2つのメッセージをエンキューします。actionapplyでないため、例23-21で作成されたoe.explicit_dqプロシージャはこれらのメッセージをデキューします。適用プロセス・ルールに基づき、適用プロセスではこれらのメッセージを無視します。エンキュー後のCOMMITによって、これらの2つのエンキューが同じトランザクションに属すようになります。

例23-23 明示的にデキューされるLCR以外のメッセージのエンキュー

SET ECHO OFFSET VERIFY OFFACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDECONNECT oe/&password@db01;set echo on

BEGIN
  oe.enq_proc(ANYDATA.convertobject(oe.order_event_typ(
    2501,'22-JAN-00','direct',117,3,22788,161,NULL,'DEQUEUE')));
END;
/
BEGIN
  oe.enq_proc(ANYDATA.convertobject(oe.customer_event_typ(
    991,'Nick','Carraway',oe.cust_address_typ('10th Street',
    '11101','Long Island','NY','US'),oe.phone_list_typ('+1 718 786 2287', 
    '+1 718 511 9114', '+1 718 888 4832'),'i','AMERICA',3000,
    'nick@great_gatsby.com',149,NULL,'MARRIED','M','OVER 150,000','DEQUEUE')));
END;
/
COMMIT;

例23-24では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、oe.orders表に行を挿入するLCR行、およびその行を更新する別のLCRを作成します。適用プロセスはこれらのメッセージに直接適用されます。


注意:

エンキューされたLCRはトランザクションの境界でコミットする必要があります。この例では、各エンキュー後にCOMMIT文が実行され、各エンキューを別々のトランザクションにします。ただし、トランザクション内に複数のLCRがある場合は、コミット前に複数のLCRエンキューを実行できます。

例23-24 適用プロセスによってデキューされるLCR行のエンキュー

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE
CONNECT oe/&password@db01;
set echo on

DECLARE
  newunit1  SYS.LCR$_ROW_UNIT;
  newunit2  SYS.LCR$_ROW_UNIT;
  newunit3  SYS.LCR$_ROW_UNIT;
  newunit4  SYS.LCR$_ROW_UNIT;
  newunit5  SYS.LCR$_ROW_UNIT;
  newunit6  SYS.LCR$_ROW_UNIT;
  newunit7  SYS.LCR$_ROW_UNIT;
  newunit8  SYS.LCR$_ROW_UNIT;
  newvals   SYS.LCR$_ROW_LIST;
BEGIN
  newunit1 := SYS.LCR$_ROW_UNIT(
    'ORDER_ID',ANYDATA.ConvertNumber(2502),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  newunit2 := SYS.LCR$_ROW_UNIT(
    'ORDER_DATE',ANYDATA.ConvertTimestampLTZ('04-NOV-00'),DBMS_LCR.NOT_A_LOB,
    NULL,NULL);
  newunit3 := SYS.LCR$_ROW_UNIT(
    'ORDER_MODE',ANYDATA.ConvertVarchar2('online'),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  newunit4 := SYS.LCR$_ROW_UNIT(
    'CUSTOMER_ID',ANYDATA.ConvertNumber(145),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  newunit5 := SYS.LCR$_ROW_UNIT(
    'ORDER_STATUS',ANYDATA.ConvertNumber(3),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  newunit6 := SYS.LCR$_ROW_UNIT(
    'ORDER_TOTAL',ANYDATA.ConvertNumber(35199),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  newunit7 := SYS.LCR$_ROW_UNIT(
    'SALES_REP_ID',ANYDATA.ConvertNumber(160),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  newunit8 := SYS.LCR$_ROW_UNIT(
    'PROMOTION_ID',ANYDATA.ConvertNumber(1),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  newvals := SYS.LCR$_ROW_LIST(
    newunit1,newunit2,newunit3,newunit4,newunit5,newunit6,newunit7,newunit8);
  oe.enq_row_lcr('DB01','INSERT','OE','ORDERS',NULL,newvals);
END;
/
COMMIT;
DECLARE
  oldunit1  SYS.LCR$_ROW_UNIT;
  oldunit2  SYS.LCR$_ROW_UNIT;
  oldvals   SYS.LCR$_ROW_LIST;
  newunit1  SYS.LCR$_ROW_UNIT;
  newvals   SYS.LCR$_ROW_LIST;
BEGIN
  oldunit1 := SYS.LCR$_ROW_UNIT(
    'ORDER_ID',ANYDATA.ConvertNumber(2502),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  oldunit2 := SYS.LCR$_ROW_UNIT(
    'ORDER_TOTAL',ANYDATA.ConvertNumber(35199),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2);
  newunit1 := SYS.LCR$_ROW_UNIT(
    'ORDER_TOTAL',ANYDATA.ConvertNumber(5235),DBMS_LCR.NOT_A_LOB,NULL,NULL);
  newvals := SYS.LCR$_ROW_LIST(newunit1);
  oe.enq_row_lcr('DB01','UPDATE','OE','ORDERS',oldvals,newvals);
END;
/
COMMIT;

メッセージの明示的なデキューと適用されたメッセージの問合せ

この項の例では、メッセージを明示的にデキューして、適用プロセスによって適用されたメッセージを問い合せる方法について説明します。この例では、前の項でエンキューされたメッセージを使用します。

例23-25では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、例23-21で作成されたプロシージャexplicit_dqを実行します。デキューするメッセージのコンシューマとして、例23-20で追加されたサブスクライバexplicit_dqを指定します。この例では、このプロシージャによって明示的にデキューされないメッセージは、適用プロセスによってデキューされます。

例23-25 メッセージの明示的なデキュー

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE
CONNECT oe/&password@db01;
set echo on

CREATE PROCEDURE oe.enq_proc (payload ANYDATA)  IS 
SET SERVEROUTPUT ON SIZE 100000;
EXEC oe.explicit_dq('explicit_dq');

この例は、例23-23でエンキューされたメッセージのペイロードを戻します。

Message Dequeued
Type Name := OE.ORDER_EVENT_TYP
order_id     - 2501
order_date   - 22-JAN-00 12.00.00.000000 AM
order_mode   - direct
customer_id  - 117
order_status - 3
order_total  - 22788
sales_rep_id - 161
promotion_id -
Message Dequeued
Type Name := OE.CUSTOMER_EVENT_TYP
customer_id       - 991
cust_first_name   - Nick
cust_last_name    - Carraway
street_address    - 10th Street
postal_code       - 11101
city              - Long Island
state_province    - NY
country_id        - US
phone_number1     - +1 718 786 2287
phone_number2     - +1 718 511 9114
phone_number3     - +1 718 888 4832
nls_language      - i
nls_territory     - AMERICA
credit_limit      - 3000
cust_email        - nick@great_gatsby.com
account_mgr_id    - 149
date_of_birth     -
marital_status    - MARRIED
gender            - M
income_level      - OVER 150,000
No more messages

例23-26では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、oe.ordersおよびoe.customers表を問い合せて、例23-15で作成された適用プロセスapply_oeによって適用されたメッセージに対応する行を確認します。

例23-26 適用されたメッセージの問合せ

SET ECHO OFF
SET VERIFY OFF
ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE
CONNECT oe/&password@db01; 
set echo on

CREATE PROCEDURE oe.enq_proc (payload ANYDATA)  IS 
SELECT order_id, order_date, customer_id, order_total
  FROM oe.orders WHERE order_id = 2500;
SELECT cust_first_name, cust_last_name, cust_email 
  FROM oe.customers WHERE customer_id = 990;
SELECT order_id, order_date, customer_id, order_total 
  FROM oe.orders WHERE order_id = 2502;

この例は、次の3行を戻します。

  ORDER_ID ORDER_DATE                     CUSTOMER_ID ORDER_TOTAL
---------- ------------------------------ ----------- -----------
      2500 05-MAY-01 12.00.00.000000 AM           117       44699
 
1 row selected.

CUST_FIRST_NAME      CUST_LAST_NAME       CUST_EMAIL
-------------------- -------------------- ------------------------------
Hester               Prynne               a@scarlet_letter.com
 
1 row selected.

  ORDER_ID ORDER_DATE                     CUSTOMER_ID ORDER_TOTAL
---------- ------------------------------ ----------- -----------
      2502 04-NOV-00 12.00.00.000000 AM           145        5235
 
1 row selected.

JMSを使用したメッセージのエンキューおよびデキュー

この項の例では、LCR以外のメッセージおよびLCR行をキューにエンキューし、Java Message Service(JMS)を使用してデキューする方法について説明します。

Oracle Databaseは、JDK 1.2、JDK 1.3、JDK 1.4、およびすべてのclasses12*.*ファイルをサポートしません。JDK 5.nおよびJDK 6.nでそれぞれojdbc5.jarおよびojbc6.jarを使用する必要があります。次のjarファイルとzipファイルは使用しているJDKのリリースに基づくCLASSPATHにある必要があります。

JDK 1.5.xの場合、CLASSPATHに次のパスを含める必要があります。

ORACLE_HOME/jdbc/lib/ojdbc5.jar

JDK 1.6.xの場合、CLASSPATHに次のパスを含める必要があります。

ORACLE_HOME/jdbc/lib/ojdbc6.jar 

いずれかのJDKバージョンに次のファイルを使用します。

ORACLE_HOME/lib/jta.jar
ORACLE_HOME/xdk/lib/xmlparserv2.jar
ORACLE_HOME/rdbms/jlib/xdb.jar
ORACLE_HOME/jlib/aqapi.jar 
ORACLE_HOME/rdbms/jlib/jmscommon.jar

また、LD_LIBRARY_PATH(LinuxおよびSolaris)またはPATH(Windows)に、ORACLE_HOME/libが含まれていることを確認してください。

次の例は、JMSメッセージをキューおよびエージェントexplicit_dqにエンキューした後デキューする、サンプル・スキーマ・ユーザーoeを示しています。エージェントexplicit_dq例23-18で作成され、例23-19のサンプル・スキーマ・ユーザーoeに関連付けられ、例23-20のキューoe_queueのサブスクライバになっています。

サンプル・スキーマ・ユーザーoeには、例23-1DBMS_AQに対するEXECUTE権限が付与されています。このユーザーがOracle JMSインタフェースを使用するためには、DBMS_AQINに対するEXECUTE権限も必要です。例23-27では、管理者権限を持つユーザーとしてデータベースdb01に接続し、必要な権限をoeに付与します。

例23-27 DBMS_AQINに対するEXECUTE権限をユーザーoeに付与

GRANT EXECUTE on DBMS_AQIN to oe;

DBMS_STREAMS_ADM.SET_UP_QUEUE()の後でDBMS_AQADM.ENABLE_JMS_TYPES(queue_table_name)をコールしないと、JMS型とXML型のエンキューはOracle StreamsのANYDATAキューでは機能しません。例23-28では、例23-1で作成された管理者ユーザーstrmadminとしてデータベースdb01に接続し、例23-2で作成されたANYDATAキュー表oe_queue_tableに対してENABLE_JMS_TYPESを実行します。

例23-28 ANYDATAキューに対するJMS型の有効化

CONNECT strmadmin;
Enter password: password
BEGIN
  DBMS_AQADM.ENABLE_JMS_TYPES('oe_queue_table');
END;
/

注意:

Oracle Streamsキューをこれらの型に対して使用可能にすると、キュー表のインポート/エクスポートに影響が出る場合があります。

例23-29では、サンプル・スキーマ・ユーザーoeとしてデータベースdb01に接続し、address型およびperson型を作成します。

例23-29 Oracleオブジェクト型addressおよびpersonの作成

CONNECT oe;
Enter password: password
CREATE TYPE address AS OBJECT (street VARCHAR (30), num NUMBER)
/ 
CREATE TYPE person AS OBJECT (name VARCHAR (30), home ADDRESS)
/

例23-30では、JPublisherを使用して、person型とaddress型に対してそれぞれJPersonJAddressという2つのJavaクラスを生成します。JPublisherへの入力は、次の行を含むinput.typというファイルです。

SQL PERSON AS JPerson
SQL ADDRESS AS JAddress

例23-30 Oracleオブジェクト型にマップするJavaクラスの作成

jpub -input=input.typ -user=OE/OE

例23-31は、JMSテキスト・メッセージ、LCRおよびLCR ADT以外のメッセージをOracle Streamsトピックにパブリッシュするために使用するJavaコードです。このガイドには、次のような役割があります。

  • JDBC OCIドライバを使用して、TopicConnectionFactoryを作成


    注意:

    JDBC OCIドライバは、JMSを介してOracle Streamsにアクセスする場合のみ選択できます。

  • TopicSessionの作成

  • 接続の開始

  • メソッドpublishUserMessages()を作成して、ADTメッセージおよびJMSテキスト・メッセージをOracle Streamsトピックにパブリッシュ

  • メソッドpublishLcrMessages()を作成して、XML LCRメッセージをOracle Streamsトピックにパブリッシュ

  • 3つのメッセージをパブリッシュして、実行中にフィードバックを提供

メソッドpublishUserMessages()は、次の処理を実行します。

  • トピックの取得

  • パブリッシャの作成

  • エージェントexplicit_enqを指定してキューoe_queueにアクセス

  • PERSON ADTメッセージの作成

  • メッセージ内のペイロードの設定

  • 受信者としてexplicit_dqを指定

  • PERSON ADTメッセージのパブリッシュ

  • JMSテキスト・メッセージの作成

  • JMSテキスト・メッセージのパブリッシュ

メソッドpublishLcrMessages()は、次の処理を実行します。

  • トピックの取得

  • パブリッシャの作成

  • JDBC接続の取得

  • エージェントexplicit_enqを指定してキューoe_queueにアクセス

  • ADTメッセージの作成

  • XMLでのLCR表現の作成

  • LCRを含むXMLTypeの作成

  • メッセージ内のペイロードの設定

  • 受信者としてexplicit_dqを指定

  • LCRのパブリッシュ

コードは例23-33でコンパイルされます。現時点では、StreamsEnq.javaとして保存します。

例23-31 メッセージをエンキューするJavaコード

import oracle.AQ.*;
import oracle.jms.*;
import javax.jms.*;
import java.lang.*;
import oracle.xdb.*;
 
public class StreamsEnq
{
  public static void main (String args [])
       throws java.sql.SQLException, ClassNotFoundException, JMSException
  {
     TopicConnectionFactory tc_fact= null;
     TopicConnection        t_conn = null;
     TopicSession           t_sess = null;
 
     try
     {
       if (args.length < 3 )
         System.out.println("Usage:java filename [SID] [HOST] [PORT]");
       else 
       {
         tc_fact = AQjmsFactory.getTopicConnectionFactory(
                      args[1], args[0], Integer.parseInt(args[2]), "oci8");
         t_conn = tc_fact.createTopicConnection( "OE","OE");
         t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
         t_conn.start() ;
         publishUserMessages(t_sess);
         publishLcrMessages(t_sess);
         t_sess.close() ;
         t_conn.close() ;
         System.out.println("End of StreamsEnq Demo") ;
       }
     }
     catch (Exception ex)
     {
       System.out.println("Exception-1: " + ex);
       ex.printStackTrace();
     }
  }
 
  public static void publishUserMessages(TopicSession t_sess) throws Exception
  {
    Topic           topic     = null;
    TopicPublisher  t_pub     = null;
    JPerson         pers      = null;
    JAddress        addr      = null;
    TextMessage     t_msg     = null;
    AdtMessage      adt_msg   = null;
    AQjmsAgent      agent     = null;
    AQjmsAgent[]    recipList = null;
 
    try
    {
      topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue");
      t_pub = t_sess.createPublisher(topic);
      agent = new AQjmsAgent("explicit_enq", null);
      adt_msg = ((AQjmsSession)t_sess).createAdtMessage();
      pers = new JPerson();
      addr = new JAddress();
      addr.setNum(new java.math.BigDecimal(500));
      addr.setStreet("Oracle Pkwy");
      pers.setName("Mark");
      pers.setHome(addr);
      adt_msg.setAdtPayload(pers);
      ((AQjmsMessage)adt_msg).setSenderID(agent);
      System.out.println("Publish message 1 -type  PERSON\n");
      recipList = new AQjmsAgent[1];
      recipList[0] = new AQjmsAgent("explicit_dq", null);
      ((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList);
      t_sess.commit();
 
      t_msg = t_sess.createTextMessage();
      t_msg.setText("Test message");
      t_msg.setStringProperty("color", "BLUE");
      t_msg.setIntProperty("year", 1999);
      ((AQjmsMessage)t_msg).setSenderID(agent);
      System.out.println("Publish message 2 -type  JMS TextMessage\n");
      ((AQjmsTopicPublisher)t_pub).publish(topic, t_msg, recipList);
      t_sess.commit();
 
    }
    catch (JMSException jms_ex)
    {
      System.out.println("JMS Exception: " + jms_ex);
      if(jms_ex.getLinkedException() != null)
        System.out.println("Linked Exception: " + jms_ex.getLinkedException());
    }
  }
 
  public static void publishLcrMessages(TopicSession t_sess) throws Exception
  {
    Topic                topic     = null;
    TopicPublisher       t_pub     = null;
    XMLType              xml_lcr   = null;
    AdtMessage           adt_msg   = null;
    AQjmsAgent           agent     = null;
    StringBuffer         lcr_data  = null;
    AQjmsAgent[]         recipList = null;
    java.sql.Connection  db_conn   = null;
 
    try
    {
      topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue");
      t_pub = t_sess.createPublisher(topic);
      db_conn = ((AQjmsSession)t_sess).getDBConnection();
      agent = new AQjmsAgent("explicit_enq", null);
      adt_msg = ((AQjmsSession)t_sess).createAdtMessage();
      lcr_data = new StringBuffer();
 
      lcr_data.append("<ROW_LCR ");
      lcr_data.append("xmlns='http://xmlns.oracle.com/streams/schemas/lcr' \n");
      lcr_data.append("xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' \n");
      lcr_data.append("xsi:schemaLocation='http://xmlns.oracle.com/streams/schemas/lcr "); 
      lcr_data.append("http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd'");
      lcr_data.append("> \n");
      lcr_data.append("<source_database_name>source_dbname</source_database_name> \n");
      lcr_data.append("<command_type>INSERT</command_type> \n");
      lcr_data.append("<object_owner>Ram</object_owner> \n");
      lcr_data.append("<object_name>Emp</object_name> \n");
      lcr_data.append("<tag>0ABC</tag> \n");
      lcr_data.append("<transaction_id>0.0.0</transaction_id> \n");
      lcr_data.append("<scn>0</scn> \n");
      lcr_data.append("<old_values> \n");
      lcr_data.append("<old_value> \n");
      lcr_data.append("<column_name>C01</column_name> \n");
      lcr_data.append("<data><varchar2>Clob old</varchar2></data> \n");
      lcr_data.append("</old_value> \n");
      lcr_data.append("<old_value> \n");
      lcr_data.append("<column_name>C02</column_name> \n");
      lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n");
      lcr_data.append("</old_value> \n");
      lcr_data.append("<old_value> \n");
      lcr_data.append("<column_name>C03</column_name> \n");
      lcr_data.append("<data> \n");
      lcr_data.append("<date><value>1997-11-24</value><format>SYYYY-MM-DD</format></date> \n");
      lcr_data.append("</data> \n");
      lcr_data.append("</old_value> \n");
      lcr_data.append("<old_value> \n");
      lcr_data.append("<column_name>C04</column_name> \n");
      lcr_data.append("<data> \n");
      lcr_data.append("<timestamp><value>1999-05-31T13:20:00.000</value>");
      lcr_data.append("<format>SYYYY-MM-DD\"T\"HH24:MI:SS.FF</format></timestamp> \n");
      lcr_data.append("</data> \n");
      lcr_data.append("</old_value> \n");
      lcr_data.append("<old_value> \n");
      lcr_data.append("<column_name>C05</column_name> \n");
      lcr_data.append("<data><raw>ABCDE</raw></data> \n");
      lcr_data.append("</old_value> \n");
      lcr_data.append("</old_values> \n");
      lcr_data.append("<new_values> \n");
      lcr_data.append("<new_value> \n");
      lcr_data.append("<column_name>C01</column_name> \n");
      lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n");
      lcr_data.append("</new_value> \n");
      lcr_data.append("<new_value> \n");
      lcr_data.append("<column_name>C02</column_name> \n");
      lcr_data.append("<data><number>35.23</number></data> \n");
      lcr_data.append("</new_value> \n");
      lcr_data.append("<new_value> \n");
      lcr_data.append("<column_name>C03</column_name> \n");
      lcr_data.append("<data><number>-100000</number></data> \n");
      lcr_data.append("</new_value> \n");
      lcr_data.append("<new_value> \n");
      lcr_data.append("<column_name>C04</column_name> \n");
      lcr_data.append("<data><varchar2>Hello</varchar2></data> \n");
      lcr_data.append("</new_value> \n");
      lcr_data.append("<new_value> \n");
      lcr_data.append("<column_name>C05</column_name> \n");
      lcr_data.append("<data><char>world</char></data> \n");
      lcr_data.append("</new_value> \n");
      lcr_data.append("</new_values> \n");
      lcr_data.append("</ROW_LCR>");
 
      xml_lcr = oracle.xdb.XMLType.createXML(db_conn, lcr_data.toString());
      adt_msg.setAdtPayload(xml_lcr);
      ((AQjmsMessage)adt_msg).setSenderID(agent);
      System.out.println("Publish message 3 - XMLType containing LCR ROW\n");
      recipList = new AQjmsAgent[1];
      recipList[0] = new AQjmsAgent("explicit_dq", null);
      ((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList);
      t_sess.commit();
 
    }
    catch (JMSException jms_ex)
    {
      System.out.println("JMS Exception: " + jms_ex);
      if(jms_ex.getLinkedException() != null)
        System.out.println("Linked Exception: " + jms_ex.getLinkedException());
    }
  }
}

例23-32は、Oracle Streamsトピックからのメッセージの受信に使用するJavaコードです。このガイドには、次のような役割があります。

  • JDBC OCIドライバを使用して、TopicConnectionFactoryを作成


    注意:

    JDBC OCIドライバは、JMSを介してOracle Streamsにアクセスする場合のみ選択できます。

  • TopicSessionの作成

  • 接続の開始

  • receiveMessages()メソッドを作成して、Oracle Streamsトピックからメッセージを受信

  • 3つのメッセージを受信して、実行中にフィードバックを提供

メソッドreceiveMessages()は、次の処理を実行します。

  • トピックの取得

  • TopicReceiverを作成して、コンシューマexplicit_dqのメッセージを受信

  • JMS typemapでのADDRESSおよびPERSONのマッピング登録

  • typemapでのXMLTypeのマッピング登録(LCRに必要)

  • エンキューされたメッセージの受信

コードは例23-33でコンパイルされます。現時点では、StreamsDeq.javaとして保存します。

例23-32 メッセージをデキューするJavaコード

import oracle.AQ.*;
import oracle.jms.*;
import javax.jms.*;
import java.lang.*;
import oracle.xdb.*;
import java.sql.SQLException;
 
public class StreamsDeq
{
  public static void main (String args [])
       throws java.sql.SQLException, ClassNotFoundException, JMSException
  {
     TopicConnectionFactory tc_fact= null;
     TopicConnection        t_conn = null;
     TopicSession           t_sess = null;
 
     try
     {
       if (args.length < 3 )
         System.out.println("Usage:java filename [SID] [HOST] [PORT]");
       else 
       {
         tc_fact = AQjmsFactory.getTopicConnectionFactory(
                      args[1], args[0], Integer.parseInt(args[2]), "oci8");
         t_conn = tc_fact.createTopicConnection( "OE","OE");
 
         t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
         t_conn.start() ;
 
         receiveMessages(t_sess);
 
         t_sess.close() ;
         t_conn.close() ;
         System.out.println("\nEnd of StreamsDeq Demo") ;
       }
     }
     catch (Exception ex)
     {
       System.out.println("Exception-1: " + ex);
       ex.printStackTrace();
     }
  }
 
  public static void receiveMessages(TopicSession t_sess) throws Exception
  {
    Topic           topic   = null;
    JPerson         pers    = null;
    JAddress        addr    = null;
    XMLType         xtype   = null;
    TextMessage     t_msg   = null;
    AdtMessage      adt_msg = null;
    Message         jms_msg = null;
    TopicReceiver   t_recv  = null;
    int             i       = 0;
    java.util.Map map= null;
 
    try
    {
      topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue");
      t_recv = ((AQjmsSession)t_sess).createTopicReceiver(topic, "explicit_dq", null);
      map = ((AQjmsSession)t_sess).getTypeMap();
      map.put("OE.PERSON", Class.forName("JPerson"));
      map.put("OE.ADDRESS", Class.forName("JAddress"));
      map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLTypeFactory"));
      System.out.println("Receive messages ...\n");
      do
      {
        try
        {
          jms_msg = (t_recv.receive(10));
          i++;
 
          ((AQjmsTopicReceiver)t_recv).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_MESSAGE);
        }
        catch (JMSException jms_ex2)
        {
          if((jms_ex2.getLinkedException() != null) &&
             (jms_ex2.getLinkedException() instanceof SQLException))
          {
            SQLException sql_ex2 =(SQLException)(jms_ex2.getLinkedException());
            if(sql_ex2.getErrorCode() == 25235)
            {
              ((AQjmsTopicReceiver)t_recv).setNavigationMode(
                                                AQjmsConstants.NAVIGATION_NEXT_TRANSACTION);
              continue;
            }
            else
              throw jms_ex2;
          }
          else
            throw jms_ex2;
        }
        if(jms_msg == null)
        {
          System.out.println("\nNo more messages");
        }
        else
        {
          if(jms_msg instanceof AdtMessage)
          {
            adt_msg = (AdtMessage)jms_msg;
 
            System.out.println("Retrieved message " + i + ": " +
                               adt_msg.getAdtPayload());
            if(adt_msg.getAdtPayload() instanceof JPerson)
            {
              pers =(JPerson)( adt_msg.getAdtPayload());
              System.out.println("PERSON: Name: " + pers.getName());
            }
            else if(adt_msg.getAdtPayload() instanceof JAddress)
            {
              addr =(JAddress)( adt_msg.getAdtPayload());
              System.out.println("ADDRESS: Street" + addr.getStreet());
            }
            else if(adt_msg.getAdtPayload() instanceof oracle.xdb.XMLType)
            {
              xtype = (XMLType)adt_msg.getAdtPayload();
              System.out.println("XMLType: Data: \n" + xtype.getStringVal());
            }
            System.out.println("Msg id: " + adt_msg.getJMSMessageID());
            System.out.println();
          }
          else if(jms_msg instanceof TextMessage)
          {
            t_msg = (TextMessage)jms_msg;
 
            System.out.println("Retrieved message " + i + ": " +
                               t_msg.getText());
            System.out.println("Msg id: " + t_msg.getJMSMessageID());
            System.out.println();
          }
          else
            System.out.println("Invalid message type");
        }
      } while (jms_msg != null);
      t_sess.commit();
    }
    catch (JMSException jms_ex)
    {
      System.out.println("JMS Exception: " + jms_ex);
      if(jms_ex.getLinkedException() != null)
        System.out.println("Linked Exception: " + jms_ex.getLinkedException());
      t_sess.rollback();
    }
    catch (java.sql.SQLException sql_ex)
    {
      System.out.println("SQL Exception: " + sql_ex);
      sql_ex.printStackTrace();
      t_sess.rollback();
    }
  }
}

例23-33では、スクリプトをコンパイルします。

例23-33 StreamsEnq.javaおよびStreamsDeq.javaのコンパイル

javac StreamsEnq.java StreamsDeq.java JPerson.java JAddress.java

例23-34では、テスト環境に適したORACLE_SIDHOSTおよびPORTの値を指定して、エンキュー・プログラムを実行します。

例23-34 StreamsEnqの実行

java StreamsEnq  ORACLE_SID HOST PORT

この例は、次の行を戻します。

Publish message 1 -type  PERSON
Publish message 2 -type  JMS TextMessage
Publish message 3 - XMLType containing LCR ROW
End of StreamsEnq Demo

例23-35では、テスト環境に適したORACLE_SIDHOSTおよびPORTの値を指定して、デキュー・プログラムを実行します。

例23-35 StreamsDeqの実行

java StreamsDeq  ORACLE_SID HOST PORT