Skip Headers

Oracle9i Streams
Release 2 (9.2)

Part Number A96571-01
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback

Go to previous page Go to next page

12
Managing Staging and Propagation

This chapter provides instructions for managing Streams queues, propagation jobs, and messaging environments.

This chapter contains these topics:

Each task described in this section should be completed by a Streams administrator that has been granted the appropriate privileges, unless specified otherwise.

See Also:

Managing Streams Queues

A Streams queue stages events whose payloads are of SYS.AnyData type. Therefore, a Streams queue can stage an event with payload of nearly any type, if the payload is wrapped in a SYS.AnyData wrapper. Each Streams capture process and apply process is associated with one Streams queue, and each Streams propagation job is associated with one Streams source queue and one Streams destination queue.

This section provides instructions for completing the following tasks related to Streams queues:

Creating a Streams Queue

You use the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package to create a Streams queue. This procedure enables you to specify the following for the Streams queue it creates:

This procedure creates a queue that is both a secure queue and a transactional queue and starts the newly created queue.

For example, to create a Streams queue named strm01_queue with a queue table named strm01_queue_table and grant the hr user the privileges necessary to enqueue events into and dequeue events from the queue, run the following procedure:

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'strm01_queue_table',
    queue_name   => 'strm01_queue',
    queue_user   => 'hr');
END;
/

You can also use procedures in the DBMS_AQADM package to create a SYS.AnyData queue.

See Also:

Enabling a User to Perform Operations on a Secure Queue

For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. If you use the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package to create the secure queue, then the queue owner and the user specified by the queue_user parameter are configured as secure users of the queue automatically. If you want to enable other users to perform operations on the queue, then you can configure these users in one of the following ways:

The following example illustrates associating a user with an agent manually. Suppose you want to enable the oe user to perform queue operations on the strm01_queue created in "Creating a Streams Queue". The following steps configure the oe user as a secure queue user of strm01_queue:

  1. Connect as an administrative user who can create agents and alter users.
  2. Create an agent:
    EXEC DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'strm01_queue_agent');
    
    
  3. If the user must be able to dequeue events from queue, then make the agent a subscriber of the secure queue:
    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('strm01_queue_agent', NULL, NULL);  
      SYS.DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name          =>  'strmadmin.strm01_queue',
        subscriber                  =>  subscriber,
        rule                =>  NULL,
        transformation      =>  NULL);
    END;
    /
    
    
  4. Associate the user with the agent:
    BEGIN
      DBMS_AQADM.ENABLE_DB_ACCESS(
        agent_name  => 'strm01_queue_agent',
        db_username => 'oe');
    END;
    /
    
    
  5. Grant the user EXECUTE privilege on the DBMS_AQ package, if the user is not already granted this privilege.
    GRANT EXECUTE ON DBMS_AQ TO oe;
    
    

When these steps are complete, the oe user is a secure user of the strm01_queue queue and can perform operations on the queue. You still must grant the user specific privileges to perform queue operations, such as enqueue and dequeue privileges.

See Also:

Disabling a User from Performing Operations on a Secure Queue

You may want to disable a user from performing queue operations on a secure queue for the following reasons:

To disable a secure queue user, you can revoke ENQUEUE and DEQUEUE privilege on the queue from the user, or you can run the DISABLE_DB_ACCESS procedure in the DBMS_AQADM package. For example, suppose you want to disable the oe user from performing queue operations on the strm01_queue created in "Creating a Streams Queue".


Attention:

If an agent is used for multiple secure queues, then running DISABLE_DB_ACCESS for the agent prevents the user from performing operations on all of these queues.


  1. Run the following procedure to disable the oe user from performing queue operations on the secure queue strm01_queue:
    BEGIN
      DBMS_AQADM.DISABLE_DB_ACCESS(
        agent_name  => 'strm01_queue_agent',
        db_username => 'oe');
    END;
    /
    
    
  2. If the agent is no longer needed, you can drop the agent:
    BEGIN
      DBMS_AQADM.DROP_AQ_AGENT(
        agent_name  => 'strm01_queue_agent',
    END;
    /
    
    
  3. Revoke privileges on the queue from the user, if the user no longer needs these privileges.
    BEGIN
      DBMS_AQADM.REVOKE_QUEUE_PRIVILEGE (
       privilege   => 'ALL',
       queue_name  => 'strm01_queue',
       grantee     => 'oe');
    END;
    /
    
    See Also:

Managing Streams Propagation Jobs

A propagation job propagates events from a Streams source queue to a Streams destination queue. This section provides instructions for completing the following tasks:

In addition, you can use the features of Oracle Advanced Queuing (AQ) to manage Streams propagation jobs.

See Also:

Oracle9i Application Developer's Guide - Advanced Queuing for more information about managing propagation jobs with the features of AQ

Creating a Propagation Job

You can use any of the following procedures to create a propagation job:

Each of the procedures in the DBMS_STREAMS_ADM package creates a propagation job with the specified name if it does not already exist, creates a rule set for the propagation job if the propagation job does not have a rule set, and may add table, schema, or global rules to the rule set. The CREATE_PROPAGATION procedure creates a propagation job, but does not create a rule set or rules for the propagation job. All propagation jobs are started automatically upon creation.

The following tasks must be completed before you create a propagation job:

Example of Creating a Propagation Job Using DBMS_STREAMS_ADM

The following is an example that runs the ADD_TABLE_RULES procedure in the DBMS_STREAMS_ADM package to create a propagation job:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.departments',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm01_queue',
    destination_queue_name  => 'strmadmin.strm02_queue@dbs2.net',
    include_dml             => true,
    include_ddl             => true,
    include_tagged_lcr      => false,
    source_database         => 'dbs1.net' );
END;
/

Running this procedure performs the following actions:

Example of Creating a Propagation Job Using DBMS_PROPAGATION_ADM

The following is an example that runs the CREATE_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to create a propagation job:

BEGIN
  DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
    propagation_name   => 'strm02_propagation',
    source_queue       => 'strmadmin.strm01_queue',
    destination_queue  => 'strmadmin.strm02_queue',
    destination_dblink => 'dbs2.net',
    rule_set_name      => 'strmadmin.strm01_rule_set');
END;
/

Running this procedure performs the following actions:

Enabling a Propagation Job

By default, propagation jobs are enabled upon creation. If you disable a propagation job and want to enable it, then use the ENABLE_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, to enable a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link, run the following procedure:

BEGIN
  DBMS_AQADM.ENABLE_PROPAGATION_SCHEDULE(
    queue_name  => 'strmadmin.strm01_queue',
    destination => 'dbs2.net');
END;
/

Note:

Completing this task affects the propagation of events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Oracle9i Application Developer's Guide - Advanced Queuing for more information about using the ENABLE_PROPAGATION_SCHEDULE procedure

Scheduling a Propagation Job

You can schedule a propagation job using the SCHEDULE_PROPAGATION procedure in the DBMS_AQADM package. If there is a problem with a propagation job, then unscheduling and scheduling the propagation job may correct the problem.

For example, the following procedure schedules a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link:

BEGIN
  DBMS_AQADM.SCHEDULE_PROPAGATION(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net'); 
END;
/

Note:

Completing this task affects the propagation of events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Altering the Schedule of a Propagation Job

You can alter the schedule of an existing propagation job using the ALTER_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, suppose you want to alter the schedule of a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link. The following procedure sets the propagation job to propagate events every 15 minutes (900 seconds), with each propagation lasting 300 seconds, and a 25 second wait before new events in a completely propagated queue are propagated.

BEGIN
  DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net',
   duration    => 300,
   next_time   => 'SYSDATE + 900/86400',
   latency     => 25); 
END;
/

Note:

Completing this task affects the propagation of events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Oracle9i Application Developer's Guide - Advanced Queuing for more information about using the ALTER_PROPAGATION_SCHEDULE procedure

Unscheduling a Propagation Job

You can unschedule a propagation job using the UNSCHEDULE_PROPAGATION procedure in the DBMS_AQADM package. If there is a problem with a propagation job, then unscheduling and scheduling the propagation job may correct the problem.

For example, the following procedure unschedules a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link:

BEGIN
  DBMS_AQADM.UNSCHEDULE_PROPAGATION(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net'); 
END;
/

Note:

Completing this task affects the propagation of events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Specifying the Rule Set for a Propagation Job

You specify the rule set that you want to associate with a propagation job using the rule_set_name parameter in the ALTER_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package. For example, the following procedure sets the rule set for a propagation job named strm01_propagation to strm02_rule_set.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name  => 'strm01_propagation',
    rule_set_name     => 'strmadmin.strm02_rule_set');
END;
/
See Also:

Adding Rules to the Rule Set for a Propagation Job

Yo add rules to the rule set of a propagation job, you can run one of the following procedures:

The following is an example that runs the ADD_TABLE_RULES procedure in the DBMS_STREAMS_ADM package to add rules to the rule set of a propagation job named strm01_propagation:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.locations',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm01_queue',
    destination_queue_name  => 'strmadmin.strm02_queue@dbs2.net',
    include_dml             => true,
    include_ddl             => true,
    source_database         => 'dbs1.net' );
END;
/

Running this procedure performs the following actions:

Removing a Rule from the Rule Set for a Propagation Job

You specify that you want to remove a rule from the rule set for an existing propagation job by running the REMOVE_RULE procedure in the DBMS_STREAMS_ADM package. For example, the following procedure removes a rule named DEPARTMENTS3 from the rule set of a propagation job named strm01_propagation.

BEGIN
  DBMS_STREAMS_ADM.REMOVE_RULE(
    rule_name        => 'DEPARTMENTS3',
    streams_type     => 'propagate',
    streams_name     => 'strm01_propagation',
    drop_unused_rule => true);
END;
/

In this example, the drop_unused_rule parameter in the REMOVE_RULE procedure is set to true, which is the default setting. Therefore, if the rule being removed is not in any other rule set, then it will be dropped from the database. If the drop_unused_rule parameter is set to false, then the rule is removed from the rule set, but it is not dropped from the database.

In addition, if you want to remove all of the rules in the rule set for the propagation job, then specify NULL for the rule_name parameter when you run the REMOVE_RULE procedure.


Note:

If you drop all of the rules in the rule set for a propagation job, then the propagation job propagations no events in the source queue to the destination queue.


Removing the Rule Set for a Propagation Job

You specify that you want to remove the rule set from a propagation job by setting the rule_set_name parameter to NULL in the ALTER_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package. For example, the following procedure removes the rule set from a propagation job named strm01_propagation.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name => 'strm01_propagation',
    rule_set_name    => NULL);
END;
/

Note:

If you remove a rule set for a propagation job, then the propagation job propagates all events in the source queue to the destination queue.


Disabling a Propagation Job

To stop a propagation job, use the DISABLE_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, to stop a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link, run the following procedure:

BEGIN
  DBMS_AQADM.DISABLE_PROPAGATION_SCHEDULE(
    queue_name  => 'strmadmin.strm01_queue',
    destination => 'dbs2.net');
END;
/

Note:
  • Completing this task affects the propagation of events from the source queue to all destination queues that use the dbs2.net database link.
  • The DISABLE_PROPAGATION_SCHEDULE disables the propagation job immediately. It does not wait for the current duration to end.

See Also:

Oracle9i Application Developer's Guide - Advanced Queuing for more information about using the DISABLE_PROPAGATION_SCHEDULE procedure

Dropping a Propagation Job

You run the DROP_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to drop an existing propagation job. For example, the following procedure drops a propagation job named strm01_propagation:

BEGIN
  DBMS_PROPAGATION_ADM.DROP_PROPAGATION(
    propagation_name => 'strm01_propagation');
END;
/

Managing a Streams Messaging Environment

Streams enables messaging with queues of type SYS.AnyData. These queues stage user messages whose payloads are of SYS.AnyData type, and a SYS.AnyData payload can be a wrapper for payloads of different datatypes.

This section provides instructions for completing the following tasks:

Wrapping User Messages in a SYS.AnyData Wrapper

You can wrap almost any type of payload in a SYS.AnyData payload. The following sections provide examples enqueuing messages into, and dequeuing messages from, a SYS.AnyData queue. These examples assume that you have configured a Streams administrator at each database.

See Also:

"Configuring a Streams Administrator"

Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It

The following steps illustrate how to wrap payloads of various types in a SYS.AnyData payload.

  1. Connect as an administrative user who can create users, grant privileges, create tablespaces, and alter users.
  2. Grant EXECUTE privilege on the DBMS_AQ package to the oe user so that this user can run the ENQUEUE and DEQUEUE procedures in that package:
    GRANT EXECUTE ON DBMS_AQ TO oe;
    
    
  3. Connect as the Streams administrator, as in the following example:
    CONNECT strmadmin/strmadminpw
    
    
  4. Create a SYS.AnyData queue if one does not already exist.
    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe_q_table_any',
        queue_name   => 'oe_q_any',
        queue_user   => 'oe');
    END;
    /
    
    

    The oe user is configured automatically as a secure queue user of the oe_q_any queue and is given ENQUEUE and DEQUEUE privileges on the queue.

  5. Add a subscriber to the oe_q_any queue. This subscriber will perform explicit dequeues of events.
    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('OE', NULL, NULL);  
      SYS.DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name  =>  'strmadmin.oe_q_any',
        subscriber  =>  subscriber);
    END;
    /
    
    
  6. Connect as the oe user.
    CONNECT oe/oe
    
    
  7. Create a procedure that takes as an input parameter an object of SYS.AnyData type and enqueues a message containing the payload into an existing SYS.AnyData queue.
    CREATE OR REPLACE PROCEDURE oe.enq_proc (payload SYS.AnyData) IS
      enqopt     DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop      DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid  RAW(16);
    BEGIN
      mprop.SENDER_ID := SYS.AQ$_AGENT('OE', NULL, NULL); 
      DBMS_AQ.ENQUEUE(
        queue_name          =>  'strmadmin.oe_q_any',
        enqueue_options     =>  enqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  enq_msgid);
    END;
    /
    
    
  8. Run the procedure you created in Step 7 by specifying the appropriate Convertdata_type function. The following commands enqueue messages of various types.

    VARCHAR2 type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertVarchar2('Chemicals - SW'));
    COMMIT;
    
    

    NUMBER type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertNumber('16'));
    COMMIT;
    
    

    User-defined type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ('1646 Brazil 
    Blvd','361168','Chennai','Tam', 'IN')));
    COMMIT;
    
    
    See Also:

    "Viewing the Contents of User-Enqueued Events in a Queue" for information about viewing the contents of these enqueued messages

Example of Dequeuing a Payload That Is Wrapped in a SYS.AnyData Payload

The following steps illustrate how to dequeue a payload wrapped in a SYS.AnyData payload. This example assumes that you have completed the steps in "Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It".

To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$queue_table_name, where queue_table_name is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any queue, run the following query:

CONNECT strmadmin/strmadminpw

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;

  1. Connect as the oe user:
    CONNECT oe/oe
    
    
  2. Create a procedure that takes as an input the consumer of the messages you want to dequeue. The following example procedure dequeues messages of oe.cust_address_typ and prints the contents of the messages.
    CREATE OR REPLACE PROCEDURE oe.get_cust_address (
    consumer IN VARCHAR2) AS
      address         OE.CUST_ADDRESS_TYP;
      deq_address     SYS.AnyData; 
      msgid           RAW(16); 
      deqopt          DBMS_AQ.DEQUEUE_OPTIONS_T; 
      mprop           DBMS_AQ.MESSAGE_PROPERTIES_T;
      new_addresses   BOOLEAN := TRUE;
      next_trans      EXCEPTION;
      no_messages     EXCEPTION; 
      pragma exception_init (next_trans, -25235);
      pragma exception_init (no_messages, -25228);
      num_var         pls_integer;
    BEGIN
         deqopt.consumer_name := consumer;
         deqopt.wait := 1;
         WHILE (new_addresses) LOOP
         BEGIN
          DBMS_AQ.DEQUEUE( 
             queue_name          =>  'strmadmin.oe_q_any',
             dequeue_options     =>  deqopt,
             message_properties  =>  mprop,
             payload             =>  deq_address,
             msgid               =>  msgid);
    
              deqopt.navigation := DBMS_AQ.NEXT;
    
    
             DBMS_OUTPUT.PUT_LINE('****');  
             IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
                 DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||  
                                       deq_address.GetTypeName()); 
                 num_var := deq_address.GetObject(address);    
                 DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
                 DBMS_OUTPUT.PUT_LINE(address.street_address);
                 DBMS_OUTPUT.PUT_LINE(address.postal_code);
                 DBMS_OUTPUT.PUT_LINE(address.city);
                 DBMS_OUTPUT.PUT_LINE(address.state_province);
                 DBMS_OUTPUT.PUT_LINE(address.country_id);
             ELSE
                DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||    
                                      deq_address.GetTypeName()); 
             END IF;  
           COMMIT;   
        EXCEPTION
          WHEN next_trans THEN
          deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
          WHEN no_messages THEN
            new_addresses := FALSE;
            DBMS_OUTPUT.PUT_LINE('No more messages');
         END;
      END LOOP; 
    END;
    /
    
    
  3. Run the procedure you created in Step 1 and specify the consumer of the messages you want to dequeue, as in the following example:
    SET SERVEROUTPUT ON SIZE 100000
    EXEC oe.get_cust_address('OE');
    

Propagating Messages Between a SYS.AnyData Queue and a Typed Queue

SYS.AnyData queues can interoperate with typed queues in a Streams environment. A typed queue is a queue that can stage messages of a particular type only. To propagate a message from a SYS.AnyData queue to a typed queue, the message must be transformed to match the type of the typed queue. The following sections provide examples of propagating non-LCR user messages and LCRs between a SYS.AnyData queue and a typed queue.

See Also:

"Message Propagation" for more information about propagation between SYS.AnyData and typed queues

Example of Propagating Non-LCR User Messages to a Typed Queue

The following steps set up propagation from a SYS.AnyData queue named oe_q_any to a typed queue of type oe.cust_address_typ named oe_q_address. The source queue oe_q_any is at the dbs1.net database, and the destination queue oe_q_address is at the dbs2.net database.

  1. Create a database link between dbs1.net and dbs2.net if one does not already exist.
    CONNECT oe/oe@dbs1.net
    
    CREATE DATABASE LINK dbs2.net CONNECT TO oe IDENTIFIED BY oe 
      USING 'DBS2.NET'; 
    
    
  2. Create a typed queue if one does not already exist.
    CONNECT system/manager@dbs2.net
    
    BEGIN 
      DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table         => 'oe.oe_q_table_address', 
        queue_payload_type  => 'oe.cust_address_typ', 
        multiple_consumers  => true);
      DBMS_AQADM.CREATE_QUEUE(
        queue_name   => 'oe.oe_q_address', 
        queue_table  => 'oe.oe_q_table_address');
      DBMS_AQADM.START_QUEUE(
        queue_name   => 'oe.oe_q_address');
    END;
    /
    
    
  3. Create a function called any_to_cust_address_typ in the oe schema at dbs1.net that takes a SYS.AnyData payload containing a oe.cust_address_typ object and returns the oe.cust_address_typ object.
    CONNECT oe/oe@dbs1.net
    
    CREATE OR REPLACE FUNCTION oe.any_to_cust_address_typ(in_any IN SYS.AnyData) 
    RETURN OE.CUST_ADDRESS_TYP
    AS
      address       OE.CUST_ADDRESS_TYP;
      num_var       NUMBER;
      type_name     VARCHAR2(100);
    BEGIN
      -- Get the type of object
      type_name := in_any.GetTypeName();
      -- Check if the object type is OE.CUST_ADDRESS_TYP
      IF (type_name = 'OE.CUST_ADDRESS_TYP') THEN
        -- Put the address in the message into the address variable
        num_var := in_any.GetObject(address);
        RETURN address;
      ELSE
        raise_application_error(-20101, 'Conversion failed - ' || type_name);   
      END IF;
    END;
    /
    
    
  4. Create a transformation at dbs1.net using the DBMS_TRANSFORM package.
    CONNECT system/manager@dbs1.net
    
    BEGIN
      DBMS_TRANSFORM.CREATE_TRANSFORMATION( 
       schema         => 'OE', 
       name           => 'ANYTOADDRESS', 
       from_schema    => 'SYS', 
       from_type      => 'ANYDATA', 
       to_schema      => 'OE', 
       to_type        => 'CUST_ADDRESS_TYP', 
       transformation => 'oe.any_to_cust_address_typ(source.user_data)'); 
    END;
    /
    
    
  5. Create a subscriber for the typed queue if one does not already exist. The subscriber must contain a rule that ensures that only messages of the appropriate type are propagated to the destination queue.
    DECLARE 
      subscriber  SYS.AQ$_AGENT; 
    BEGIN 
      subscriber := SYS.AQ$_AGENT ('OE', 
                                   'OE.OE_Q_ADDRESS@DBS2.NET', 
                                   0); 
      DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name     => 'strmadmin.oe_q_any', 
        subscriber     => subscriber,
        rule           => 
                    'TAB.USER_DATA.GetTypeName()=''OE.OE_CUST_ADDRESS''',
        transformation => 'OE.ANYTOADDRESS'); 
    END; 
    /
    
    
  6. Schedule propagation between the SYS.AnyData queue at dbs1.net and the typed queue at dbs2.net.
    BEGIN 
      DBMS_AQADM.SCHEDULE_PROPAGATION(
        queue_name   => 'strmadmin.oe_q_any', 
        destination  => 'dbs2.net'); 
    END;
    /
    
    
    See Also:

    Oracle9i Application Developer's Guide - Advanced Queuing for more information about transformations during propagation

Example of Propagating LCRs to a Typed Queue

To propagate LCRs from a SYS.AnyData queue to a typed queue, you complete the same steps as you do for non-LCR events, but Oracle supplies the transformation functions. You can use the following functions in the DBMS_STREAMS package to transform LCRs in SYS.AnyData queues to messages in typed queues:

You should only propagate user-enqueued LCRs to a typed queue. Do not propagate captured LCRs to a typed queue.

The following example sets up propagation of row LCRs from a SYS.AnyData queue named oe_q_any to a typed queue of type SYS.LCR$_ROW_RECORD named oe_q_lcr. The source queue oe_q_any is at the dbs1.net database, and the destination queue oe_q_lcr is at the dbs3.net database.

  1. Create a database link between dbs1.net and dbs3.net if one does not already exist.
    CONNECT oe/oe@dbs1.net
    
    CREATE DATABASE LINK dbs3.net CONNECT TO oe IDENTIFIED BY oe 
      USING 'DBS3.NET'; 
    
    
  2. Create a queue of the LCR type if one does not already exist.
    CONNECT system/manager@dbs3.net
    
    BEGIN 
      DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table         => 'oe.oe_q_table_lcr', 
        queue_payload_type  => 'SYS.LCR$_ROW_RECORD', 
        multiple_consumers  => true);
      DBMS_AQADM.CREATE_QUEUE(
        queue_name   => 'oe.oe_q_lcr', 
        queue_table  => 'oe.oe_q_table_lcr');
      DBMS_AQADM.START_QUEUE(
        queue_name   => 'oe.oe_q_lcr');
    END;
    /
    
    
  3. Create a transformation at dbs1.net using the DBMS_TRANSFORM package.
    CONNECT system/manager@dbs1.net
    
    BEGIN
      DBMS_TRANSFORM.CREATE_TRANSFORMATION( 
        schema         => 'oe', 
        name           => 'ANYTOLCR', 
        from_schema    => 'SYS', 
        from_type      => 'ANYDATA', 
        to_schema      => 'SYS', 
        to_type        => 'LCR$_ROW_RECORD', 
        transformation =>  
              'SYS.DBMS_STREAMS.CONVERT_ANYDATA_TO_LCR_ROW(source.user_data)'); 
    END;
    /
    
    
  4. Create a subscriber at the typed queue if one does not already exist. The subscriber specifies the CONVERT_ANYDATA_TO_LCR_ROW function for the transformation parameter.
    DECLARE 
      subscriber  SYS.AQ$_AGENT; 
    BEGIN 
      subscriber := SYS.AQ$_AGENT ('OE', 'OE.OE_Q_LCR@DBS3.NET', 0); 
      DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name     => 'strmadmin.oe_q_any', 
        subscriber     => subscriber,
        rule           => 'TAB.USER_DATA.GetTypeName()=''SYS.LCR$_ROW_RECORD''',
        transformation => 'oe.anytolcr'); 
    END; 
    /
    
    
  5. Schedule propagation between the SYS.AnyData queue at dbs1.net and the LCR queue at dbs3.net.
    BEGIN 
      DBMS_AQADM.SCHEDULE_PROPAGATION(
        queue_name   => 'strmadmin.oe_q_any', 
        destination  => 'dbs3.net'); 
    END;
    /
    
    See Also:

    Oracle9i Supplied PL/SQL Packages and Types Reference for more information about the row LCR and DDL LCR conversion functions


Go to previous page Go to next page
Oracle
Copyright © 2002 Oracle Corporation.

All Rights Reserved.
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback