5 Logical Change Records with LOBs Example

This chapter illustrates an example that creates a PL/SQL procedure for constructing and enqueuing LCRs that contain LOBs.

This chapter contains this topic:

5.1 Example Script for Constructing and Enqueuing LCRs Containing LOBs

  1. "Show Output and Spool Results"
  2. "Grant the Oracle Streams Administrator EXECUTE Privilege on DBMS_STREAMS_MESSAGING"
  3. "Connect as the Oracle Streams Administrator"
  4. "Create an ANYDATA Queue"
  5. "Create and Start an Apply Process"
  6. "Create a Schema with Tables Containing LOB Columns"
  7. "Grant the Oracle Streams Administrator Necessary Privileges on the Tables"
  8. "Create a PL/SQL Procedure to Enqueue LCRs Containing LOBs"
  9. "Create the do_enq_clob Function to Enqueue CLOB Data"
  10. "Enqueue CLOB Data Using the do_enq_clob Function"
  11. "Check the Spool Results"

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line after this note to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to all of the databases in the environment.

/************************* BEGINNING OF SCRIPT ******************************
Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL lob_construct.out

/*
Grant the Oracle Streams Administrator EXECUTE Privilege on DBMS_STREAMS_MESSAGING

Explicit EXECUTE privilege on the package is required because a procedure in the package is called in within a PL/SQL procedure in Step "Create a PL/SQL Procedure to Enqueue LCRs Containing LOBs".

*/

CONNECT / AS SYSDBA;

GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO strmadmin;

/*
Connect as the Oracle Streams Administrator
*/
SET ECHO ON
SET FEEDBACK 1
SET NUMWIDTH 10
SET LINESIZE 80
SET TRIMSPOOL ON
SET TAB OFF
SET PAGESIZE 100
SET SERVEROUTPUT ON SIZE 100000

CONNECT strmadmin

/*
Create an ANYDATA Queue
*/
BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE( 
    queue_table => 'lobex_queue_table', 
    queue_name  => 'lobex_queue');
END;
/

/*
Create and Start an Apply Process
*/
BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    queue_name      => 'strmadmin.lobex_queue',
    apply_name      => 'apply_lob',
    apply_captured  => FALSE);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name => 'apply_lob', 
    parameter  => 'disable_on_error',
    value      => 'N');
END;
/

BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    'apply_lob');
END;
/

/*
Create a Schema with Tables Containing LOB Columns
*/
CONNECT system

CREATE TABLESPACE lob_user_tbs DATAFILE 'lob_user_tbs.dbf' 
  SIZE 5M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

ACCEPT password PROMPT 'Enter password for user: ' HIDE

CREATE USER lob_user
IDENTIFIED BY &password
  DEFAULT TABLESPACE lob_user_tbs
  QUOTA UNLIMITED ON lob_user_tbs;

GRANT ALTER SESSION, CREATE CLUSTER, CREATE DATABASE LINK, CREATE SEQUENCE,
  CREATE SESSION, CREATE SYNONYM, CREATE TABLE, CREATE VIEW, CREATE INDEXTYPE, 
  CREATE OPERATOR, CREATE PROCEDURE, CREATE TRIGGER, CREATE TYPE
TO lob_user;

CONNECT lob_user/lob_user_pw

CREATE TABLE with_clob (a  NUMBER PRIMARY KEY,
                        c1 CLOB,
                        c2 CLOB,
                        c3 CLOB);

CREATE TABLE with_blob (a NUMBER PRIMARY KEY,
                        b BLOB);

/*
Grant the Oracle Streams Administrator Necessary Privileges on the Tables

Granting these privileges enables the Oracle Streams administrator to get the LOB length for offset and to perform DML operations on the tables.

*/

GRANT ALL ON with_clob TO strmadmin;
GRANT ALL ON with_blob TO strmadmin;
COMMIT;

/*
Create a PL/SQL Procedure to Enqueue LCRs Containing LOBs
*/
CONNECT strmadmin

CREATE OR REPLACE PROCEDURE enq_row_lcr(source_dbname  VARCHAR2,
                                            cmd_type       VARCHAR2,
                                            obj_owner      VARCHAR2,
                                            obj_name       VARCHAR2,
                                            old_vals       SYS.LCR$_ROW_LIST,
                                            new_vals       SYS.LCR$_ROW_LIST) AS
  xr_lcr         SYS.LCR$_ROW_RECORD;
