Skip Headers

Oracle® Streams Concepts and Administration
10g Release 1 (10.1)

Part Number B10727-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page
Previous
Go to next page
Next
View PDF

10
Managing Staging and Propagation

This chapter provides instructions for managing SYS.AnyData queues, propagations, and messaging environments.

This chapter contains these topics:

Each task described in this chapter should be completed by a Streams administrator that has been granted the appropriate privileges, unless specified otherwise.

See Also:

Managing SYS.AnyData Queues

A SYS.AnyData queue stages events whose payloads are of SYS.AnyData type. Therefore, a SYS.AnyData queue can stage an event with payload of nearly any type, if the payload is wrapped in a SYS.AnyData wrapper. Each Streams capture process and apply process is associated with one SYS.AnyData queue, and each Streams propagation is associated with one Streams source queue and one SYS.AnyData destination queue.

This section provides instructions for completing the following tasks related to SYS.AnyData queues:

Creating a SYS.AnyData Queue

You use the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package to create a SYS.AnyData queue. This procedure enables you to specify the following for the SYS.AnyData queue it creates:

This procedure creates a queue that is both a secure queue and a transactional queue and starts the newly created queue.

For example, to create a SYS.AnyData queue named strm01_queue in the strmadmin schema with a queue table named strm01_queue_table and grant the hr user the privileges necessary to enqueue events into and dequeue events from the queue, run the following procedure:

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'strmadmin.strm01_queue_table',
    queue_name   => 'strmadmin.strm01_queue',
    queue_user   => 'hr');
END;
/

You also can use procedures in the DBMS_AQADM package to create a SYS.AnyData queue.


Note:
  • Queue names and queue table names can be a maximum of 24 bytes.
  • An event cannot be enqueued into a queue unless a subscriber who can dequeue the event is configured.

See Also:

Enabling a User to Perform Operations on a Secure Queue

For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. If you use the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package to create the secure queue, then the queue owner and the user specified by the queue_user parameter are configured as secure users of the queue automatically. If you want to enable other users to perform operations on the queue, then you can configure these users in one of the following ways:

The following example illustrates associating a user with an AQ agent manually. Suppose you want to enable the oe user to perform queue operations on the strm01_queue created in "Creating a SYS.AnyData Queue". The following steps configure the oe user as a secure queue user of strm01_queue:

  1. Connect as an administrative user who can create AQ agents and alter users.
  2. Create an agent:
    EXEC DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'strm01_queue_agent');
    
    
  3. If the user must be able to dequeue events from queue, then make the agent a subscriber of the secure queue:
    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('strm01_queue_agent', NULL, NULL);  
      DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name          =>  'strmadmin.strm01_queue',
        subscriber          =>  subscriber,
        rule                =>  NULL,
        transformation      =>  NULL);
    END;
    /
    
    
  4. Associate the user with the agent:
    BEGIN
      DBMS_AQADM.ENABLE_DB_ACCESS(
        agent_name  => 'strm01_queue_agent',
        db_username => 'oe');
    END;
    /
    
    
  5. Grant the user EXECUTE privilege on the DBMS_STREAMS_MESSAGING package or the DBMS_AQ package, if the user is not already granted these privileges:
    GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe;
    
    GRANT EXECUTE ON DBMS_AQ TO oe;
    
    

When these steps are complete, the oe user is a secure user of the strm01_queue queue and can perform operations on the queue. You still must grant the user specific privileges to perform queue operations, such as enqueue and dequeue privileges.

See Also:

Disabling a User from Performing Operations on a Secure Queue

You may want to disable a user from performing queue operations on a secure queue for the following reasons:

To disable a secure queue user, you can revoke ENQUEUE and DEQUEUE privilege on the queue from the user, or you can run the DISABLE_DB_ACCESS procedure in the DBMS_AQADM package. For example, suppose you want to disable the oe user from performing queue operations on the strm01_queue created in "Creating a SYS.AnyData Queue".


Attention:

