この章の例では、Oracle Streamsを使用して作成できるメッセージ環境を説明します。この例では、db01
という名前のデータベースへのアクセス権を持つSQL*Plusテスト環境を使用していることを前提にしています。
内容は次のとおりです。
関連項目 メッセージおよびANYDATA キューの詳細は、『Oracle Streams概要および管理』を参照してください。
|
この例では、1つのANYDATA
キューを使用して、同じキュー内に異なるタイプのメッセージ・ペイロードが格納されるOracle Streamsメッセージ環境を説明します。特にこの例ではOracle Streamsの次のメッセージ機能を説明します。
ANYDATA
ペイロードとして注文ペイロードを含むメッセージのエンキュー
ANYDATA
ペイロードとして顧客ペイロードを含むメッセージのエンキュー
ANYDATA
ペイロードとしてLCR行を含むメッセージのエンキュー
イベントを適用するためのルール・セットの作成
ルール・セットで使用される評価コンテキストの作成
デキューするOracle Streams適用プロセスとルールに基づくイベント処理の作成
メッセージ・ハンドラの作成と適用プロセスとの関連付け
適用プロセスを使用せずにルールに基づく明示的なイベントのデキューおよび処理
図23-1は、この環境の概要を示したものです。
この章の例は、oe
サンプル・スキーマを使用しているため、oe
ユーザーにはDBMS_AQ
パッケージのサブプログラムを実行する権限が必要です。これは、例23-1で実行されます。
注意: oe ユーザーは、例23-2でANYDATA キューを作成するときにキュー・ユーザーとして指定されます。SET_UP_QUEUE プロシージャは、oe ユーザーにキューのエンキューおよびデキュー権限を付与しますが、oe ユーザーには、メッセージをエンキューおよびデキューするために、DBMS_AQ パッケージのEXECUTE 権限も必要です。
|
この例で説明するほとんどの構成および管理アクションは、Oracle Streams管理者strmadmin
によって実行されます。例23-1は、このユーザーを作成して、必要な権限も付与します。これらの権限によって、ユーザーはOracle Streamsに関連するパッケージでのサブプログラムの実行、ルール・セットの作成、ルールの作成、およびデータ・ディクショナリ・ビューの問合せによるOracle Streams環境の監視ができるようになります。
例23-1では、管理権限を持つユーザーとしてデータベースdb01
に接続します。
例23-1 ANYDATAユーザーの設定
GRANT EXECUTE ON DBMS_AQ TO oe; CREATE USER strmadmin IDENTIFIED BY strmadmin DEFAULT TABLESPACE example; GRANT DBA, SELECT_CATALOG_ROLE TO strmadmin; GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin; GRANT EXECUTE ON DBMS_AQ TO strmadmin; GRANT EXECUTE ON DBMS_AQADM TO strmadmin; GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin; BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'strmadmin', grant_option => FALSE); DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'strmadmin', grant_option => FALSE); DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; /
注意:
|
例23-2では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、ANYDATA
キューoe_queue
を作成します。SET_UP_QUEUE
プロシージャは、キューのキュー表を作成してから、キューを作成および開始します。
例23-2 ANYDATAキューの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe_queue_table', queue_name => 'oe_queue'); END; /
例23-3では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、キューoe_queue
に対してoe
ユーザー権限を付与して、キューに対する明示的なエンキュー操作の実行に使用されるエージェント
explicit_enq
を作成し、oe
ユーザーをエージェントに関連付けます。
キューoe_queue
は、SET_UP_QUEUE
を使用して作成されたため保護キューです。保護キューに対してエンキューおよびデキュー操作を実行するユーザーは、キューの保護キュー・ユーザーとして構成する必要があります。oe
ユーザーをエージェントexplicit_enq
に関連付けると、oe
ユーザーはこのキューに対してエンキュー操作を実行できます。
例23-3 ANYDATAキューに対するエンキューの有効化
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN SYS.DBMS_AQADM.GRANT_QUEUE_PRIVILEGE( privilege => 'ALL', queue_name => 'strmadmin.oe_queue', grantee => 'oe'); SYS.DBMS_AQADM.CREATE_AQ_AGENT( agent_name => 'explicit_enq'); DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'explicit_enq', db_username => 'oe'); END; /
この項の例では、メッセージをANYDATA
キューoe_queue
にエンキューする2つのPL/SQLプロシージャを作成します。一方のプロシージャはLCR以外のメッセージをエンキューし、もう一方のプロシージャはLCR行メッセージをエンキューします。
例23-4では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、oe.orders
表の列に基づき注文を表す型を作成します。この型は、ANYDATA
キューoe_queue
にエンキューされるメッセージに使用されます。型属性には、action
という1つの追加属性とともにoe.orders
表に列が含まれています。この型のインスタンスのaction
属性の値は、インスタンス(適用プロセス・デキューまたは明示的デキュー)に対して実行する正しいアクションの決定に使用されます。
例23-4 注文の型の作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE TYPE order_event_typ AS OBJECT( order_id NUMBER(12), order_date TIMESTAMP(6) WITH LOCAL TIME ZONE, order_mode VARCHAR2(8), customer_id NUMBER(6), order_status NUMBER(2), order_total NUMBER(8,2), sales_rep_id NUMBER(6), promotion_id NUMBER(6), action VARCHAR(7)); /
例23-5では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、oe.customers
表の列に基づき顧客を表す型を作成します。この型は、ANYDATA
キューoe_queue
にエンキューされるメッセージに使用されます。型属性には、action
という1つの追加属性とともに、oe.customers
表に列が含まれています。この型のインスタンスのaction
属性の値は、インスタンス(適用プロセス・デキューまたは明示的デキュー)に対して実行する正しいアクションの決定に使用されます。
注意: この例は、oe.customers 表からcust_geo_location 列が削除されていることを前提としています。この列は、Oracle Spatialの場合のみ有効です。
|
例23-5 顧客の型の作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE TYPE customer_event_typ AS OBJECT( customer_id NUMBER(6), cust_first_name VARCHAR2(20), cust_last_name VARCHAR2(20), cust_address CUST_ADDRESS_TYP, phone_numbers PHONE_LIST_TYP, nls_language VARCHAR2(3), nls_territory VARCHAR2(30), credit_limit NUMBER(9,2), cust_email VARCHAR2(30), account_mgr_id NUMBER(6), date_of_birth DATE, marital_status VARCHAR2(20), gender VARCHAR2(1), income_level VARCHAR2(20), action VARCHAR(7)); /
例23-6では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、enq_proc
というPL/SQLプロシージャを作成して、LCR以外のメッセージをANYDATA
キューoe_queueにエンキューします。
注意: エンキューされた単一メッセージは、適用プロセスおよび明示的デキューによってデキューできますが、この章の例ではこの機能の説明はしません。 |
例23-6 LCR以外のメッセージをエンキューするプロシージャの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE PROCEDURE oe.enq_proc (event IN ANYDATA) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_eventid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_queue', enqueue_options => enqopt, message_properties => mprop, payload => event, msgid => enq_eventid); END; /
例23-7では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、LCR行を作成するenq_row_lcr
というPL/SQLプロシージャを作成し、LCR行をANYDATA
キューoe_queueにエンキューします。
関連項目: LCRコンストラクタの詳細は、『Oracle Database PL/SQLパッケージ・プロシージャおよびタイプ・リファレンス』を参照してください。 |
例23-7 LCRイベント行を作成およびエンキューするプロシージャの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE PROCEDURE oe.enq_row_lcr( source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); row_lcr SYS.LCR$_ROW_RECORD; BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL); row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_queue', enqueue_options => eopt, message_properties => mprop, payload => ANYDATA.ConvertObject(row_lcr), msgid => enq_msgid); END enq_row_lcr; /
この項の例では、適用プロセスを構成して、ANYDATA
キューoe_queue内のユーザーによってエンキューされたメッセージを適用します。
例23-8では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、get_oe_action
という関数を作成して、関数に対するEXECUTE
権限を管理者ユーザーstrmadmin
に付与します。
この関数は、キューoe_queue
内のメッセージのaction
属性値を決定します。この関数は、この章の後のルールでイベントに対するaction
属性の値の決定に使用されます。次に、ルール・エンジンのクライアントによって、イベントに対する適切なアクション(適用プロセスによるデキューまたは明示的デキュー)が実行されます。この例では、ルール・エンジンのクライアントは、適用プロセスとoe.explicit_dq
PL/SQLプロシージャです。
例23-8 action属性値を決定する関数の作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE FUNCTION oe.get_oe_action (event IN ANYDATA) RETURN VARCHAR2 IS ord oe.order_event_typ; cust oe.customer_event_typ; num NUMBER; type_name VARCHAR2(61); BEGIN type_name := event.GETTYPENAME; IF type_name = 'OE.ORDER_EVENT_TYP' THEN num := event.GETOBJECT(ord); RETURN ord.action; ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN num := event.GETOBJECT(cust); RETURN cust.action; ELSE RETURN NULL; END IF; END; / GRANT EXECUTE ON get_oe_action TO strmadmin;
例23-9では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、適用プロセスによってメッセージ・ハンドラとして使用されるmes_handler
というPL/SQLプロシージャを作成します。また、このプロシージャに対するEXECUTE
権限を管理者ユーザーstrmadmin
に付与します。このプロシージャでは、型oe.order_event_typ
またはoe.customer_event_typ
のユーザーによってエンキューされたメッセージのペイロードを取得し、それぞれoe.orders
表とoe.customers
表の行として挿入します。
例23-9 メッセージ・ハンドラの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE PROCEDURE oe.mes_handler (event IN ANYDATA) IS ord oe.order_event_typ; cust oe.customer_event_typ; num NUMBER; type_name VARCHAR2(61); BEGIN type_name := event.GETTYPENAME; IF type_name = 'OE.ORDER_EVENT_TYP' THEN num := event.GETOBJECT(ord); INSERT INTO oe.orders VALUES (ord.order_id, ord.order_date, ord.order_mode, ord.customer_id, ord.order_status, ord.order_total, ord.sales_rep_id, ord.promotion_id); ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN num := event.GETOBJECT(cust); INSERT INTO oe.customers VALUES (cust.customer_id, cust.cust_first_name, cust.cust_last_name, cust.cust_address, cust.phone_numbers, cust.nls_language, cust.nls_territory, cust.credit_limit, cust.cust_email, cust.account_mgr_id, cust.date_of_birth, cust.marital_status, cust.gender, cust.income_level); END IF; END; / GRANT EXECUTE ON mes_handler TO strmadmin;
例23-10では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、ルール・セットの評価コンテキストを作成します。
例23-10 ルール・セットの評価コンテキストの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on DECLARE table_alias SYS.RE$TABLE_ALIAS_LIST; BEGIN table_alias := SYS.RE$TABLE_ALIAS_LIST( SYS.RE$TABLE_ALIAS('tab', 'strmadmin.oe_queue_table')); DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT( evaluation_context_name => 'oe_eval_context', table_aliases => table_alias); END; /
例23-11では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、適用プロセスのルール・セットを作成します。
例23-11 適用プロセスのルール・セットの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_RULE_ADM.CREATE_RULE_SET( rule_set_name => 'apply_oe_rs', evaluation_context => 'strmadmin.oe_eval_context'); END; /
例23-12では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、メッセージのaction
値がapply
の場合にTRUE
と評価するルールを作成します。tab.user_data
は、oe.get_oe_action
関数に渡されます。tab.user_data
列には、キュー表内のイベント・ペイロードが保持されます。キュー表の表別名は、例23-10でtab
として指定されています。
例23-12 actionがapplyの場合にTRUEと評価するルールの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.apply_action', condition => 'oe.get_oe_action(tab.user_data) = ''APPLY'' '); END; /
例23-13では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、キュー内のイベントがoe.orders
表またはoe.customers
表を変更するLCR行の場合にTRUE
と評価するルールを作成します。このルールによって適用プロセスが使用可能になり、ユーザーによってエンキューされた変更が直接、表に適用されます。
Oracleによって提供されている評価コンテキストSYS.STREAMS$_EVALUATION_CONTEXT
がLCR評価に使用されているため、便宜上このルールが使用されます。このルールがルール・セットに追加されると、評価時には例23-10で作成された評価コンテキストoe_eval_context
のかわりに、Oracleによって提供されている評価コンテキストが使用されます。
例23-13 LCR行イベントの場合にTRUEと評価するルールの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_RULE_ADM.CREATE_RULE( rule_name => 'apply_lcrs', condition => ':dml.GET_OBJECT_OWNER() = ''OE'' AND ' || ' (:dml.GET_OBJECT_NAME() = ''ORDERS'' OR ' || ':dml.GET_OBJECT_NAME() = ''CUSTOMERS'') ', evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT'); END; /
例23-14では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、例23-12で作成されたapply_action
ルール、および例23-13で作成されたapply_lcrs
ルールを、例23-11で作成されたapply_oe_rs
ルール・セットに追加します。
例23-14 ルール・セットへのルールの追加
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_RULE_ADM.ADD_RULE( rule_name => 'apply_action', rule_set_name => 'apply_oe_rs'); DBMS_RULE_ADM.ADD_RULE( rule_name => 'apply_lcrs', rule_set_name => 'apply_oe_rs'); END; /
例23-15では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、キューoe_queue
に関連付けられ、apply_oe_rs
ルール・セットを使用し、メッセージ・ハンドラとしてmes_handler
プロシージャを使用する適用プロセスを作成します。
例23-15 適用プロセスの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'strmadmin.oe_queue', apply_name => 'apply_oe', rule_set_name => 'strmadmin.apply_oe_rs', message_handler => 'oe.mes_handler', apply_user => 'oe', apply_captured => false); END; /
例23-15で適用プロセスが作成された時、oe
が適用ユーザーとして指定されているため、このユーザーには適用プロセスによって使用されるstrmadmin.apply_oe_rs
ルール・セットに対するEXECUTE
権限が必要です。例23-16でこれを実行するには、管理者ユーザーstrmadmin
としてデータベースdb01
に接続します。
例23-16 ルール・セットに対するEXECUTE権限をoeユーザーに付与
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE( privilege => DBMS_RULE_ADM.EXECUTE_ON_RULE_SET, object_name => 'strmadmin.apply_oe_rs', grantee => 'oe', grant_option => FALSE); END; /
例23-17では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、エラーが発生したとき適用プロセスが使用不可にならないように、disable_on_error
パラメータをn
に設定して適用プロセスを開始します。
例23-17 適用プロセスの開始
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_oe', parameter => 'disable_on_error', value => 'n'); DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_oe'); END; /
この項の例では、メッセージ内容に基づきメッセージの明示的デキューを構成する方法について説明します。
例23-18では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、エージェントexplicit_dq
を作成します。このエージェントは、oe_queue
キューに対する明示的なデキュー操作の実行に使用されます。
例23-18 明示的デキューに対するエージェントの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN SYS.DBMS_AQADM.CREATE_AQ_AGENT( agent_name => 'explicit_dq'); END; /
oe_queue
キューは、例23-2のSET_UP_QUEUE
を使用して作成されたため保護キューです。保護キューに対してエンキューおよびデキュー操作を実行するユーザーは、キューの保護キュー・ユーザーとして構成する必要があります。
例23-19では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、oe
ユーザーをエージェントexplicit_dq
に関連付けます。エージェントを使用して例23-20のキューにサブスクライバを作成する場合、oe
ユーザーはoe_queue
キューに対するデキュー操作を実行できます。
例23-19 ユーザーoeとエージェントexplicit_dqの関連付け
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'explicit_dq', db_username => 'oe'); END; /
例23-20では、管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、oe_queue
キューにサブスクライバを追加します。このサブスクライバによってメッセージの明示的デキューが実行されます。サブスクライバ・ルールを使用して、action
値がapply
ではない任意のメッセージをデキューします。メッセージに対するアクション値がapply
の場合、メッセージはサブスクライバによって無視されます。そのようなメッセージはデキューされ、適用プロセスによって処理されます。
例23-20 oe_queueキューへのサブスクライバの追加
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for STRMADMIN: ' HIDE CONNECT strmadmin/&password@db01; set echo on DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('explicit_dq', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_queue', subscriber => subscriber, rule => 'oe.get_oe_action(tab.user_data) != ''APPLY'''); END; /
例23-21では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、explicit_dq
というPL/SQLプロシージャを作成して、例23-20で作成されたサブスクライバを使用してメッセージを明示的にデキューします。
このプロシージャはメッセージのデキュー後にコミットされます。コミットによって、デキューされたメッセージがサブスクライバによって正常にコンシュームされたことがキューに通知されます。
このプロシージャは複数のトランザクションを処理でき、2つの例外ハンドラを使用します。他にメッセージがない場合は、例外ハンドラnext_trans
は次のトランザクションに移動し、例外ハンドラno_messages
はループを終了します。
例23-21 メッセージを明示的にデキューするプロシージャの作成
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE PROCEDURE oe.explicit_dq (consumer IN VARCHAR2) AS deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; msgid RAW(16); payload ANYDATA; new_messages BOOLEAN := TRUE; ord oe.order_event_typ; cust oe.customer_event_typ; tc pls_integer; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); BEGIN deqopt.consumer_name := consumer; deqopt.wait := 1; WHILE (new_messages) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'strmadmin.oe_queue', dequeue_options => deqopt, message_properties => mprop, payload => payload, msgid => msgid); COMMIT; deqopt.navigation := DBMS_AQ.NEXT; DBMS_OUTPUT.PUT_LINE('Message Dequeued'); DBMS_OUTPUT.PUT_LINE('Type Name := ' || payload.GetTypeName); IF (payload.GetTypeName = 'OE.ORDER_EVENT_TYP') THEN tc := payload.GetObject(ord); DBMS_OUTPUT.PUT_LINE('order_id - ' || ord.order_id); DBMS_OUTPUT.PUT_LINE('order_date - ' || ord.order_date); DBMS_OUTPUT.PUT_LINE('order_mode - ' || ord.order_mode); DBMS_OUTPUT.PUT_LINE('customer_id - ' || ord.customer_id); DBMS_OUTPUT.PUT_LINE('order_status - ' || ord.order_status); DBMS_OUTPUT.PUT_LINE('order_total - ' || ord.order_total); DBMS_OUTPUT.PUT_LINE('sales_rep_id - ' || ord.sales_rep_id); DBMS_OUTPUT.PUT_LINE('promotion_id - ' || ord.promotion_id); END IF; IF (payload.GetTypeName = 'OE.CUSTOMER_EVENT_TYP') THEN tc := payload.GetObject(cust); DBMS_OUTPUT.PUT_LINE('customer_id - ' || cust.customer_id); DBMS_OUTPUT.PUT_LINE('cust_first_name - ' || cust.cust_first_name); DBMS_OUTPUT.PUT_LINE('cust_last_name - ' || cust.cust_last_name); DBMS_OUTPUT.PUT_LINE('street_address - ' || cust.cust_address.street_address); DBMS_OUTPUT.PUT_LINE('postal_code - ' || cust.cust_address.postal_code); DBMS_OUTPUT.PUT_LINE('city - ' || cust.cust_address.city); DBMS_OUTPUT.PUT_LINE('state_province - ' || cust.cust_address.state_province); DBMS_OUTPUT.PUT_LINE('country_id - ' || cust.cust_address.country_id); DBMS_OUTPUT.PUT_LINE('phone_number1 - ' || cust.phone_numbers(1)); DBMS_OUTPUT.PUT_LINE('phone_number2 - ' || cust.phone_numbers(2)); DBMS_OUTPUT.PUT_LINE('phone_number3 - ' || cust.phone_numbers(3)); DBMS_OUTPUT.PUT_LINE('nls_language - ' || cust.nls_language); DBMS_OUTPUT.PUT_LINE('nls_territory - ' || cust.nls_territory); DBMS_OUTPUT.PUT_LINE('credit_limit - ' || cust.credit_limit); DBMS_OUTPUT.PUT_LINE('cust_email - ' || cust.cust_email); DBMS_OUTPUT.PUT_LINE('account_mgr_id - ' || cust.account_mgr_id); DBMS_OUTPUT.PUT_LINE('date_of_birth - ' || cust.date_of_birth); DBMS_OUTPUT.PUT_LINE('marital_status - ' || cust.marital_status); DBMS_OUTPUT.PUT_LINE('gender - ' || cust.gender); DBMS_OUTPUT.PUT_LINE('income_level - ' || cust.income_level); END IF; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN new_messages := FALSE; DBMS_OUTPUT.PUT_LINE('No more messagess'); END; END LOOP; END; /
この項の例では、LCR以外のメッセージおよびLCR行メッセージをキューにエンキューする方法について説明します。
注意: ユーザーによってエンキューされたLCRを明示的にデキューできますが、この例ではこの機能は説明しません。 |
例23-22では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、action
値をapply
にして2つのメッセージをエンキューします。適用プロセス・ルールに基づき、適用プロセスは例23-9で作成されたoe.mes_handler
メッセージ・ハンドラ・プロシージャを使用して、これらのメッセージをデキューして処理します。エンキュー後のCOMMIT
によって、これらの2つのエンキューが同じトランザクションに属すようになります。エンキューされたメッセージは、それをエンキューしたセッションがエンキューをコミットするまで表示されません。
例23-22 適用プロセスによってデキューされるLCR以外のメッセージのエンキュー
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on BEGIN oe.enq_proc(ANYDATA.convertobject(oe.order_event_typ( 2500,'05-MAY-01','online',117,3,44699,161,NULL,'APPLY'))); END; / BEGIN oe.enq_proc(ANYDATA.convertobject(oe.customer_event_typ( 990,'Hester','Prynne',oe.cust_address_typ('555 Beacon Street', '02109','Boston','MA','US'),oe.phone_list_typ('+1 617 123 4104', '+1 617 083 4381','+1 617 742 5813'),'i','AMERICA',5000, 'a@scarlet_letter.com',145,NULL,'SINGLE','F','UNDER 50,000','APPLY'))); END; / COMMIT;
例23-23では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、action
値をdequeue
にして2つのメッセージをエンキューします。action
がapply
でないため、例23-21で作成されたoe.explicit_dq
プロシージャはこれらのメッセージをデキューします。適用プロセス・ルールに基づき、適用プロセスではこれらのメッセージを無視します。エンキュー後のCOMMIT
によって、これらの2つのエンキューが同じトランザクションに属すようになります。
例23-23 明示的にデキューされるLCR以外のメッセージのエンキュー
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on BEGIN oe.enq_proc(ANYDATA.convertobject(oe.order_event_typ( 2501,'22-JAN-00','direct',117,3,22788,161,NULL,'DEQUEUE'))); END; / BEGIN oe.enq_proc(ANYDATA.convertobject(oe.customer_event_typ( 991,'Nick','Carraway',oe.cust_address_typ('10th Street', '11101','Long Island','NY','US'),oe.phone_list_typ('+1 718 786 2287', '+1 718 511 9114', '+1 718 888 4832'),'i','AMERICA',3000, 'nick@great_gatsby.com',149,NULL,'MARRIED','M','OVER 150,000','DEQUEUE'))); END; / COMMIT;
例23-24では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、oe.orders
表に行を挿入するLCR行、およびその行を更新する別のLCRを作成します。適用プロセスはこれらのメッセージに直接適用されます。
注意: エンキューされたLCRはトランザクションの境界でコミットする必要があります。この例では、各エンキュー後にCOMMIT 文が実行され、各エンキューを別々のトランザクションにします。ただし、トランザクション内に複数のLCRがある場合は、コミット前に複数のLCRエンキューを実行できます。
|
例23-24 適用プロセスによってデキューされるLCR行のエンキュー
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on DECLARE newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newunit4 SYS.LCR$_ROW_UNIT; newunit5 SYS.LCR$_ROW_UNIT; newunit6 SYS.LCR$_ROW_UNIT; newunit7 SYS.LCR$_ROW_UNIT; newunit8 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN newunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_ID',ANYDATA.ConvertNumber(2502),DBMS_LCR.NOT_A_LOB,NULL,NULL); newunit2 := SYS.LCR$_ROW_UNIT( 'ORDER_DATE',ANYDATA.ConvertTimestampLTZ('04-NOV-00'),DBMS_LCR.NOT_A_LOB, NULL,NULL); newunit3 := SYS.LCR$_ROW_UNIT( 'ORDER_MODE',ANYDATA.ConvertVarchar2('online'),DBMS_LCR.NOT_A_LOB,NULL,NULL); newunit4 := SYS.LCR$_ROW_UNIT( 'CUSTOMER_ID',ANYDATA.ConvertNumber(145),DBMS_LCR.NOT_A_LOB,NULL,NULL); newunit5 := SYS.LCR$_ROW_UNIT( 'ORDER_STATUS',ANYDATA.ConvertNumber(3),DBMS_LCR.NOT_A_LOB,NULL,NULL); newunit6 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL',ANYDATA.ConvertNumber(35199),DBMS_LCR.NOT_A_LOB,NULL,NULL); newunit7 := SYS.LCR$_ROW_UNIT( 'SALES_REP_ID',ANYDATA.ConvertNumber(160),DBMS_LCR.NOT_A_LOB,NULL,NULL); newunit8 := SYS.LCR$_ROW_UNIT( 'PROMOTION_ID',ANYDATA.ConvertNumber(1),DBMS_LCR.NOT_A_LOB,NULL,NULL); newvals := SYS.LCR$_ROW_LIST( newunit1,newunit2,newunit3,newunit4,newunit5,newunit6,newunit7,newunit8); oe.enq_row_lcr('DB01','INSERT','OE','ORDERS',NULL,newvals); END; / COMMIT; DECLARE oldunit1 SYS.LCR$_ROW_UNIT; oldunit2 SYS.LCR$_ROW_UNIT; oldvals SYS.LCR$_ROW_LIST; newunit1 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN oldunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_ID',ANYDATA.ConvertNumber(2502),DBMS_LCR.NOT_A_LOB,NULL,NULL); oldunit2 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL',ANYDATA.ConvertNumber(35199),DBMS_LCR.NOT_A_LOB,NULL,NULL); oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2); newunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL',ANYDATA.ConvertNumber(5235),DBMS_LCR.NOT_A_LOB,NULL,NULL); newvals := SYS.LCR$_ROW_LIST(newunit1); oe.enq_row_lcr('DB01','UPDATE','OE','ORDERS',oldvals,newvals); END; / COMMIT;
この項の例では、メッセージを明示的にデキューして、適用プロセスによって適用されたメッセージを問い合せる方法について説明します。この例では、前の項でエンキューされたメッセージを使用します。
例23-25では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、例23-21で作成されたプロシージャexplicit_dq
を実行します。デキューするメッセージのコンシューマとして、例23-20で追加されたサブスクライバexplicit_dq
を指定します。この例では、このプロシージャによって明示的にデキューされないメッセージは、適用プロセスによってデキューされます。
例23-25 メッセージの明示的なデキュー
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on SET SERVEROUTPUT ON SIZE 100000; EXEC oe.explicit_dq('explicit_dq');
この例は、例23-23でエンキューされたメッセージのペイロードを戻します。
Message Dequeued Type Name := OE.ORDER_EVENT_TYP order_id - 2501 order_date - 22-JAN-00 12.00.00.000000 AM order_mode - direct customer_id - 117 order_status - 3 order_total - 22788 sales_rep_id - 161 promotion_id - Message Dequeued Type Name := OE.CUSTOMER_EVENT_TYP customer_id - 991 cust_first_name - Nick cust_last_name - Carraway street_address - 10th Street postal_code - 11101 city - Long Island state_province - NY country_id - US phone_number1 - +1 718 786 2287 phone_number2 - +1 718 511 9114 phone_number3 - +1 718 888 4832 nls_language - i nls_territory - AMERICA credit_limit - 3000 cust_email - nick@great_gatsby.com account_mgr_id - 149 date_of_birth - marital_status - MARRIED gender - M income_level - OVER 150,000 No more messages
例23-26では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、oe.orders
およびoe.customers
表を問い合せて、例23-15で作成された適用プロセスapply_oe
によって適用されたメッセージに対応する行を確認します。
例23-26 適用されたメッセージの問合せ
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE PROCEDURE oe.enq_proc (payload ANYDATA) IS SELECT order_id, order_date, customer_id, order_total FROM oe.orders WHERE order_id = 2500; SELECT cust_first_name, cust_last_name, cust_email FROM oe.customers WHERE customer_id = 990; SELECT order_id, order_date, customer_id, order_total FROM oe.orders WHERE order_id = 2502;
この例は、次の3行を戻します。
ORDER_ID ORDER_DATE CUSTOMER_ID ORDER_TOTAL ---------- ------------------------------ ----------- ----------- 2500 05-MAY-01 12.00.00.000000 AM 117 44699 1 row selected. CUST_FIRST_NAME CUST_LAST_NAME CUST_EMAIL -------------------- -------------------- ------------------------------ Hester Prynne a@scarlet_letter.com 1 row selected. ORDER_ID ORDER_DATE CUSTOMER_ID ORDER_TOTAL ---------- ------------------------------ ----------- ----------- 2502 04-NOV-00 12.00.00.000000 AM 145 5235 1 row selected.
この項の例では、LCR以外のメッセージおよびLCR行をキューにエンキューし、Java Message Service(JMS)を使用してデキューする方法について説明します。
次のjarファイルとzipファイルは使用しているJDKのリリースに基づくCLASSPATH
にある必要があります。
JDK 1.4.xの場合、CLASSPATH
に次のパスを含める必要があります。
ORACLE_HOME/jdbc/lib/classes12.jar ORACLE_HOME/jdbc/lib/ojdbc14.jar ORACLE_HOME/jlib/jndi.jar ORACLE_HOME/lib/jta.jar ORACLE_HOME/rdbms/jlib/aqapi13.jar ORACLE_HOME/rdbms/jlib/jmscommon.jar ORACLE_HOME/rdbms/jlib/xdb.jar ORACLE_HOME/xdk/lib/xmlparserv2.jar
JDK 1.3.xの場合、CLASSPATH
に次のパスを含める必要があります。
ORACLE_HOME/jdbc/lib/classes12.jar ORACLE_HOME/jlib/jndi.jar ORACLE_HOME/rdbms/jlib/aqapi13.jar ORACLE_HOME/rdbms/jlib/jmscommon.jar ORACLE_HOME/rdbms/jlib/xdb.jar ORACLE_HOME/lib/jta.jar ORACLE_HOME/xdk/lib/xmlparserv2.jar
JDK 1.2.xの場合、CLASSPATH
に次のパスを含める必要があります。
ORACLE_HOME/jdbc/lib/classes12.jar ORACLE_HOME/jlib/jndi.jar ORACLE_HOME/lib/jta.jar ORACLE_HOME/rdbms/jlib/aqapi12.jar ORACLE_HOME/rdbms/jlib/jmscommon.jar ORACLE_HOME/rdbms/jlib/xdb.jar ORACLE_HOME/xdk/lib/xmlparserv2.jar
また、LD_LIBRARY_PATH
(LinuxおよびSolaris)またはPATH
(Windows)に、ORACLE_HOME
/lib
が含まれていることを確認してください。
次の例は、JMSメッセージをキューおよびエージェントexplicit_dq
にエンキューした後デキューする、サンプル・スキーマ・ユーザーoe
を示しています。エージェントexplicit_dq
は例23-18で作成され、例23-19のサンプル・スキーマ・ユーザーoe
に関連付けられ、例23-20のキューoe_queue
のサブスクライバになっています。
サンプル・スキーマ・ユーザーoe
には、例23-1でDBMS_AQ
に対するEXECUTE
権限が付与されています。このユーザーがOracle JMSインタフェースを使用するためには、DBMS_AQIN
に対するEXECUTE
権限も必要です。例23-27では、管理者権限を持つユーザーとしてデータベースdb01
に接続し、必要な権限をoe
に付与します。
DBMS_STREAMS_ADM.SET_UP_QUEUE()
の後でDBMS_AQADM.ENABLE_JMS_TYPES(
queue_table_name
)
をコールしないと、JMS型とXML型のエンキューはOracle StreamsのANYDATA
キューでは機能しません。例23-28では、例23-1で作成された管理者ユーザーstrmadmin
としてデータベースdb01
に接続し、例23-2で作成されたANYDATA
キュー表oe_queue_table
に対してENABLE_JMS_TYPES
を実行します。
例23-28 ANYDATAキューに対するJMS型の有効化
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_AQADM.ENABLE_JMS_TYPES('oe_queue_table');
END;
/
注意: Oracle Streamsキューをこれらの型に対して使用可能にすると、キュー表のインポート/エクスポートに影響が出る場合があります。 |
例23-29では、サンプル・スキーマ・ユーザーoe
としてデータベースdb01
に接続し、address
型およびperson
型を作成します。
例23-29 Oracleオブジェクト型addressおよびpersonの作成
CONNECT oe;
Enter password: password
CREATE TYPE address AS OBJECT (street VARCHAR (30), num NUMBER)
/
CREATE TYPE person AS OBJECT (name VARCHAR (30), home ADDRESS)
/
例23-30では、JPublisherを使用して、person
型とaddress
型に対してそれぞれJPerson
とJAddress
という2つのJavaクラスを生成します。JPublisherへの入力は、次の行を含むinput.typ
というファイルです。
SQL PERSON AS JPerson SQL ADDRESS AS JAddress
例23-31は、JMSテキスト・メッセージ、LCRおよびLCR ADT以外のメッセージをOracle Streamsトピックにパブリッシュするために使用するJavaコードです。次の処理を実行します。
JDBC OCIドライバを使用して、TopicConnectionFactory
を作成
注意: JDBC OCIドライバは、JMSを介してOracle Streamsにアクセスする場合のみ選択できます。 |
TopicSession
の作成
接続の開始
メソッドpublishUserMessages()
を作成して、ADTメッセージおよびJMSテキスト・メッセージをOracle Streamsトピックにパブリッシュ
メソッドpublishLcrMessages()
を作成して、XML LCRメッセージをOracle Streamsトピックにパブリッシュ
3つのメッセージをパブリッシュして、実行中にフィードバックを提供
メソッドpublishUserMessages()
は、次の処理を実行します。
トピックの取得
パブリッシャの作成
エージェントexplicit_enq
を指定して、キューoe_queue
にアクセス
PERSON
ADTメッセージの作成
メッセージ内のペイロードの設定
受信者としてexplicit_dq
を指定
PERSON
ADTメッセージのパブリッシュ
JMSテキスト・メッセージの作成
JMSテキスト・メッセージのパブリッシュ
メソッドpublishLcrMessages()
は、次の処理を実行します。
トピックの取得
パブリッシャの作成
JDBC接続の取得
エージェントexplicit_enq
を指定して、キューoe_queue
にアクセス
ADTメッセージの作成
XMLでのLCR表現の作成
LCRを含むXMLType
の作成
メッセージ内のペイロードの設定
受信者としてexplicit_dq
を指定
LCRのパブリッシュ
コードは例23-33でコンパイルされます。現時点では、StreamsEnq.java
として保存します。
例23-31 メッセージをエンキューするJavaコード
import oracle.AQ.*; import oracle.jms.*; import javax.jms.*; import java.lang.*; import oracle.xdb.*; public class StreamsEnq { public static void main (String args []) throws java.sql.SQLException, ClassNotFoundException, JMSException { TopicConnectionFactory tc_fact= null; TopicConnection t_conn = null; TopicSession t_sess = null; try { if (args.length < 3 ) System.out.println("Usage:java filename [SID] [HOST] [PORT]"); else { tc_fact = AQjmsFactory.getTopicConnectionFactory( args[1], args[0], Integer.parseInt(args[2]), "oci8"); t_conn = tc_fact.createTopicConnection( "OE","OE"); t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); t_conn.start() ; publishUserMessages(t_sess); publishLcrMessages(t_sess); t_sess.close() ; t_conn.close() ; System.out.println("End of StreamsEnq Demo") ; } } catch (Exception ex) { System.out.println("Exception-1: " + ex); ex.printStackTrace(); } } public static void publishUserMessages(TopicSession t_sess) throws Exception { Topic topic = null; TopicPublisher t_pub = null; JPerson pers = null; JAddress addr = null; TextMessage t_msg = null; AdtMessage adt_msg = null; AQjmsAgent agent = null; AQjmsAgent[] recipList = null; try { topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); t_pub = t_sess.createPublisher(topic); agent = new AQjmsAgent("explicit_enq", null); adt_msg = ((AQjmsSession)t_sess).createAdtMessage(); pers = new JPerson(); addr = new JAddress(); addr.setNum(new java.math.BigDecimal(500)); addr.setStreet("Oracle Pkwy"); pers.setName("Mark"); pers.setHome(addr); adt_msg.setAdtPayload(pers); ((AQjmsMessage)adt_msg).setSenderID(agent); System.out.println("Publish message 1 -type PERSON\n"); recipList = new AQjmsAgent[1]; recipList[0] = new AQjmsAgent("explicit_dq", null); ((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList); t_sess.commit(); t_msg = t_sess.createTextMessage(); t_msg.setText("Test message"); t_msg.setStringProperty("color", "BLUE"); t_msg.setIntProperty("year", 1999); ((AQjmsMessage)t_msg).setSenderID(agent); System.out.println("Publish message 2 -type JMS TextMessage\n"); ((AQjmsTopicPublisher)t_pub).publish(topic, t_msg, recipList); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); } } public static void publishLcrMessages(TopicSession t_sess) throws Exception { Topic topic = null; TopicPublisher t_pub = null; XMLType xml_lcr = null; AdtMessage adt_msg = null; AQjmsAgent agent = null; StringBuffer lcr_data = null; AQjmsAgent[] recipList = null; java.sql.Connection db_conn = null; try { topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); t_pub = t_sess.createPublisher(topic); db_conn = ((AQjmsSession)t_sess).getDBConnection(); agent = new AQjmsAgent("explicit_enq", null); adt_msg = ((AQjmsSession)t_sess).createAdtMessage(); lcr_data = new StringBuffer(); lcr_data.append("<ROW_LCR "); lcr_data.append("xmlns='http://xmlns.oracle.com/streams/schemas/lcr' \n"); lcr_data.append("xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' \n"); lcr_data.append("xsi:schemaLocation='http://xmlns.oracle.com/streams/schemas/lcr "); lcr_data.append("http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd'"); lcr_data.append("> \n"); lcr_data.append("<source_database_name>source_dbname</source_database_name> \n"); lcr_data.append("<command_type>INSERT</command_type> \n"); lcr_data.append("<object_owner>Ram</object_owner> \n"); lcr_data.append("<object_name>Emp</object_name> \n"); lcr_data.append("<tag>0ABC</tag> \n"); lcr_data.append("<transaction_id>0.0.0</transaction_id> \n"); lcr_data.append("<scn>0</scn> \n"); lcr_data.append("<old_values> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C01</column_name> \n"); lcr_data.append("<data><varchar2>Clob old</varchar2></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C02</column_name> \n"); lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C03</column_name> \n"); lcr_data.append("<data> \n"); lcr_data.append("<date><value>1997-11-24</value><format>SYYYY-MM-DD</format></date> \n"); lcr_data.append("</data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C04</column_name> \n"); lcr_data.append("<data> \n"); lcr_data.append("<timestamp><value>1999-05-31T13:20:00.000</value>"); lcr_data.append("<format>SYYYY-MM-DD\"T\"HH24:MI:SS.FF</format></timestamp> \n"); lcr_data.append("</data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C05</column_name> \n"); lcr_data.append("<data><raw>ABCDE</raw></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("</old_values> \n"); lcr_data.append("<new_values> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C01</column_name> \n"); lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C02</column_name> \n"); lcr_data.append("<data><number>35.23</number></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C03</column_name> \n"); lcr_data.append("<data><number>-100000</number></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C04</column_name> \n"); lcr_data.append("<data><varchar2>Hello</varchar2></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C05</column_name> \n"); lcr_data.append("<data><char>world</char></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("</new_values> \n"); lcr_data.append("</ROW_LCR>"); xml_lcr = oracle.xdb.XMLType.createXML(db_conn, lcr_data.toString()); adt_msg.setAdtPayload(xml_lcr); ((AQjmsMessage)adt_msg).setSenderID(agent); System.out.println("Publish message 3 - XMLType containing LCR ROW\n"); recipList = new AQjmsAgent[1]; recipList[0] = new AQjmsAgent("explicit_dq", null); ((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); } } }
例23-32は、Oracle Streamsトピックからのメッセージの受信に使用するJavaコードです。次の処理を実行します。
JDBC OCIドライバを使用して、TopicConnectionFactory
を作成
注意: JDBC OCIドライバは、JMSを介してOracle Streamsにアクセスする場合のみ選択できます。 |
TopicSession
の作成
接続の開始
receiveMessages()
メソッドを作成して、Oracle Streamsトピックからメッセージを受信
3つのメッセージを受信して、実行中にフィードバックを提供
メソッドreceiveMessages()
は、次の処理を実行します。
トピックの取得
TopicReceiver
を作成して、コンシューマexplicit_dq
のメッセージを受信
JMS typemapでのADDRESS
およびPERSON
のマッピング登録
typemapでのXMLType
のマッピング登録(LCRに必要)
エンキューされたメッセージの受信
コードは例23-33でコンパイルされます。現時点では、StreamsDeq.java
として保存します。
例23-32 メッセージをデキューするJavaコード
import oracle.AQ.*; import oracle.jms.*; import javax.jms.*; import java.lang.*; import oracle.xdb.*; import java.sql.SQLException; public class StreamsDeq { public static void main (String args []) throws java.sql.SQLException, ClassNotFoundException, JMSException { TopicConnectionFactory tc_fact= null; TopicConnection t_conn = null; TopicSession t_sess = null; try { if (args.length < 3 ) System.out.println("Usage:java filename [SID] [HOST] [PORT]"); else { tc_fact = AQjmsFactory.getTopicConnectionFactory( args[1], args[0], Integer.parseInt(args[2]), "oci8"); t_conn = tc_fact.createTopicConnection( "OE","OE"); t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); t_conn.start() ; receiveMessages(t_sess); t_sess.close() ; t_conn.close() ; System.out.println("\nEnd of StreamsDeq Demo") ; } } catch (Exception ex) { System.out.println("Exception-1: " + ex); ex.printStackTrace(); } } public static void receiveMessages(TopicSession t_sess) throws Exception { Topic topic = null; JPerson pers = null; JAddress addr = null; XMLType xtype = null; TextMessage t_msg = null; AdtMessage adt_msg = null; Message jms_msg = null; TopicReceiver t_recv = null; int i = 0; java.util.Map map= null; try { topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); t_recv = ((AQjmsSession)t_sess).createTopicReceiver(topic, "explicit_dq", null); map = ((AQjmsSession)t_sess).getTypeMap(); map.put("OE.PERSON", Class.forName("JPerson")); map.put("OE.ADDRESS", Class.forName("JAddress")); map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLTypeFactory")); System.out.println("Receive messages ...\n"); do { try { jms_msg = (t_recv.receive(10)); i++; ((AQjmsTopicReceiver)t_recv).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_MESSAGE); } catch (JMSException jms_ex2) { if((jms_ex2.getLinkedException() != null) && (jms_ex2.getLinkedException() instanceof SQLException)) { SQLException sql_ex2 =(SQLException)(jms_ex2.getLinkedException()); if(sql_ex2.getErrorCode() == 25235) { ((AQjmsTopicReceiver)t_recv).setNavigationMode( AQjmsConstants.NAVIGATION_NEXT_TRANSACTION); continue; } else throw jms_ex2; } else throw jms_ex2; } if(jms_msg == null) { System.out.println("\nNo more messages"); } else { if(jms_msg instanceof AdtMessage) { adt_msg = (AdtMessage)jms_msg; System.out.println("Retrieved message " + i + ": " + adt_msg.getAdtPayload()); if(adt_msg.getAdtPayload() instanceof JPerson) { pers =(JPerson)( adt_msg.getAdtPayload()); System.out.println("PERSON: Name: " + pers.getName()); } else if(adt_msg.getAdtPayload() instanceof JAddress) { addr =(JAddress)( adt_msg.getAdtPayload()); System.out.println("ADDRESS: Street" + addr.getStreet()); } else if(adt_msg.getAdtPayload() instanceof oracle.xdb.XMLType) { xtype = (XMLType)adt_msg.getAdtPayload(); System.out.println("XMLType: Data: \n" + xtype.getStringVal()); } System.out.println("Msg id: " + adt_msg.getJMSMessageID()); System.out.println(); } else if(jms_msg instanceof TextMessage) { t_msg = (TextMessage)jms_msg; System.out.println("Retrieved message " + i + ": " + t_msg.getText()); System.out.println("Msg id: " + t_msg.getJMSMessageID()); System.out.println(); } else System.out.println("Invalid message type"); } } while (jms_msg != null); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); t_sess.rollback(); } catch (java.sql.SQLException sql_ex) { System.out.println("SQL Exception: " + sql_ex); sql_ex.printStackTrace(); t_sess.rollback(); } } }
例23-33では、スクリプトをコンパイルします。
例23-33 StreamsEnq.javaおよびStreamsDeq.javaのコンパイル
javac StreamsEnq.java StreamsDeq.java JPerson.java JAddress.java
例23-34では、テスト環境に適したORACLE_SID
、HOST
およびPORT
の値を指定して、エンキュー・プログラムを実行します。
この例は、次の行を戻します。
Publish message 1 -type PERSON Publish message 2 -type JMS TextMessage Publish message 3 - XMLType containing LCR ROW End of StreamsEnq Demo
例23-35では、テスト環境に適したORACLE_SID
、HOST
およびPORT
の値を指定して、デキュー・プログラムを実行します。