4 単一データベースの取得および適用の例
この章では、単一データベースの例を説明します。例では、表に対する変更を取得し、取得した変更をキューに再エンキューします。その後、適用時にプロシージャDMLハンドラを使用して、変更のサブセットを別の表に挿入します。
ここでは、単一データベースの取得および適用の例の構成について説明します。
4.1 単一データベースの取得および適用の例の概要
この章の例では、Oracle Streamsを使用して、単一データベースcpap.example.com
でデータ操作言語(DML)の変更を取得および適用する方法を示します。具体的には、この例ではhr
スキーマ内のemployees
表に対するDML変更を取得し、行の論理変更レコード(LCR)をキューstreams_queue
に入れます。次に、適用プロセスによってこれらの行LCRが同じキューからデキューされ、このキューに再エンキューされてからプロシージャDMLハンドラに送信されます。
取得された行LCRは、バッファ・キューに存在することになるため、明示的にはデキューできません。適用時に行LCRを再エンキューすると、アプリケーションによる明示的なデキューに使用できるようになります。この例では、これらの行LCRをデキューするアプリケーションは作成しません。
この例では、削除された従業員のレコードをhr
スキーマ内のemp_del
表に挿入するプロシージャDMLハンドラについて説明します。この例では、emp_del
表を使用して、削除されたすべての従業員のレコードを保持すると想定しています。プロシージャDMLハンドラを使用して、各行LCRにDELETE
文が含まれているかどうかを判別します。プロシージャDMLハンドラは、DELETE
文を含む行LCRを検出すると、DELETE
をemp_del
表に対するINSERT
に変換し、その行を挿入します。
図4-1に、この環境の概要を示します。
関連項目:
4.2 前提条件
この章の例を開始する前に、前提条件となる次の作業を完了する必要があります。
-
オプションで、
STREAMS_POOL_SIZE
初期化パラメータを適切な値に設定します。このパラメータでは、Oracle Streamsプールのサイズを指定します。Oracle Streamsプールは、バッファ・キューにメッセージを格納し、パラレル取得およびパラレル適用中の内部通信に使用されます。MEMORY_TARGET
、MEMORY_MAX_TARGET
またはSGA_TARGET
初期化パラメータを0 (ゼロ)以外の値に設定した場合、Oracle Streamsプールのサイズは自動的に管理されます。関連項目:
Oracle Streamsに関連する初期化パラメータの設定については、『Oracle Streamsレプリケーション管理者ガイド』を参照してください。
-
データベースを
ARCHIVELOG
モードで実行するように設定します。取得される変更を生成するデータベースは、ARCHIVELOG
モードで実行する必要があります。関連項目:
ARCHIVELOG
モードでデータベースを実行する方法の詳細は、『Oracle Database管理者ガイド』を参照 -
データベースでOracle Streams管理者を作成します。この例では、Oracle Streams管理者のユーザー名が
strmadmin
であると想定しています。この例では、ストアド・プロシージャ内でOracle Streamsパッケージのサブプログラムを実行します。具体的には、手順「メッセージをデキューするプロシージャの作成」で作成した
emp_dq
プロシージャで、DBMS_STREAMS_MESSAGING
パッケージのDEQUEUE
プロシージャを実行します。したがって、Oracle Streams管理者に、このパッケージに対するEXECUTE
権限を明示的に付与する必要があります。この場合、EXECUTE
権限はロールを介して付与することはできません。DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE
プロシージャを使用すると、すべてのOracle Streamsパッケージに対するEXECUTE
と、Oracle Streamsに関連するその他の権限を付与できます。パッケージに対するEXECUTE
権限は、直接付与することも、GRANT_ADMIN_PRIVILEGE
プロシージャを使用して付与することもできます。関連項目:
Oracle Streams管理者を作成する方法の詳細は、『Oracle Streamsレプリケーション管理者ガイド』を参照
4.3 環境の設定
次の手順を実行してhr.emp_del
表を作成し、Oracle Streams管理者を設定して、キューを作成します。
注意:
このマニュアルをオンラインで参照している場合、この注意の後にある「BEGINNING OF SCRIPT」の行から次の「END OF SCRIPT」の行までのテキストをテキスト・エディタにコピーし、テキストを編集してご使用の環境用のスクリプトを作成できます。データベースに接続可能なコンピュータで、SQL*Plusを使用してスクリプトを実行します。
/************************* BEGINNING OF SCRIPT ******************************
- 出力およびスプール結果の表示
-
SET
ECHO
ON
を実行し、スクリプトのスプール・ファイルを指定します。このスクリプトの実行後に、スプール・ファイルにエラーがないかをチェックします。*/ SET ECHO ON SPOOL streams_setup_capapp.out /*
- hr.emp_del表の作成
-
hr
ユーザーとしてcpap.example.com
に接続します。*/ CONNECT hr@cpap.example.com /*
hr.emp_del
表を作成します。emp_del
表の列は、emp_del
表に行が挿入されたときにその日付を記録する追加のtimestamp
列を除いて、employees
表の列と同じです。*/ CREATE TABLE emp_del( employee_id NUMBER(6), first_name VARCHAR2(20), last_name VARCHAR2(25), email VARCHAR2(25), phone_number VARCHAR2(20), hire_date DATE, job_id VARCHAR2(10), salary NUMBER(8,2), commission_pct NUMBER(2,2), manager_id NUMBER(6), department_id NUMBER(4), timestamp DATE); CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id); ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id)); /*
- Oracle Streams管理者への追加の権限の付与
-
SYSTEM
ユーザーとしてcpap.example.com
に接続します。*/ CONNECT SYSTEM@cpap.example.com /*
Oracle Streams管理者は適用ユーザーであり、
emp_del
表にレコードを挿入できる必要があるため、Oracle Streams管理者にこの表に対するすべての権限を付与します。または、適用プロセスを変更してhr
を適用ユーザーとして指定できます。*/ GRANT ALL ON hr.emp_del TO STRMADMIN; /*
- cpap.example.comでのANYDATAキューの作成
-
strmadmin
ユーザーとしてcpap.example.com
に接続します。*/ CONNECT strmadmin@cpap.example.com /*
SET_UP_QUEUE
プロシージャを実行して、cpap.example.com
でstreams_queue
というキューを作成します。このキューは適用プロセスによってデキューされる取得済の変更、およびデキュー・プロシージャによってデキューされるユーザーが行った変更をステージングするANYDATA
キューです。SET_UP_QUEUE
プロシージャを実行すると、次のアクションが実行されます。-
streams_queue_table
というキュー表の作成。このキュー表は、Oracle Streams管理者(strmadmin
)が所有し、このユーザーのデフォルトの記憶域を使用します。 -
Oracle Streams管理者(
strmadmin
)が所有するstreams_queue
というキューの作成。 -
キューの開始。
*/ BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strmadmin.streams_queue_table', queue_name => 'strmadmin.streams_queue'); END; / /*
-
- スプール結果のチェック
-
streams_setup_capapp.out
スプール・ファイルをチェックして、このスクリプトの完了後にすべてのアクションが正常に終了していることを確認します。*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
4.4 取得および適用の構成
次の手順を実行してhr.employees
表に対する変更を取得し、プロシージャDMLハンドラを使用して、カスタマイズされた方法で単一データベースにこれらの変更を適用します。
- 出力およびスプール結果の表示
- cpap.example.comでの取得プロセスの構成
- hr.employees表に対するインスタンス化SCNの設定
- プロシージャDMLハンドラのハンドラ・プロシージャの作成
- hr.employees表に対するプロシージャDMLハンドラの設定
- キューのメッセージ・クライアントの作成
- cpap.example.comでの適用プロセスの構成
- メッセージをデキューするプロシージャの作成
- cpap.example.comでの適用プロセスの起動
- cpap.example.comでの取得プロセスの起動
- スプール結果のチェック
注意:
このマニュアルをオンラインで参照している場合、この注意の後にある「BEGINNING OF SCRIPT」の行から次の「END OF SCRIPT」の行までのテキストをテキスト・エディタにコピーし、テキストを編集してご使用の環境用のスクリプトを作成できます。データベースに接続可能なコンピュータで、SQL*Plusを使用してスクリプトを実行します。
/************************* BEGINNING OF SCRIPT ******************************
- 出力およびスプール結果の表示
-
SET
ECHO
ON
を実行し、スクリプトのスプール・ファイルを指定します。このスクリプトの実行後に、スプール・ファイルにエラーがないかをチェックします。*/ SET ECHO ON SPOOL streams_config_capapp.out /*
- cpap.example.comでの取得プロセスの構成
-
strmadmin
ユーザーとしてcpap.example.com
に接続します。*/ CONNECT strmadmin@cpap.example.com /*
cpap.example.com
でhr.employees
表に対するDML変更を取得するように取得プロセスを構成します。この手順では、取得プロセスを作成し、そのポジティブ・ルール・セットに、この表に対するDML変更を取得するように取得プロセスに指示するルールを追加します。また、この手順では、hr.employees
表をインスタンス化のために準備し、表のすべての主キー列、一意キー列、ビットマップ索引列および外部キー列のサプリメンタル・ロギングを有効にします。サプリメンタル・ロギングによって、表に対する変更の追加の情報がREDOログに記録されます。適用プロセスでは、一意の行の識別など、一部の操作を実行するために、この追加情報が必要です。
*/ BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'capture', streams_name => 'capture_emp', queue_name => 'strmadmin.streams_queue', include_dml => TRUE, include_ddl => FALSE, inclusion_rule => TRUE); END; / /*
- hr.employees表に対するインスタンス化SCNの設定
-
この例では単一データベースで変更を取得および適用するため、インスタンス化は不要です。ただし、
cpap.example.com
データベースの適用プロセスに対しては、特定のシステム変更番号(SCN)の後にhr.employees
表に対して行われた変更を適用するように指示する必要があります。この例では、
DBMS_FLASHBACK
パッケージのGET_SYSTEM_CHANGE_NUMBER
ファンクションを使用して、データベースの現行のSCNを取得します。このSCNは、DBMS_APPLY_ADM
パッケージのSET_TABLE_INSTANTIATION_SCN
プロシージャを実行するために使用されます。SET_TABLE_INSTANTIATION_SCN
プロシージャを使用すると、適用プロセスで無視される表のLCRと、適用プロセスで適用される表のLCRを制御できます。ソース・データベースからの表に関するLCRのコミットSCNが、接続先データベースでその表のインスタンス化SCN以下であれば、接続先データベースの適用プロセスではLCRが廃棄されます。それ以外の場合は、適用プロセスによってLCRが適用されます。この例では、cpap.example.com
データベースがソース・データベースと接続先データベースの両方になります。適用プロセスは、この手順で取得したSCNの後にコミットされたSCNを持つトランザクションを
hr.employees
表に適用します。注意:
hr.employees
表にはインスタンス化のための準備も必要です。手順「cpap.example.comでの取得プロセスの構成」の取得プロセスの構成で、hr.employees
表に対するDML変更を取得するルールを追加したときに、この準備は自動的に行われています。*/ DECLARE iscn NUMBER; -- Variable to hold instantiation SCN value BEGIN iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER(); DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN( source_object_name => 'hr.employees', source_database_name => 'cpap.example.com', instantiation_scn => iscn); END; / /*
- プロシージャDMLハンドラのハンドラ・プロシージャの作成
-
この手順を実行すると、
emp_dml_handler
プロシージャが作成されます。このプロシージャは、hr.employees
表に対するDELETE
変更のプロシージャDMLハンドラになります。このプロシージャによって、DELETE
コマンド・タイプを含むすべての行LCRがINSERT
の行LCRに変換され、その行LCRを実行することで、変換された行LCRがhr.emp_del
表に挿入されます。*/ CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS lcr SYS.LCR$_ROW_RECORD; rc PLS_INTEGER; command VARCHAR2(30); old_values SYS.LCR$_ROW_LIST; BEGIN -- Access the LCR rc := in_any.GETOBJECT(lcr); -- Get the object command type command := lcr.GET_COMMAND_TYPE(); -- Check for DELETE command on the hr.employees table IF command = 'DELETE' THEN -- Set the command_type in the row LCR to INSERT lcr.SET_COMMAND_TYPE('INSERT'); -- Set the object_name in the row LCR to EMP_DEL lcr.SET_OBJECT_NAME('EMP_DEL'); -- Get the old values in the row LCR old_values := lcr.GET_VALUES('old'); -- Set the old values in the row LCR to the new values in the row LCR lcr.SET_VALUES('new', old_values); -- Set the old values in the row LCR to NULL lcr.SET_VALUES('old', NULL); -- Add a SYSDATE value for the timestamp column lcr.ADD_COLUMN('new', 'TIMESTAMP', ANYDATA.ConvertDate(SYSDATE)); -- Apply the row LCR as an INSERT into the hr.emp_del table lcr.EXECUTE(TRUE); END IF; END; / /*
- hr.employees表に対するプロシージャDMLハンドラの設定
-
hr.employees
表のプロシージャDMLハンドラを手順「プロシージャDMLハンドラのハンドラ・プロシージャの作成」で作成したプロシージャに設定します。表に対して実行可能な各操作(INSERT
、UPDATE
、DELETE
など)でプロシージャDMLハンドラが使用されるように、operation_name
パラメータをDEFAULT
に設定する必要があることに注意してください。*/ BEGIN DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'DEFAULT', error_handler => FALSE, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL); END; / /*
- キューのメッセージ・クライアントの作成
-
再エンキューされたメッセージをデキューするために、アプリケーションで使用できるメッセージ・クライアントを作成します。メッセージをキューに再エンキューするには、メッセージ・クライアントを指定する必要があります。
*/ BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'dequeue', streams_name => 'hr', queue_name => 'strmadmin.streams_queue', include_dml => TRUE, include_ddl => FALSE, inclusion_rule => TRUE); END; / /*
- cpap.example.comでの適用プロセスの構成
-
hr.employees
表に対するDML変更を適用する適用プロセスを作成します。適用プロセスのプロシージャDMLハンドラでは、削除された従業員はemp_del
表に挿入されますが、キュー内の行LCRにはemp_del
表ではなくemployees
表に対する変更が含まれるため、このルールではemployees
表を指定します。ADD_TABLE_RULES
プロシージャを実行して適用プロセスを作成すると、作成したDMLルールの名前がoutパラメータdml_rule_name
に含まれます。このルール名は、SET_ENQUEUE_DESTINATION
プロシージャに渡されます。DBMS_APPLY_ADM
パッケージのSET_ENQUEUE_DESTINATION
プロシージャを実行すると、ADD_TABLE_RULES
で生成されたDMLルールを使用する適用プロセスによって、このルールを満たすメッセージがstreams_queue
にエンキューされるように指定されます。この場合、DMLルールは、hr.employees
表に対するDML変更を含む行LCR用です。必要に応じて、適用プロセス・キュー以外のローカル・キューを指定できます。*/ DECLARE emp_rule_name_dml VARCHAR2(30); emp_rule_name_ddl VARCHAR2(30); BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'apply', streams_name => 'apply_emp', queue_name => 'strmadmin.streams_queue', include_dml => TRUE, include_ddl => FALSE, source_database => 'cpap.example.com', dml_rule_name => emp_rule_name_dml, ddl_rule_name => emp_rule_name_ddl); DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION( rule_name => emp_rule_name_dml, destination_queue_name => 'strmadmin.streams_queue'); END; / /*
- メッセージをデキューするプロシージャの作成
-
この手順で作成する
emp_dq
プロシージャを使用すると、適用プロセスによって再エンキューされたメッセージをデキューできます。手順「cpap.example.comでの適用プロセスの構成」では、SET_ENQUEUE_DESTINATION
プロシージャを使用して、hr.employees
表に対する変更が含まれる行LCRをstreams_queue
にエンキューするように適用プロセスに指示しました。emp_dq
プロシージャを実行すると、キュー内の各行LCRがデキューされ、行LCRのコマンドのタイプ(INSERT
、UPDATE
またはDELETE
のいずれか)が表示されます。コマンド・タイプのみでなく、行LCR内のすべての情報にアクセスして表示できます。関連項目:
LCR内の情報を表示する方法の詳細は、『Oracle Streams概要および管理』を参照してください。
*/ CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS msg ANYDATA; row_lcr SYS.LCR$_ROW_RECORD; num_var pls_integer; more_messages BOOLEAN := TRUE; navigation VARCHAR2(30); BEGIN navigation := 'FIRST MESSAGE'; WHILE (more_messages) LOOP BEGIN DBMS_STREAMS_MESSAGING.DEQUEUE( queue_name => 'strmadmin.streams_queue', streams_name => consumer, payload => msg, navigation => navigation, wait => DBMS_STREAMS_MESSAGING.NO_WAIT); IF msg.GETTYPENAME() = 'SYS.LCR$_ROW_RECORD' THEN num_var := msg.GetObject(row_lcr); DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued'); END IF; navigation := 'NEXT MESSAGE'; COMMIT; EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN navigation := 'NEXT TRANSACTION'; WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN more_messages := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages.'); WHEN OTHERS THEN RAISE; END; END LOOP; END; / /*
- cpap.example.comでの適用プロセスの起動
-
エラーが発生した場合に適用プロセスが無効化されないように、
disable_on_error
パラメータをn
に設定して、cpap.example.com
で適用プロセスを起動します。*/ BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_emp', parameter => 'disable_on_error', value => 'N'); END; / BEGIN DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_emp'); END; / /*
- cpap.example.comでの取得プロセスの起動
-
cpap.example.com
で取得プロセスを起動します。*/ BEGIN DBMS_CAPTURE_ADM.START_CAPTURE( capture_name => 'capture_emp'); END; / /*
- スプール結果のチェック
-
streams_config_capapp.out
スプール・ファイルをチェックして、このスクリプトの完了後にすべてのアクションが正常に終了していることを確認します。*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
4.5 DML変更の実行、結果の問合せ、およびメッセージのデキュー
次の手順を実行して適用プロセスが正しく構成されていることを確認し、hr.employees
表に対するDML変更を実行して、その結果であるhr.emp_del
表への挿入と、streams_queue_table
に再エンキューされたメッセージを問い合せ、適用プロセスによって再エンキューされたメッセージをデキューします。
- ルールのアクション・コンテキストの確認
-
手順「cpap.example.comでの適用プロセスの構成」では、ルールを満たすLCRのエンキュー先となる宛先キューを指定する適用プロセス・ルールを作成しています。この場合、ルールを満たすLCRとは、
hr.employees
表に対する変更を含む行LCRです。次の手順を実行して、ルールが宛先キューを指定していることを確認します。
-
次の問合せを実行し、適用プロセス
apply_emp
で使用されるhr.employees
表に対するDML変更のルール名を判断します。CONNECT strmadmin@cpap.example.com Enter password: password SELECT RULE_OWNER, RULE_NAME FROM DBA_STREAMS_RULES WHERE STREAMS_NAME = 'APPLY_EMP' AND STREAMS_TYPE = 'APPLY' AND SCHEMA_NAME = 'HR' AND OBJECT_NAME = 'EMPLOYEES' AND RULE_TYPE = 'DML' ORDER BY RULE_NAME;
出力は次のようになります。
RULE_OWNER RULE_NAME ------------------------------ ------------------------------ STRMADMIN EMPLOYEES3
-
手順1の問合せで戻されたルールのアクション・コンテキストを表示します。
COLUMN RULE_OWNER HEADING 'Rule Owner' FORMAT A15 COLUMN DESTINATION_QUEUE_NAME HEADING 'Destination Queue' FORMAT A30 SELECT RULE_OWNER, DESTINATION_QUEUE_NAME FROM DBA_APPLY_ENQUEUE WHERE RULE_NAME = 'EMPLOYEES3' ORDER BY DESTINATION_QUEUE_NAME;
手順1で戻されたルール名を
WHERE
句に代入します。出力は次のようになります。Rule Owner Destination Queue --------------- ------------------------------ STRMADMIN "STRMADMIN"."STREAMS_QUEUE"
出力には、適用プロセス・ルールを満たすLCRが
streams_queue
にエンキューされることが示されます。
-
- hr.employeesに対するINSERT、UPDATEおよびDELETEの実行
-
hr.employees
表に次のDML変更を行います。CONNECT hr@cpap.example.com Enter password: password INSERT INTO hr.employees VALUES(207, 'JOHN', 'SMITH', 'JSMITH@EXAMPLE.COM', NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110); COMMIT; UPDATE hr.employees SET salary=5999 WHERE employee_id=207; COMMIT; DELETE FROM hr.employees WHERE employee_id=207; COMMIT;
- hr.emp_del表およびstreams_queue_tableの問合せ
-
前述の手順で行った変更が取得および適用されてから、次の問合せを実行して結果を確認します。
CONNECT strmadmin@cpap.example.com Enter password: password SELECT employee_id, first_name, last_name, timestamp FROM hr.emp_del ORDER BY employee_id; SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE ORDER BY MSG_ID;
最初の問合せを実行すると、
employee_id
が207
の従業員のレコードが表示されます。この従業員は前述の手順で削除されています。2番目の問合せを実行すると、前述の手順で行ったすべての変更の結果として再エンキューされたメッセージが表示され、これらのメッセージのMSG_STATE
がREADY
であることを確認できます。 - プロシージャDMLハンドラによって再エンキューされたメッセージのデキュー
-
emp_dq
プロシージャを使用して、プロシージャDMLハンドラによって再エンキューされたメッセージをデキューします。SET SERVEROUTPUT ON SIZE 100000 EXEC emp_dq('HR');
DML文で変更された各行に対して1行が戻され、各行には変更のコマンド・タイプ(
INSERT
、UPDATE
またはDELETE
)が示されます。メッセージのデキュー後に手順「hr.emp_del表およびstreams_queue_tableの問合せ」のキュー表に問合せを繰り返すと、デキューされたメッセージはコンシュームされているはずです。つまり、これらのメッセージのMSG_STATE
がPROCESSED
であるか、またはメッセージがキューに存在しません。SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE ORDER BY MSG_ID;