Oracle9i Application Developer's Guide - Advanced Queuing
Release 1 (9.0.1)

Part Number A88890-02
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback

Go to previous page Go to beginning of chapter Go to next page

A Sample Application Using AQ, 5 of 7


Dequeue Features

Dequeue Methods

A message can be dequeued using one of the following dequeue methods:

A correlation identifier is a user-defined message property (of VARCHAR2 datatype) while a message identifier is a system-assigned value (of RAW datatype). Multiple messages with the same correlation identifier can be present in a queue, while only one message with a given message identifier can be present. A dequeue call with a correlation identifier will directly remove a message of specific interest, rather than using a combination of locked and remove mode to first examine the content and then remove the message. Hence, the correlation identifier usually contains the most useful attribute of a payload. If there are multiple messages with the same correlation identifier, the ordering (enqueue order) between messages may not be preserved on dequeue calls. The correlation identifier cannot be changed between successive dequeue calls without specifying the first message navigation option.

A dequeue condition is an expression that is similar in syntax to the WHERE clause of a SQL query. Dequeue conditions are expressed in terms of the attributes that represent message properties or message content. The messages in the queue are evaluated against the conditions and a message that satisfies the given condition is returned.

Note that dequeueing a message with any of the dequeue methods will not preserve the message grouping property (see "Message Grouping" and "Message Navigation in Dequeue" for further information).

Scenario

In the BooksOnLine example, rush orders received by the East shipping site are processed first. This is achieved by dequeueing the message using the correlation identifier, which has been defined to contain the order type (rush/normal). For an illustration of dequeueing using a message identifier, refer to the get_northamerican_orders procedure discussed in the example under "Modes of Dequeuing".

PL/SQL (DBMS_AQADM Package): Example Code

CONNECT boladm/boladm; 
 
/*  Create procedures to dequeue RUSH orders */ 
create or replace procedure get_rushtitles(consumer in varchar2) as 
  
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
qname                    varchar2(30); 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 
  
begin 
  
        dopt.consumer_name := consumer; 
        dopt.wait := 1; 
        dopt.correlation := 'RUSH'; 
  
        IF (consumer = 'West_Shipping') THEN 
                qname := 'WS.WS_bookedorders_que'; 
        ELSIF (consumer = 'East_Shipping') THEN 
                qname := 'ES.ES_bookedorders_que'; 
        ELSE 
                qname := 'OS.OS_bookedorders_que'; 
        END IF; 
  
        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name => qname, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => deq_order_data, 
                msgid => deq_msgid); 
            commit; 
         
            deq_item_data := deq_order_data.items(1); 
            deq_book_data := deq_item_data.item; 
  
            dbms_output.put_line(' rushorder book_title: ' ||  
                                deq_book_data.title ||  
                        ' quantity: ' || deq_item_data.quantity); 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE RUSH TITLES ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
end; 
/ 
 
CONNECT EXECUTE on get_rushtitles to ES; 
 
/* Dequeue the orders: */ 
CONNECT ES/ES; 
 
/*  Dequeue all rush order titles for East_Shipping: */ 
EXECUTE BOLADM.get_rushtitles('East_Shipping'); 

Visual Basic (OO4O): Example Code

   set oraaq1 = OraDatabase.CreateAQ("WS.WS_backorders_que")
   set oraaq2 = OraDatabase.CreateAQ("ES.ES_backorders_que")
   set oraaq3 = OraDatabase.CreateAQ("CBADM.deferbilling_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
   Set OraBackOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")

Private Sub Requeue_backorder
   Dim q as oraobject
   If sale_region = WEST then
      q = oraaq1
   else if sale_region = EAST then
      q = oraaq2
   else 
      q = oraaq3
   end if

   OraMsg.delay = 7*60*60*24
   OraMsg = OraBackOrder 'OraOrder contains the order details
   Msgid = q.enqueue

End Sub

Java (JDBC): Example Code

public static void getRushTitles(Connection db_conn, String consumer)
{
    AQSession         aq_sess;
    Order             deq_order;
    byte[]            deq_msgid;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    boolean           new_orders = true;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setConsumerName(consumer);
        deq_option.setWaitTime(1);
        deq_option.setCorrelation("RUSH");

        if(consumer.equals("West_Shipping"))
        {
            bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");    
        }       
        else if(consumer.equals("East_Shipping"))
        {
            bookedorders_q = aq_sess.getQueue("ES", "ES_bookedorders_que");    
        }       
        else
        {
            bookedorders_q = aq_sess.getQueue("OS", "OS_bookedorders_que");   
        }       

        while(new_orders)
        {
            try
            {
              /* Dequeue the message */
              message = bookedorders_q.dequeue(deq_option, Order.getFactory());

              obj_payload = message.getObjectPayload();

              deq_order = (Order)(obj_payload.getPayloadData());

              System.out.println("Order number " + deq_order.getOrderno() + 
                                 " is a rush order");
              
            }
            catch (AQException aqex)
            {
              new_orders = false;       
              System.out.println("No more rush titles");        
              System.out.println("Exception-1: " + aqex);       
            }
        }
    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }   

}

Multiple Recipients

A consumer can dequeue a message from a multi-consumer normal queue by supplying the name that was used in the AQ$_AGENT type of the DBMS_AQADM.ADD_SUBSCRIBER procedure or the recipient list of the message properties. (See "Adding a Subscriber" or Enqueuing a Message [Specify Message Properties]).

Multiple processes or operating system threads can use the same consumer_name to dequeue concurrently from a queue. In that case AQ will provide the first unlocked message that is at the head of the queue and is intended for the consumer. Unless the message ID of a specific message is specified during dequeue, the consumers can dequeue messages that are in the READY state.

A message is considered PROCESSED only when all intended consumers have successfully dequeued the message. A message is considered EXPIRED if one or more consumers did not dequeue the message before the EXPIRATION time. When a message has expired, it is moved to an exception queue.

The exception queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE mode exactly once by specifying a NULL consumer name in the dequeue options. Hence, from a dequeue perspective, multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL consumer name. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table with the compatible parameter set to '8.0'.

Beginning with release 8.1.6, only the queue monitor removes messages from multi-consumer queues. This allows dequeuers to complete the dequeue operation by not locking the message in the queue table. Since the queue monitor removes messages that have been processed by all consumers from multi-consumer queues approximately once every minute, users may see a delay when the messages have been completely processed and when they are physically removed from the queue.

