Oracle9i Streams Release 2 (9.2) Part Number A96571-01 |
|
This chapter provides instructions for managing logical change records (LCRs) and Streams tags.
This chapter contains these topics:
Each task described in this chapter should be completed by a Streams administrator that has been granted the appropriate privileges, unless specified otherwise.
This section describes managing logical change records (LCRs). Make sure you meet the following requirements when you create or modify an LCR:
command_type
attribute is consistent with the presence or absence of old column values and the presence or absence of new column values.ddl_text
is consistent with the base_table_name
, base_table_owner
, object_type
, object_owner
, object_name
, and command_type
attributes.Use the following LCR constructors to create LCRs:
SYS.LCR$_ROW_RECORD
constructor.SYS.LCR$_DDL_RECORD
constructor. Make sure the DDL text specified in the ddl_text
attribute of each DDL LCR conforms to Oracle SQL syntax.The following example creates a queue in an Oracle database and an apply process associated with the queue. Then, it creates a PL/SQL procedure that constructs a row LCR based on information passed to it and enqueues the row LCR into the queue:
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strm02_queue_table', storage_clause => NULL, queue_name => 'strm02_queue'); END; /
apply_captured
parameter is set to false
when you create the apply process, because the apply process will be applying user-enqueued events, not events captured by a capture process.
BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'strm02_queue', apply_name => 'strm02_apply', apply_captured => false); END; / BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'strm02_apply', parameter => 'disable_on_error', value => 'n'); END; / EXEC DBMS_APPLY_ADM.START_APPLY('strm02_apply');
construct_row_lcr
that constructs a row LCR and then enqueues it into the queue created in Step 1.
CREATE OR REPLACE PROCEDURE construct_row_lcr( source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); row_lcr SYS.LCR$_ROW_RECORD; BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('strmadmin', NULL, NULL); -- Construct the LCR based on information passed to procedure row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue the created row LCR DBMS_AQ.ENQUEUE( queue_name => 'strm02_queue', enqueue_options => eopt, message_properties => mprop, payload => SYS.AnyData.ConvertObject(row_lcr), msgid => enq_msgid); END construct_row_lcr; /
See Also:
Oracle9i Supplied PL/SQL Packages and Types Reference for more information about LCR constructors |
construct_row_lcr
procedure created in Step 2.
hr.regions
table.
DECLARE newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN newunit1 := SYS.LCR$_ROW_UNIT( 'region_id', SYS.AnyData.ConvertNumber(5), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit2 := SYS.LCR$_ROW_UNIT( 'region_name', SYS.AnyData.ConvertVarchar2('Moon'), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2); construct_row_lcr( source_dbname => 'dbs1.net', cmd_type => 'INSERT', obj_owner => 'hr', obj_name => 'regions', old_vals => NULL, new_vals => newvals); END; / COMMIT;
hr.regions
table.
DECLARE oldunit1 SYS.LCR$_ROW_UNIT; oldunit2 SYS.LCR$_ROW_UNIT; oldvals SYS.LCR$_ROW_LIST; newunit1 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN oldunit1 := SYS.LCR$_ROW_UNIT( 'region_id', SYS.AnyData.ConvertNumber(5), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldunit2 := SYS.LCR$_ROW_UNIT( 'region_name', SYS.AnyData.ConvertVarchar2('Moon'), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2); newunit1 := SYS.LCR$_ROW_UNIT( 'region_name', SYS.AnyData.ConvertVarchar2('Mars'), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1); construct_row_lcr( source_dbname => 'dbs1.net', cmd_type => 'UPDATE', obj_owner => 'hr', obj_name => 'regions', old_vals => oldvals, new_vals => newvals); END; / COMMIT;
hr.regions
table.
DECLARE oldunit1 SYS.LCR$_ROW_UNIT; oldunit2 SYS.LCR$_ROW_UNIT; oldvals SYS.LCR$_ROW_LIST; BEGIN oldunit1 := SYS.LCR$_ROW_UNIT( 'region_id', SYS.AnyData.ConvertNumber(5), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldunit2 := SYS.LCR$_ROW_UNIT( 'region_name', SYS.AnyData.ConvertVarchar2('Mars'), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2); construct_row_lcr( source_dbname => 'dbs1.net', cmd_type => 'DELETE', obj_owner => 'hr', obj_name => 'regions', old_vals => oldvals, new_vals => NULL); END; / COMMIT;
The following sections contain information about the requirements you must meet when constructing or processing LOBs and about apply process behavior for LCRs containing LOBs. This section also includes an example that constructs and enqueues LCRs containing LOBs.
If your environment uses LCRs that contain LOB columns, then you must meet the following requirements when you construct these LCRs or process them with an apply handler or a rule-based transformation:
VARCHAR2
or RAW
. A VARCHAR2
is interpreted as a CLOB
, and a RAW
is interpreted as a BLOB
.LOB
WRITE
, LOB
ERASE
, and LOB
TRIM
are the only valid command types for out-of-line LOBs.LOB
WRITE
, LOB
ERASE
, and LOB
TRIM
LCRs, the old_values
collection should be empty or NULL
and new_values
should not be empty.lob_offset
should be a valid value for LOB
WRITE
and LOB
ERASE
LCRs. For all other command types, lob_offset
should be NULL
, under the assumption that LOB chunks for that column will follow.lob_operation_size
should be a valid value for LOB
ERASE
and LOB
TRIM
LCRs. For all other command types, lob_operation_size
should be NULL
.LOB
TRIM
and LOB
ERASE
are valid command types only for an LCR containing a LOB column with lob_information
set to LAST_LOB_CHUNK
.LOB
WRITE
is a valid command type only for an LCR containing a LOB column with lob_information
set to LAST_LOB_CHUNK
or LOB_CHUNK
.lob_information
set to NULL_LOB
, the data portion of the column should be a NULL
of VARCHAR2
type (for a CLOB
) or a NULL
of RAW
type (for a BLOB
). Otherwise, it is interpreted as a non-NULL
inline LOB column.LOB
WRITE
, LOB
ERASE
, and LOB
TRIM
LCR.LOB
ERASE
and a LOB
TRIM
LCR should be a NULL
value encapsulated in a SYS.AnyData
.All validation of these requirements is done by an apply process. If these requirements are not met, then an LCR containing a LOB column cannot be applied by an apply process nor processed by an apply handler. In this case, the LCR is moved to the error queue with the rest of the LCRs in the same transaction.
See Also:
|
An apply process behaves in the following way when it encounters an LCR that contains a LOB:
INSERT
or UPDATE
has a new LOB that contains data and the lob_information
is not DBMS_LCR.LOB_CHUNK
or DBMS_LCR.LAST_LOB_CHUNK
, then the data is applied.INSERT
or UPDATE
has a new LOB that contains no data, and the lob_information
is DBMS_LCR.EMPTY_LOB
, then it is applied as an empty LOB.INSERT
or UPDATE
has a new LOB that contains no data, and the lob_information
is DBMS_LCR.NULL_LOB
or DBMS_LCR.INLINE_LOB
, then it is applied as a NULL
.INSERT
or UPDATE
has a new LOB and the lob_information
is DBMS_LCR.LOB_CHUNK
or DBMS_LCR.LAST_LOB_CHUNK
, then any LOB value is ignored. If the command type is INSERT
, then an empty LOB is inserted into the column under the assumption that LOB chunks will follow. If the command type is UPDATE
, then the column value is ignored under the assumption that LOB chunks will follow.UPDATE
are LOBs whose lob_information
is DBMS_LCR.LOB_CHUNK
or DBMS_LCR.LAST_LOB_CHUNK
, then the update is skipped under the assumption that LOB chunks will follow.UPDATE
or DELETE
, old LOB values are ignored.The following example illustrates creating a PL/SQL procedure for constructing and enqueuing LCRs containing LOBs. This example assumes that you have prepared your database for Streams by completing the necessary actions in Chapter 10, "Configuring a Streams Environment".
/************************* BEGINNING OF SCRIPT ******************************
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL lob_construct.out /*
*/ SET ECHO ON SET FEEDBACK 1 SET NUMWIDTH 10 SET LINESIZE 80 SET TRIMSPOOL ON SET TAB OFF SET PAGESIZE 100 SET SERVEROUTPUT ON CONNECT strmadmin/strmadminpw /*
*/ BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'lobex_queue_table', queue_name => 'lobex_queue'); END; / /*
*/ BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'strmadmin.lobex_queue', apply_name => 'apply_lob' apply_captured => false); END; / BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_lob', parameter => 'disable_on_error', value => 'n'); END; / BEGIN DBMS_APPLY_ADM.START_APPLY( 'apply_lob'); END; / /*
*/ CONNECT sys/change_on_install AS SYSDBA CREATE USER lob_user IDENTIFIED BY Lob_user_pw; GRANT CONNECT,RESOURCE TO lob_user; CONNECT lob_user/lob_user_pw CREATE TABLE with_clob (a NUMBER PRIMARY KEY, c1 CLOB, c2 CLOB, c3 CLOB); CREATE TABLE with_blob (a NUMBER PRIMARY KEY, b BLOB); /*
Granting these privileges enables the Streams administrator to get the LOB length for offset and to perform DML operations on the tables.
*/ GRANT ALL ON with_clob TO strmadmin; GRANT ALL ON with_blob TO strmadmin; COMMIT; /*
*/ CONNECT strmadmin/strmadminpw CREATE OR REPLACE PROCEDURE enq_row_lcr(source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); xr_lcr SYS.LCR$_ROW_RECORD; BEGIN xr_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue a row lcr DBMS_AQ.ENQUEUE( queue_name => 'lobex_queue', enqueue_options => eopt, message_properties => mprop, payload => SYS.AnyData.ConvertObject(xr_lcr), msgid => enq_msgid); END enq_row_lcr; / SHOW ERRORS /*
*/ -- Description of each variable: -- src_dbname : Source database name -- tab_owner : Table owner -- tab_name : Table name -- col_name : Name of the CLOB column -- new_vals : SYS.LCR$_ROW_LIST containing primary key and supplementally -- logged colums -- clob_data : CLOB that contains data to be sent -- offset : Offset from which data should be sent, default is 1 -- lsize : Size of data to be sent, default is 0 -- chunk_size : Size used for creating LOB chunks, default is 2048 CREATE OR REPLACE FUNCTION do_enq_clob(src_dbname VARCHAR2, tab_owner VARCHAR2, tab_name VARCHAR2, col_name VARCHAR2, new_vals SYS.LCR$_ROW_LIST, clob_data CLOB, offset NUMBER default 1, lsize NUMBER default 0, chunk_size NUMBER default 2048) RETURN NUMBER IS lob_offset NUMBER; -- maintain lob offset newunit SYS.LCR$_ROW_UNIT; tnewvals SYS.LCR$_ROW_LIST; lob_flag NUMBER; lob_data VARCHAR2(32767); lob_size NUMBER; unit_pos NUMBER; final_size NUMBER; exit_flg BOOLEAN; c_size NUMBER; i NUMBER; BEGIN lob_size := DBMS_LOB.GETLENGTH(clob_data); unit_pos := new_vals.count + 1; tnewvals := new_vals; c_size := chunk_size; i := 0; -- validate parameters IF (unit_pos <= 1) THEN DBMS_OUTPUT.PUT_LINE('Invalid new_vals list'); RETURN 1; END IF; IF (c_size < 1) THEN DBMS_OUTPUT.PUT_LINE('Invalid LOB chunk size'); RETURN 1; END IF; IF (lsize < 0 OR lsize > lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid LOB size'); RETURN 1; END IF; IF (offset < 1 OR offset >= lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid lob offset'); RETURN 1; ELSE lob_offset := offset; END IF; -- calculate final size IF (lsize = 0) THEN final_size := lob_size; ELSE final_size := lob_offset + lsize; END IF; -- The following output lines are for debugging purposes only. -- DBMS_OUTPUT.PUT_LINE('Final size: ' || final_size); -- DBMS_OUTPUT.PUT_LINE('Lob size: ' || lob_size); IF (final_size < 1 OR final_size > lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid lob size'); RETURN 1; END IF; -- expand new_vals list for LOB column tnewvals.extend(); exit_flg := FALSE; -- Enqueue all LOB chunks LOOP -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('About to write chunk#' || i); i := i + 1; -- check if last LOB chunk IF ((lob_offset + c_size) < final_size) THEN lob_flag := DBMS_LCR.LOB_CHUNK; ELSE lob_flag := DBMS_LCR.LAST_LOB_CHUNK; exit_flg := TRUE; -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('Last LOB chunk'); END IF; -- The following output lines are for debugging purposes only. DBMS_OUTPUT.PUT_LINE('lob offset: ' || lob_offset); DBMS_OUTPUT.PUT_LINE('Chunk size: ' || to_char(c_size)); lob_data := DBMS_LOB.SUBSTR(clob_data, c_size, lob_offset); -- create row unit for clob newunit := SYS.LCR$_ROW_UNIT(col_name, SYS.AnyData.ConvertVarChar2(lob_data), lob_flag, lob_offset, NULL); -- insert new LCR$_ROW_UNIT tnewvals(unit_pos) := newunit; -- enqueue lcr enq_row_lcr( source_dbname => src_dbname, cmd_type => 'LOB WRITE', obj_owner => tab_owner, obj_name => tab_name, old_vals => NULL, new_vals => tnewvals); -- calculate next chunk size lob_offset := lob_offset + c_size; IF ((final_size - lob_offset) < c_size) THEN c_size := final_size - lob_offset + 1; END IF; -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('Next chunk size : ' || TO_CHAR(c_size)); IF (c_size < 1) THEN exit_flg := TRUE; END IF; EXIT WHEN exit_flg; END LOOP; RETURN 0; END do_enq_clob; / SHOW ERRORS /*
*/ SET SERVEROUTPUT ON DECLARE c1_data CLOB; c2_data CLOB; c3_data CLOB; newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newunit4 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; big_data VARCHAR(22000); n NUMBER; BEGIN -- Create primary key for LCR$_ROW_UNIT newunit1 := SYS.LCR$_ROW_UNIT('A', Sys.AnyData.ConvertNumber(3), NULL, NULL, NULL); -- Create empty CLOBs newunit2 := sys.lcr$_row_unit('C1', Sys.AnyData.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newunit3 := SYS.LCR$_ROW_UNIT('C2', Sys.AnyData.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newunit4 := SYS.LCR$_ROW_UNIT('C3', Sys.AnyData.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4); -- Perform an insert enq_row_lcr( source_dbname => 'MYDB.NET', cmd_type => 'INSERT', obj_owner => 'LOB_USER', obj_name => 'WITH_CLOB', old_vals => NULL, new_vals => newvals); -- construct clobs big_data := RPAD('Hello World', 1000, '_'); big_data := big_data || '#'; big_data := big_data || big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c1_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c1_data, amount => length(big_data), buffer => big_data); big_data := RPAD('1234567890#', 1000, '_'); big_data := big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c2_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c2_data, amount => length(big_data), buffer => big_data); big_data := RPAD('ASDFGHJKLQW', 2000, '_'); big_data := big_data || '#'; big_data := big_data || big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c3_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c3_data, amount => length(big_data), buffer => big_data); -- pk info newunit1 := SYS.LCR$_ROW_UNIT('A', SYS.AnyData.ConvertNumber(3), NULL, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1); -- write c1 clob n := do_enq_clob( src_dbname => 'MYDB.NET', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C1', new_vals => newvals, clob_data => c1_data, offset => 1, chunk_size => 1024); DBMS_OUTPUT.PUT_LINE('n=' || n); -- write c2 clob newvals := SYS.LCR$_ROW_LIST(newunit1); n := do_enq_clob( src_dbname => 'MYDB.NET', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C2', new_vals => newvals, clob_data => c2_data, offset => 1, chunk_size => 2000); DBMS_OUTPUT.PUT_LINE('n=' || n); -- write c3 clob newvals := SYS.LCR$_ROW_LIST(newunit1); n := do_enq_clob(src_dbname=>'MYDB.NET', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C3', new_vals => newvals, clob_data => c3_data, offset => 1, chunk_size => 500); DBMS_OUTPUT.PUT_LINE('n=' || n); COMMIT; END; / /*
Check the lob_construct.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
After you run the script, you can check the lob_user.with_clob
table to list the rows applied by the apply process. The DBMS_LOCK.SLEEP
statement is used to give the apply process time to apply the enqueued rows.
CONNECT lob_user/lob_user_pw EXECUTE DBMS_LOCK.SLEEP(10); SELECT a, c1, c2, c3 FROM with_clob ORDER BY a; SELECT a, LENGTH(c1), LENGTH(c2), LENGTH(c3) FROM with_clob ORDER BY a;
You can set or get the value of the tags generated by the current session or by an apply process. The following sections describe how to set and get tag values.
This section contains instructions for setting and getting the tag for the current session.
You can set the tag for all redo entries generated by the current session using the SET_TAG
procedure in the DBMS_STREAMS
package. For example, to set the tag to the hexadecimal value of '1D'
in the current session, run the following procedure:
BEGIN DBMS_STREAMS.SET_TAG( tag => HEXTORAW('1D')); END;
/
After running this procedure, each redo entry generated by DML or DDL statements in the current session will have a tag value of 1D
. Running this procedure affects only the current session.
You can get the tag for all redo entries generated by the current session using the GET_TAG
procedure in the DBMS_STREAMS
package. For example, to get the hexadecimal value of the tags generated in the redo entries for the current session, run the following procedure:
SET SERVEROUTPUT ON DECLARE raw_tag RAW(2048); BEGIN raw_tag := DBMS_STREAMS.GET_TAG(); DBMS_OUTPUT.PUT_LINE('Tag Value = ' || RAWTOHEX(raw_tag)); END; /
You can also display the tag value for the current session by querying the DUAL
view:
SELECT DBMS_STREAMS.GET_TAG FROM DUAL;
This section contains instructions for setting and removing the tag for an apply process.
See Also:
|
An apply process generates redo entries when it applies changes to a database or invokes handlers. You can set the default tag for all redo entries generated by an apply process when you create the apply process using the CREATE_APPLY
procedure in the DBMS_APPLY_ADM
package, or when you alter an existing apply process using the ALTER_APPLY
procedure in the DBMS_APPLY_ADM
package. In both of these procedures, set the apply_tag
parameter to the value you want to specify for the tags generated by the apply process.
For example, to set the value of the tags generated in the redo log by an existing apply process named strm01_apply
to the hexadecimal value of '7'
, run the following procedure:
BEGIN DBMS_APPLY_ADM.ALTER_APPLY( apply_name => 'strm01_apply', apply_tag => HEXTORAW('7')); END;
/
After running this procedure, each redo entry generated by the apply process will have a tag value of 7
.
You remove the apply tag for an apply process by setting the remove_apply_tag
parameter to true
in the ALTER_APPLY
procedure in the DBMS_APPLY_ADM
package. Removing the apply tag means that each redo entry generated by the apply process has a NULL
tag. For example, the following procedure removes the apply tag from an apply process named strm02_apply
.
BEGIN DBMS_APPLY_ADM.ALTER_APPLY( apply_name => 'strm02_apply', remove_apply_tag => true); END; /
|
Copyright © 2002 Oracle Corporation. All Rights Reserved. |
|