If an AQ agent is used for multiple secure queues, then running DISABLE_DB_ACCESS for the agent prevents the user associated with the agent from performing operations on all of these queues.


  1. Run the following procedure to disable the oe user from performing queue operations on the secure queue strm01_queue:
    BEGIN
      DBMS_AQADM.DISABLE_DB_ACCESS(
        agent_name  => 'strm01_queue_agent',
        db_username => 'oe');
    END;
    /
    
    
  2. If the agent is no longer needed, you can drop the agent:
    BEGIN
      DBMS_AQADM.DROP_AQ_AGENT(
        agent_name  => 'strm01_queue_agent');
    END;
    /
    
    
  3. Revoke privileges on the queue from the user, if the user no longer needs these privileges.
    BEGIN
      DBMS_AQADM.REVOKE_QUEUE_PRIVILEGE (
       privilege   => 'ALL',
       queue_name  => 'strmadmin.strm01_queue',
       grantee     => 'oe');
    END;
    /
    
    
    See Also:

Removing a SYS.AnyData Queue

You use the REMOVE_QUEUE procedure in the DBMS_STREAMS_ADM package to remove an existing SYS.AnyData queue. When you run the REMOVE_QUEUE procedure, it waits until any existing events in the queue are consumed. Next, it stops the queue, which means that no further enqueues into the queue or dequeues from the queue are allowed. When the queue is stopped, it drops the queue.

You also can drop the queue table for the queue if it is empty and is not used by another queue. To do so, specify true, the default, for the drop_unused_queue_table parameter.

In addition, you can drop any Streams clients that use the queue by setting the cascade parameter to true. By default, the cascade parameter is set to false.

For example, to remove a SYS.AnyData queue named strm01_queue in the strmadmin schema and drop its empty queue table, run the following procedure:

BEGIN
  DBMS_STREAMS_ADM.REMOVE_QUEUE(
    queue_name              => 'strmadmin.strm01_queue',
    cascade                 => false,
    drop_unused_queue_table => true);
END;
/

In this case, because the cascade parameter is set to false, this procedure drops the strm01_queue only if no Streams clients use the queue. If the cascade parameter is set to false and any Streams client uses the queue, then an error is raised.

Managing Streams Propagations and Propagation Jobs

A propagation propagates events from a Streams source queue to a Streams destination queue. This section provides instructions for completing the following tasks:

In addition, you can use the features of Oracle Advanced Queuing (AQ) to manage Streams propagations.

See Also:

Oracle Streams Advanced Queuing User's Guide and Reference for more information about managing propagations with the features of AQ

Creating a Propagation

You can use any of the following procedures to create a propagation:

Each of the procedures in the DBMS_STREAMS_ADM package creates a propagation with the specified name if it does not already exist, creates either a positive or negative rule set for the propagation if the propagation does not have such a rule set, and may add table, schema, or global rules to the rule set. The CREATE_PROPAGATION procedure creates a propagation, but does not create a rule set or rules for the propagation. However, the CREATE_PROPAGATION procedure enables you to specify an existing rule set to associate with the propagation, either as a positive or a negative rule set. All propagations are started automatically upon creation.

The following tasks must be completed before you create a propagation:

Example of Creating a Propagation Using DBMS_STREAMS_ADM

The following is an example that runs the ADD_TABLE_PROPAGATION_RULES procedure in the DBMS_STREAMS_ADM package to create a propagation:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.departments',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm01_queue',
    destination_queue_name  => 'strmadmin.strm02_queue@dbs2.net',
    include_dml             => true,
    include_ddl             => true,
    include_tagged_lcr      => false,
    source_database         => 'dbs1.net',
    inclusion_rule          => true);
END;
/

Running this procedure performs the following actions:

Example of Creating a Propagation Using DBMS_PROPAGATION_ADM

The following is an example that runs the CREATE_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to create a propagation:

BEGIN
  DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
    propagation_name   => 'strm02_propagation',
    source_queue       => 'strmadmin.strm03_queue',
    destination_queue  => 'strmadmin.strm04_queue',
    destination_dblink => 'dbs2.net',
    rule_set_name      => 'strmadmin.strm01_rule_set');
END;
/

Running this procedure performs the following actions:

Enabling a Propagation Job

By default, propagation jobs are enabled upon creation. If you disable a propagation job and want to enable it, then use the ENABLE_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, to enable a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link, run the following procedure:

BEGIN
  DBMS_AQADM.ENABLE_PROPAGATION_SCHEDULE(
    queue_name  => 'strmadmin.strm01_queue',
    destination => 'dbs2.net');
END;
/


Note:

Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Scheduling a Propagation Job

You can schedule a propagation job using the SCHEDULE_PROPAGATION procedure in the DBMS_AQADM package. If there is a problem with a propagation job, then unscheduling and scheduling the propagation job may correct the problem.

For example, the following procedure schedules a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link:

BEGIN
  DBMS_AQADM.SCHEDULE_PROPAGATION(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net'); 
END;
/


Note:

Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Altering the Schedule of a Propagation Job

You can alter the schedule of an existing propagation job using the ALTER_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, suppose you want to alter the schedule of a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link. The following procedure sets the propagation job to propagate events every 15 minutes (900 seconds), with each propagation lasting 300 seconds, and a 25 second wait before new events in a completely propagated queue are propagated.

BEGIN
  DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net',
   duration    => 300,
   next_time   => 'SYSDATE + 900/86400',
   latency     => 25); 
END;
/

Note:

Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Unscheduling a Propagation Job

You can unschedule a propagation job using the UNSCHEDULE_PROPAGATION procedure in the DBMS_AQADM package. If there is a problem with a propagation job, then unscheduling and scheduling the propagation job may correct the problem.

For example, the following procedure unschedules a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link:

BEGIN
  DBMS_AQADM.UNSCHEDULE_PROPAGATION(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net'); 
END;
/

Note:

Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Specifying the Rule Set for a Propagation

You can specify one positive rule set and one negative rule set for a propagation. The propagation propagates an event if it evaluates to TRUE for at least one rule in the positive rule set and discards a change if it evaluates to TRUE for at least one rule in the negative rule set. The negative rule set is evaluated before the positive rule set.

See Also:

Specifying a Positive Rule Set for a Propagation

You specify an existing rule set as the positive rule set for an existing propagation using the rule_set_name parameter in the ALTER_PROPAGATION procedure. This procedure is in the DBMS_PROPAGATION_ADM package.

For example, the following procedure sets the positive rule set for a propagation named strm01_propagation to strm02_rule_set.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name  => 'strm01_propagation',
    rule_set_name     => 'strmadmin.strm02_rule_set');
END;
/

Specifying a Negative Rule Set for a Propagation

You specify an existing rule set as the negative rule set for an existing propagation using the negative_rule_set_name parameter in the ALTER_PROPAGATION procedure. This procedure is in the DBMS_PROPAGATION_ADM package.

For example, the following procedure sets the negative rule set for a propagation named strm01_propagation to strm03_rule_set.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name        => 'strm01_propagation',
    negative_rule_set_name  => 'strmadmin.strm03_rule_set');
END;
/

Adding Rules to the Rule Set for a Propagation

To add rules to the rule set of a propagation, you can run one of the following procedures:

Excluding the ADD_SUBSET_PROPAGATION_RULES procedure, these procedures can add rules to the positive or negative rule set for a propagation. The ADD_SUBSET_PROPAGATION_RULES procedure can add rules only to the positive rule set for a propagation.

See Also:

Adding Rules to the Positive Rule Set for a Propagation

The following is an example that runs the ADD_TABLE_PROPAGATION_RULES procedure in the DBMS_STREAMS_ADM package to add rules to the positive rule set of an existing propagation named strm01_propagation:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.locations',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm01_queue',
    destination_queue_name  => 'strmadmin.strm02_queue@dbs2.net',
    include_dml             => true,
    include_ddl             => true,
    source_database         => 'dbs1.net',
    inclusion_rule          => true);
END;
/

Running this procedure performs the following actions:

Adding Rules to the Negative Rule Set for a Propagation

The following is an example that runs the ADD_TABLE_PROPAGATION_RULES procedure in the DBMS_STREAMS_ADM package to add rules to the negative rule set of an existing propagation named strm01_propagation:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.departments',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm01_queue',
    destination_queue_name  => 'strmadmin.strm02_queue@dbs2.net',
    include_dml             => true,
    include_ddl             => true,
    source_database         => 'dbs1.net',
    inclusion_rule          => false);
END;
/

Running this procedure performs the following actions:

Removing a Rule from the Rule Set for a Propagation

You specify that you want to remove a rule from the rule set for an existing propagation by running the REMOVE_RULE procedure in the DBMS_STREAMS_ADM package. For example, the following procedure removes a rule named departments3 from the positive rule set of a propagation named strm01_propagation.

BEGIN
  DBMS_STREAMS_ADM.REMOVE_RULE(
    rule_name        => 'departments3',
    streams_type     => 'propagation',
    streams_name     => 'strm01_propagation',
    drop_unused_rule => true,
    inclusion_rule   => true);
END;
/

In this example, the drop_unused_rule parameter in the REMOVE_RULE procedure is set to true, which is the default setting. Therefore, if the rule being removed is not in any other rule set, then it will be dropped from the database. If the drop_unused_rule parameter is set to false, then the rule is removed from the rule set, but it is not dropped from the database.

If the inclusion_rule parameter is set to false, then the REMOVE_RULE procedure removes the rule from the negative rule set for the propagation, not the positive rule set.

In addition, if you want to remove all of the rules in the rule set for the propagation, then specify NULL for the rule_name parameter when you run the REMOVE_RULE procedure.

See Also:

"Streams Client With One or More Empty Rule Sets"

Removing a Rule Set for a Propagation

You specify that you want to remove a rule set from a propagation using the ALTER_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package. This procedure can remove the positive rule set, negative rule set, or both. Specify true for the remove_rule_set parameter to remove the positive rule set for the propagation. Specify true for the remove_negative_rule_set parameter to remove the negative rule set for the propagation.

For example, the following procedure removes both the positive and the negative rule set from a propagation named strm01_propagation.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name         => 'strm01_propagation',
    remove_rule_set          => true,
    remove_negative_rule_set => true);
END;
/


Note:

If a propagation does not have a positive or negative rule set, then the propagation propagates all events in the source queue to the destination queue.


Disabling a Propagation Job

To stop a propagation job, use the DISABLE_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, to stop a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link, run the following procedure:

BEGIN
  DBMS_AQADM.DISABLE_PROPAGATION_SCHEDULE(
    queue_name  => 'strmadmin.strm01_queue',
    destination => 'dbs2.net');
END;
/

Note:
  • Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.
  • The DISABLE_PROPAGATION_SCHEDULE disables the propagation job immediately. It does not wait for the current duration to end.

See Also:

Oracle Streams Advanced Queuing User's Guide and Reference for more information about using the DISABLE_PROPAGATION_SCHEDULE procedure

Dropping a Propagation

You run the DROP_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to drop an existing propagation. For example, the following procedure drops a propagation named strm01_propagation:

BEGIN
  DBMS_PROPAGATION_ADM.DROP_PROPAGATION(
    propagation_name      => 'strm01_propagation',
    drop_unused_rule_sets => true);
END;
/

Because the drop_unused_rule_sets parameter is set to true, this procedure also drops any rule sets used by the propagation strm01_propagation, unless a rule set is used by another Streams client. If the drop_unused_rule_sets parameter is set to true, then both the positive and negative rule set for the propagation may be dropped. If this procedure drops a rule set, then it also drops any rules in the rule set that are not in another rule set.


Note:

When you drop a propagation, the propagation job used by the propagation is dropped automatically, if no other propagations are using the propagation job.


Managing a Streams Messaging Environment

Streams enables messaging with queues of type SYS.AnyData. These queues stage user messages whose payloads are of SYS.AnyData type, and a SYS.AnyData payload can be a wrapper for payloads of different datatypes.

This section provides instructions for completing the following tasks:

Wrapping User Message Payloads in a SYS.AnyData Wrapper and Enqueuing Them

You can wrap almost any type of payload in a SYS.AnyData payload. The following sections provide examples of enqueuing messages into, and dequeuing messages from, a SYS.AnyData queue.

The following steps illustrate how to wrap payloads of various types in a SYS.AnyData payload.

  1. Connect as an administrative user who can create users, grant privileges, create tablespaces, and alter users at the dbs1.net database.
  2. Grant EXECUTE privilege on the DBMS_AQ package to the oe user so that this user can run the ENQUEUE and DEQUEUE procedures in that package:
    GRANT EXECUTE ON DBMS_AQ TO oe;
    
    
  3. Connect as the Streams administrator, as in the following example:
    CONNECT strmadmin/strmadminpw@dbs1.net
    
    
  4. Create a SYS.AnyData queue if one does not already exist.
    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe_q_table_any',
        queue_name   => 'oe_q_any',
        queue_user   => 'oe');
    END;
    /
    
    

    The oe user is configured automatically as a secure queue user of the oe_q_any queue and is given ENQUEUE and DEQUEUE privileges on the queue. In addition, an AQ agent named oe is configured and is associated with the oe user. However, a message cannot be enqueued into a queue unless a subscriber who can dequeue the message is configured.

  5. Add a subscriber for oe_q_any queue. This subscriber will perform explicit dequeues of events.
    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('OE', NULL, NULL);  
      SYS.DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name  =>  'strmadmin.oe_q_any',
        subscriber  =>  subscriber);
    END;
    /
    
    
  6. Connect as the oe user.
    CONNECT oe/oe@dbs1.net
    
    
  7. Create a procedure that takes as an input parameter an object of SYS.AnyData type and enqueues a message containing the payload into an existing SYS.AnyData queue.
    CREATE OR REPLACE PROCEDURE oe.enq_proc (payload SYS.AnyData) 
    IS
      enqopt     DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop      DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid  RAW(16);
    BEGIN
      mprop.SENDER_ID := SYS.AQ$_AGENT('OE', NULL, NULL); 
      DBMS_AQ.ENQUEUE(
        queue_name          =>  'strmadmin.oe_q_any',
        enqueue_options     =>  enqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  enq_msgid);
    END;
    /
    
    
  8. Run the procedure you created in Step 7 by specifying the appropriate Convertdata_type function. The following commands enqueue messages of various types.

    VARCHAR2 type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertVarchar2('Chemicals - SW'));
    COMMIT;
    
    

    NUMBER type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertNumber('16'));
    COMMIT;
    
    

    User-defined type:

    BEGIN
      oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ(
        '1646 Brazil Blvd','361168','Chennai','Tam', 'IN')));
    END;
    /
    COMMIT;
    
    See Also:

    "Viewing the Contents of User-Enqueued Events in a Queue" for information about viewing the contents of these enqueued messages

Dequeuing a Payload That Is Wrapped in a SYS.AnyData Payload

The following steps illustrate how to dequeue a payload wrapped in a SYS.AnyData payload. This example assumes that you have completed the steps in "Wrapping User Message Payloads in a SYS.AnyData Wrapper and Enqueuing Them".

To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$queue_table_name, where queue_table_name is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any queue, run the following query:

CONNECT strmadmin/strmadminpw@dbs1.net

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;

  1. Connect as the oe user:
    CONNECT oe/oe@dbs1.net
    
    
  2. Create a procedure that takes as an input the consumer of the messages you want to dequeue. The following example procedure dequeues messages of oe.cust_address_typ and prints the contents of the messages.
    CREATE OR REPLACE PROCEDURE oe.get_cust_address (
    consumer IN VARCHAR2) AS
      address         OE.CUST_ADDRESS_TYP;
      deq_address     SYS.AnyData; 
      msgid           RAW(16); 
      deqopt          DBMS_AQ.DEQUEUE_OPTIONS_T; 
      mprop           DBMS_AQ.MESSAGE_PROPERTIES_T;
      new_addresses   BOOLEAN := true;
      next_trans      EXCEPTION;
      no_messages     EXCEPTION; 
      pragma exception_init (next_trans, -25235);
      pragma exception_init (no_messages, -25228);
      num_var         pls_integer;
    BEGIN
         deqopt.consumer_name := consumer;
         deqopt.wait := 1;
         WHILE (new_addresses) LOOP
         BEGIN
          DBMS_AQ.DEQUEUE( 
             queue_name          =>  'strmadmin.oe_q_any',
             dequeue_options     =>  deqopt,
             message_properties  =>  mprop,
             payload             =>  deq_address,
             msgid               =>  msgid);
    
              deqopt.navigation := DBMS_AQ.NEXT;
    
             DBMS_OUTPUT.PUT_LINE('****');
             IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
                 DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||  
                                       deq_address.GetTypeName()); 
                 num_var := deq_address.GetObject(address);    
                 DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
                 DBMS_OUTPUT.PUT_LINE(address.street_address);
                 DBMS_OUTPUT.PUT_LINE(address.postal_code);
                 DBMS_OUTPUT.PUT_LINE(address.city);
                 DBMS_OUTPUT.PUT_LINE(address.state_province);
                 DBMS_OUTPUT.PUT_LINE(address.country_id);
             ELSE
                DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||    
                                      deq_address.GetTypeName()); 
             END IF;  
           COMMIT;   
        EXCEPTION
          WHEN next_trans THEN
          deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
          WHEN no_messages THEN
            new_addresses := false;
            DBMS_OUTPUT.PUT_LINE('No more messages');
         END;
      END LOOP; 
    END;
    /
    
    
  3. Run the procedure you created in Step 1 and specify the consumer of the messages you want to dequeue, as in the following example:
    SET SERVEROUTPUT ON SIZE 100000
    EXEC oe.get_cust_address('OE');
    

Configuring a Messaging Client and Message Notification

This section contains instructions for configuring the following elements in a database:

You can query the DBA_STREAMS_MESSAGE_CONSUMERS data dictionary view for information about existing messaging clients and notifications.

Complete the following steps to configure a messaging client and message notification:

  1. Connect as an administrative user who can grant privileges and execute subprograms in supplied packages.
  2. Set the host name used to send the email, the mail port, and the email account who sends email messages for email notifications using the DBMS_AQELM package. The following example sets the mail host name to smtp.mycompany.com, the mail port to 25, and the email account to Mary.Smith@mycompany.com:
    BEGIN
      DBMS_AQELM.SET_MAILHOST('smtp.mycompany.com') ;
      DBMS_AQELM.SET_MAILPORT(25) ;
      DBMS_AQELM.SET_SENDFROM('Mary.Smith@mycompany.com');
    END;
    /
    
    

    To determine the current mail host, mail port, and send from settings for a database, you can use procedures in the DBMS_AQELM package to get this information. For example, to determine the current mail host for a database, use the DBMS_AQELM.GET_MAILHOST procedure.

  3. Grant the necessary privileges to the users who will create the messaging client, enqueue and dequeue messages, and specify message notifications. In this example, the oe user performs all of these tasks.
    GRANT EXECUTE ON DBMS_AQ TO oe;
    GRANT EXECUTE ON DBMS_STREAMS_ADM TO oe;
    GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe;
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
        grantee      => 'oe', 
        grant_option => false);
    END;
    /
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
        grantee      => 'oe', 
        grant_option => false);
    END;
    /
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, 
        grantee      => 'oe', 
        grant_option => false);
    END;
    /
    
    
  4. Connect as the oe user:
    CONNECT oe/oe
    
    
  5. Create a SYS.AnyData queue using SET_UP_QUEUE, as in the following example:
    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe.notification_queue_table',
        queue_name   => 'oe.notification_queue');
    END;
    /
    
    
  6. Create the types for the user-enqueued messages, as in the following example:
    CREATE TYPE oe.user_msg AS OBJECT(
      object_name    VARCHAR2(30),
      object_owner   VARCHAR2(30),
      message        VARCHAR2(50));
    /
    
    
  7. Create a trigger that enqueues a message into the queue whenever an order is inserted into the oe.orders table, as in the following example:
    CREATE OR REPLACE TRIGGER oe.order_insert AFTER INSERT
    ON oe.orders FOR EACH ROW
    DECLARE
      msg            oe.user_msg;
      str            VARCHAR2(2000);
    BEGIN
      str := 'New Order - ' || :NEW.ORDER_ID || ' Order ID';
      msg  := oe.user_msg(
                 object_name   => 'ORDERS',
                 object_owner  => 'OE',
                 message       => str);
      DBMS_STREAMS_MESSAGING.ENQUEUE (
        queue_name   => 'oe.notification_queue',
        payload      => SYS.AnyData.CONVERTOBJECT(msg));
    END;
    /
    
    
  8. Create the messaging client that will dequeue messages from the queue and the rule used by the messaging client to determine which messages to dequeue, as in the following example:
    BEGIN
      DBMS_STREAMS_ADM.ADD_MESSAGE_RULE (
        message_type   => 'oe.user_msg',
        rule_condition => ' :msg.OBJECT_OWNER = ''OE'' AND  ' ||
                          ' :msg.OBJECT_NAME = ''ORDERS'' ',
        streams_type   => 'dequeue',
        streams_name   => 'oe',
        queue_name     => 'oe.notification_queue');
    END;
    /
    
    
  9. Set the message notification to send email upon enqueue of messages that can be dequeued by the messaging client, as in the following example:
    BEGIN
      DBMS_STREAMS_ADM.SET_MESSAGE_NOTIFICATION (
        streams_name         => 'oe',
        notification_action  => 'Mary.Smith@mycompany.com',
        notification_type    => 'MAIL',
        include_notification => true,
        queue_name           => 'oe.notification_queue');
    END;
    /
    
    
  10. Create a PL/SQL procedure that dequeues messages using the messaging client, as in the following example:
    CREATE OR REPLACE PROCEDURE oe.deq_notification(consumer IN VARCHAR2) AS
      msg            SYS.AnyData;
      user_msg       oe.user_msg;
      num_var        PLS_INTEGER;
      more_messages  BOOLEAN := true;
      navigation     VARCHAR2(30);
    BEGIN
      navigation := 'FIRST MESSAGE';
      WHILE (more_messages) LOOP
        BEGIN
          DBMS_STREAMS_MESSAGING.DEQUEUE(
            queue_name   => 'oe.notification_queue',
            streams_name => consumer,
            payload      => msg,
            navigation   => navigation,
            wait         => DBMS_STREAMS_MESSAGING.NO_WAIT);
          IF msg.GETTYPENAME() = 'OE.USER_MSG' THEN
            num_var := msg.GETOBJECT(user_msg);
            DBMS_OUTPUT.PUT_LINE(user_msg.object_name);
            DBMS_OUTPUT.PUT_LINE(user_msg.object_owner);
            DBMS_OUTPUT.PUT_LINE(user_msg.message);
          END IF;
          navigation := 'NEXT MESSAGE';
          COMMIT;
        EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN
                    navigation := 'NEXT TRANSACTION';
                  WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN
                    more_messages := false;
                    DBMS_OUTPUT.PUT_LINE('No more messages.');
                  WHEN OTHERS THEN
                    RAISE;  
        END;
      END LOOP;
    END;
    /
    
    
  11. Insert rows into the oe.orders table, as in the following example:
    INSERT INTO oe.orders VALUES(2521, 'direct', 144, 0, 922.57, 159, NULL);
    INSERT INTO oe.orders VALUES(2522, 'direct', 116, 0, 1608.29, 153, NULL);
    COMMIT;
    INSERT INTO oe.orders VALUES(2523, 'direct', 116, 0, 227.55, 155, NULL);
    COMMIT;
    
    

Message notification sends a message to the email address specified in Step 9 for each message that was enqueued. Each notification is an AQXmlNotification, which includes of the following:

The following is an example of the AQXmlNotification format sent in an email notification:

<?xml version="1.0" encoding="UTF-8"?>
<Envelope xmlns="http://ns.oracle.com/AQ/schemas/envelope">
    <Body>
        <AQXmlNotification xmlns="http://ns.oracle.com/AQ/schemas/access">
            <notification_options>
                <destination>OE.NOTIFICATION_QUEUE</destination>
                <consumer_name>OE</consumer_name>
            </notification_options>
            <message_set>
                <message>
                    <message_header>
                        <message_id>CB510DDB19454731E034080020AE3E0A</message_id>
                        <expiration>-1</expiration>
                        <delay>0</delay>
                        <priority>1</priority>
                        <delivery_count>0</delivery_count>
                        <sender_id>
                            <agent_name>OE</agent_name>
                            <protocol>0</protocol>
                        </sender_id>
                        <message_state>0</message_state>
                    </message_header>
                </message>
            </message_set>
        </AQXmlNotification>
    </Body>
</Envelope>

You may dequeue the messages enqueued in this example by running the oe.deq_notification procedure:

SET SERVEROUTPUT ON SIZE 100000
EXEC oe.deq_notification('OE');

See Also: