この章では、単一データベースの例を説明します。例では、表に対する変更を取得し、取得した変更をキューに再エンキューします。その後、適用時にプロシージャDMLハンドラを使用して、変更のサブセットを別の表に挿入します。
ここでは、単一データベースの取得および適用の例の構成について説明します。
この章の例では、Oracle Streamsを使用して、単一データベースcpap.example.comでデータ操作言語(DML)の変更を取得および適用する方法を示します。具体的には、この例ではhrスキーマ内のemployees表に対するDML変更を取得し、行の論理変更レコード(LCR)をキューstreams_queueに入れます。次に、適用プロセスによってこれらの行LCRが同じキューからデキューされ、このキューに再エンキューされてからプロシージャDMLハンドラに送信されます。
取得された行LCRは、バッファ・キューに存在することになるため、明示的にはデキューできません。適用時に行LCRを再エンキューすると、アプリケーションによる明示的なデキューに使用できるようになります。この例では、これらの行LCRをデキューするアプリケーションは作成しません。
この例では、削除された従業員のレコードをhrスキーマ内のemp_del表に挿入するプロシージャDMLハンドラについて説明します。この例では、emp_del表を使用して、削除されたすべての従業員のレコードを保持すると想定しています。プロシージャDMLハンドラを使用して、各行LCRにDELETE文が含まれているかどうかを判別します。プロシージャDMLハンドラは、DELETE文を含む行LCRを検出すると、DELETEをemp_del表に対するINSERTに変換し、その行を挿入します。
図4-1に、この環境の概要を示します。
|
関連項目: 『Oracle Streams概要および管理』 |
この章の例を開始する前に、前提条件となる次の作業を完了する必要があります。
オプションで、STREAMS_POOL_SIZE初期化パラメータを適切な値に設定します。このパラメータでは、Oracle Streamsプールのサイズを指定します。Oracle Streamsプールは、バッファ・キューにメッセージを格納し、パラレル取得およびパラレル適用中の内部通信に使用されます。MEMORY_TARGET、MEMORY_MAX_TARGETまたはSGA_TARGET初期化パラメータを0 (ゼロ)以外の値に設定した場合、Oracle Streamsプールのサイズは自動的に管理されます。
|
関連項目: Oracle Streamsに関連する初期化パラメータの設定については、『Oracle Streamsレプリケーション管理者ガイド』を参照してください。 |
データベースをARCHIVELOGモードで実行するように設定します。取得される変更を生成するデータベースは、ARCHIVELOGモードで実行する必要があります。
|
関連項目: ARCHIVELOGモードでデータベースを実行する方法の詳細は、『Oracle Database管理者ガイド』を参照してください。 |
データベースでOracle Streams管理者を作成します。この例では、Oracle Streams管理者のユーザー名がstrmadminであると想定しています。
この例では、ストアド・プロシージャ内でOracle Streamsパッケージのサブプログラムを実行します。具体的には、手順8で作成したemp_dqプロシージャで、DBMS_STREAMS_MESSAGINGパッケージのDEQUEUEプロシージャを実行します。したがって、Oracle Streams管理者に、このパッケージに対するEXECUTE権限を明示的に付与する必要があります。この場合、EXECUTE権限はロールを介して付与することはできません。DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGEプロシージャを使用すると、すべてのOracle Streamsパッケージに対するEXECUTEと、Oracle Streamsに関連するその他の権限を付与できます。パッケージに対するEXECUTE権限は、直接付与することも、GRANT_ADMIN_PRIVILEGEプロシージャを使用して付与することもできます。
|
関連項目: Oracle Streams管理者を作成する方法の詳細は、『Oracle Streamsレプリケーション管理者ガイド』を参照してください。 |
次の手順を実行してhr.emp_del表を作成し、Oracle Streams管理者を設定して、キューを作成します。
|
注意: このマニュアルをオンラインで参照している場合、この注意の後にある「BEGINNING OF SCRIPT」の行から次の「END OF SCRIPT」の行までのテキストをテキスト・エディタにコピーし、テキストを編集してご使用の環境用のスクリプトを作成できます。データベースに接続可能なコンピュータで、SQL*Plusを使用してスクリプトを実行します。 |
/************************* BEGINNING OF SCRIPT ******************************
SET ECHO ONを実行し、スクリプトのスプール・ファイルを指定します。このスクリプトの実行後に、スプール・ファイルにエラーがないかをチェックします。
*/ SET ECHO ON SPOOL streams_setup_capapp.out /*
hrユーザーとしてcpap.example.comに接続します。
*/ CONNECT hr@cpap.example.com /*
hr.emp_del表を作成します。emp_del表の列は、emp_del表に行が挿入されたときにその日付を記録する追加のtimestamp列を除いて、employees表の列と同じです。
*/ CREATE TABLE emp_del( employee_id NUMBER(6), first_name VARCHAR2(20), last_name VARCHAR2(25), email VARCHAR2(25), phone_number VARCHAR2(20), hire_date DATE, job_id VARCHAR2(10), salary NUMBER(8,2), commission_pct NUMBER(2,2), manager_id NUMBER(6), department_id NUMBER(4), timestamp DATE); CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id); ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id)); /*
SYSTEMユーザーとしてcpap.example.comに接続します。
*/ CONNECT SYSTEM@cpap.example.com /*
Oracle Streams管理者は適用ユーザーであり、emp_del表にレコードを挿入できる必要があるため、Oracle Streams管理者にこの表に対するすべての権限を付与します。または、適用プロセスを変更してhrを適用ユーザーとして指定できます。
*/ GRANT ALL ON hr.emp_del TO STRMADMIN; /*
strmadminユーザーとしてcpap.example.comに接続します。
*/ CONNECT strmadmin@cpap.example.com /*
SET_UP_QUEUEプロシージャを実行して、cpap.example.comでstreams_queueというキューを作成します。このキューは適用プロセスによってデキューされる取得済の変更、およびデキュー・プロシージャによってデキューされるユーザーが行った変更をステージングするANYDATAキューです。
SET_UP_QUEUEプロシージャを実行すると、次のアクションが実行されます。
streams_queue_tableというキュー表の作成。このキュー表は、Oracle Streams管理者(strmadmin)が所有し、このユーザーのデフォルトの記憶域を使用します。
Oracle Streams管理者(strmadmin)が所有するstreams_queueというキューの作成。
キューの起動。
*/
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/
/*
streams_setup_capapp.outスプール・ファイルをチェックして、このスクリプトの完了後にすべてのアクションが正常に終了していることを確認します。
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
次の手順を実行してhr.employees表に対する変更を取得し、プロシージャDMLハンドラを使用して、カスタマイズされた方法で単一データベースにこれらの変更を適用します。
|
注意: このマニュアルをオンラインで参照している場合、この注意の後にある「BEGINNING OF SCRIPT」の行から次の「END OF SCRIPT」の行までのテキストをテキスト・エディタにコピーし、テキストを編集してご使用の環境用のスクリプトを作成できます。データベースに接続可能なコンピュータで、SQL*Plusを使用してスクリプトを実行します。 |
/************************* BEGINNING OF SCRIPT ******************************
SET ECHO ONを実行し、スクリプトのスプール・ファイルを指定します。このスクリプトの実行後に、スプール・ファイルにエラーがないかをチェックします。
*/ SET ECHO ON SPOOL streams_config_capapp.out /*
strmadminユーザーとしてcpap.example.comに接続します。
*/ CONNECT strmadmin@cpap.example.com /*
cpap.example.comでhr.employees表に対するDML変更を取得するように取得プロセスを構成します。この手順では、取得プロセスを作成し、そのポジティブ・ルール・セットに、この表に対するDML変更を取得するように取得プロセスに指示するルールを追加します。また、この手順では、hr.employees表をインスタンス化のために準備し、表のすべての主キー列、一意キー列、ビットマップ索引列および外部キー列のサプリメンタル・ロギングを有効にします。
サプリメンタル・ロギングによって、表に対する変更の追加の情報がREDOログに記録されます。適用プロセスでは、一意の行の識別など、一部の操作を実行するために、この追加情報が必要です。
|
関連項目: 『Oracle Streamsレプリケーション管理者ガイド』 |
*/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'hr.employees',
streams_type => 'capture',
streams_name => 'capture_emp',
queue_name => 'strmadmin.streams_queue',
include_dml => TRUE,
include_ddl => FALSE,
inclusion_rule => TRUE);
END;
/
/*
この例では単一データベースで変更を取得および適用するため、インスタンス化は不要です。ただし、cpap.example.comデータベースの適用プロセスに対しては、特定のシステム変更番号(SCN)の後にhr.employees表に対して行われた変更を適用するように指示する必要があります。
この例では、DBMS_FLASHBACKパッケージのGET_SYSTEM_CHANGE_NUMBERファンクションを使用して、データベースの現行のSCNを取得します。このSCNは、DBMS_APPLY_ADMパッケージのSET_TABLE_INSTANTIATION_SCNプロシージャを実行するために使用されます。
SET_TABLE_INSTANTIATION_SCNプロシージャを使用すると、適用プロセスで無視される表のLCRと、適用プロセスで適用される表のLCRを制御できます。ソース・データベースからの表に関するLCRのコミットSCNが、接続先データベースでその表のインスタンス化SCN以下であれば、接続先データベースの適用プロセスではLCRが廃棄されます。それ以外の場合は、適用プロセスによってLCRが適用されます。この例では、cpap.example.comデータベースがソース・データベースと接続先データベースの両方になります。
適用プロセスは、この手順で取得したSCNの後にコミットされたSCNを持つトランザクションをhr.employees表に適用します。
|
注意: hr.employees表にはインスタンス化のための準備も必要です。手順2の取得プロセスの構成で、hr.employees表に対するDML変更を取得するルールを追加したときに、この準備は自動的に行われています。 |
*/
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'hr.employees',
source_database_name => 'cpap.example.com',
instantiation_scn => iscn);
END;
/
/*
この手順を実行すると、emp_dml_handlerプロシージャが作成されます。このプロシージャは、hr.employees表に対するDELETE変更のプロシージャDMLハンドラになります。このプロシージャによって、DELETEコマンド・タイプを含むすべての行LCRがINSERTの行LCRに変換され、その行LCRを実行することで、変換された行LCRがhr.emp_del表に挿入されます。
*/
CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
command VARCHAR2(30);
old_values SYS.LCR$_ROW_LIST;
BEGIN
-- Access the LCR
rc := in_any.GETOBJECT(lcr);
-- Get the object command type
command := lcr.GET_COMMAND_TYPE();
-- Check for DELETE command on the hr.employees table
IF command = 'DELETE' THEN
-- Set the command_type in the row LCR to INSERT
lcr.SET_COMMAND_TYPE('INSERT');
-- Set the object_name in the row LCR to EMP_DEL
lcr.SET_OBJECT_NAME('EMP_DEL');
-- Get the old values in the row LCR
old_values := lcr.GET_VALUES('old');
-- Set the old values in the row LCR to the new values in the row LCR
lcr.SET_VALUES('new', old_values);
-- Set the old values in the row LCR to NULL
lcr.SET_VALUES('old', NULL);
-- Add a SYSDATE value for the timestamp column
lcr.ADD_COLUMN('new', 'TIMESTAMP', ANYDATA.ConvertDate(SYSDATE));
-- Apply the row LCR as an INSERT into the hr.emp_del table
lcr.EXECUTE(TRUE);
END IF;
END;
/
/*
hr.employees表のプロシージャDMLハンドラを手順4で作成したプロシージャに設定します。表に対して実行可能な各操作(INSERT、UPDATE、DELETEなど)でプロシージャDMLハンドラが使用されるように、operation_nameパラメータをDEFAULTに設定する必要があることに注意してください。
*/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'hr.employees',
object_type => 'TABLE',
operation_name => 'DEFAULT',
error_handler => FALSE,
user_procedure => 'strmadmin.emp_dml_handler',
apply_database_link => NULL,
apply_name => NULL);
END;
/
/*
再エンキューされたメッセージをデキューするために、アプリケーションで使用できるメッセージ・クライアントを作成します。メッセージをキューに再エンキューするには、メッセージ・クライアントを指定する必要があります。
*/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'hr.employees',
streams_type => 'dequeue',
streams_name => 'hr',
queue_name => 'strmadmin.streams_queue',
include_dml => TRUE,
include_ddl => FALSE,
inclusion_rule => TRUE);
END;
/
/*
hr.employees表に対するDML変更を適用する適用プロセスを作成します。適用プロセスのプロシージャDMLハンドラでは、削除された従業員はemp_del表に挿入されますが、キュー内の行LCRにはemp_del表ではなくemployees表に対する変更が含まれるため、このルールではemployees表を指定します。ADD_TABLE_RULESプロシージャを実行して適用プロセスを作成すると、作成したDMLルールの名前がoutパラメータdml_rule_nameに含まれます。このルール名は、SET_ENQUEUE_DESTINATIONプロシージャに渡されます。
DBMS_APPLY_ADMパッケージのSET_ENQUEUE_DESTINATIONプロシージャを実行すると、ADD_TABLE_RULESで生成されたDMLルールを使用する適用プロセスによって、このルールを満たすメッセージがstreams_queueにエンキューされるように指定されます。この場合、DMLルールは、hr.employees表に対するDML変更を含む行LCR用です。必要に応じて、適用プロセス・キュー以外のローカル・キューを指定できます。
*/
DECLARE
emp_rule_name_dml VARCHAR2(30);
emp_rule_name_ddl VARCHAR2(30);
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'hr.employees',
streams_type => 'apply',
streams_name => 'apply_emp',
queue_name => 'strmadmin.streams_queue',
include_dml => TRUE,
include_ddl => FALSE,
source_database => 'cpap.example.com',
dml_rule_name => emp_rule_name_dml,
ddl_rule_name => emp_rule_name_ddl);
DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION(
rule_name => emp_rule_name_dml,
destination_queue_name => 'strmadmin.streams_queue');
END;
/
/*
この手順で作成するemp_dqプロシージャを使用すると、適用プロセスによって再エンキューされたメッセージをデキューできます。手順7では、SET_ENQUEUE_DESTINATIONプロシージャを使用して、hr.employees表に対する変更が含まれる行LCRをstreams_queueにエンキューするように適用プロセスに指示しました。emp_dqプロシージャを実行すると、キュー内の各行LCRがデキューされ、行LCRのコマンドのタイプ(INSERT、UPDATEまたはDELETEのいずれか)が表示されます。コマンド・タイプのみでなく、行LCR内のすべての情報にアクセスして表示できます。
|
関連項目: LCR内の情報を表示する方法の詳細は、『Oracle Streams概要および管理』を参照してください。 |
*/
CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS
msg ANYDATA;
row_lcr SYS.LCR$_ROW_RECORD;
num_var pls_integer;
more_messages BOOLEAN := TRUE;
navigation VARCHAR2(30);
BEGIN
navigation := 'FIRST MESSAGE';
WHILE (more_messages) LOOP
BEGIN
DBMS_STREAMS_MESSAGING.DEQUEUE(
queue_name => 'strmadmin.streams_queue',
streams_name => consumer,
payload => msg,
navigation => navigation,
wait => DBMS_STREAMS_MESSAGING.NO_WAIT);
IF msg.GETTYPENAME() = 'SYS.LCR$_ROW_RECORD' THEN
num_var := msg.GetObject(row_lcr);
DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');
END IF;
navigation := 'NEXT MESSAGE';
COMMIT;
EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN
navigation := 'NEXT TRANSACTION';
WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN
more_messages := FALSE;
DBMS_OUTPUT.PUT_LINE('No more messages.');
WHEN OTHERS THEN
RAISE;
END;
END LOOP;
END;
/
/*
エラーが発生した場合に適用プロセスが無効化されないように、disable_on_errorパラメータをnに設定して、cpap.example.comで適用プロセスを起動します。
*/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_emp',
parameter => 'disable_on_error',
value => 'N');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_emp');
END;
/
/*
cpap.example.comで取得プロセスを起動します。
*/
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_emp');
END;
/
/*
streams_config_capapp.outスプール・ファイルをチェックして、このスクリプトの完了後にすべてのアクションが正常に終了していることを確認します。
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
次の手順を実行して適用プロセスが正しく構成されていることを確認し、hr.employees表に対するDML変更を実行して、その結果であるhr.emp_del表への挿入と、streams_queue_tableに再エンキューされたメッセージを問い合せ、適用プロセスによって再エンキューされたメッセージをデキューします。
手順7では、ルールを満たすLCRのエンキュー先となる宛先キューを指定する適用プロセス・ルールを作成しています。この場合、ルールを満たすLCRとは、hr.employees表に対する変更を含む行LCRです。
次の手順を実行して、ルールが宛先キューを指定していることを確認します。
次の問合せを実行し、適用プロセスapply_empで使用されるhr.employees表に対するDML変更のルール名を判断します。
CONNECT strmadmin@cpap.example.com
Enter password: password
SELECT RULE_OWNER, RULE_NAME FROM DBA_STREAMS_RULES
WHERE STREAMS_NAME = 'APPLY_EMP' AND
STREAMS_TYPE = 'APPLY' AND
SCHEMA_NAME = 'HR' AND
OBJECT_NAME = 'EMPLOYEES' AND
RULE_TYPE = 'DML'
ORDER BY RULE_NAME;
出力は次のようになります。
RULE_OWNER RULE_NAME ------------------------------ ------------------------------ STRMADMIN EMPLOYEES3
手順1の問合せで戻されたルールのアクション・コンテキストを表示します。
COLUMN RULE_OWNER HEADING 'Rule Owner' FORMAT A15 COLUMN DESTINATION_QUEUE_NAME HEADING 'Destination Queue' FORMAT A30 SELECT RULE_OWNER, DESTINATION_QUEUE_NAME FROM DBA_APPLY_ENQUEUE WHERE RULE_NAME = 'EMPLOYEES3' ORDER BY DESTINATION_QUEUE_NAME;
手順1で戻されたルール名をWHERE句に代入します。出力は次のようになります。
Rule Owner Destination Queue --------------- ------------------------------ STRMADMIN "STRMADMIN"."STREAMS_QUEUE"
出力には、適用プロセス・ルールを満たすLCRがstreams_queueにエンキューされることが示されます。
hr.employees表に次のDML変更を行います。
CONNECT hr@cpap.example.com
Enter password: password
INSERT INTO hr.employees VALUES(207, 'JOHN', 'SMITH', 'JSMITH@EXAMPLE.COM',
NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;
UPDATE hr.employees SET salary=5999 WHERE employee_id=207;
COMMIT;
DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;
前述の手順で行った変更が取得および適用されてから、次の問合せを実行して結果を確認します。
CONNECT strmadmin@cpap.example.com
Enter password: password
SELECT employee_id, first_name, last_name, timestamp
FROM hr.emp_del ORDER BY employee_id;
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME
FROM AQ$STREAMS_QUEUE_TABLE ORDER BY MSG_ID;
最初の問合せを実行すると、employee_idが207の従業員のレコードが表示されます。この従業員は前述の手順で削除されています。2番目の問合せを実行すると、前述の手順で行ったすべての変更の結果として再エンキューされたメッセージが表示され、これらのメッセージのMSG_STATEがREADYであることを確認できます。
emp_dqプロシージャを使用して、プロシージャDMLハンドラによって再エンキューされたメッセージをデキューします。
SET SERVEROUTPUT ON SIZE 100000
EXEC emp_dq('HR');
DML文で変更された各行に対して1行が戻され、各行には変更のコマンド・タイプ(INSERT、UPDATEまたはDELETE)が示されます。メッセージのデキュー後に手順3のキュー表に問合せを繰り返すと、デキューされたメッセージはコンシュームされているはずです。つまり、これらのメッセージのMSG_STATEがPROCESSEDであるか、またはメッセージがキューに存在しません。
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE ORDER BY MSG_ID;