この章では、単一データベースの例を説明します。例では、取得プロセスを使用して表に対する変更を取得し、取得された変更をキューに再エンキューします。その後、適用中にDMLハンドラを使用して、変更のサブセットを異なる表に挿入します。
次の項では、単一データベースの取得および適用例の構成について説明します。
この章の例では、Oracle Streamsを使用して、単一データベースcpap.example.comでデータ操作言語(DML)の変更を取得および適用する方法を示します。具体的には、hrスキーマでのemployees表に対するDML変更が取得されます。ここでは、行の論理変更レコード(LCR)がキューstreams_queueに含められます。次に、適用プロセスによってこれらの行LCRは同じキューからデキューされ、このキューに再エンキューされた後、DMLハンドラに送られます。
行LCRは、取得されるとバッファ・キューに存在することになり、明示的にはデキューできません。行LCRが適用中に再エンキューされると、アプリケーションで明示的にデキュー可能になります。この例では、これらの行LCRをデキューするアプリケーションは作成されません。
この例では、削除された従業員のレコードをhrスキーマのemp_del表に挿入するDMLハンドラについて説明します。この例では、すべての削除された従業員のレコードを保持するために、emp_del表が使用されることを想定しています。DMLハンドラは、各行LCRにDELETE文が含まれるかどうかを判別するために使用されます。行LCRにDELETE文が含まれることがDMLハンドラで検出されると、DMLハンドラによって、emp_del表に対するDELETE文がINSERT文に変換され、行が挿入されます。
図32-1に、この環境の概要を示します。
この章の例を開始する前に、前提条件となる次の作業を完了する必要があります。
環境内のすべてのデータベースについて、次の初期化パラメータを指定の値に設定します。
STREAMS_POOL_SIZE: オプションで、このパラメータを環境内の各データベースの適切な値に設定します。このパラメータでは、Oracle Streamsプールのサイズを指定します。Oracle Streamsプールは、バッファ・キューにメッセージを格納し、パラレル取得およびパラレル適用中の内部通信に使用されます。MEMORY_TARGET、MEMORY_MAX_TARGETまたはSGA_TARGET初期化パラメータを0(ゼロ)以外の値に設定した場合、Oracle Streamsプールのサイズは自動的に管理されます。
ARCHIVELOGモードで実行されるようにデータベースを設定します。取得される変更を生成するデータベースは、ARCHIVELOGモードで実行されている必要があります。
|
関連項目: ARCHIVELOGモードでデータベースを実行する方法については、Oracle Database管理者ガイドを参照 |
データベースにOracle Streams管理者を作成します。この例では、Oracle Streams管理者のユーザー名がstrmadminであると想定しています。
この例では、ストアド・プロシージャ内のOracle Streamsパッケージのサブプログラムを実行します。具体的には、手順8で作成したemp_dqプロシージャによって、DBMS_STREAMS_MESSAGINGパッケージのDEQUEUEプロシージャが実行されます。そのため、Oracle Streams管理者は、このパッケージに対するEXECUTE権限が明示的に付与されている必要があります。この場合、EXECUTE権限は、ロールを介して付与することはできません。DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGEプロシージャを実行すると、Oracle Streamsに関連する他の権限に加えて、すべてのOracle Streamsパッケージに対するEXECUTE権限が付与されます。EXECUTE権限は、パッケージに直接付与するか、またはGRANT_ADMIN_PRIVILEGEプロシージャを使用して付与します。
次の手順を実行して、hr.emp_del表を作成し、Oracle Streams管理者を設定して、キューを作成します。
|
注意: このマニュアルをオンラインで参照している場合は、この注意の後にある「BEGINNING OF SCRIPT」の行から次の「END OF SCRIPT」の行までのテキストをテキスト・エディタにコピーし、テキストを編集してご使用の環境用のスクリプトを作成できます。データベースに接続可能なコンピュータで、SQL*Plusを使用してスクリプトを実行します。 |
/************************* BEGINNING OF SCRIPT ******************************
手順1: 出力およびスプール結果の表示
SET ECHO ONを実行し、スクリプトのスプール・ファイルを指定します。このスクリプトの実行後に、スプール・ファイルにエラーがないかをチェックします。
*/ SET ECHO ON SPOOL streams_setup_capapp.out /*
手順2: hr.emp_del表の作成
hrユーザーとしてcpap.example.comに接続します。
*/ CONNECT hr@cpap.example.com /*
hr.emp_del表を作成します。emp_del表の列は、emp_del表に行が挿入された日付を記録する追加のtimestamp列を除いて、employees表の列と同じです。
*/ CREATE TABLE emp_del( employee_id NUMBER(6), first_name VARCHAR2(20), last_name VARCHAR2(25), email VARCHAR2(25), phone_number VARCHAR2(20), hire_date DATE, job_id VARCHAR2(10), salary NUMBER(8,2), commission_pct NUMBER(2,2), manager_id NUMBER(6), department_id NUMBER(4), timestamp DATE); CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id); ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id)); /*
手順3: Oracle Streams管理者への追加の権限の付与
SYSTEMユーザーとしてcpap.example.comに接続します。
*/ CONNECT SYSTEM@cpap.example.com /*
Oracle Streams管理者は適用ユーザーであり、emp_del表にレコードを挿入できる必要があるため、Oracle Streams管理者にこの表に対するすべての権限を付与します。または、適用プロセスを変更してhrを適用ユーザーとして指定できます。
*/ GRANT ALL ON hr.emp_del TO STRMADMIN; /*
手順4: cpap.example.comでのANYDATAキューの作成
strmadminユーザーとしてcpap.example.comに接続します。
*/ CONNECT strmadmin@cpap.example.com /*
SET_UP_QUEUEプロシージャを実行して、cpap.example.comでstreams_queueというキューを作成します。このキューは、適用プロセスによってデキューされる取得済の変更、およびデキュー・プロシージャによってデキューされるユーザー作成変更をステージングするANYDATAキューです。
SET_UP_QUEUEプロシージャを実行すると、次のアクションが実行されます。
streams_queue_tableというキュー表を作成します。このキュー表は、Oracle Streams管理者(strmadmin)が所有し、このユーザーのデフォルトの記憶域を使用します。
Oracle Streams管理者(strmadmin)が所有するstreams_queueというキューを作成します。
キューを起動します。
*/
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/
/*
手順5: スプール結果のチェック
streams_setup_capapp.outスプール・ファイルをチェックして、このスクリプトの完了後にすべてのアクションが正常に終了していることを確認します。
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
次の手順を実行して、hr.employees表に対する変更を取得し、DMLハンドラを使用してカスタマイズした方法で、1つのデータベースにこれらの変更を適用します。
|
注意: このマニュアルをオンラインで参照している場合は、この注意の後にある「BEGINNING OF SCRIPT」の行から次の「END OF SCRIPT」の行までのテキストをテキスト・エディタにコピーし、テキストを編集してご使用の環境用のスクリプトを作成できます。データベースに接続可能なコンピュータで、SQL*Plusを使用してスクリプトを実行します。 |
/************************* BEGINNING OF SCRIPT ******************************
手順1: 出力およびスプール結果の表示
SET ECHO ONを実行し、スクリプトのスプール・ファイルを指定します。このスクリプトの実行後に、スプール・ファイルにエラーがないかをチェックします。
*/ SET ECHO ON SPOOL streams_config_capapp.out /*
手順2: cpap.example.comでの取得プロセスの構成
strmadminユーザーとしてcpap.example.comに接続します。
*/ CONNECT strmadmin@cpap.example.com /*
cpap.example.comでhr.employees表に対するDML変更を取得するように取得プロセスを構成します。この手順を実行すると、取得プロセスが作成され、この表に対するDML変更を取得するように取得プロセスに指示するポジティブ・ルール・セットにルールが追加されます。また、hr.employees表がインスタンス化のために準備され、表の主キー、一意キー、ビットマップ索引および外部キーの列のサプリメンタル・ロギングが有効になります。
サプリメンタル・ロギングによって、表に対する変更の追加の情報がREDOログに記録されます。適用プロセスでは、一意の行の識別など、一部の操作を実行するために、この追加情報が必要です。
*/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'hr.employees',
streams_type => 'capture',
streams_name => 'capture_emp',
queue_name => 'strmadmin.streams_queue',
include_dml => TRUE,
include_ddl => FALSE,
inclusion_rule => TRUE);
END;
/
/*
手順3: hr.employees表のインスタンス化SCNの設定
この例では1つのデータベース内で変更を取得および適用するため、インスタンス化は必要ありません。ただし、cpap.example.comデータベースの適用プロセスには、特定のシステム変更番号(SCN)後にhr.employees表に行われた変更を適用するように指示する必要があります。
この例では、DBMS_FLASHBACKパッケージのGET_SYSTEM_CHANGE_NUMBERファンクションを使用して、データベースの現行のSCNを取得します。このSCNを使用して、DBMS_APPLY_ADMパッケージのSET_TABLE_INSTANTIATION_SCNプロシージャを実行します。
SET_TABLE_INSTANTIATION_SCNプロシージャは、適用プロセスで無視する表のLCRおよび適用プロセスで適用する表のLCRを制御します。ソース・データベースからの表のLCRのコミットSCNが、宛先データベースでのその表のインスタンス化SCN以下である場合、宛先データベースの適用プロセスはそのLCRを廃棄します。そうでない場合、適用プロセスはLCRを適用します。この例では、ソース・データベースと宛先データベースの両方がcpap.example.comデータベースです。
適用プロセスは、この手順で取得したSCNの後にコミットされたSCNを持つトランザクションをhr.employees表に適用します。
|
注意: hr.employees表は、インスタンス化のために準備する必要もあります。この準備は、手順2でhr.employees表に対するDML変更を取得するルールで取得プロセスを構成したときに自動的に行われています。 |
*/
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'hr.employees',
source_database_name => 'cpap.example.com',
instantiation_scn => iscn);
END;
/
/*
この手順を実行すると、emp_dml_handlerプロシージャが作成されます。このプロシージャは、hr.employees表に対するDELETE変更のDMLハンドラです。このプロシージャによって、DELETEコマンド・タイプを含むすべての行LCRがINSERTの行LCRに変換され、その行LCRを実行することで、変換された行LCRがhr.emp_del表に挿入されます。
*/
CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
command VARCHAR2(30);
old_values SYS.LCR$_ROW_LIST;
BEGIN
-- Access the LCR
rc := in_any.GETOBJECT(lcr);
-- Get the object command type
command := lcr.GET_COMMAND_TYPE();
-- Check for DELETE command on the hr.employees table
IF command = 'DELETE' THEN
-- Set the command_type in the row LCR to INSERT
lcr.SET_COMMAND_TYPE('INSERT');
-- Set the object_name in the row LCR to EMP_DEL
lcr.SET_OBJECT_NAME('EMP_DEL');
-- Get the old values in the row LCR
old_values := lcr.GET_VALUES('old');
-- Set the old values in the row LCR to the new values in the row LCR
lcr.SET_VALUES('new', old_values);
-- Set the old values in the row LCR to NULL
lcr.SET_VALUES('old', NULL);
-- Add a SYSDATE value for the timestamp column
lcr.ADD_COLUMN('new', 'TIMESTAMP', ANYDATA.ConvertDate(SYSDATE));
-- Apply the row LCR as an INSERT into the hr.emp_del table
lcr.EXECUTE(TRUE);
END IF;
END;
/
/*
手順5: hr.employees表のDMLハンドラの設定
hr.employees表のDMLハンドラを手順4で作成したプロシージャに設定します。表に対して実行可能な各操作(INSERT、UPDATE、DELETEなど)にDMLハンドラが使用されるように、operation_nameパラメータはDEFAULTに設定することに注意してください。
*/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'hr.employees',
object_type => 'TABLE',
operation_name => 'DEFAULT',
error_handler => FALSE,
user_procedure => 'strmadmin.emp_dml_handler',
apply_database_link => NULL,
apply_name => NULL);
END;
/
/*
アプリケーションで、再エンキューされたメッセージのデキューに使用できるメッセージ・クライアントを作成します。メッセージ・クライアントは、メッセージをキューに再エンキューする前に指定する必要があります。
*/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'hr.employees',
streams_type => 'dequeue',
streams_name => 'hr',
queue_name => 'strmadmin.streams_queue',
include_dml => TRUE,
include_ddl => FALSE,
inclusion_rule => TRUE);
END;
/
/*
手順7: cpap.example.comでの適用プロセスの構成
hr.employees表に対するDML変更を適用する適用プロセスを作成します。適用プロセスのDMLハンドラによって、削除された従業員がemp_del表に挿入されますが、キュー内の行LCRにはemp_del表ではなくemployees表に対する変更が含まれるため、このルールはemployees表を指定します。ADD_TABLE_RULESプロシージャを実行して適用プロセスを作成する場合、出力パラメータdml_rule_nameには作成したDMLルールの名前が含まれます。このルール名は、SET_ENQUEUE_DESTINATIONプロシージャに渡されます。
DBMS_APPLY_ADMパッケージのSET_ENQUEUE_DESTINATIONプロシージャを実行すると、ADD_TABLE_RULESで生成されたDMLルールを使用するすべての適用プロセスが、このルールを満たすメッセージをstreams_queueにエンキューするように指定されます。この場合、DMLルールは、hr.employees表に対するDML変更を含む行LCRに適用されます。必要に応じて、適用プロセス・キュー以外のローカル・キューを指定できます。
*/
DECLARE
emp_rule_name_dml VARCHAR2(30);
emp_rule_name_ddl VARCHAR2(30);
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'hr.employees',
streams_type => 'apply',
streams_name => 'apply_emp',
queue_name => 'strmadmin.streams_queue',
include_dml => TRUE,
include_ddl => FALSE,
source_database => 'cpap.example.com',
dml_rule_name => emp_rule_name_dml,
ddl_rule_name => emp_rule_name_ddl);
DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION(
rule_name => emp_rule_name_dml,
destination_queue_name => 'strmadmin.streams_queue');
END;
/
/*
この手順で作成したemp_dqプロシージャを使用して、適用プロセスによって再エンキューされたメッセージをデキューできます。手順7では、SET_ENQUEUE_DESTINATIONプロシージャを使用してhr.employees表に対する変更を含む行LCRをstreams_queueにエンキューするように適用プロセスに指示しました。emp_dqプロシージャを実行すると、キュー内の各行LCRがデキューされ、行LCR内のコマンドのタイプ(INSERT、UPDATEまたはDELETE)が表示されます。コマンド・タイプのみでなく、行LCR内のすべての情報にアクセスして表示できます。
*/
CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS
msg ANYDATA;
row_lcr SYS.LCR$_ROW_RECORD;
num_var pls_integer;
more_messages BOOLEAN := TRUE;
navigation VARCHAR2(30);
BEGIN
navigation := 'FIRST MESSAGE';
WHILE (more_messages) LOOP
BEGIN
DBMS_STREAMS_MESSAGING.DEQUEUE(
queue_name => 'strmadmin.streams_queue',
streams_name => consumer,
payload => msg,
navigation => navigation,
wait => DBMS_STREAMS_MESSAGING.NO_WAIT);
IF msg.GETTYPENAME() = 'SYS.LCR$_ROW_RECORD' THEN
num_var := msg.GetObject(row_lcr);
DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');
END IF;
navigation := 'NEXT MESSAGE';
COMMIT;
EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN
navigation := 'NEXT TRANSACTION';
WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN
more_messages := FALSE;
DBMS_OUTPUT.PUT_LINE('No more messages.');
WHEN OTHERS THEN
RAISE;
END;
END LOOP;
END;
/
/*
手順9: cpap.example.comでの適用プロセスの起動
エラーが発生した場合に適用プロセスが無効化されないようにdisable_on_errorパラメータをnに設定し、cpap.example.comで適用プロセスを起動します。
*/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_emp',
parameter => 'disable_on_error',
value => 'N');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_emp');
END;
/
/*
手順10: cpap.example.comでの取得プロセスの起動
cpap.example.comで取得プロセスを起動します。
*/
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_emp');
END;
/
/*
手順11: スプール結果のチェック
streams_config_capapp.outスプール・ファイルをチェックして、このスクリプトの完了後にすべてのアクションが正常に終了していることを確認します。
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
次の手順を実行して、適用プロセスが正しく構成されていることを確認し、hr.employees表にDML変更を行って、hr.emp_del表に挿入された結果およびstreams_queue_table内の再エンキューされたメッセージを問い合せ、DMLハンドラによって再エンキューされたメッセージをデキューします。
手順1: ルール・アクション・コンテキストの確認
手順7では、ルールを満たすLCRがエンキューされる宛先キューを指定する、適用プロセスのルールを作成します。この場合、ルールを満たすLCRは、hr.employees表に対する変更を含む行LCRです。
次の手順を実行して、このルールが宛先キューを指定することを確認します。
次の問合せを実行して、適用プロセスapply_empで使用されるhr.employees表に対するDML変更のルール名を判別します。
CONNECT strmadmin@cpap.example.com
Enter password: password
SELECT RULE_OWNER, RULE_NAME FROM DBA_STREAMS_RULES
WHERE STREAMS_NAME = 'APPLY_EMP' AND
STREAMS_TYPE = 'APPLY' AND
SCHEMA_NAME = 'HR' AND
OBJECT_NAME = 'EMPLOYEES' AND
RULE_TYPE = 'DML'
ORDER BY RULE_NAME;
出力は次のようになります。
RULE_OWNER RULE_NAME ------------------------------ ------------------------------ STRMADMIN EMPLOYEES3
手順1の問合せで戻されるルールのアクション・コンテキストを参照します。
COLUMN RULE_OWNER HEADING 'Rule Owner' FORMAT A15 COLUMN DESTINATION_QUEUE_NAME HEADING 'Destination Queue' FORMAT A30 SELECT RULE_OWNER, DESTINATION_QUEUE_NAME FROM DBA_APPLY_ENQUEUE WHERE RULE_NAME = 'EMPLOYEES3' ORDER BY DESTINATION_QUEUE_NAME;
WHERE句で、手順1で戻されたルール名に置き換えます。出力は次のようになります。
Rule Owner Destination Queue --------------- ------------------------------ STRMADMIN "STRMADMIN"."STREAMS_QUEUE"
出力に、適用プロセスのルールを満たすLCRがstreams_queueにエンキューされていることが示されます。
手順2: hr.employeesでのINSERT、UPDATEおよびDELETEの実行
hr.employees表に次のDML変更を行います。
CONNECT hr@cpap.example.com
Enter password: password
INSERT INTO hr.employees VALUES(207, 'JOHN', 'SMITH', 'JSMITH@EXAMPLE.COM',
NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;
UPDATE hr.employees SET salary=5999 WHERE employee_id=206;
COMMIT;
DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;
手順3: hr.emp_del表およびstreams_queue_tableの問合せ
前述の手順で行った変更が取得および適用されるまで待ってから、次の問合せを実行して結果を表示します。
CONNECT strmadmin@cpap.example.com
Enter password: password
SELECT employee_id, first_name, last_name, timestamp
FROM hr.emp_del ORDER BY employee_id;
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME
FROM AQ$STREAMS_QUEUE_TABLE ORDER BY MSG_ID;
最初の問合せを実行すると、employee_idが207の従業員のレコードが表示されます。この従業員は、前述の手順で削除されています。2つ目の問合せを実行すると、前述の手順で行ったすべての変更の結果として再エンキューされたメッセージが表示され、これらのメッセージのMSG_STATEがREADYであることを確認できます。
手順4: DMLハンドラによって再エンキューされたメッセージのデキュー
emp_dqプロシージャを使用して、DMLハンドラによって再エンキューされたメッセージをデキューします。
SET SERVEROUTPUT ON SIZE 100000
EXEC emp_dq('HR');
DML文で変更された各行に対して1行が戻され、各行には変更のコマンド・タイプ(INSERT、UPDATEまたはDELETE)が示されます。メッセージのデキュー後に手順3のキュー表に問合せを繰り返すと、デキューされたメッセージはコンシュームされているはずです。つまり、これらのメッセージのMSG_STATEがPROCESSEDであるか、またはメッセージがキューに存在しません。
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE ORDER BY MSG_ID;