ヘッダーをスキップ
Oracle® Streams拡張例
11gリリース2 (11.2)
B61356-02
  目次へ移動
目次
索引へ移動
索引

前
 
次
 

5 LOBを含む論理変更レコードの例

この章では、LOBを含むLCRを構成およびエンキューするPL/SQLプロシージャを作成する例を説明します。

この章の内容は次のとおりです。

LOBを含むLCRを構成およびエンキューするスクリプトの例

  1. 出力およびスプール結果の表示

  2. Oracle Streams管理者へのDBMS_STREAMS_MESSAGINGに対するEXECUTE権限の付与

  3. Oracle Streams管理者としての接続

  4. ANYDATAキューの作成

  5. 適用プロセスの作成および起動

  6. LOB列を含む表が存在するスキーマの作成

  7. Oracle Streams管理者への表に対する必要な権限の付与

  8. LOBを含むLCRをエンキューするPL/SQLプロシージャの作成

  9. CLOBデータをエンキューするdo_enq_clobファンクションの作成

  10. do_enq_clobファンクションを使用したCLOBデータのエンキュー

  11. スプール結果のチェック


注意:

このマニュアルをオンラインで参照している場合、この注意の後にある「BEGINNING OF SCRIPT」の行から次の「END OF SCRIPT」の行までのテキストをテキスト・エディタにコピーし、テキストを編集してご使用の環境用のスクリプトを作成できます。環境内のすべてのデータベースに接続可能なコンピュータで、SQL*Plusを使用してスクリプトを実行します。

/************************* BEGINNING OF SCRIPT ******************************
手順1: 出力およびスプール結果の表示

SET ECHO ONを実行し、スクリプトのスプール・ファイルを指定します。このスクリプトの実行後に、スプール・ファイルにエラーがないかをチェックします。

*/

SET ECHO ON
SPOOL lob_construct.out

/*
手順2: Oracle Streams管理者へのDBMS_STREAMS_MESSAGINGに対するEXECUTE権限の付与

手順8のPL/SQLプロシージャ内でこのパッケージのプロシージャをコールするため、このパッケージに対する明示的なEXECUTE権限が必要です。

*/

CONNECT / AS SYSDBA;

GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO strmadmin;

/*
手順3: Oracle Streams管理者としての接続
*/
SET ECHO ON
SET FEEDBACK 1
SET NUMWIDTH 10
SET LINESIZE 80
SET TRIMSPOOL ON
SET TAB OFF
SET PAGESIZE 100
SET SERVEROUTPUT ON SIZE 100000

CONNECT strmadmin

/*
手順4: ANYDATAキューの作成
*/
BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE( 
    queue_table => 'lobex_queue_table', 
    queue_name  => 'lobex_queue');
END;
/

/*
手順5: 適用プロセスの作成および起動
*/
BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    queue_name      => 'strmadmin.lobex_queue',
    apply_name      => 'apply_lob',
    apply_captured  => FALSE);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name => 'apply_lob', 
    parameter  => 'disable_on_error',
    value      => 'N');
END;
/

BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    'apply_lob');
END;
/

/*
手順6: LOB列を含む表が存在するスキーマの作成
*/
CONNECT system

CREATE TABLESPACE lob_user_tbs DATAFILE 'lob_user_tbs.dbf' 
  SIZE 5M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

ACCEPT password PROMPT 'Enter password for user: ' HIDE

CREATE USER lob_user
IDENTIFIED BY &password
  DEFAULT TABLESPACE lob_user_tbs
  QUOTA UNLIMITED ON lob_user_tbs;

GRANT ALTER SESSION, CREATE CLUSTER, CREATE DATABASE LINK, CREATE SEQUENCE,
  CREATE SESSION, CREATE SYNONYM, CREATE TABLE, CREATE VIEW, CREATE INDEXTYPE, 
  CREATE OPERATOR, CREATE PROCEDURE, CREATE TRIGGER, CREATE TYPE
TO lob_user;

CONNECT lob_user/lob_user_pw

CREATE TABLE with_clob (a  NUMBER PRIMARY KEY,
                        c1 CLOB,
                        c2 CLOB,
                        c3 CLOB);

CREATE TABLE with_blob (a NUMBER PRIMARY KEY,
                        b BLOB);

/*
手順7: Oracle Streams管理者への表に対する必要な権限の付与

次の権限を付与すると、Oracle Streams管理者は、LOBオフセット長を取得し、表にDML操作を実行できるようになります。

*/

GRANT ALL ON with_clob TO strmadmin;
GRANT ALL ON with_blob TO strmadmin;
COMMIT;

/*
手順8: LOBを含むLCRをエンキューするPL/SQLプロシージャの作成
*/
CONNECT strmadmin

CREATE OR REPLACE PROCEDURE enq_row_lcr(source_dbname  VARCHAR2,
                                            cmd_type       VARCHAR2,
                                            obj_owner      VARCHAR2,
                                            obj_name       VARCHAR2,
                                            old_vals       SYS.LCR$_ROW_LIST,
                                            new_vals       SYS.LCR$_ROW_LIST) AS
  xr_lcr         SYS.LCR$_ROW_RECORD;
BEGIN
  xr_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
              source_database_name => source_dbname,
              command_type         => cmd_type,
              object_owner         => obj_owner,
              object_name          => obj_name,
              old_values           => old_vals,
              new_values           => new_vals);
  -- Enqueue a row lcr
  DBMS_STREAMS_MESSAGING.ENQUEUE(
        queue_name         => 'lobex_queue', 
        payload            => ANYDATA.ConvertObject(xr_lcr));
END enq_row_lcr;
/
SHOW ERRORS

/*
手順9: CLOBデータをエンキューするdo_enq_clobファンクションの作成
*/
-- Description of each variable:
-- src_dbname  : Source database name
-- tab_owner   : Table owner
-- tab_name    : Table name
-- col_name    : Name of the CLOB column
-- new_vals    : SYS.LCR$_ROW_LIST containing primary key and supplementally  
--               logged colums
-- clob_data   : CLOB that contains data to be sent
-- offset      : Offset from which data should be sent, default is 1
-- lsize       : Size of data to be sent, default is 0
-- chunk_size  : Size used for creating LOB chunks, default is 2048

CREATE OR REPLACE FUNCTION do_enq_clob(src_dbname     VARCHAR2,
                                       tab_owner      VARCHAR2,
                                       tab_name       VARCHAR2,
                                       col_name       VARCHAR2,
                                       new_vals       SYS.LCR$_ROW_LIST,
                                       clob_data      CLOB,
                                       offset         NUMBER default 1,
                                       lsize          NUMBER default 0,
                                       chunk_size     NUMBER default 2048) 
RETURN NUMBER IS
  lob_offset NUMBER; -- maintain lob offset
  newunit    SYS.LCR$_ROW_UNIT;
  tnewvals   SYS.LCR$_ROW_LIST;
  lob_flag   NUMBER;
  lob_data   VARCHAR2(32767);
  lob_size   NUMBER;
  unit_pos   NUMBER;
  final_size NUMBER;
  exit_flg   BOOLEAN;
  c_size     NUMBER;
  i          NUMBER;
BEGIN
  lob_size := DBMS_LOB.GETLENGTH(clob_data);
  unit_pos := new_vals.count + 1;
  tnewvals := new_vals;
  c_size   := chunk_size;
  i := 0;
  -- validate parameters
  IF (unit_pos <= 1) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid new_vals list');
    RETURN 1;
  END IF;

  IF (c_size < 1) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid LOB chunk size');
    RETURN 1;
  END IF;

  IF (lsize < 0 OR lsize > lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid LOB size');
    RETURN 1;
  END IF;

  IF (offset < 1 OR offset >= lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid lob offset');
    RETURN 1;
  ELSE
    lob_offset := offset;
  END IF;

  -- calculate final size
  IF (lsize = 0) THEN
    final_size := lob_size;
  ELSE
    final_size := lob_offset + lsize;
  END IF;

  --  The following output lines are for debugging purposes only.
  -- DBMS_OUTPUT.PUT_LINE('Final size: ' || final_size);
  -- DBMS_OUTPUT.PUT_LINE('Lob size: ' || lob_size);

  IF (final_size < 1 OR final_size > lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid lob size');
    RETURN 1;
  END IF;

  -- expand new_vals list for LOB column
  tnewvals.extend();

  exit_flg := FALSE;

  -- Enqueue all LOB chunks
  LOOP
    --  The following output line is for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('About to write chunk#' || i);
    i := i + 1;
 
    -- check if last LOB chunk
    IF ((lob_offset + c_size) < final_size) THEN
      lob_flag := DBMS_LCR.LOB_CHUNK;
    ELSE
      lob_flag := DBMS_LCR.LAST_LOB_CHUNK;
      exit_flg := TRUE;
      --  The following output line is for debugging purposes only.
      DBMS_OUTPUT.PUT_LINE('Last LOB chunk');
    END IF;

    --  The following output lines are for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('lob offset: ' || lob_offset);
    DBMS_OUTPUT.PUT_LINE('Chunk size: ' || to_char(c_size));

    lob_data := DBMS_LOB.SUBSTR(clob_data, c_size, lob_offset); 

    -- create row unit for clob
    newunit := SYS.LCR$_ROW_UNIT(col_name,
                                 ANYDATA.ConvertVarChar2(lob_data), 
                                 lob_flag, 
                                 lob_offset, 
                                 NULL);

    -- insert new LCR$_ROW_UNIT
    tnewvals(unit_pos) := newunit;  

    -- enqueue lcr
    enq_row_lcr(
          source_dbname => src_dbname,
          cmd_type      => 'LOB WRITE',
          obj_owner     => tab_owner,
          obj_name      => tab_name,
          old_vals      => NULL,
          new_vals      => tnewvals);

    -- calculate next chunk size 
    lob_offset := lob_offset + c_size;
    
    IF ((final_size - lob_offset) < c_size) THEN
      c_size := final_size - lob_offset + 1;
    END IF;

    --  The following output line is for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('Next chunk size : ' || TO_CHAR(c_size));

    IF (c_size < 1) THEN
      exit_flg := TRUE;
    END IF;

    EXIT WHEN exit_flg;

  END LOOP;

  RETURN 0;
END do_enq_clob;
/

SHOW ERRORS

/*
手順10: do_enq_clobファンクションを使用したCLOBデータのエンキュー

次の例に示すDBMS_OUTPUT行は、必要に応じて、デバッグのために使用できます。この行が不要な場合は、コメント・アウトまたは削除できます。

*/

SET SERVEROUTPUT ON SIZE 100000
DECLARE
  c1_data CLOB;
  c2_data CLOB;
  c3_data CLOB;
  newunit1 SYS.LCR$_ROW_UNIT;
  newunit2 SYS.LCR$_ROW_UNIT;
  newunit3 SYS.LCR$_ROW_UNIT;
  newunit4 SYS.LCR$_ROW_UNIT;
  newvals  SYS.LCR$_ROW_LIST;
  big_data VARCHAR(22000);
  n        NUMBER;
BEGIN
  -- Create primary key for LCR$_ROW_UNIT
  newunit1 := SYS.LCR$_ROW_UNIT('A',
                                ANYDATA.ConvertNumber(3), 
                                NULL, 
                                NULL, 
                                NULL);
  -- Create empty CLOBs
  newunit2 := sys.lcr$_row_unit('C1',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newunit3 := SYS.LCR$_ROW_UNIT('C2',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newunit4 := SYS.LCR$_ROW_UNIT('C3',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4);

  -- Perform an insert
  enq_row_lcr(
    source_dbname => 'MYDB.EXAMPLE.COM',
    cmd_type      => 'INSERT',
    obj_owner     => 'LOB_USER',
    obj_name      => 'WITH_CLOB',
    old_vals      => NULL,
    new_vals      => newvals);

  -- construct clobs
  big_data := RPAD('Hello World', 1000, '_');
  big_data := big_data || '#';
  big_data := big_data || big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c1_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c1_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  big_data := RPAD('1234567890#', 1000, '_');
  big_data := big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c2_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c2_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  big_data := RPAD('ASDFGHJKLQW', 2000, '_');
  big_data := big_data || '#';
  big_data := big_data || big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c3_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c3_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  -- pk info
  newunit1 := SYS.LCR$_ROW_UNIT('A',
                                ANYDATA.ConvertNumber(3), 
                                NULL, 
                                NULL, 
                                NULL);
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 

  -- write c1 clob
  n := do_enq_clob(
         src_dbname => 'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C1',
         new_vals   => newvals,
         clob_data  => c1_data,
         offset     => 1,
         chunk_size => 1024);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  -- write c2 clob
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  n := do_enq_clob(
         src_dbname => 'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C2',
         new_vals   => newvals,
         clob_data  => c2_data,
         offset     => 1,
         chunk_size => 2000);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  -- write c3 clob
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  n := do_enq_clob(src_dbname=>'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C3',
         new_vals   => newvals,
         clob_data  => c3_data,
         offset     => 1,
         chunk_size => 500);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  COMMIT;

END;
/

/*
手順11: スプール結果のチェック

lob_construct.outスプール・ファイルをチェックして、このスクリプトの完了後にすべてのアクションが正常に終了していることを確認します。

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

スクリプトの実行後に、lob_user.with_clob表を問い合せて、適用プロセスによって適用された行を表示できます。DBMS_LOCK.SLEEP文を使用して、エンキューされた行を適用プロセスが適用するための時間のロックを指定します。

CONNECT lob_user/lob_user_pw

EXECUTE DBMS_LOCK.SLEEP(10);

SELECT a, c1, c2, c3 FROM with_clob ORDER BY a;

SELECT a, LENGTH(c1), LENGTH(c2), LENGTH(c3) FROM with_clob ORDER BY a;