この章では、エンキューおよび伝播時に、Oracle Streams Advanced Queuingを使用し、管理する方法を説明します。ANYDATAキューおよびユーザー・メッセージについて説明します。
Oracle Streamsでは、ANYDATA型のキューを使用して次の3種類のメッセージが格納されます。
取得済の論理変更レコード(LCR)
このメッセージ型はOracle Streamsの取得プロセスにより生成されます。このマニュアルでは、このメッセージ型については説明しません。
|
関連項目: 『Oracle Streams概要および管理』のストリームの取得プロセスに関する項 |
ユーザーによってエンキューされたLCR
これは、ユーザーまたはアプリケーションによってエンキューされたLCRを含むメッセージです。
ユーザー・メッセージ
これは、ユーザーまたはアプリケーションによって作成されエンキューされた、LCR以外のメッセージです。
この3種類のメッセージはいずれも、単一データベース内またはデータベース間での情報共有に使用できます。
内容は次のとおりです。
|
関連項目: ANYDATA型の詳細は、『Oracle Database PL/SQLパッケージ・プロシージャおよびタイプ・リファレンス』を参照してください。 |
この項の内容は次のとおりです。
|
関連項目 『Oracle Streams概要および管理』 |
ANYDATAタイプのConvertdata_type静的関数を使用して、ほとんどのタイプのペイロードをANYDATAペイロードでラップできます。data_typeはラップするオブジェクトの型です。このような関数は入力としてオブジェクトを取り、ANYDATAオブジェクトを返します。
次のデータ型は、ANYDATAラッパーでラップできません。
ネストした表
ROWIDおよびUROWID
次のデータ型はANYDATAラッパーで直接ラップできますが、ANYDATAラッパーでラップされたユーザー定義型ペイロードで表示することはできません。
アプリケーションでは、次のプログラム・インタフェースを使用してユーザー・メッセージをANYDATAキューにエンキュー、およびANYDATAキューからユーザー・メッセージをデキューできます。
PL/SQL(DBMS_AQパッケージ)
Java Message Service(JMS)
OCI
次の項では、これらのインタフェースを使用してユーザー・メッセージをANYDATAキューにエンキューしたりここからデキューする方法について説明します。
PL/SQLを使用して、LCRを含むユーザー・メッセージをANYDATAキューにエンキューするには、まずエンキュー対象のLCRを作成します。SYS.LCR$_ROW_RECORD型のコンストラクタを使用してLCR行を作成し、SYS.LCR$_DDL_RECORD型のコンストラクタを使用してDDL LCRを作成します。次に、ANYDATA.ConvertObject関数を使用してLCRをANYDATAペイロードに変換し、DBMS_AQ.ENQUEUEプロシージャを使用してそれをエンキューします。
PL/SQLを使用してLCR以外のオブジェクトを含むユーザー・メッセージをANYDATAキューにエンキューするには、ANYDATA.Convert関数の1つを使用してオブジェクトをANYDATAペイロードに変換し、DBMS_AQ.ENQUEUEプロシージャを使用してそれをエンキューします。
JMSまたはOCIを使用して、LCRを含むユーザー・メッセージをANYDATAキューにエンキューするには、LCRをXMLフォーマットで表示する必要があります。LCRを構成するには、oracle.xdb.XMLTypeクラスを使用します。LCRはSYSスキーマで定義されています。ORACLE_HOME/rdbms/adminにあるcatxlcr.sqlスクリプトを使用して、LCRスキーマをSYSスキーマにロードする必要があります。
OCIを使用してメッセージをエンキューするには、メッセージを型指定キューにエンキューする場合と同じアクションを実行します。JMSを使用してメッセージをエンキューするには、ユーザーにはDBMS_AQ、DBMS_AQINおよびDBMS_AQJMSの各パッケージに対するEXECUTE権限が必要です。
|
注意: DBMS_STREAMS_ADM.SET_UP_QUEUE(queue_name)の後でDBMS_AQADM.ENABLE_JMS_TYPES(queue_table_name)をコールしないと、JMS型とXML型のエンキューはANYDATAキューでは機能しません。キューをこれらの型に対して使用可能にすると、キュー表のインポート/エクスポートに影響が出る場合があります。 |
LCR以外のユーザー・メッセージでは、任意のユーザー定義型またはJMS型のメッセージが有効です。JMS型には、次のものが含まれます。
javax.jms.TextMessage
javax.jms.MapMessage
javax.jms.StreamMessage
javax.jms.ObjectMessage
javax.jms.BytesMessage
ユーザー定義型を使用する場合は、ORADataインタフェースを実装しているJPublisherを使用してメッセージのJavaクラスを生成する必要があります。ANYDATAキューにメッセージをエンキューする場合は、QueueSender.sendメソッドまたはTopicPublisher.publishメソッドを使用できます。
|
関連項目:
|
PL/SQLを使用して、ANYDATAキューからユーザー・メッセージをデキューするには、DBMS_AQ.DEQUEUEプロシージャを使用してANYDATAをペイロードとして指定します。ユーザー・メッセージには、LCRまたは他の型のオブジェクトを含めることができます。
ANYDATAキューでは、XMLフォーマットのLCRを含むユーザー・メッセージはoracle.xdb.XMLTypeとして表されます。LCR以外のメッセージでは、任意のユーザー定義型またはJMS型が有効です。
JMSを使用して、ANYDATAキューからメッセージをデキューする場合は、QueueReceiver、TopicSubscriberまたはTopicReceiverの各メソッドを使用できます。キューにはANYDATAラッパーでラップされた異なるオブジェクト型を含めることができるため、SQL型のリストおよび対応するJavaクラスをJMSセッションのTypeMapに登録する必要があります。JMS型はTypeMapに事前登録済です。
たとえば、キューにoracle.xdb.XMLTypeと表されるユーザーによってエンキューされたLCRメッセージと、person型とaddress型のLCR以外のメッセージが含まれているとします。JPerson.javaおよびJAddress.javaの各クラスは、それぞれpersonおよびaddressのORADataマッピングです。メッセージをデキューする前に、TypeMapを次のように移入する必要があります。
java.util.Map map = ((AQjmsSession)q_sess).getTypeMap();
map.put("SCOTT.PERSON", Class.forName("JPerson"));
map.put("SCOTT.ADDRESS", Class.forName("JAddress"));
map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLType")); // For LCRs
QueueReceiverまたはTopicPublisherでmessageSelectorを使用する場合は、セレクタには次のうちの1つ以上の組合せを持つ任意のSQL式を含めることができます。
JMSメッセージ・ヘッダー・フィールドまたはプロパティ
これには、JMSPriority、JMSCorrelationID、JMSType、JMSXUserI、JMSXAppID、JMSXGroupIDおよびJMSXGroupSeqなどが含まれます。次は、JMSメッセージ・フィールドmessageSelectorの例です。
JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
ユーザー定義のメッセージ・プロパティ
次は、ユーザー定義のメッセージ・プロパティmessageSelectorの例です。
color IN ('RED', 'BLUE', 'GREEN') AND price < 30000
PL/SQLファンクション
次は、PL/SQLファンクションmessageSelectorの例です。
hr.GET_TYPE(tab.user_data) = 'HR.EMPLOYEES'
OCIを使用してANYDATAキューからメッセージをデキューするには、メッセージを型指定キューからデキューする場合と同じアクションを実行します。
|
関連項目:
|
ANYDATAキューは、型指定キューと相互運用できます。表22-1は、キュー間で伝播可能な型を示したものです。
表22-1 異なるキュー型間の伝播
| ソース・キュー | 宛先キュー | 変換 |
|---|---|---|
|
|
|
なし。 |
|
型指定 |
|
暗黙的。 注意: 伝播は、型指定キューのメッセージが「オブジェクト型のサポート」の制約概要に一致する場合のみ可能です。 |
|
|
型指定 |
メッセージをフィルタリングするルールとユーザー定義の変換が必要です。型指定キューと同じ型のペイロードを含むメッセージのみを型指定キューに伝播できます。 |
|
型指定 |
型指定 |
Oracle Streams Advanced Queuingルールに従います。 |
|
注意: 異なるキャラクタ・セットを使用するデータベース間でオブジェクト型、VARRAYまたはネストした表のペイロードをカプセル化する、ユーザーによってエンキューされたANYDATAメッセージは伝播できません。同じキャラクタ・セットを使用するデータベース間では、この種のメッセージを伝播できます。 |
Simple Object Access Protocol(SOAP)を使用してANYDATAキューと直接交信することはできませんが、ANYDATAキューと型指定キュー間でメッセージを伝播するとSOAPを使用できます。SOAPを使用してメッセージをANYDATAキューにエンキューする場合は、最初に型指定キューからANYDATAキューへの伝播を構成する必要があります。これで、SOAPを使用してメッセージを型指定キューにエンキューできます。メッセージは、型指定キューからANYDATAキューに自動的に伝播されます。
SOAPを使用して、ANYDATAキュー内のメッセージをデキューする場合は、伝播をANYDATAキューから型指定キューに設定できます。メッセージは、ANYDATAキューからSOAPを使用してアクセス可能な型指定キューに自動的に伝播されます。
|
関連項目: 『Oracle Streams概要および管理』のANYDATAキューと型指定キュー間のメッセージの伝播に関する項 |
この項では、ANYDATAキューにメッセージをエンキューする例を示します。この項の例では、SQL*Plusテスト環境でdb01およびdb02という2つのデータベースにアクセスできることを前提としています。最初の数例では、この章の他の例に使用できるようにテスト環境を準備します。
例22-1では、データベースdb01およびdb02に管理権限を持つユーザーとして接続し、管理者ユーザーstrmadminを作成し、サンプル・スキーマ・ユーザーoeにDBMS_AQパッケージに対するEXECUTE権限を付与します。
例22-1 ANYDATAユーザーの作成
GRANT EXECUTE ON DBMS_AQ TO oe; CREATE USER strmadmin IDENTIFIED BY strmadmin DEFAULT TABLESPACE example; GRANT DBA TO strmadmin; GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin; GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
例22-2では、db01にstrmadminとして接続し、ANYDATAキューoe_queue_anyを作成します。oeユーザーは、oe_queue_anyキューの保護ユーザーとして自動的に設定され、キューに対するENQUEUE権限とDEQUEUE権限が付与されます。
例22-2 ANYDATAキューの作成
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'oe_qtab_any',
queue_name => 'oe_queue_any',
queue_user => 'oe');
END;
/
例22-3では、oe_queue_anyキューにサブスクライバを追加します。このサブスクライバによってメッセージの明示的デキューが実行されます。ADD_SUBSCRIBERプロシージャにより、AQ_AGENTが自動的に作成されます。
例22-3 ANYDATAキューへのサブスクライバの追加
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.oe_queue_any',
subscriber => subscriber);
END;
/
例22-4では、oeユーザーをlocal_agentエージェントに関連付けます。
例22-4 ユーザーとAQ_AGENTとの関連付け
BEGIN
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'local_agent',
db_username => 'oe');
END;
/
例22-5では、エンキュー・プロシージャを作成するために、データベースdb01にユーザーoeとして接続します。ANYDATA型のオブジェクトを入力パラメータとして使用し、ペイロードが含まれているメッセージを既存のANYDATAキューにエンキューします。
例22-5 エンキュー・プロシージャの作成
set echo off
set verify off
ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE
CONNECT oe/&password@db01;
set echo on
CREATE PROCEDURE oe.enq_proc (payload ANYDATA) IS
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL);
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.oe_queue_any',
enqueue_options => enqopt,
message_properties => mprop,
payload => payload,
msgid => enq_msgid);
END;
/
例22-6では、プロシージャoe.enq_procを使用してVARCHAR2型のメッセージをANYDATAキューにエンキューします。
例22-6 ANYDATAキューへのVARCHAR2メッセージのエンキュー
EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW'));
COMMIT;
例22-7では、プロシージャoe.enq_procを使用してNUMBER型のメッセージをANYDATAキューにエンキューします。
例22-8では、プロシージャoe.enq_procを使用してユーザー定義型のメッセージをANYDATAキューにエンキューします。
例22-8 ANYDATAキューへのユーザー定義型メッセージのエンキュー
BEGIN
oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ(
'1646 Brazil Blvd','361168','Chennai','Tam', 'IN')));
END;
/
COMMIT;
|
関連項目: 『Oracle Streams概要および管理』のキューにおけるユーザーによってエンキューされたイベントの内容の表示に関する項 |
この項では、ANYDATAキューからメッセージをデキューする例を示します。この項の例では、「ANYDATAキューへのユーザー・メッセージのエンキュー」の例を完了していることを前提としています。
メッセージをデキューするには、メッセージのコンシューマがわかっている必要があります。キュー内のメッセージのコンシューマを確認するには、キューの所有者として接続し、AQ$queue_table_nameビューを問い合せます。queue_table_nameはキューが含まれているキュー表の名前です。
例22-9では、データベースdb01にキューoe_queue_anyの所有者であるstrmadminとして接続し、AQ$OE_QTAB_ANYビューの問合せを実行します。この問合せでは3行が戻され、各行にCONSUMER_NAMEとしてLOCAL_AGENTが含まれています。
例22-9 キュー内のメッセージのコンシューマの判別
CONNECT strmadmin;
Enter password: password
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_ANY;
例22-10では、データベースdb01にユーザーoeとして接続し、デキュー・プロシージャを作成します。このプロシージャは、入力としてデキュー対象のメッセージのコンシューマを取得し、oe.cust_address_typのメッセージをデキューして、メッセージの内容を出力します。
例22-10 ANYDATAキューのデキュー・プロシージャの作成
CONNECT oe; -- @db01
Enter password: password
CREATE PROCEDURE oe.get_cust_address (
consumer IN VARCHAR2) AS
address OE.CUST_ADDRESS_TYP;
deq_address ANYDATA;
msgid RAW(16);
deqopt DBMS_AQ.DEQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
new_addresses BOOLEAN := TRUE;
next_trans EXCEPTION;
no_messages EXCEPTION;
pragma exception_init (next_trans, -25235);
pragma exception_init (no_messages, -25228);
num_var pls_integer;
BEGIN
deqopt.consumer_name := consumer;
deqopt.wait := 1;
WHILE (new_addresses) LOOP
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'strmadmin.oe_queue_any',
dequeue_options => deqopt,
message_properties => mprop,
payload => deq_address,
msgid => msgid);
deqopt.navigation := DBMS_AQ.NEXT;
DBMS_OUTPUT.PUT_LINE('****');
IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName());
num_var := deq_address.GetObject(address);
DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
DBMS_OUTPUT.PUT_LINE(address.street_address);
DBMS_OUTPUT.PUT_LINE(address.postal_code);
DBMS_OUTPUT.PUT_LINE(address.city);
DBMS_OUTPUT.PUT_LINE(address.state_province);
DBMS_OUTPUT.PUT_LINE(address.country_id);
ELSE
DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName());
END IF;
COMMIT;
EXCEPTION
WHEN next_trans THEN
deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
WHEN no_messages THEN
new_addresses := FALSE;
DBMS_OUTPUT.PUT_LINE('No more messages');
END;
END LOOP;
END;
/
例22-11では、コンシューマとしてLOCAL_AGENTを指定して、例22-10で作成したプロシージャoe.get_cust_addressを使用します。
例22-11 ANYDATAキューからのメッセージのデキュー
SET SERVEROUTPUT ON SIZE 100000
EXEC oe.get_cust_address('LOCAL_AGENT');
この例は、次の行を戻します。
**** Message TYPE is: SYS.VARCHAR2 **** Message TYPE is: SYS.NUMBER **** Message TYPE is: OE.CUST_ADDRESS_TYP **** CUSTOMER ADDRESS **** 1646 Brazil Blvd 361168 Chennai Tam IN No more messages
この項では、ANYDATAキューと型指定キュー間でLCR以外のユーザー・メッセージを伝播する方法を示す例について説明します。
最初の数例では、例22-2で作成したANYDATAキューoe_queue_anyからデータベースdb02内の型指定キューへの伝播を設定します。例22-12では、サンプル・スキーマ・ユーザーoeとして接続し、データベースdb01およびdb02のoe.cust_address_typに対するEXECUTE権限を管理者ユーザーstrmadminに付与します。
例22-12 型に対するEXECUTE権限の付与
CONNECT oe; -- @db01 Enter password: password GRANT EXECUTE ON oe.cust_address_typ TO strmadmin; CONNECT oe; -- @db02 Enter password: password GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
例22-13では、データベースdb02に管理者ユーザーstrmadminとして接続し、oe.cust_address_typ型の宛先キューを作成します。
例22-13 型指定の宛先キューの作成
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'strmadmin.oe_qtab_address',
queue_payload_type => 'oe.cust_address_typ',
multiple_consumers => true);
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'strmadmin.oe_queue_address',
queue_table => 'strmadmin.oe_qtab_address');
DBMS_AQADM.START_QUEUE(
queue_name => 'strmadmin.oe_queue_address');
END;
/
例22-14では、データベースdb01に管理者ユーザーstrmadminとして接続し、db01からdb02へのデータベース・リンクを作成します。
例22-14 データベース・リンクの作成
CONNECT strmadmin; Enter password: password CREATE DATABASE LINK db02 CONNECT TO strmadmin IDENTIFIED BY password USING 'db02';
例22-15では、db01のstrmadminスキーマ内にファンクションany_to_cust_address_typを作成します。このファンクションは、oe.cust_address_typオブジェクトを含むANYDATAペイロードを取得してoe.cust_address_typオブジェクトを戻します。
例22-15 ANYDATAオブジェクトから型指定オブジェクトを抽出するファンクションの作成
CONNECT strmadmin;
Enter password: password
CREATE FUNCTION strmadmin.any_to_cust_address_typ(in_any IN ANYDATA)
RETURN OE.CUST_ADDRESS_TYP
AS
address OE.CUST_ADDRESS_TYP;
num_var NUMBER;
type_name VARCHAR2(100);
BEGIN
type_name := in_any.GetTypeName();
IF (type_name = 'OE.CUST_ADDRESS_TYP') THEN
num_var := in_any.GetObject(address);
RETURN address;
ELSE
raise_application_error(-20101, 'Conversion failed - ' || type_name);
END IF;
END;
/
例22-16では、db01でDBMS_TRANSFORMパッケージを使用して変換を作成します。
例22-16 ANYDATAから型指定オブジェクトへの変換の作成
BEGIN DBMS_TRANSFORM.CREATE_TRANSFORMATION( schema => 'strmadmin', name => 'anytoaddress', from_schema => 'SYS', from_type => 'ANYDATA', to_schema => 'oe', to_type => 'cust_address_typ', transformation => 'strmadmin.any_to_cust_address_typ(source.user_data)'); END; /
例22-17では、型指定キューのサブスクライバを作成します。サブスクライバには、適切な型のメッセージのみが宛先キューに伝播されるルールが含まれている必要があります。
例22-17 サブスクライバADDRESS_AGENT_REMOTEの作成
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT ('ADDRESS_AGENT_REMOTE',
'STRMADMIN.OE_QUEUE_ADDRESS@DB02',
0);
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.oe_queue_any',
subscriber => subscriber,
rule => 'TAB.USER_DATA.GetTypeName()=''OE.CUST_ADDRESS_TYP''',
transformation => 'strmadmin.anytoaddress');
END;
/
例22-18では、db01にあるANYDATAキューとdb02にある型指定キューの間の伝播をスケジュールします。
例22-18 ANYDATAキューから型指定キューへの伝播のスケジューリング
BEGIN
DBMS_AQADM.SCHEDULE_PROPAGATION(
queue_name => 'strmadmin.oe_queue_any',
destination => 'db02');
END;
/
例22-19では、データベースdb01にサンプル・スキーマ・ユーザーoeとして接続し、ANYDATAラッパーでラップされたoe.cust_address_typ型のメッセージをエンキューします。この例では、例22-5で作成したエンキュー・プロシージャoe.enq_procを使用します。
例22-19 ANYDATAラッパー内の型指定メッセージのエンキュー
CONNECT oe;
Enter password: password
BEGIN
oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ(
'1668 Chong Tao','111181','Beijing',NULL,'CN')));
END;
/
COMMIT;
伝播されるまで少し待ちます。その後、例22-20では、db02でキュー表AQ$OE_QTAB_ADDRESSを問い合せて、伝播されたメッセージを表示します。
例22-20 伝播されたメッセージの表示
CONNECT strmadmin;
Enter password: password
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_ADDRESS;
この例では、次のようにADDRESS_AGENT_REMOTEに関するメッセージが1件戻されます。
MSG_ID MSG_STATE CONSUMER_NAME -------------------------------- ---------------- ------------------------------ EBEF5CACC4665A6FE030578CE70A370D READY ADDRESS_AGENT_REMOTE 1 row selected.
ユーザーによってエンキューされたLCRを適切な型指定キューに伝播することはできますが、取得済LCRの型指定キューへの伝播はサポートされていません。
|
関連項目: 取得プロセスの詳細は、『Oracle Streams概要および管理』のStreamsの取得プロセスに関する項を参照してください。 |
ユーザーがエンキューしたLCRをANYDATAキューから型指定キューに伝播するには、LCR以外のメッセージの手順と同じ手順を実行しますが、Oracleによって変換ファンクションが用意されています。DBMS_STREAMSパッケージ内の次のファンクションを使用して、ANYDATAキュー内のLCRを型指定キュー内のメッセージに変換できます。
CONVERT_ANYDATA_TO_LCR_ROWは、LCR行を含むANYDATAペイロードをSYS.LCR$_ROW_RECORDペイロードに変換します。
CONVERT_ANYDATA_TO_LCR_DDLは、DDL LCRを含むANYDATAペイロードをSYS.LCR$_DDL_RECORDペイロードに変換します。
この項の例では、oe_queue_anyというANYDATAキューからoe_queue_lcrというSYS.LCR$_ROW_RECORD型の型指定キューへのLCR行の伝播を設定します。ソース・キューoe_queue_anyはデータベースdb01にあり、宛先キューoe_queue_lcrは例22-21でデータベースdb02に作成します。
|
注意: この項の例では、すでに前項の例を実行していることを前提としています。 |
例22-21 LCR$_ROW_RECORD型のキューの作成
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'strmadmin.oe_qtab_lcr',
queue_payload_type => 'SYS.LCR$_ROW_RECORD',
multiple_consumers => true);
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'strmadmin.oe_queue_lcr',
queue_table => 'strmadmin.oe_qtab_lcr');
DBMS_AQADM.START_QUEUE(
queue_name => 'strmadmin.oe_queue_lcr');
END;
/
例22-22では、db01に管理者ユーザーstrmadminとして接続し、DBMS_TRANSFORMパッケージを使用してdb01にANYDATAからLCR$_ROW_RECORDへの変換を作成します。
例22-22 ANYDATAからLCR$_ROW_RECORDへの変換の作成
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_TRANSFORM.CREATE_TRANSFORMATION(
schema => 'strmadmin',
name => 'anytolcr',
from_schema => 'SYS',
from_type => 'ANYDATA',
to_schema => 'SYS',
to_type => 'LCR$_ROW_RECORD',
transformation =>
'SYS.DBMS_STREAMS.CONVERT_ANYDATA_TO_LCR_ROW(source.user_data)');
END;
/
例22-23では、型指定キューのサブスクライバを作成します。このサブスクライバでは、transformationパラメータに例22-22で作成したanytolcr変換を指定します。
例22-23サブスクライバROW_LCR_AGENT_REMOTEの作成
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT(
'ROW_LCR_AGENT_REMOTE',
'STRMADMIN.OE_QUEUE_LCR@DB02',
0);
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.oe_queue_any',
subscriber => subscriber,
rule => 'TAB.USER_DATA.GetTypeName()=''SYS.LCR$_ROW_RECORD''',
transformation => 'strmadmin.anytolcr');
END;
/
例22-24では、データベースdb01にサンプル・スキーマ・ユーザーoeとして接続し、LCR行を構成してstrmadmin.oe_queue_anyキューにエンキューするプロシージャを作成します。
例22-24 LCR行を構成してエンキューするプロシージャの作成
CONNECT oe;
Enter password: password
CREATE PROCEDURE oe.enq_row_lcr_proc(
source_dbname VARCHAR2,
cmd_type VARCHAR2,
obj_owner VARCHAR2,
obj_name VARCHAR2,
old_vals SYS.LCR$_ROW_LIST,
new_vals SYS.LCR$_ROW_LIST)
AS
eopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
row_lcr SYS.LCR$_ROW_RECORD;
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL);
row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
source_database_name => source_dbname,
command_type => cmd_type,
object_owner => obj_owner,
object_name => obj_name,
old_values => old_vals,
new_values => new_vals);
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.oe_queue_any',
enqueue_options => eopt,
message_properties => mprop,
payload => ANYDATA.ConvertObject(row_lcr),
msgid => enq_msgid);
END enq_row_lcr_proc;
/
例22-25では、最初にoe.enq_row_lcr_procプロシージャを使用して、oe.inventories表に1行を挿入するLCR行を作成した後、そのLCR行をstrmadmin.oe_queue_anyキューにエンキューします。
|
注意: この例では、oe.inventories表に新規の行を挿入しません。新規の行は、Oracle Streamsの適用プロセスによりLCR行がデキューされて適用されると挿入されます。 |
例22-25 LCR行の作成とエンキュー
DECLARE
newunit1 SYS.LCR$_ROW_UNIT;
newunit2 SYS.LCR$_ROW_UNIT;
newunit3 SYS.LCR$_ROW_UNIT;
newvals SYS.LCR$_ROW_LIST;
BEGIN
newunit1 := SYS.LCR$_ROW_UNIT(
'PRODUCT_ID',
ANYDATA.ConvertNumber(3503),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit2 := SYS.LCR$_ROW_UNIT(
'WAREHOUSE_ID',
ANYDATA.ConvertNumber(1),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit3 := SYS.LCR$_ROW_UNIT(
'QUANTITY_ON_HAND',
ANYDATA.ConvertNumber(157),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3);
oe.enq_row_lcr_proc(
source_dbname => 'DB01',
cmd_type => 'INSERT',
obj_owner => 'OE',
obj_name => 'INVENTORIES',
old_vals => NULL,
new_vals => newvals);
END;
/
COMMIT;
LCRは、例22-18で作成されたスケジュールに従ってデータベースdb02に伝播されます。伝播されるまで少し待ちます。その後、例22-26では、db02でキュー表AQ$OE_QTAB_LCRを問い合せて、伝播されたメッセージを表示します。
例22-26 伝播されたLCRの表示
CONNECT strmadmin;
Enter password: password
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_LCR;
この例では、次のようにROW_LCR_AGENT_REMOTEに関するメッセージが1件戻されます。
MSG_ID MSG_STATE CONSUMER_NAME -------------------------------- ---------------- ------------------------------ ECE2B0F912DDFF5EE030578CE70A04BB READY ROW_LCR_AGENT_REMOTE
|
関連項目: LCR行およびDDL LCR変換関数の詳細は、『Oracle Database PL/SQLパッケージ・プロシージャおよびタイプ・リファレンス』のDBMS_STREAMSに関する項を参照してください。 |