BEGIN
  xr_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
              source_database_name => source_dbname,
              command_type         => cmd_type,
              object_owner         => obj_owner,
              object_name          => obj_name,
              old_values           => old_vals,
              new_values           => new_vals);
  -- Enqueue a row lcr
  DBMS_STREAMS_MESSAGING.ENQUEUE(
        queue_name         => 'lobex_queue', 
        payload            => ANYDATA.ConvertObject(xr_lcr));
END enq_row_lcr;
/
SHOW ERRORS

/*
Create the do_enq_clob Function to Enqueue CLOB Data
*/
-- Description of each variable:
-- src_dbname  : Source database name
-- tab_owner   : Table owner
-- tab_name    : Table name
-- col_name    : Name of the CLOB column
-- new_vals    : SYS.LCR$_ROW_LIST containing primary key and supplementally  
--               logged colums
-- clob_data   : CLOB that contains data to be sent
-- offset      : Offset from which data should be sent, default is 1
-- lsize       : Size of data to be sent, default is 0
-- chunk_size  : Size used for creating LOB chunks, default is 2048

CREATE OR REPLACE FUNCTION do_enq_clob(src_dbname     VARCHAR2,
                                       tab_owner      VARCHAR2,
                                       tab_name       VARCHAR2,
                                       col_name       VARCHAR2,
                                       new_vals       SYS.LCR$_ROW_LIST,
                                       clob_data      CLOB,
                                       offset         NUMBER default 1,
                                       lsize          NUMBER default 0,
                                       chunk_size     NUMBER default 2048) 
RETURN NUMBER IS
  lob_offset NUMBER; -- maintain lob offset
  newunit    SYS.LCR$_ROW_UNIT;
  tnewvals   SYS.LCR$_ROW_LIST;
  lob_flag   NUMBER;
  lob_data   VARCHAR2(32767);
  lob_size   NUMBER;
  unit_pos   NUMBER;
  final_size NUMBER;
  exit_flg   BOOLEAN;
  c_size     NUMBER;
  i          NUMBER;
BEGIN
  lob_size := DBMS_LOB.GETLENGTH(clob_data);
  unit_pos := new_vals.count + 1;
  tnewvals := new_vals;
  c_size   := chunk_size;
  i := 0;
  -- validate parameters
  IF (unit_pos <= 1) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid new_vals list');
    RETURN 1;
  END IF;

  IF (c_size < 1) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid LOB chunk size');
    RETURN 1;
  END IF;

  IF (lsize < 0 OR lsize > lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid LOB size');
    RETURN 1;
  END IF;

  IF (offset < 1 OR offset >= lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid lob offset');
    RETURN 1;
  ELSE
    lob_offset := offset;
  END IF;

  -- calculate final size
  IF (lsize = 0) THEN
    final_size := lob_size;
  ELSE
    final_size := lob_offset + lsize;
  END IF;

  --  The following output lines are for debugging purposes only.
  -- DBMS_OUTPUT.PUT_LINE('Final size: ' || final_size);
  -- DBMS_OUTPUT.PUT_LINE('Lob size: ' || lob_size);

  IF (final_size < 1 OR final_size > lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid lob size');
    RETURN 1;
  END IF;

  -- expand new_vals list for LOB column
  tnewvals.extend();

  exit_flg := FALSE;

  -- Enqueue all LOB chunks
  LOOP
    --  The following output line is for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('About to write chunk#' || i);
    i := i + 1;
 
    -- check if last LOB chunk
    IF ((lob_offset + c_size) < final_size) THEN
      lob_flag := DBMS_LCR.LOB_CHUNK;
    ELSE
      lob_flag := DBMS_LCR.LAST_LOB_CHUNK;
      exit_flg := TRUE;
      --  The following output line is for debugging purposes only.
      DBMS_OUTPUT.PUT_LINE('Last LOB chunk');
    END IF;

    --  The following output lines are for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('lob offset: ' || lob_offset);
    DBMS_OUTPUT.PUT_LINE('Chunk size: ' || to_char(c_size));

    lob_data := DBMS_LOB.SUBSTR(clob_data, c_size, lob_offset); 

    -- create row unit for clob
    newunit := SYS.LCR$_ROW_UNIT(col_name,
                                 ANYDATA.ConvertVarChar2(lob_data), 
                                 lob_flag, 
                                 lob_offset, 
                                 NULL);

    -- insert new LCR$_ROW_UNIT
    tnewvals(unit_pos) := newunit;  

    -- enqueue lcr
    enq_row_lcr(
          source_dbname => src_dbname,
          cmd_type      => 'LOB WRITE',
          obj_owner     => tab_owner,
          obj_name      => tab_name,
          old_vals      => NULL,
          new_vals      => tnewvals);

    -- calculate next chunk size 
    lob_offset := lob_offset + c_size;
    
    IF ((final_size - lob_offset) < c_size) THEN
      c_size := final_size - lob_offset + 1;
    END IF;

    --  The following output line is for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('Next chunk size : ' || TO_CHAR(c_size));

    IF (c_size < 1) THEN
      exit_flg := TRUE;
    END IF;

    EXIT WHEN exit_flg;

  END LOOP;

  RETURN 0;
END do_enq_clob;
/

SHOW ERRORS

/*
Enqueue CLOB Data Using the do_enq_clob Function

The DBMS_OUTPUT lines in the following example can be used for debugging purposes if necessary. If they are not needed, then they can be commented out or deleted.

*/

SET SERVEROUTPUT ON SIZE 100000
DECLARE
  c1_data CLOB;
  c2_data CLOB;
  c3_data CLOB;
  newunit1 SYS.LCR$_ROW_UNIT;
  newunit2 SYS.LCR$_ROW_UNIT;
  newunit3 SYS.LCR$_ROW_UNIT;
  newunit4 SYS.LCR$_ROW_UNIT;
  newvals  SYS.LCR$_ROW_LIST;
  big_data VARCHAR(22000);
  n        NUMBER;
BEGIN
  -- Create primary key for LCR$_ROW_UNIT
  newunit1 := SYS.LCR$_ROW_UNIT('A',
                                ANYDATA.ConvertNumber(3), 
                                NULL, 
                                NULL, 
                                NULL);
  -- Create empty CLOBs
  newunit2 := sys.lcr$_row_unit('C1',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newunit3 := SYS.LCR$_ROW_UNIT('C2',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newunit4 := SYS.LCR$_ROW_UNIT('C3',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4);

  -- Perform an insert
  enq_row_lcr(
    source_dbname => 'MYDB.EXAMPLE.COM',
    cmd_type      => 'INSERT',
    obj_owner     => 'LOB_USER',
    obj_name      => 'WITH_CLOB',
    old_vals      => NULL,
    new_vals      => newvals);

  -- construct clobs
  big_data := RPAD('Hello World', 1000, '_');
  big_data := big_data || '#';
  big_data := big_data || big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c1_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c1_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  big_data := RPAD('1234567890#', 1000, '_');
  big_data := big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c2_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c2_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  big_data := RPAD('ASDFGHJKLQW', 2000, '_');
  big_data := big_data || '#';
  big_data := big_data || big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c3_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c3_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  -- pk info
  newunit1 := SYS.LCR$_ROW_UNIT('A',
                                ANYDATA.ConvertNumber(3), 
                                NULL, 
                                NULL, 
                                NULL);
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 

  -- write c1 clob
  n := do_enq_clob(
         src_dbname => 'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C1',
         new_vals   => newvals,
         clob_data  => c1_data,
         offset     => 1,
         chunk_size => 1024);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  -- write c2 clob
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  n := do_enq_clob(
         src_dbname => 'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C2',
         new_vals   => newvals,
         clob_data  => c2_data,
         offset     => 1,
         chunk_size => 2000);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  -- write c3 clob
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  n := do_enq_clob(src_dbname=>'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C3',
         new_vals   => newvals,
         clob_data  => c3_data,
         offset     => 1,
         chunk_size => 500);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  COMMIT;

END;
/

/*
Check the Spool Results

Check the lob_construct.out spool file to ensure that all actions completed successfully after this script completes.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

After you run the script, you can check the lob_user.with_clob table to list the rows applied by the apply process. The DBMS_LOCK.SLEEP statement is used to give the apply process time to apply the enqueued rows.

CONNECT lob_user/lob_user_pw

EXECUTE DBMS_LOCK.SLEEP(10);

SELECT a, c1, c2, c3 FROM with_clob ORDER BY a;

SELECT a, LENGTH(c1), LENGTH(c2), LENGTH(c3) FROM with_clob ORDER BY a;