Local and Remote Recipients

Consumers of a message in multi-consumer queues (either by virtue of being a subscriber to the queue or because the consumer was a recipient in the enqueuer's recipient list) can be local or remote.

When a consumer is remote, a message will be marked as PROCESSED in the source queue immediately after the message has been propagated, even though the consumer may not have dequeued the message at the remote queue. Similarly, when a propagated message expires at the remote queue, the message is moved to the DEFAULT exception queue of the remote queue's queue table, and not to the exception queue of the local queue. As can be seen in both cases, AQ does not currently propagate the exceptions to the source queue. You can use the MSGID and the ORIGINAL_MSGID columns in the queue table view (AQ$<queue_table>) to chain the propagated messages. When a message with message ID m1 is propagated to a remote queue, m1 is stored in the ORIGINAL_MSGID column of the remote queue.

The DELAY, EXPIRATION and PRIORITY parameters apply identically to both local and remote consumers. AQ accounts for any delay in propagation by adjusting the DELAY and EXPIRATION parameters accordingly. For example, if the EXPIRATION is set to one hour, and the message is propagated after 15 minutes, the expiration at the remote queue will be set to 45 minutes.

Since the database handles message propagation, OO4O does not differentiate between remote and local recipients. The same sequence of calls/steps are required to dequeue a message for local and remote recipients.

Message Navigation in Dequeue

You have several options for selecting a message from a queue. You can select the "first message". Alternatively, once you have selected a message and established its position in the queue (for example, as the fourth message), you can then retrieve the "next message".

These selections work in a slightly different way if the queue is enabled for transactional grouping.

Note that the transaction grouping property is negated if a dequeue is performed in one of the following ways: dequeue by specifying a correlation identifier, dequeue by specifying a message identifier, or dequeueing some of the messages of a transaction and committing (see "Dequeue Methods").

In navigating through the queue, if the program reaches the end of the queue while using the "next message" or "next transaction" option, and you have specified a non-zero wait time, then the navigating position is automatically changed to the beginning of the queue. If a zero wait time is specified, you may get an exception when the end of the queue is reached.

Scenario

The following scenario in the BooksOnLine example continues the message grouping example already discussed with regard to enqueuing (see "Dequeue Methods").

The get_orders() procedure dequeues orders from the OE_neworders_que. Recall that each transaction refers to an order and each message corresponds to an individual book in the order. The get_orders() procedure loops through the messages to dequeue the book orders. It resets the position to the beginning of the queue using the first message option before the first dequeues. It then uses the next message navigation option to retrieve the next book (message) of an order (transaction). If it gets an error message indicating all message in the current group/transaction have been fetched, it changes the navigation option to next transaction and gets the first book of the next order. It then changes the navigation option back to next message for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.

PL/SQL (DBMS_AQADM Package): Example Code

CONNECT boladm/boladm; 
 
create or replace procedure get_new_orders as 
 
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
qname                    VARCHAR2(30); 
no_messages              exception; 
end_of_group             exception; 
pragma exception_init    (no_messages, -25228); 
pragma exception_init    (end_of_group, -25235); 
new_orders               BOOLEAN := TRUE; 
  
BEGIN 
  
        dopt.wait := 1; 
        dopt.navigation := DBMS_AQ.FIRST_MESSAGE;  
        qname := 'OE.OE_neworders_que'; 
        WHILE (new_orders) LOOP 
          BEGIN 
            LOOP 
                BEGIN 
                    dbms_aq.dequeue( 
                        queue_name          => qname, 
                        dequeue_options     => dopt, 
                        message_properties  => mprop, 
                        payload             => deq_order_data, 
                        msgid               => deq_msgid); 
         
                    deq_item_data := deq_order_data.items(1); 
                    deq_book_data := deq_item_data.item; 
                    deq_cust_data := deq_order_data.customer; 
  
                    IF (deq_cust_data IS NOT NULL) THEN 
                      dbms_output.put_line(' **** NEXT ORDER **** ');  
                      dbms_output.put_line('order_num: ' ||  
                                deq_order_data.orderno); 
                      dbms_output.put_line('ship_state: ' ||  
                                deq_cust_data.state); 
                    END IF; 
                    dbms_output.put_line(' ---- next book ---- ');  
                    dbms_output.put_line(' book_title: ' ||  
                                deq_book_data.title ||  
                                ' quantity: ' || deq_item_data.quantity); 
                EXCEPTION 
                    WHEN end_of_group THEN 
                      dbms_output.put_line ('*** END OF ORDER ***'); 
                      commit; 
                      dopt.navigation := DBMS_AQ.NEXT_TRANSACTION; 
                END; 
            END LOOP; 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE NEW ORDERS ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
END; 
/ 
 
CONNECT EXECUTE ON get_new_orders to OE; 
 
/*  Dequeue the orders: */
CONNECT OE/OE; 
EXECUTE BOLADM.get_new_orders; 

Visual Basic (OO4O): Example Code

Dim OraSession as object
Dim OraDatabase as object
Dim OraAq as object
Dim OraMsg as Object
Dim OraOrder,OraItemList,OraItem,OraBook,OraCustomer  as Object
Dim Msgid as String

   Set OraSession = CreateObject("OracleInProcServer.XOraSession")
   Set OraDatabase = OraSession.DbOpenDatabase("", "boladm/boladm", 0&)
   set oraaq = OraDatabase.CreateAQ("OE.OE_neworders_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
       OraAq.wait = 1
   OraAq.Navigation = ORAAQ_DQ_FIRST_MESSAGE

private sub get_new_orders
   Dim MsgIsDequeued as Boolean
   On Error goto ErrHandler
   MsgIsDequeued = TRUE
       msgid = q.Dequeue
        if MsgIsDequeued then
      set OraOrder = OraMsg   
      OraItemList = OraOrder("items")
      OraItem = OraItemList(1)
      OraBook = OraItem("item")
      OraCustomer = OraOrder("customer")

         ' Populate the textboxes with the values 
      if( OraCustomer ) then
         if OraAq.Navigation <> ORAAQ_DQ_NEXT_MESSAGE then
            MsgBox " ********* NEXT ORDER *******"
         end if
         txt_book_orderno = OraOrder("orderno")
         txt_book_shipstate = OraCustomer("state")
      End if
      OraAq.Navigation = ORAAQ_DQ_NEXT_MESSAGE
      txt_book_title = OraBook("title")
      txt_book_qty  = OraItem("quantity")
    Else
      MsgBox " ********* END OF  ORDER *******"
   End if
 
ErrHandler :
   'Handle error case, like no message etc
   If OraDatabase.LastServerErr = 25228 then
      OraAq.Navigation = ORAAQ_DQ_NEXT_TRANSACTION
      MsgIsDequeued = FALSE
      Resume Next
   End If
   'Process other errors
end sub

Java (JDBC): Example Code

No example is provided with this release.

Modes of Dequeuing

A dequeue request can either view a message or delete a message (see "Dequeuing a Message" in Chapter 11, "Operational Interface: Basic Operations").

If a message is browsed, it remains available for further processing. Similarly if a message is locked, it remains available for further processing after the lock is released by performing a transaction commit or rollback. After a message is consumed, using either of the remove modes, it is no longer available for dequeue requests.

When a message is dequeued using REMOVE_NODATA mode, the payload of the message is not retrieved. This mode can be useful when the user has already examined the message payload, possibly by means of a previous BROWSE dequeue. In this way, you can avoid the overhead of payload retrieval that can be substantial for large payloads

A message is retained in the queue table after it has been consumed only if a retention time is specified for a queue. Messages cannot be retained in exception queues (refer to the section on exceptions for further information). Removing a message with no data is generally used if the payload is known (from a previous browse/locked mode dequeue call), or the message will not be used.

Note that after a message has been browsed, there is no guarantee that the message can be dequeued again since a dequeue call from a concurrent user might have removed the message. To prevent a viewed message from being dequeued by a concurrent user, you should view the message in the locked mode.

In general, use care while using the browse mode. The dequeue position is automatically changed to the beginning of the queue if a non-zero wait time is specified and the navigating position reaches the end of the queue. Hence repeating a dequeue call in the browse mode with the "next message" navigation option and a non-zero wait time can dequeue the same message over and over again. We recommend that you use a non-zero wait time for the first dequeue call on a queue in a session, and then use a zero wait time with the next message navigation option for subsequent dequeue calls. If a dequeue call gets an "end of queue" error message, the dequeue position can be explicitly set by the dequeue call to the beginning of the queue using the "first message" navigation option, following which the messages in the queue can be browsed again.

Scenario

In the following scenario from the BooksOnLine example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) and the customer country (message payload) is checked. If the customer country is Mexico or Canada, the message is consumed (deleted from the queue) using REMOVE_NODATA (since the payload is already known). Otherwise, the lock on the message is released by the commit call. Note that the remove dequeue call uses the message identifier obtained from the locked mode dequeue call. The shipping_bookedorder_deq (refer to the example code for the description of this procedure) call illustrates the use of the browse mode.

PL/SQL (DBMS_AQADM Package): Example Code

CONNECT boladm/boladm; 
 
create or replace procedure get_northamerican_orders as 
  
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
deq_order_nodata         BOLADM.order_typ; 
qname                    VARCHAR2(30); 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 
  
begin 
  
        dopt.consumer_name := consumer; 
        dopt.wait := DBMS_AQ.NO_WAIT; 
        dopt.navigation := dbms_aq.FIRST_MESSAGE; 
        dopt.dequeue_mode := DBMS_AQ.LOCKED; 
  
        qname := 'OS.OS_bookedorders_que'; 
  
        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name => qname, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => deq_order_data, 
                msgid => deq_msgid); 
         
            deq_item_data := deq_order_data.items(1); 
            deq_book_data := deq_item_data.item; 
            deq_cust_data := deq_order_data.customer; 
  
            IF (deq_cust_data.country = 'Canada' OR  
                deq_cust_data.country = 'Mexico' ) THEN 
  
                dopt.dequeue_mode := dbms_aq.REMOVE_NODATA; 
                dopt.msgid := deq_msgid; 
                dbms_aq.dequeue( 
                        queue_name => qname, 
                        dequeue_options => dopt, 
                        message_properties => mprop, 
                        payload => deq_order_nodata, 
                        msgid => deq_msgid); 
                commit; 
  
                dbms_output.put_line(' **** next booked order **** ');  
                dbms_output.put_line('order_no: ' || deq_order_data.orderno ||  
                        ' book_title: ' || deq_book_data.title ||  
                        ' quantity: ' || deq_item_data.quantity); 
                dbms_output.put_line('ship_state: ' || deq_cust_data.state || 
                        ' ship_country: ' || deq_cust_data.country || 
                        ' ship_order_type: ' || deq_order_data.ordertype); 
  
            END IF; 
  
            commit; 
            dopt.dequeue_mode := DBMS_AQ.LOCKED; 
            dopt.msgid := NULL; 
            dopt.navigation := dbms_aq.NEXT_MESSAGE; 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE BOOKED ORDERS ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
end; 
/ 
 
CONNECT EXECUTE on get_northamerican_orders to OS; 
 
CONNECT ES/ES; 
 
/*  Browse all booked orders for East_Shipping: */ 
EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.BROWSE); 
 
CONNECT OS/OS; 
 
/*  Dequeue all international North American orders for Overseas_Shipping: */ 
EXECUTE BOLADM.get_northamerican_orders; 

Visual Basic (OO4O): Example Code

OO4O supports all the modes of dequeuing described above. Possible values include:

Java (JDBC): Example Code

public static void get_northamerican_orders(Connection db_conn)
{

    AQSession         aq_sess;
    Order             deq_order;
    Customer          deq_cust;
    String            cust_country;
    byte[]            deq_msgid;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    boolean           new_orders = true;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setConsumerName("Overseas_Shipping");
        deq_option.setWaitTime(AQDequeueOption.WAIT_NONE);
        deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE);
        deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED);

        bookedorders_q = aq_sess.getQueue("OS", "OS_bookedorders_que");    
    
        while(new_orders)
        {
            try
            {
              /* Dequeue the message - browse with lock */
              message = bookedorders_q.dequeue(deq_option, Order.getFactory());

              obj_payload = message.getObjectPayload();

              deq_msgid = message.getMessageId();
              deq_order = (Order)(obj_payload.getPayloadData());

              deq_cust = deq_order.getCustomer();

              cust_country = deq_cust.getCountry();

              if(cust_country.equals("Canada") ||
                 cust_country.equals("Mexico"))
              {
                deq_option.setDequeueMode(
                                  AQDequeueOption.DEQUEUE_REMOVE_NODATA);
                deq_option.setMessageId(deq_msgid);

                
                /* Delete the message */
                bookedorders_q.dequeue(deq_option, Order.getFactory());

                System.out.println("----  next booked order ------");
                System.out.println("Order no: " + deq_order.getOrderno());
                System.out.println("Ship state: " + deq_cust.getState());
                System.out.println("Ship country: " + deq_cust.getCountry());
                System.out.println("Order type: " + deq_order.getOrdertype());
                
              }

              db_conn.commit();

              deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED);
              deq_option.setMessageId(null);
              deq_option.setNavigationMode(
                                    AQDequeueOption.NAVIGATION_NEXT_MESSAGE); 
            }
            catch (AQException aqex)
            {
              new_orders = false;       
              System.out.println("--- No more booked orders ----");     
              System.out.println("Exception-1: " + aqex);       
            }
        }

    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }

}

