Skip Headers

Oracle® Streams Advanced Queuing User's Guide and Reference
Release 10.1

Part Number B10785-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page
Previous
Go to next page
Next
View PDF

23 Staging and Propagating with Oracle Streams AQ

This chapter describes how to use and manage Oracle Streams AQ when staging and propagating. It describes SYS.AnyData queues and user messages.

This chapter contains these topics:

Oracle Streams Event Staging and Propagation Overview

Oracle Streams uses queues of type SYS.AnyData to stage events. There are two types of events that can be staged in an Oracle Streams queue:

Both types of events are of type SYS.AnyData and can be used for information sharing within a single database or between databases.

Staged events can be consumed or propagated, or both. These events can be consumed by an apply process or by a user application that explicitly dequeues them. Even after an event is consumed, it can remain in the queue if you have also configured Oracle Streams to propagate the event to one or more other queues or if message retention is specified. These other queues can reside in the same database or in different databases. In either case, the queue from which the events are propagated is called the source queue, and the queue that receives the events is called the destination queue.

SYS.AnyData Queues and User Messages

Oracle Streams enables messaging with queues of type SYS.AnyData. SYS.AnyData queues can stage user messages whose payloads are of SYS.AnyData type. A SYS.AnyData payload can be a wrapper for payloads of different datatypes. Queues that can stage messages of only a particular type are called typed queues.

By using SYS.AnyData wrappers for message payloads, publishing applications can enqueue messages of different types into a single queue. Subscribing applications can then dequeue these messages, either explicitly using a dequeue API or implicitly using an apply process. If the subscribing application is remote, then the messages can be propagated to the remote site, and the subscribing application can dequeue the messages from a local queue in the remote database. Alternatively, a remote subscribing application can dequeue messages directly from the source queue using a variety of standard protocols, such as PL/SQL and Oracle Call Interface (OCI).

Oracle Streams interoperates with Oracle Streams AQ, which supports all the standard features of message queuing systems, including multiconsumer queues, publish and subscribe, content-based routing, internet propagation, transformations, and gateways to other messaging subsystems.

SYS.AnyData Wrapper for User Messages Payloads

You can wrap almost any type of payload in a SYS.AnyData payload. To do this, you use the Convertdata_type static functions of the SYS.AnyData type, where data_type is the type of object to wrap. These functions take the object as input and return a SYS.AnyData object.

The following datatypes cannot be wrapped in a SYS.AnyData wrapper:

  • Nested table

  • NCLOB

  • ROWID and UROWID

The following datatypes can be directly wrapped in a SYS.AnyData wrapper, but these datatypes cannot be present in a user-defined type payload wrapped in a SYS.AnyData wrapper:

Programmatic Environments for Enqueue and Dequeue of User Messages

Your applications can use the following programmatic environments to enqueue user messages into a SYS.AnyData queue and dequeue user messages from a SYS.AnyData queue:

The following sections provide information about using these interfaces to enqueue user messages into and dequeue user messages from a SYS.AnyData queue.


See Also:

Chapter 4, " Oracle Streams AQ: Programmatic Environments" for more information about these programmatic interfaces

Enqueuing User Messages Using PL/SQL

To enqueue a user message containing an LCR into a SYS.AnyData queue using PL/SQL, first create the LCR to be enqueued. You use the constructor for the SYS.LCR$_ROW_RECORD type to create a row LCR, and you use the constructor for the SYS.LCR$_DDL_RECORD type to create a DDL LCR. Then you use the SYS.AnyData.ConvertObject function to convert the LCR into SYS.AnyData payload and enqueue it using the DBMS_AQ.ENQUEUE procedure.

To enqueue a user message containing a non-LCR object into a SYS.AnyData queue using PL/SQL, you use one of the SYS.AnyData.Convert* functions to convert the object into SYS.AnyData payload and enqueue it using the DBMS_AQ.ENQUEUE procedure.

Enqueuing User Messages Using OCI or JMS

To enqueue a user message containing an LCR into a SYS.AnyData queue using JMS or OCI, you must represent the LCR in XML format. To construct an LCR, use the oracle.xdb.XMLType class. LCRs are defined in the SYS schema. The LCR schema must be loaded into the SYS schema using the catxlcr.sql script in Oracle home in the rdbms/admin/ directory.

To enqueue a message using OCI, perform the same actions that you would to enqueue a message into a typed queue. A typed queue is a queue that can stage messages of a particular type only. To enqueue a message using JMS, a user must have EXECUTE privilege on DBMS_AQ, DBMS_AQIN, and DBMS_AQJMS packages.


Note:

Enqueue of JMS types and XML types does not work with Oracle Streams Sys.Anydata queues unless you call DBMS_AQADM.ENABLE_JMS_TYPES(queue_table_name) after DBMS_STREAMS_ADM.SET_UP_QUEUE(). Enabling an Oracle Streams queue for these types may affect import/export of the queue table.

A non-LCR user message can be a message of any user-defined type or a JMS type. The JMS types include the following:

  • javax.jms.TextMessage

  • javax.jms.MapMessage

  • javax.jms.StreamMessage

  • javax.jms.ObjectMessage

  • javax.jms.BytesMessage

When using user-defined types, you must generate the Java class for the message using Jpublisher, which implements the ORAData interface. To enqueue a message into a SYS.AnyData queue, you can use methods QueueSender.send or TopicPublisher.publish.


See Also:


Dequeuing User Messages Using PL/SQL

To dequeue a user message from SYS.AnyData queue using PL/SQL, you use the DBMS_AQ.DEQUEUE procedure and specify SYS.AnyData as the payload. The user message can contain an LCR or another type of object.

Dequeuing User Messages Using OCI or JMS

In a SYS.AnyData queue, user messages containing LCRs in XML format are represented as oracle.xdb.XMLType. Non-LCR messages can be one of the following formats:

  • A JMS type (javax.jms.TextMessage, javax.jms.MapMessage, javax.jms.StreamMessage, javax.jms.ObjectMessage, or javax.jms.BytesMessage)

  • A user-defined type

To dequeue a message from a SYS.AnyData queue using JMS, you can use methods QueueReceiver, TopicSubscriber, or TopicReceiver. Because the queue can contain different types of objects wrapped in a SYS.AnyData wrapper, you must register a list of SQL types and their corresponding Java classes in the typemap of the JMS session. JMS types are already preregistered in the typemap.

For example, suppose a queue contains LCR messages represented as oracle.xdb.XMLType and messages of type person and address. The classes JPerson.java and JAddress.java are the ORAData mappings for person and address, respectively. Before dequeuing the message, the type map must be populated as follows:

java.util.Map map = ((AQjmsSession)q_sess).getTypeMap();

map.put("SCOTT.PERSON", Class.forName("JPerson"));
map.put("SCOTT.ADDRESS", Class.forName("JAddress"));
map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLType"));  // For LCRs

When using message selectors with a QueueReceiver or TopicPublisher, the selector can contain any SQL92 expression that has a combination of one or more of the following:

  • JMS message header fields or properties, including JMSPriority, JMSCorrelationID, JMSType, JMSXUserI, JMSXAppID, JMSXGroupID, and JMSXGroupSeq. The following is an example of a JMS message field:

    JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
    
    
  • User-defined message properties, as in the following example:

    color IN ('RED', 'BLUE', 'GREEN') AND price < 30000
    
    
  • PL/SQL functions, as in the following example:

    hr.GET_TYPE(tab.user_data) = 'HR.EMPLOYEES'  
    
    

To dequeue a message using OCI, perform the same actions that you would to dequeue a message from a typed queue.


See Also:


Message Propagation and SYS.AnyData Queues

SYS.AnyData queues can interoperate with typed queues in an Oracle Streams environment. A typed queue can stage messages of a particular type only. Table 23-1 shows the types of propagation possible between queues.

Table 23-1 Propagation Between Different Types of Queues

Source Queue Destination Queue Transformation
SYS.AnyData SYS.AnyData None
Typed SYS.AnyData Implicit

Note: Propagation is possible only if the messages in the typed queue meet the restrictions outlined in "User-Defined Type Messages".

SYS.AnyData Typed Requires a rule to filter messages and a user-defined transformation
Typed Typed Follows Oracle Streams AQ rules

To propagate messages containing a payload of a certain type from a SYS.AnyData source queue to a typed destination queue, you must perform a transformation. Only messages containing a payload of the same type as the typed queue can be propagated to the typed queue.

Although you cannot use Simple Object Access Protocol (SOAP) to interact directly with a SYS.AnyData queue, you can use SOAP with Oracle Streams by propagating messages between a SYS.AnyData queue and a typed queue. If you want to enqueue a message into a SYS.AnyData queue using SOAP, then you can configure propagation from a typed queue to SYS.AnyData queue. Then, you can use SOAP to enqueue a message into the typed queue. The message is propagated automatically from the typed queue to the SYS.AnyData queue.

If you want to use SOAP to dequeue a message that is in a SYS.AnyData queue, then you can configure propagation from a SYS.AnyData queue to a typed queue. The message is propagated automatically from the SYS.AnyData queue to the typed queue. Then, the message would be available for access using SOAP.


Note:

Certain Oracle Streams capabilities, such as capturing changes using a capture process and applying changes with an apply process, can be configured only with SYS.AnyData queues.


See Also:

Oracle Streams Concepts and Administration, "Propagating Messages Between a SYS.AnyData Queue and a Typed Queue"

User-Defined Type Messages

If you plan to enqueue, propagate, or dequeue user-defined type messages in an Oracle Streams environment, then each type used in these messages must exist at every database where the message can be staged in a queue. Some environments use directed networks to route messages through intermediate databases before they reach their destination. In such environments, the type must exist at each intermediate database, even if the messages of this type are never enqueued or dequeued at a particular intermediate database.

In addition, the following requirements must be met for such types:

  • The type name must be the same at each database.

  • The type must be in the same schema at each database.

  • The shape of the type must match exactly at each database.

  • The type cannot use inheritance or type evolution at any database.

  • The type cannot contain varrays, nested tables, LOBs, rowids, or urowids.

The object identifier need not match at each database.


See Also:


Managing an Oracle Streams Messaging Environment

Oracle Streams enables messaging with queues of type SYS.AnyData. These queues stage user messages whose payloads are of SYS.AnyData type, and a SYS.AnyData payload can be a wrapper for payloads of different datatypes.

This section provides instructions for completing the following tasks:

Wrapping User Message Payloads in a SYS.AnyData Wrapper

You can wrap almost any type of payload in a SYS.AnyData payload. The following sections provide examples of enqueuing messages into, and dequeuing messages from, a SYS.AnyData queue.

Example 23-1 Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It

The following steps illustrate how to wrap payloads of various types in a SYS.AnyData payload.

  1. Connect as an administrative user who can create users, grant privileges, create tablespaces, and alter users at the dbs1.net database.

  2. Grant EXECUTE privilege on the DBMS_AQ package to the oe user so that this user can run the ENQUEUE and DEQUEUE procedures in that package:

    GRANT EXECUTE ON DBMS_AQ TO oe;
    
    
  3. Connect as the Oracle Streams administrator, as in the following example:

    CONNECT strmadmin/strmadminpw@dbs1.net
    
    
  4. Create a SYS.AnyData queue if one does not already exist.

    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe_q_table_any',
        queue_name   => 'oe_q_any',
        queue_user   => 'oe');
    END;
    /
    
    

    The oe user is configured automatically as a secure user of the oe_q_any queue and is given ENQUEUE and DEQUEUE privileges on the queue.

  5. Add a subscriber to the oe_q_any queue. This subscriber performs explicit dequeues of events. The ADD_SUBSCRIBER procedure will automatically create an AQ_AGENT.

    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL);
      SYS.DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name  =>  'strmadmin.oe_q_any',
        subscriber  =>  subscriber);
    END;
    /
    
    
  6. Grant the oe user enqueue and dequeue privileges on queue strmadmin.oe_q_any.

    BEGIN
      DBMS_AQADM.GRANT_QUEUE_PRIVILEGE(
        privilege  =>  ALL,
        queue_name => 'strmadmin.oe_q_any',
        grantee    => 'oe');
    END;
    /
    
    
  7. Associate the oe user with the local_agent agent:

    BEGIN
      DBMS_AQADM.ENABLE_DB_ACCESS(
        agent_name  => 'local_agent',
        db_username => 'oe');
    END;
    /
    
    
  8. Connect as the oe user.

    CONNECT oe/oe@dbs1.net
    
    
  9. Create a procedure that takes as an input parameter an object of SYS.AnyData type and enqueues a message containing the payload into an existing SYS.AnyData queue.

    CREATE OR REPLACE PROCEDURE oe.enq_proc (payload SYS.AnyData) 
    IS
      enqopt     DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop      DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid  RAW(16);
    BEGIN
      mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); 
      DBMS_AQ.ENQUEUE(
        queue_name          =>  'strmadmin.oe_q_any',
        enqueue_options     =>  enqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  enq_msgid);
    END;
    /
    
    
  10. Run the procedure you created in Step 9 by specifying the appropriate Convertdata_type function. The following commands enqueue messages of various types.

    VARCHAR2 type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertVarchar2('Chemicals - SW'));
    COMMIT;
    
    

    NUMBER type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertNumber('16'));
    COMMIT;
    
    

    User-defined type:

    BEGIN
      oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ(
        '1646 Brazil Blvd','361168','Chennai','Tam', 'IN')));
    END;
    /
    COMMIT;
    

    See Also:

    Oracle Streams Concepts and Administration, "Viewing the Contents of User-Enqueued Events in a Queue" for information about viewing the contents of these enqueued messages

Example 23-2 Example of Dequeuing a Payload That Is Wrapped in a SYS.AnyData Payload

The following steps illustrate how to dequeue a payload wrapped in a SYS.AnyData payload. This example assumes that you have completed the steps in "Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It".

To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$queue_table_name, where queue_table_name is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any queue, run the following query:

CONNECT strmadmin/strmadminpw@dbs1.net

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;

  1. Connect as the oe user:

    CONNECT oe/oe@dbs1.net
    
    
  2. Create a procedure that takes as an input the consumer of the messages you want to dequeue. The following example procedure dequeues messages of oe.cust_address_typ and prints the contents of the messages.

    CREATE OR REPLACE PROCEDURE oe.get_cust_address (
    consumer IN VARCHAR2) AS
      address         OE.CUST_ADDRESS_TYP;
      deq_address     SYS.AnyData; 
      msgid           RAW(16); 
      deqopt          DBMS_AQ.DEQUEUE_OPTIONS_T; 
      mprop           DBMS_AQ.MESSAGE_PROPERTIES_T;
      new_addresses   BOOLEAN := TRUE;
      next_trans      EXCEPTION;
      no_messages     EXCEPTION; 
      pragma exception_init (next_trans, -25235);
      pragma exception_init (no_messages, -25228);
      num_var         pls_integer;
    BEGIN
         deqopt.consumer_name := consumer;
         deqopt.wait := 1;
         WHILE (new_addresses) LOOP
         BEGIN
          DBMS_AQ.DEQUEUE( 
             queue_name          =>  'strmadmin.oe_q_any',
             dequeue_options     =>  deqopt,
             message_properties  =>  mprop,
             payload             =>  deq_address,
             msgid               =>  msgid);
              deqopt.navigation := DBMS_AQ.NEXT;
             DBMS_OUTPUT.PUT_LINE('****');
             IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
                 DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||
                                       deq_address.GetTypeName()); 
                 num_var := deq_address.GetObject(address);
                 DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
                 DBMS_OUTPUT.PUT_LINE(address.street_address);
                 DBMS_OUTPUT.PUT_LINE(address.postal_code);
                 DBMS_OUTPUT.PUT_LINE(address.city);
                 DBMS_OUTPUT.PUT_LINE(address.state_province);
                 DBMS_OUTPUT.PUT_LINE(address.country_id);
             ELSE
                DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||
                                      deq_address.GetTypeName()); 
             END IF;
           COMMIT;
        EXCEPTION
          WHEN next_trans THEN
          deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
          WHEN no_messages THEN
            new_addresses := FALSE;
            DBMS_OUTPUT.PUT_LINE('No more messages');
         END;
      END LOOP; 
    END;
    /
    
    
  3. Run the procedure you created in Step 1 and specify the consumer of the messages you want to dequeue, as in the following example:

    SET SERVEROUTPUT ON SIZE 100000
    EXEC oe.get_cust_address('LOCAL_AGENT');
    

Propagating Messages Between a SYS.AnyData Queue and a Typed Queue

SYS.AnyData queues can interoperate with typed queues in an Oracle Streams environment. A typed queue is a queue that can stage messages of a particular type only. To propagate a message from a SYS.AnyData queue to a typed queue, the message must be transformed to match the type of the typed queue. The following sections provide examples of propagating non-LCR user messages and LCRs between a SYS.AnyData queue and a typed queue.


Note:

The examples in this section assume that you have completed the examples in "SYS.AnyData Wrapper for User Messages Payloads".


See Also:

"Message Propagation and SYS.AnyData Queues" for more information about propagation between SYS.AnyData and typed queues

Example 23-3 Example of Propagating Non-LCR User Messages to a Typed Queue

The following steps set up propagation from a SYS.AnyData queue named oe_q_any to a typed queue of type oe.cust_address_typ named oe_q_address. The source queue oe_q_any is at the dbs1.net database, and the destination queue oe_q_address is at the dbs2.net database. Both queues are owned by strmadmin.

  1. Connect as an administrative user who can grant privileges at dbs1.net.

  2. Grant the following privilege to strmadmin, if it was not already granted.

    GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
    
    
  3. Grant strmadmin EXECUTE privilege on oe.cust_address_typ at dbs1.net and dbs2.net.

    CONNECT oe/oe@dbs1.net
    
    GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
    
    CONNECT oe/oe@dbs2.net
    
    GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
    
    
  4. Create a typed queue at dbs2.net, if one does not already exist.

    CONNECT strmadmin/strmadminpw@dbs2.net
    
    BEGIN 
      DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table         => 'strmadmin.oe_q_table_address', 
        queue_payload_type  => 'oe.cust_address_typ', 
        multiple_consumers  => true);
      DBMS_AQADM.CREATE_QUEUE(
        queue_name   => 'strmadmin.oe_q_address', 
        queue_table  => 'strmadmin.oe_q_table_address');
      DBMS_AQADM.START_QUEUE(
        queue_name   => 'strmadmin.oe_q_address');
    END;
    /
    
    
  5. Create a database link between dbs1.net and dbs2.net if one does not already exist.

    CONNECT strmadmin/strmadminpw@dbs1.net
    
    CREATE DATABASE LINK dbs2.net CONNECT TO strmadmin IDENTIFIED BY strmadminpw
      USING 'DBS2.NET'; 
    
    
  6. Create a function called any_to_cust_address_typ in the strmadmin schema at dbs1.net that takes a SYS.AnyData payload containing a oe.cust_address_typ object and returns the oe.cust_address_typ object.

    CREATE OR REPLACE FUNCTION strmadmin.any_to_cust_address_typ(
      in_any IN SYS.AnyData) 
    RETURN OE.CUST_ADDRESS_TYP
    AS
      address       OE.CUST_ADDRESS_TYP;
      num_var       NUMBER;
      type_name     VARCHAR2(100);
    BEGIN
      -- Get the type of object
      type_name := in_any.GetTypeName();
      -- Check if the object type is OE.CUST_ADDRESS_TYP
      IF (type_name = 'OE.CUST_ADDRESS_TYP') THEN
        -- Put the address in the message into the address variable
        num_var := in_any.GetObject(address);
        RETURN address;
      ELSE
        raise_application_error(-20101, 'Conversion failed - ' || type_name);
      END IF;
    END;
    /
    
    
  7. Create a transformation at dbs1.net using the DBMS_TRANSFORM package.

    BEGIN
      DBMS_TRANSFORM.CREATE_TRANSFORMATION( 
       schema         => 'strmadmin', 
       name           => 'anytoaddress', 
       from_schema    => 'SYS', 
       from_type      => 'ANYDATA', 
       to_schema      => 'oe', 
       to_type        => 'cust_address_typ', 
       transformation => 'strmadmin.any_to_cust_address_typ(source.user_data)'); 
    END;
    /
    
    
  8. Create a subscriber for the typed queue if one does not already exist. The subscriber must contain a rule that ensures that only messages of the appropriate type are propagated to the destination queue.

    DECLARE 
      subscriber  SYS.AQ$_AGENT; 
    BEGIN 
      subscriber := SYS.AQ$_AGENT ('ADDRESS_AGENT_REMOTE', 
                                   'STRMADMIN.OE_Q_ADDRESS@DBS2.NET', 
                                   0); 
      DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name     => 'strmadmin.oe_q_any', 
        subscriber     => subscriber,
        rule           => 
                    'TAB.USER_DATA.GetTypeName()=''OE.CUST_ADDRESS_TYP''',
        transformation => 'strmadmin.anytoaddress'); 
    END; 
    /
    
    
  9. Schedule propagation between the SYS.AnyData queue at dbs1.net and the typed queue at dbs2.net.

    BEGIN 
      DBMS_AQADM.SCHEDULE_PROPAGATION(
        queue_name   => 'strmadmin.oe_q_any', 
        destination  => 'dbs2.net'); 
    END;
    /
    
    
  10. Enqueue a message of oe.cust_address_typ type wrapped in a SYS.AnyData wrapper:

    CONNECT oe/oe@dbs1.net
    
    BEGIN
      oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ(
        '1668 Chong Tao','111181','Beijing',NULL, 'CN')));
    END;
    /
    COMMIT;
    
    
  11. After allowing some time for propagation, query the queue table at dbs2.net to view the propagated message:

    CONNECT strmadmin/strmadminpw@dbs2.net
    
    SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ADDRESS;
    
    

    See Also:

    Chapter 21, " Oracle Messaging Gateway Message Conversion" for more information about transformations during propagation

Example 23-4 Example of Propagating LCRs to a Typed Queue

To propagate LCRs from a SYS.AnyData queue to a typed queue, you complete the same steps as you do for non-LCR events, but Oracle supplies the transformation functions. You can use the following functions in the DBMS_STREAMS package to transform LCRs in SYS.AnyData queues to messages in typed queues:

You can propagate user-enqueued LCRs to an appropriate typed queue, but propagation of captured LCRs to a typed queue is not supported.

The following example sets up propagation of row LCRs from a SYS.AnyData queue named oe_q_any to a typed queue of type SYS.LCR$_ROW_RECORD named oe_q_lcr. The source queue oe_q_any is at the dbs1.net database, and the destination queue oe_q_lcr is at the dbs3.net database.

  1. Connect as an administrative user who can grant privileges at dbs1.net.

  2. Grant the following privilege to strmadmin, if it was not already granted.

    GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
    
    
  3. Create a queue of the LCR type if one does not already exist.

    CONNECT strmadmin/strmadminpw@dbs3.net
    
    BEGIN 
      DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table         => 'strmadmin.oe_q_table_lcr', 
        queue_payload_type  => 'SYS.LCR$_ROW_RECORD', 
        multiple_consumers  => true);
      DBMS_AQADM.CREATE_QUEUE(
        queue_name   => 'strmadmin.oe_q_lcr', 
        queue_table  => 'strmadmin.oe_q_table_lcr');
      DBMS_AQADM.START_QUEUE(
        queue_name   => 'strmadmin.oe_q_lcr');
    END;
    /
    
    
  4. Create a database link between dbs1.net and dbs3.net if one does not already exist.

    CONNECT strmadmin/strmadminpw@dbs1.net
    
    CREATE DATABASE LINK dbs3.net CONNECT TO strmadmin IDENTIFIED BY strmadminpw
      USING 'DBS3.NET'; 
    
    
  5. Create a transformation at dbs1.net using the DBMS_TRANSFORM package.

    BEGIN
      DBMS_TRANSFORM.CREATE_TRANSFORMATION( 
        schema         => 'strmadmin', 
        name           => 'anytolcr', 
        from_schema    => 'SYS', 
        from_type      => 'ANYDATA', 
        to_schema      => 'SYS', 
        to_type        => 'LCR$_ROW_RECORD', 
        transformation =>
              'SYS.DBMS_STREAMS.CONVERT_ANYDATA_TO_LCR_ROW(source.user_data)'); 
    END;
    /
    
    
  6. Create a subscriber at the typed queue if one does not already exist. The subscriber specifies the CONVERT_ANYDATA_TO_LCR_ROW function for the transformation parameter.

    DECLARE 
      subscriber  SYS.AQ$_AGENT; 
    BEGIN 
      subscriber := SYS.AQ$_AGENT (
        'ROW_LCR_AGENT_REMOTE', 
        'STRMADMIN.OE_Q_LCR@DBS3.NET', 
        0); 
      DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name     => 'strmadmin.oe_q_any',
        subscriber     => subscriber,
        rule           => 'TAB.USER_DATA.GetTypeName()=''SYS.LCR$_ROW_RECORD''',
        transformation => 'strmadmin.anytolcr'); 
    END; 
    /
    
    
  7. Schedule propagation between the SYS.AnyData queue at dbs1.net and the LCR queue at dbs3.net.

    BEGIN 
      DBMS_AQADM.SCHEDULE_PROPAGATION(
        queue_name   => 'strmadmin.oe_q_any', 
        destination  => 'dbs3.net'); 
    END;
    /
    
    
  8. Create a procedure to construct and enqueue a row LCR into the strmadmin.oe_q_any queue:

    CONNECT oe/oe@dbs1.net
    
    CREATE OR REPLACE PROCEDURE oe.enq_row_lcr_proc(
                     source_dbname  VARCHAR2,
                     cmd_type       VARCHAR2,
                     obj_owner      VARCHAR2,
                     obj_name       VARCHAR2,
                     old_vals       SYS.LCR$_ROW_LIST,
                     new_vals       SYS.LCR$_ROW_LIST) AS
      eopt           DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop          DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid      RAW(16);
      row_lcr        SYS.LCR$_ROW_RECORD;
    BEGIN
      mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); 
      -- Construct the LCR based on information passed to procedure
      row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
        source_database_name  =>  source_dbname,
        command_type          =>  cmd_type,
        object_owner          =>  obj_owner,
        object_name           =>  obj_name,
        old_values            =>  old_vals,
        new_values            =>  new_vals);
      -- Enqueue the created row LCR
      DBMS_AQ.ENQUEUE(
        queue_name         =>  'strmadmin.oe_q_any', 
        enqueue_options    =>  eopt,
        message_properties =>  mprop,
        payload            =>  SYS.AnyData.ConvertObject(row_lcr),
        msgid              =>  enq_msgid);
    END enq_row_lcr_proc;
    /
    
    
  9. Create a row LCR that inserts a row into the oe.inventories table and enqueue the row LCR into the strmadmin.oe_q_any queue.

    DECLARE
      newunit1  SYS.LCR$_ROW_UNIT;
      newunit2  SYS.LCR$_ROW_UNIT;
      newunit3  SYS.LCR$_ROW_UNIT;
      newvals   SYS.LCR$_ROW_LIST;
    BEGIN
      newunit1 := SYS.LCR$_ROW_UNIT(
        'PRODUCT_ID', 
        SYS.AnyData.ConvertNumber(3503),
        DBMS_LCR.NOT_A_LOB,
        NULL,
        NULL);
      newunit2 := SYS.LCR$_ROW_UNIT(
        'WAREHOUSE_ID', 
        SYS.AnyData.ConvertNumber(1),
        DBMS_LCR.NOT_A_LOB,
        NULL,
        NULL);
      newunit3 := SYS.LCR$_ROW_UNIT(
        'QUANTITY_ON_HAND', 
        SYS.AnyData.ConvertNumber(157),
        DBMS_LCR.NOT_A_LOB,
        NULL,
        NULL);
      newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3);
    oe.enq_row_lcr_proc(
      source_dbname  =>  'DBS1.NET',
      cmd_type       =>  'INSERT',
      obj_owner      =>  'OE',
      obj_name       =>  'INVENTORIES',
      old_vals       =>  NULL,
      new_vals       =>  newvals);
    END;
    /
    COMMIT;
    
    
  10. After allowing some time for propagation, query the queue table at dbs3.net to view the propagated message:

    CONNECT strmadmin/strmadminpw@dbs3.net
    
    SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_LCR;
    
    

    See Also:

    "DBMS_STREAMS"in PL/SQL Packages and Types Reference for more information about the row LCR and DDL LCR conversion functions