Optimization of Waiting for Arrival of Messages

AQ allows applications to block on one or more queues waiting for the arrival of either a newly enqueued message or for a message that becomes ready. You can use the DEQUEUE operation to wait for the arrival of a message in a queue (see "Dequeuing a Message") or the LISTEN operation to wait for the arrival of a message in more than one queue (see "Listening to One (Many) Queue(s)" in Chapter 11, "Operational Interface: Basic Operations").

When the blocking DEQUEUE call returns, it returns the message properties and the message payload. By contrast, when the blocking LISTEN call returns, it discloses only the name of the queue where a message has arrived. A subsequent DEQUEUE operation is needed to dequeue the message.

Applications can optionally specify a timeout of zero or more seconds to indicate the time that AQ must wait for the arrival of a message. The default is to wait forever until a message arrives in the queue. This optimization is important in two ways. It removes the burden of continually polling for messages from the application. And it saves CPU and network resource because the application remains blocked until a new message is enqueued or becomes READY after its DELAY time. Applications can also perform a blocking dequeue on exception queues to wait for arrival of EXPIRED messages.

A process or thread that is blocked on a dequeue is either awakened directly by the enqueuer if the new message has no DELAY or is awakened by the queue monitor process when the DELAY or EXPIRATION time has passed. Applications cannot only wait for the arrival of a message in the queue that an enqueuer enqueues a message, but also on a remote queue, provided that propagation has been scheduled to the remote queue using DBMS_AQADM.SCHEDULE_PROPAGATION. In this case, the AQ propagator will wake up the blocked dequeuer after a message has been propagated.

Scenario

In the BooksOnLine example, the get_rushtitles procedure discussed under dequeue methods specifies a wait time of 1 second in the dequeue_options argument for the dequeue call. Wait time can be specified in different ways as illustrated in the code below.

PL/SQL (DBMS_AQADM Package): Example Code

/* dopt is a variable of type dbms_aq.dequeue_options_t. 
   Set the dequeue wait time to 10 seconds: */
dopt.wait := 10; 
 
/* Set the dequeue wait time to 0 seconds: */
dopt.wait := DBMS_AQ.NO_WAIT; 
 
/* Set the dequeue wait time to infinite (forever): */ 
dopt.wait := DBMS_AQ.FOREVER; 

Visual Basic (OO4O): Example Code

OO4O supports asynchronous dequeuing of messages. First, the monitor is started for a particular queue. When messages that fulfil the user criteria are dequeued, the user's callback object is notified.

Java (JDBC): Example Code

AQDequeueOption deq-opt;

deq-opt = new AQDequeueOption ();

Retry with Delay Interval

If the transaction dequeuing the message from a queue fails, it is regarded as an unsuccessful attempt to consume the message. AQ records the number of failed attempts to consume the message in the message history. Applications can query the retry_count column of the queue table view to find out the number of unsuccessful attempts on a message. In addition, AQ allows the application to specify, at the queue level, the maximum number of retries for messages in the queue. If the number of failed attempts to remove a message exceeds this number, the message is moved to the exception queue and is no longer available to applications.

Retry Delay

A bad condition can cause the transaction receiving a message to abort. AQ allows users to hide the bad message for a pre-specified interval. A retry_delay can be specified along with maximum retries. This means that a message that has had a failed attempt will be visible in the queue for dequeue after the retry_delay interval. Until then it will be in the WAITING state. In the AQ background process, the time manager enforces the retry delay property. The default value for maximum retries is 5. The default value for retry delay is 0. Note that maximum retries and retry delay are not available with 8.0-compatible multi-consumer queues.

PL/SQL (DBMS_AQADM Package): Example Code

 /*  Create a package that enqueue with delay set to one day: /*
      CONNECT BOLADM/BOLADM
     >
      /* queue has max retries = 4 and retry delay = 12 hours */
      execute dbms_aqadm.alter_queue(queue_name = 'WS.WS_BOOKED_ORDERS_QUE',
      max_retr
      ies = 4,
                                     retry_delay = 3600*12);
     >
      /* processes the next order available in the booked_order_queue */
      CREATE OR REPLACE PROCEDURE  process_next_order()
      AS
        dqqopt                   dbms_aq.dequeue_options_t;
        msgprop                  dbms_aq.message_properties_t;
        deq_msgid                RAW(16);
        book                     BOLADM.book_typ;
        item                     BOLADM.orderitem_typ;
        BOLADM.order_typ         order;
      BEGIN
     >
        dqqopt.dequeue_option := DBMS_AQ.FIRST_MESSAGE;
        dbms_aq.dequeue('WS.WS_BOOKED_ORDERS_QUEUE', dqqopt, msgprop, order,
      deq_msgid
      );
     >
        /* for simplicity, assume order has a single item */
       item = order.items(1);
        book = the_orders.item;
     >
        /* assume search_inventory searches inventory for the book */
        /* if we don't find the book in the warehouse, abort transaction */
        IF  (search_inventory(book) != TRUE)
            rollback;
        ELSE
           process_order(order);
        END IF;
     >
      END;
      /

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

public static void setup_queue(Connection db_conn)
{
    AQSession         aq_sess;
    AQQueue           bookedorders_q;
    AQQueueProperty   q_prop;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");    

        /* Alter queue - set max retries = 4 and retry delay = 12 hours */
        q_prop = new AQQueueProperty();
        q_prop.setMaxRetries(4);

        q_prop.setRetryInterval(3600*12);  // specified in seconds

        bookedorders_q.alterQueue(q_prop);


    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex); 
    }

}
public static void process_next_order(Connection db_conn)
{
    AQSession         aq_sess;
    Order             deq_order;
    OrderItem         order_item;
    Book              book;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE);

        bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");    


        /* Dequeue the message */
        message = bookedorders_q.dequeue(deq_option, Order.getFactory());

        obj_payload = message.getObjectPayload();

        deq_order = (Order)(obj_payload.getPayloadData());
        
        /* for simplicity, assume order has a single item */
        order_item = deq_order.getItems().getElement(0);
        book = order_item.getItem(); 

        /* assume search_inventory searches inventory for the book 
         * if we don't find the book in the warehouse, abort transaction 
         */
        if(search_inventory(book) != true)
          db_conn.rollback();
        else
          process_order(deq_order);
        
    }
    catch (AQException aqex)
    {
        System.out.println("Exception-1: " + aqex);     
    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }   
}

Exception Handling

AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES, EXPIRATION, MAX_RETRIES and RETRY_DELAY.

An exception_queue is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. Also, a multi-consumer exception queue cannot have subscribers associated with it. However, an application that intends to handle these expired or unserviceable messages can dequeue from the exception queue. The exception queue created for messages intended for a multi-consumer queue must itself be a multi-consumer queue. Like any other queue, the exception queue must be enabled for dequeue using the DBMS_AQADM.START_QUEUE procedure. You will get an Oracle error if you try to enable an exception queue for enqueue.

When a message has expired, it is moved to an exception queue. The exception queue for a message in multi-consumer queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE mode exactly once by specifying a NULL consumer name in the dequeue options. Hence, from a dequeue perspective multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL consumer name. Messages can also be dequeued from the exception queue by specifying the message ID. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'.

The exception queue is a message property that can be specified during enqueue time (see "Enqueuing a Message [Specify Message Properties]" in Chapter 11, "Operational Interface: Basic Operations"). In PL/SQL users can use the exception_queue attribute of the DBMS_AQ.MESSAGE_PROPERTIES_T record to specify the exception queue. In OCI users can use the OCISetAttr procedure to set the OCI_ATTR_EXCEPTION_QUEUE attribute of the OCIAQMsgProperties descriptor.

If an exception queue is not specified, the default exception queue is used. If the queue is created in a queue table, for example, QTAB, the default exception queue will be called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created. Messages are moved to the exception queues by AQ under the following conditions:

Messages intended for 8.1-compatible multi-consumer queues cannot be dequeued by the intended recipients once the messages have been moved to an exception queue. These messages should instead be dequeued in the REMOVE or BROWSE mode exactly once by specifying a NULL consumer name in the dequeue options. The messages can also be dequeued by their message IDs.

Messages intended for single consumer queues, or for 8.0-compatible multi-consumer queues, can only be dequeued by their message IDs once the messages have been moved to an exception queue.

Users can associate a RETRY_DELAY with a queue. The default value for this parameter is 0, meaning that the message will be available for dequeue immediately after the RETRY_COUNT is incremented. Otherwise the message will be unavailable for RETRY_DELAY seconds. After RETRY_DELAY seconds, the queue monitor marks the message as READY.

For a multi-consumer queue, RETRY_DELAY is per subscriber.

Scenario

In the BooksOnLine application, the business rule for each shipping region is that an order will be placed in a back order queue if the order cannot be filled immediately. The back order application will try to fill the order once a day. If the order cannot be filled within 5 days, it is placed in an exception queue for special processing. You can implement this process by making use of the retry and exception handling features in AQ.

The example below shows how you can create a queue with specific maximum retry and retry delay interval.

PL/SQL (DBMS_AQADM Package): Example Code

/* Example for creating a back order queue in Western Region which allows a 
   maximum of 5 retries and 1 day delay between each retry. */  
CONNECT BOLADM/BOLADM 
BEGIN 
  dbms_aqadm.create_queue ( 
        queue_name              => 'WS.WS_backorders_que', 
        queue_table             => 'WS.WS_orders_mqtab', 
        max_retries             => 5, 
        retry_delay             => 60*60*24); 
END; 
/ 
 
/* Create an exception queue for the back order queue for Western Region. */
CONNECT BOLADM/BOLADM 
BEGIN 
  dbms_aqadm.create_queue ( 
        queue_name              => 'WS.WS_backorders_excpt_que', 
        queue_table             => 'WS.WS_orders_mqtab', 
        queue_type              => DBMS_AQADM.EXCEPTION_QUEUE); 
end; 
/ 
 
/* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que as 
the exception queue for the message: */ 
CONNECT BOLADM/BOLADM 
CREATE OR REPLACE PROCEDURE enqueue_WS_unfilled_order(backorder order_typ) 
 AS 
   back_order_queue_name    varchar2(62); 
   enqopt                   dbms_aq.enqueue_options_t; 
   msgprop                  dbms_aq.message_properties_t; 
   enq_msgid                raw(16); 
 BEGIN 
     
   /* Set back order queue name for this message: */ 
   back_order_queue_name := 'WS.WS_backorders_que'; 
 
   /* Set exception queue name for this message: */ 
   msgprop.exception_queue := 'WS.WS_backorders_excpt_que'; 
 
   dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop, 
                   backorder, enq_msgid); 
 END; 
 / 

Visual Basic (OO4O): Example Code

The exception queue is a message property that can be provided at the time of enqueuing a message. If this property is not set, the default exception queue of the queue will be used for any error conditions.

set oraaq = OraDatabase.CreateAQ("CBADM.deferbilling_que")
Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
Set OraOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")
OraMsg = OraOrder
   OraMsg.delay = 15*60*60*24
   OraMsg.ExceptionQueue = "WS.WS_backorders_que"
   'Fill up the order values
   OraMsg = OraOrder 'OraOrder contains the order details
   Msgid = OraAq.enqueue

Java (JDBC): Example Code

public static void createBackOrderQueues(Connection db_conn)
{
    AQSession         aq_sess;
    AQQueue           backorders_q;
    AQQueue           backorders_excp_q;
    AQQueueProperty   q_prop;
    AQQueueProperty   q_prop2;
    AQQueueTable      mq_table;
    
    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        mq_table = aq_sess.getQueueTable("WS", "WS_orders_mqtab");
        
    
        /* Create a back order queue in Western Region which allows a 
           maximum of 5 retries and 1 day delay between each retry. */  

        q_prop = new AQQueueProperty();
        q_prop.setMaxRetries(5);
        q_prop.setRetryInterval(60*24*24);

        backorders_q = aq_sess.createQueue(mq_table, "WS_backorders_que", 
                                          q_prop);
        
        backorders_q.start(true, true);

        /* Create an exception queue for the back order queue for 
           Western Region. */
        q_prop2 = new AQQueueProperty();
        q_prop2.setQueueType(AQQueueProperty.EXCEPTION_QUEUE);

        backorders_excp_q = aq_sess.createQueue(mq_table, 
                                          "WS_backorders_excpt_que", q_prop2);

    }
    catch (Exception ex)
    {
        System.out.println("Exception " + ex);
    }

}

/* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que 
   as the exception queue for the message: */ 
public static void enqueue_WS_unfilled_order(Connection db_conn, 
                                             Order back_order)
{
    AQSession         aq_sess;
    AQQueue           back_order_q;     
    AQEnqueueOption   enq_option;
    AQMessageProperty m_property;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    byte[]            enq_msg_id;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        back_order_q = aq_sess.getQueue("WS", "WS_backorders_que");

        message = back_order_q.createMessage();

        /* Set exception queue name for this message: */ 
        m_property = message.getMessageProperty();

        m_property.setExceptionQueue("WS.WS_backorders_excpt_que");

        obj_payload = message.getObjectPayload();
        obj_payload.setPayloadData(back_order);

        enq_option = new AQEnqueueOption();

        /* Enqueue the message */
        enq_msg_id = back_order_q.enqueue(enq_option, message);

        db_conn.commit();
    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex); 
    }
    
}

Rule-Based Subscription

Messages can be routed to various recipients based on message properties or message content. Users define a rule-based subscription for a given queue to specify interest in receiving messages that meet particular conditions.

Rules are Boolean expressions that evaluate to TRUE or FALSE. Similar in syntax to the WHERE clause of a SQL query, rules are expressed in terms of the attributes that represent message properties or message content. These subscriber rules are evaluated against incoming messages and those rules that match are used to determine message recipients. This feature thus supports the notions of content-based subscriptions and content-based routing of messages.

Subscription rules can also be defined on an attribute of type XMLType using XML operators such as ExistsNode.

Scenario

For the BooksOnLine application, we illustrate how rule-based subscriptions are used to implement a publish-subscribe paradigm utilizing content-based subscription and content-based routing of messages. The interaction between the Order Entry application and each of the Shipping Applications is modeled as follows:

Each shipping application subscribes to the OE booked orders queue. The following rule-based subscriptions are defined by the Order Entry user to handle the routing of booked orders from the Order Entry application to each of the Shipping applications.

PL/SQL (DBMS_AQADM Package): Example Code

CONNECT OE/OE; 

Western Region Shipping defines an agent called 'West_Shipping' with the WS booked orders queue as the agent address (destination queue where messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on order region and ordertype attributes.

/*  Add a rule-based subscriber for West Shipping - 
    West Shipping handles Western region U.S. orders, 
    Rush Western region orders are handled by East Shipping: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('West_Shipping', 'WS.WS_bookedorders_que', null); 
  dbms_aqadm.add_subscriber( 
                queue_name => 'OE.OE_bookedorders_que', 
                subscriber => subscriber, 
                rule       => 'tab.user_data.orderregion =  
                    ''WESTERN'' AND tab.user_data.ordertype != ''RUSH'''); 
END; 
/ 

Eastern Region Shipping defines an agent called East_Shipping with the ES booked orders queue as the agent address (the destination queue where messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on orderregion, ordertype and customer attributes.

/*  Add a rule-based subscriber for East Shipping - 
    East shipping handles all Eastern region orders, 
    East shipping also handles all U.S. rush orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); 
  dbms_aqadm.add_subscriber( 
        queue_name => 'OE.OE_bookedorders_que', 
        subscriber => subscriber, 
        rule       => 'tab.user_data.orderregion = ''EASTERN'' OR  
                      (tab.user_data.ordertype = ''RUSH'' AND  
                       tab.user_data.customer.country = ''USA'') '); 
END; 

Overseas Shipping defines an agent called Overseas_Shipping with the OS booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on the orderregion attribute. Since the representation of orders at the Overseas Shipping site is different from the representation of orders at the Order Entry site, a transformation is applied before messages are propagated from the Order Entry site to the Overseas Shipping site.

/*  Add a rule-based subscriber (for Overseas Shipping) to the Booked orders 
queues with Transformation. Overseas Shipping handles all non-US orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('Overseas_Shipping','OS.OS_bookedorders_que',null); 

  dbms_aqadm.add_subscriber( 
        queue_name     => 'OE.OE_bookedorders_que', 
        subscriber     => subscriber, 
        rule           => 'tab.user_data.orderregion = ''INTERNATIONAL''',
        transformation => 'OS.OE2XML'); 
END; 

See "Message Format Transformation" for more details on defining transformations.

Assume that the Overseas Shipping site has a subscriber, Overseas_DHL, for handling RUSH orders. Since OS_bookedorders_que has the order details represented as an XMLType, the rule uses XPath syntax.

DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('Overseas_DHL', null, null); 

  dbms_aqadm.add_subscriber( 
        queue_name     => 'OS.OS_bookedorders_que', 
        subscriber     => subscriber, 
        rule           => 'tab.user_data.xdata.extract(''/ORDER_TYP/ORDERTYPE/
text()'').getStringVal()=''RUSH'''); END;

Visual Basic (OO4O): Example Code

This functionality is currently not available.

Java (JDBC): Example Code

public static void addRuleBasedSubscribers(Connection db_conn)
{

    AQSession         aq_sess;
    AQQueue           bookedorders_q;
    String            rule;
    AQAgent           agt1, agt2, agt3;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        bookedorders_q = aq_sess.getQueue("OE", "OE_booked_orders_que");

        /* Add a rule-based subscriber for West Shipping - 
           West Shipping handles Western region U.S. orders, 
           Rush Western region orders are handled by East Shipping: */ 

        agt1 = new AQAgent("West_Shipping", "WS.WS_bookedorders_que"); 

        rule = "tab.user_data.orderregion = 'WESTERN' AND " +
               "tab.user_data.ordertype != 'RUSH'";

        bookedorders_q.addSubscriber(agt1, rule);

        /*  Add a rule-based subscriber for East Shipping - 
            East shipping handles all Eastern region orders, 
            East shipping also handles all U.S. rush orders: */ 

        agt2 = new AQAgent("East_Shipping", "ES.ES_bookedorders_que"); 
        rule = "tab.user_data.orderregion = 'EASTERN' OR " +  
               "(tab.user_data.ordertype = 'RUSH' AND " + 
               "tab.user_data.customer.country = 'USA')"; 

        bookedorders_q.addSubscriber(agt2, rule);

        /*  Add a rule-based subscriber for Overseas Shipping 
            Intl Shipping handles all non-U.S. orders: */ 

        agt3 = new AQAgent("Overseas_Shipping", "OS.OS_bookedorders_que");
        rule = "tab.user_data.orderregion = 'INTERNATIONAL'";

        bookedorders_q.addSubscriber(agt3, rule);
    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex);
    }
} 

Listen Capability

Advanced Queuing can monitor multiple queues for messages with a single call, LISTEN. An application can use LISTEN to wait for messages for multiple subscriptions. It can also be used by gateway applications to monitor multiple queues. If the LISTEN call returns successfully, a dequeue must be used to retrieve the message (see Listening to One (Many) Queue(s) in Chapter 11, "Operational Interface: Basic Operations").

Without the LISTEN call, an application which sought to dequeue from a set of queues would have to continuously poll the queues to determine if there were a message. Alternatively, you could design your application to have a separate dequeue process for each queue. However, if there are long periods with no traffic in any of the queues, these approaches will create unacceptable overhead. The LISTEN call is well suited for such applications.

Note that when there are messages for multiple agents in the agent list, LISTEN returns with the first agent for whom there is a message. In that sense LISTEN is not 'fair' in monitoring the queues. The application designer must keep this in mind when using the call. To prevent one agent from 'starving' other agents for messages, the application can change the order of the agents in the agent list.

Scenario

In the customer service component of the BooksOnLine example, messages from different databases arrive in the customer service queues, indicating the state of the message. The customer service application monitors the queues and whenever there is a message about a customer order, it updates the order status in the order_status_table. The application uses the listen call to monitor the different queues. Whenever there is a message in any of the queues, it dequeues the message and updates the order status accordingly.

PL/SQL (DBMS_AQADM Package): Example Code

CODE (in tkaqdocd.sql) 
 
/* Update the status of the order in the order status table: */ 
CREATE OR REPLACE PROCEDURE update_status( 
                                new_status    IN VARCHAR2, 
                                order_msg     IN BOLADM.ORDER_TYP) 
IS 
 old_status    VARCHAR2(30); 
 dummy         NUMBER; 
BEGIN 
 
  BEGIN   
    /* Query old status from the table: */ 
    SELECT st.status INTO old_status FROM order_status_table st  
       WHERE st.customer_order.orderno = order_msg.orderno; 
 
  /* Status can be 'BOOKED_ORDER', 'SHIPPED_ORDER', 'BACK_ORDER' 
     and   'BILLED_ORDER': */ 
 
   IF new_status = 'SHIPPED_ORDER' THEN   
      IF old_status = 'BILLED_ORDER' THEN 
        return;             /* message about a previous state */ 
      END IF; 
   ELSIF new_status = 'BACK_ORDER' THEN 
      IF old_status = 'SHIPPED_ORDER' OR old_status = 'BILLED_ORDER' THEN 
        return;             /* message about a previous state */ 
      END IF; 
   END IF; 
 
   /* Update the order status:  */ 
     UPDATE order_status_table st 
        SET st.customer_order = order_msg, st.status = new_status; 
 
   COMMIT; 
 
  EXCEPTION 
  WHEN OTHERS  THEN     /* change to no data found */ 
    /* First update for the order: */ 
    INSERT INTO order_status_table(customer_order, status) 
    VALUES (order_msg, new_status); 
    COMMIT; 
 
  END; 
END; 
/ 
         
 
/* Dequeues message from 'QUEUE' for 'CONSUMER': */ 
CREATE OR REPLACE PROCEDURE DEQUEUE_MESSAGE( 
                         queue      IN   VARCHAR2, 
                         consumer   IN   VARCHAR2, 
                         message    OUT  BOLADM.order_typ) 
IS 
  
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_msgid                RAW(16); 
BEGIN 
  dopt.dequeue_mode := dbms_aq.REMOVE; 
  dopt.navigation := dbms_aq.FIRST_MESSAGE; 
  dopt.consumer_name := consumer; 
 
  dbms_aq.dequeue( 
                queue_name => queue, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => message, 
                msgid => deq_msgid); 
  commit; 
END; 
/ 
 
/* Monitor the queues in the customer service databse for 'time' seconds: */ 
 CREATE OR REPLACE PROCEDURE MONITOR_STATUS_QUEUE(time  IN  NUMBER)  
IS 
  agent_w_message   aq$_agent; 
  agent_list        dbms_aq.agent_list_t; 
  wait_time         INTEGER := 120; 
  no_message        EXCEPTION; 
  pragma EXCEPTION_INIT(no_message, -25254); 
  order_msg         boladm.order_typ;  
  new_status        VARCHAR2(30); 
  monitor           BOOLEAN := TRUE; 
  begin_time        NUMBER; 
  end_time          NUMBER; 
BEGIN 
 
 begin_time :=  dbms_utility.get_time;     
 WHILE (monitor) 
 LOOP 
 BEGIN 
 
  /* Construct the waiters list: */ 
  agent_list(1) := aq$_agent('BILLED_ORDER', 'CS_billedorders_que', NULL); 
  agent_list(2) := aq$_agent('SHIPPED_ORDER', 'CS_shippedorders_que', 
NULL); 
  agent_list(3) := aq$_agent('BACK_ORDER', 'CS_backorders_que', NULL); 
  agent_list(4) := aq$_agent('Booked_ORDER', 'CS_bookedorders_que', NULL); 
 
   /* Wait for order status messages: */ 
   dbms_aq.listen(agent_list, wait_time, agent_w_message); 
    
   dbms_output.put_line('Agent' || agent_w_message.name || ' Address '|| 
agent_w_message.address); 
   /* Dequeue the message from the queue: */ 
   dequeue_message(agent_w_message.address, agent_w_message.name, order_msg); 
 
   /* Update the status of the order depending on the type of the message,  
    * the name of the agent contains the new state: */ 
   update_status(agent_w_message.name, order_msg); 
 
  /* Exit if we have been working long enough: */ 
   end_time := dbms_utility.get_time; 
   IF  (end_time - begin_time > time)   THEN 
     EXIT; 
   END IF; 
 
  EXCEPTION 
  WHEN  no_message  THEN 
    dbms_output.put_line('No messages in the past 2 minutes'); 
       end_time := dbms_utility.get_time; 
    /* Exit if we have done enough work: */ 
    IF  (end_time - begin_time > time)   THEN 
      EXIT; 
    END IF; 
  END; 
  
  END LOOP; 
END; 
/ 

Visual Basic (OO4O): Example Code

Feature not currently available.

Java (JDBC): Example Code

public static void monitor_status_queue(Connection db_conn)
{
    AQSession         aq_sess;
    AQAgent[]         agt_list = null;
    AQAgent           ret_agt  = null;
    Order             deq_order;
    AQDequeueOption   deq_option;
    AQQueue           orders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    String            owner = null;
    String            queue_name = null;
    int               idx = 0;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

/* Construct the waiters list: */ 
agt_list = new AQAgent[4];

agt_list[0] = new AQAgent("BILLED_ORDER", "CS_billedorders_que", 0);
agt_list[1] = new AQAgent("SHIPPED_ORDER", "CS_shippedorders_que", 0);
agt_list[2] = new AQAgent("BACK_ORDER", "CS_backorders_que", 0);
agt_list[3] = new AQAgent("BOOKED_ORDER", "CS_bookedorders_que", 0);

/* Wait for order status messages for 120 seconds: */
ret_agt = aq_sess.listen(agt_list, 120);

System.out.println("Message available for agent: " + 
   ret_agt.getName()  + "   "  + ret_agt.getAddress());

/* Get owner, queue where message is available */
idx = ret_agt.getAddress().indexOf(".");

if(idx != -1)
{
  owner = ret_agt.getAddress().substring(0, idx);
  queue_name = ret_agt.getAddress().substring(idx + 1);

/* Dequeue the message */
deq_option = new AQDequeueOption();

deq_option.setConsumerName(ret_agt.getName());
deq_option.setWaitTime(1);

orders_q = aq_sess.getQueue(owner, queue_name);    

/* Dequeue the message */
message = orders_q.dequeue(deq_option, Order.getFactory());

obj_payload = message.getObjectPayload();

deq_order = (Order)(obj_payload.getPayloadData());

   System.out.println("Order number " + deq_order.getOrderno() + " retrieved");
   
   }
   catch (AQException aqex)
   {
   System.out.println("Exception-1: " + aqex);       
   }
   catch (Exception ex)
   {
    System.out.println("Exception-2: " + ex);
    }   

}

Message Transformation During Dequeue

Continuing the scenario introduced in "Message Format Transformation" and "Message Transformation During Enqueue", the queues in the OE schema are of payload type OE.orders_typ and the queues in the WS schema are of payload type WS.orders_typ_sh.

Scenario

At dequeue time, an application can move messages from OE_booked_orders_topic to the WS_booked_orders_topic by using a selection criteria on dequeue to dequeue only orders with order_region "WESTERN" and order_type not equal to "RUSH." At the same time, the transformation is applied and the order in the ws.order_typ_sh type is retrieved. Then the message is enqueued into the WS.ws_booked_orders queue.

PL/SQL (DBMS_AQ Package): Example Code

CREATE OR REPLACE PROCEDURE   fwd_message_to_ws_shipping AS

  enq_opt   dbms_aq.enqueue_options_t;
  deq_opt   dbms_aq.dequeue_options_t;
  msg_prp   dbms_aq.message_properties_t;
  booked_order WS.order_typ_sh;
BEGIN

/* First dequeue the message from OE booked orders topic */
    deq_opt.transformation := 'OE.OE2WS';
    deq_opt.condition := 'tab.user_data.order_region = ''WESTERN'' and tab.user_
data.order_type != ''RUSH''';

    dbms_aq.dequeue('OE.oe_bookedorders_topic', deq_opt,
                     msg_prp, booked_order);

/* enqueue the message in the WS booked orders topic */
    msg_prp.recipient_list(0) := aq$_agent('West_shipping', null, null);

    dbms_aq.enqueue('WS.ws_bookedorders_topic',
                     enq_opt, msg_prp, booked_order);

END;

Visual Basic (OO4O): Example Code

No example is provided with this release.

Java (JDBC): Example Code

No example is provided with this release.

Dequeue Using the AQ XML Servlet

You can perform dequeue requests over the Internet using IDAP. See Chapter 17, "Internet Access to Advanced Queuing" for more information on receiving AQ messages using IDAP.

In the BooksOnline scenario, assume that the East shipping application receives AQ messages with a correlation identifier 'RUSH' over the Internet. The dequeue request will have the following format:

<?xml version="1.0"?>
<Envelope xmlns= "http://ns.oracle.com/AQ/schemas/envelope">
      <Body>
        <AQXmlReceive xmlns = "http://ns.oracle.com/AQ/schemas/access">
          <consumer_options>
            <destination>ES_ES_bookedorders_que</destination>
            <consumer_name>East_Shipping</consumer_name>
            <wait_time>0</wait_time>
            <selector>
                 <correlation>RUSH</correlation>
            </selector>
          </consumer_options>

          <AQXmlCommit/>

        </AQXmlReceive>  
      </Body>
</Envelope> 

Go to previous page Go to beginning of chapter Go to next page
Oracle
Copyright © 1996-2001, Oracle Corporation.

All Rights Reserved.
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback