Oracle8i Application Developer's Guide - Advanced Queuing
Release 2 (8.1.6)

Part Number A76938-01

Library

Product

Contents

Index

Go to previous page Go to beginning of chapter Go to next page

A Sample Application Using AQ, 5 of 6


DEQUEUE Features

Dequeue Methods

A message can be dequeued from a queue using one of two dequeue methods: a correlation identifier or a message identifier.

A correlation identifier is a user defined message property (of VARCHAR2 datatype) while a message identifier is a system-assigned value (of RAW datatype). Multiple messages with the same correlation identifier can be present in a queue while only one message with a given message identifier can be present. A dequeue call with a correlation identifier will directly remove a message of specific interest rather than using a combination of locked and remove mode to first examine the content and then remove the message. Hence, the correlation identifier usually contains the most useful attribute of a payload. If there are multiple messages with the same correlation identifier, the ordering (enqueue order) between messages may not be preserved on dequeue calls. The correlation identifier cannot be changed between successive dequeue calls without specifying the first message navigation option.

Note that dequeueing a message with either of the two dequeue methods will not preserve the message grouping property (see "Message Grouping" and "Message Navigation in Dequeue" for further information).

Example Scenario

In the following scenario of the BooksOnLine example, rush orders received by the East shipping site are processed first. This is achieved by dequeueing the message using the correlation identifier which has been defined to contain the order type (rush/normal). For an illustration of dequeueing using a message identifier please refer to the get_northamerican_orders procedure discussed in the example under "Modes of Dequeuing".

PL/SQL (DBMS_AQ/ADM Package): Example Code

CONNECT boladm/boladm; 
 
/*  Create procedures to enqueue into single-consumer queues: */ 
create or replace procedure get_rushtitles(consumer in varchar2) as 
  
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
qname                    varchar2(30); 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 
  
begin 
  
        dopt.consumer_name := consumer; 
        dopt.wait := 1; 
        dopt.correlation := 'RUSH'; 
  
        IF (consumer = 'West_Shipping') THEN 
                qname := 'WS.WS_bookedorders_que'; 
        ELSIF (consumer = 'East_Shipping') THEN 
                qname := 'ES.ES_bookedorders_que'; 
        ELSE 
                qname := 'OS.OS_bookedorders_que'; 
        END IF; 
  
        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name => qname, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => deq_order_data, 
                msgid => deq_msgid); 
            commit; 
         
            deq_item_data := deq_order_data.items(1); 
            deq_book_data := deq_item_data.item; 
  
            dbms_output.put_line(' rushorder book_title: ' ||  
                                deq_book_data.title ||  
                        ' quantity: ' || deq_item_data.quantity); 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE RUSH TITLES ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
end; 
/ 
 
CONNECT EXECUTE on get_rushtitles to ES; 
 
/* Dequeue the orders: */ 
CONNECT ES/ES; 
 
/*  Dequeue all rush order titles for East_Shipping: */ 
EXECUTE BOLADM.get_rushtitles('East_Shipping'); 
 

Visual Basic (OO4O): Example Code

   set oraaq1 = OraDatabase.CreateAQ("WS.WS_backorders_que")
   set oraaq2 = OraDatabase.CreateAQ("ES.ES_backorders_que")
   set oraaq3 = OraDatabase.CreateAQ("CBADM.deferbilling_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
   Set OraBackOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")

Private Sub Requeue_backorder
   Dim q as oraobject
   If sale_region = WEST then
      q = oraaq1
   else if sale_region = EAST then
      q = oraaq2
   else 
      q = oraaq3
   end if

   OraMsg.delay = 7*60*60*24
   OraMsg = OraBackOrder 'OraOrder contains the order details
   Msgid = q.enqueue

End Sub

Java (JDBC): Example Code

public static void getRushTitles(Connection db_conn, String consumer)
{
    AQSession         aq_sess;
    Order             deq_order;
    byte[]            deq_msgid;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    boolean           new_orders = true;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setConsumerName(consumer);
        deq_option.setWaitTime(1);
        deq_option.setCorrelation("RUSH");

        if(consumer.equals("West_Shipping"))
        {
            bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");    
        }       
        else if(consumer.equals("East_Shipping"))
        {
            bookedorders_q = aq_sess.getQueue("ES", "ES_bookedorders_que");    
        }       
        else
        {
            bookedorders_q = aq_sess.getQueue("OS", "OS_bookedorders_que");   
        }       

        while(new_orders)
        {
            try
            {
              /* Dequeue the message */
              message = bookedorders_q.dequeue(deq_option, Order.getFactory());

              obj_payload = message.getObjectPayload();

              deq_order = (Order)(obj_payload.getPayloadData());

              System.out.println("Order number " + deq_order.getOrderno() + 
                                 " is a rush order");
              
            }
            catch (AQException aqex)
            {
              new_orders = false;       
              System.out.println("No more rush titles");        
              System.out.println("Exception-1: " + aqex);       
            }
        }
    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }   

}

Multiple Recipients

A consumer can dequeue a message from a multi-consumer normal queue by supplying the name that was used in the AQ$_AGENT type of the DBMS_AQADM.ADD_SUBSCRIBER procedure or the recipient list of the message properties (see "Add a Subscriber" or Enqueue a Message [Specify Message Properties]).

There can be multiple processes or operating system threads that use the same consumer_name to dequeue concurrently from a queue. In that case AQ will provide the first unlocked message that is at the head of the queue and is intended for the consumer. Unless the message ID of a specific message is specified during dequeue, the consumers can dequeue messages that are in the READY state.

A message is considered PROCESSED only when all intended consumers have successfully dequeued the message. A message is considered EXPIRED if one or more consumers did not dequeue the message before the EXPIRATION time. When a message has expired, it is moved to an exception queue.

The exception queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued the intended recipients of the message. However, they can be dequeued in the REMOVE mode exactly once by specifying a NULL consumer name in the dequeue options. Hence, from a dequeue perspective, multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL consumer name. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'.

In release 8.0.x when two or more processes/threads that are using different consumer_names are dequeuing from a queue, only one process/thread can dequeue a given message in the LOCKED or REMOVE mode at any time. What this means is that other consumers that need to the dequeue the same message will have to wait until the consumer that has locked the message commits or aborts the transaction and releases the lock on the message. However, while release 8.0.x did not support concurrency among different consumers for the same message., with release 8.1.6 all consumers can access the same message concurrently. The result is that two processes/threads that are using different consumer_name to dequeue the same message do not block each other. AQ achieves this improvement by decoupling the task of dequeuing a message and the process of removing the message from the queue. In release 8.1.6 only the queue monitor removes messages from multi-consumer queues. This allows dequeuers to complete the dequeue operation by not locking the message in the queue table. Since the queue monitor performs the task of removing messages that have been processed by all consumers from multi-consumer queues approximately once every minute, users may see a delay when the messages have been completely processed and when they are physically removed from the queue.


Local and Remote Recipients

Consumers of a message in multi-consumer queues (either by virtue of being a subscriber to the queue or because the consumer was a recipient in the enqueuer's recipient list) can be local or remote.

When a consumer is remote, a message will be marked as PROCESSED in the source queue immediately after the message has been propagated even though the consumer may not have dequeued the message at the remote queue. Similarly, when a propagated message expires at the remote queue, the message is moved to the DEFAULT exception queue of the remote queue's queue table, and not to the exception queue of the local queue. As can be seen in both cases, AQ does not currently propagate the exceptions to the source queue. You can use the MSGID and the ORIGINAL_MSGID columns in the queue table view (AQ$<queue_table>) to chain the propagated messages. When a message with message ID m1 is propagated to a remote queue, m1 is stored in the ORIGINAL_MSGID column of the remote queue.

The DELAY, EXPIRATION and PRIORITY parameters apply identically to both local and remote consumers. AQ accounts for any delay in propagation by adjusting the DELAY and EXPIRATION parameters accordingly. For example, if the EXPIRATION is set to one hour, and the message is propagated after 15 minutes, the expiration at the remote queue will be set to 45 minutes.

Since the database handles message propagation, OO4O does not differentiate between remote and local recipients. The same sequence of calls/steps are required to dequeue a message, for local and remote recipients.

Message Navigation in Dequeue

You have several options for selecting a message from a queue. You can select the "first message". Alternatively, once you have selected a message and established its position in the queue (for example, as the fourth message), you can then retrieve the "next message".

These selections work in a slightly different way if the queue is enabled for transactional grouping.

Note that the transaction grouping property is negated if a dequeue is performed in one of the following ways: dequeue by specifying a correlation identifier, dequeue by specifying a message identifier, or dequeueing some of the messages of a transaction and committing (see "Dequeue Methods").

If in navigating through the queue, the program reaches the end of the queue while using the "next message" or "next transaction" option, and you have specified a non-zero wait time, then the navigating position is automatically changed to the beginning of the queue.

Example Scenario

The following scenario in the BooksOnLine example continues the message grouping example already discussed with regard to enqueuing (see "Dequeue Methods").

The get_orders() procedure dequeues orders from the OE_neworders_que. Recall that each transaction refers to an order and each message corresponds to an individual book in the order. The get_orders() procedure loops through the messages to dequeue the book orders. It resets the position to the beginning of the queue using the first message option before the first dequeues. It then uses the next message navigation option to retrieve the next book (message) of an order (transaction). If it gets an error message indicating all message in the current group/transaction have been fetched, it changes the navigation option to next transaction and get the first book of the next order. It then changes the navigation option back to next message for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.

PL/SQL (DBMS_AQ/ADM Package): Example Code

CONNECT boladm/boladm; 
 
create or replace procedure get_new_orders as 
 
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
qname                    VARCHAR2(30); 
no_messages              exception; 
end_of_group             exception; 
pragma exception_init    (no_messages, -25228); 
pragma exception_init    (end_of_group, -25235); 
new_orders               BOOLEAN := TRUE; 
  
BEGIN 
  
        dopt.wait := 1; 
        dopt.navigation := DBMS_AQ.FIRST_MESSAGE;  
        qname := 'OE.OE_neworders_que'; 
        WHILE (new_orders) LOOP 
          BEGIN 
            LOOP 
                BEGIN 
                    dbms_aq.dequeue( 
                        queue_name          => qname, 
                        dequeue_options     => dopt, 
                        message_properties  => mprop, 
                        payload             => deq_order_data, 
                        msgid               => deq_msgid); 
         
                    deq_item_data := deq_order_data.items(1); 
                    deq_book_data := deq_item_data.item; 
                    deq_cust_data := deq_order_data.customer; 
  
                    IF (deq_cust_data IS NOT NULL) THEN 
                      dbms_output.put_line(' **** NEXT ORDER **** ');  
                      dbms_output.put_line('order_num: ' ||  
                                deq_order_data.orderno); 
                      dbms_output.put_line('ship_state: ' ||  
                                deq_cust_data.state); 
                    END IF; 
                    dbms_output.put_line(' ---- next book ---- ');  
                    dbms_output.put_line(' book_title: ' ||  
                                deq_book_data.title ||  
                                ' quantity: ' || deq_item_data.quantity); 
                EXCEPTION 
                    WHEN end_of_group THEN 
                      dbms_output.put_line ('*** END OF ORDER ***'); 
                      commit; 
                      dopt.navigation := DBMS_AQ.NEXT_TRANSACTION; 
                END; 
            END LOOP; 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE NEW ORDERS ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
END; 
/ 
 
CONNECT EXECUTE ON get_new_orders to OE; 
 
/*  Dequeue the orders: */
CONNECT OE/OE; 
EXECUTE BOLADM.get_new_orders; 
 

Visual Basic (OO4O): Example Code

Dim OraSession as object
Dim OraDatabase as object
Dim OraAq as object
Dim OraMsg as Object
Dim OraOrder,OraItemList,OraItem,OraBook,OraCustomer  as Object
Dim Msgid as String

   Set OraSession = CreateObject("OracleInProcServer.XOraSession")
   Set OraDatabase = OraSession.DbOpenDatabase("", "boladm/boladm", 0&)
   set oraaq = OraDatabase.CreateAQ("OE.OE_neworders_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
       OraAq.wait = 1
   OraAq.Navigation = ORAAQ_DQ_FIRST_MESSAGE

private sub get_new_orders
   Dim MsgIsDequeued as Boolean
   On Error goto ErrHandler
   MsgIsDequeued = TRUE
       msgid = q.Dequeue
        if MsgIsDequeued then
      set OraOrder = OraMsg   
      OraItemList = OraOrder("items")
      OraItem = OraItemList(1)
      OraBook = OraItem("item")
      OraCustomer = OraOrder("customer")

         ' Populate the textboxes with the values 
      if( OraCustomer ) then
         if OraAq.Navigation <> ORAAQ_DQ_NEXT_MESSAGE then
            MsgBox " ********* NEXT ORDER *******"
         end if
         txt_book_orderno = OraOrder("orderno")
         txt_book_shipstate = OraCustomer("state")
      End if
      OraAq.Navigation = ORAAQ_DQ_NEXT_MESSAGE
      txt_book_title = OraBook("title")
      txt_book_qty  = OraItem("quantity")
    Else
      MsgBox " ********* END OF  ORDER *******"
   End if
 
ErrHandler :
   'Handle error case, like no message etc
   If OraDatabase.LastServerErr = 25228 then
      OraAq.Navigation = ORAAQ_DQ_NEXT_TRANSACTION
      MsgIsDequeued = FALSE
      Resume Next
   End If
   'Process other errors
end sub

Java (JDBC): Example Code

No example is provided with this release.


Modes of Dequeuing

A dequeue request can either view a message or delete a message (see "Dequeue a Message" in Chapter 11, "Operational Interface: Basic Operations").

If a message is browsed it remains available for further processing. Similarly if a message is locked it remains available for further processing once the lock on it is released by performing a transaction commit or rollback. Once a message is deleted using either of the remove modes, it is no longer available for dequeue requests.

When a message is dequeued using REMOVE_NODATA mode, the payload of the message is not retrieved. This mode can be useful when the user has already examined the message payload, possibly by means of a previous BROWSE dequeue. In this way, you can avoid the overhead of payload retrieval which can be substantial for large payloads

A message is retained in the queue table after it has been removed only if a retention time is specified for a queue. Messages cannot be retained in exception queues (refer to the section on exceptions for further information). Removing a message with no data is generally used if the payload is known (from a previous browse/locked mode dequeue call), or the message will not be used.

Note that after a message has been browsed there is no guarantee that the message can be dequeued again since a dequeue call from a concurrent user might have removed the message. To prevent a viewed message from being dequeued by a concurrent user, you should view the message in the locked mode.

You need to take special care while using the browse mode for other reasons as well. The dequeue position is automatically changed to the beginning of the queue if a non-zero wait time is specified and the navigating position reaches the end of the queue. Hence repeating a dequeue call in the browse mode with the "next message" navigation option and a non-zero wait time can dequeue the same message over and over again. We recommend that you use a non-zero wait time for the first dequeue call on a queue in a session, and then use a zero wait time with the next message navigation option for subsequent dequeue calls. If a dequeue call gets an "end of queue" error message, the dequeue position can be explicitly set by the dequeue call to the beginning of the queue using the "first message" navigation option, following which the messages in the queue can be browsed again.

Example Scenario

In the following scenario from the BooksOnLine example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) and the customer country (message payload) is checked. If the customer country is Mexico or Canada the message be deleted from the queue using the remove with no data (since the payload is already known) mode. Otherwise, the lock on the message is released by the commit call. Note that the remove dequeue call uses the message identifier obtained from the locked mode dequeue call. The shipping_bookedorder_deq (refer to the example code for the description of this procedure) call illustrates the use of the browse mode.

PL/SQL (DBMS_AQ/ADM Package): Example Code

CONNECT boladm/boladm; 
 
create or replace procedure get_northamerican_orders as 
  
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
deq_order_nodata         BOLADM.order_typ; 
qname                    VARCHAR2(30); 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 
  
begin 
  
        dopt.consumer_name := consumer; 
        dopt.wait := DBMS_AQ.NO_WAIT; 
        dopt.navigation := dbms_aq.FIRST_MESSAGE; 
        dopt.dequeue_mode := DBMS_AQ.LOCKED; 
  
        qname := 'OS.OS_bookedorders_que'; 
  
        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name => qname, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => deq_order_data, 
                msgid => deq_msgid); 
         
            deq_item_data := deq_order_data.items(1); 
            deq_book_data := deq_item_data.item; 
            deq_cust_data := deq_order_data.customer; 
  
            IF (deq_cust_data.country = 'Canada' OR  
                deq_cust_data.country = 'Mexico' ) THEN 
  
                dopt.dequeue_mode := dbms_aq.REMOVE_NODATA; 
                dopt.msgid := deq_msgid; 
                dbms_aq.dequeue( 
                        queue_name => qname, 
                        dequeue_options => dopt, 
                        message_properties => mprop, 
                        payload => deq_order_nodata, 
                        msgid => deq_msgid); 
                commit; 
  
                dbms_output.put_line(' **** next booked order **** ');  
                dbms_output.put_line('order_no: ' || deq_order_data.orderno ||  
                        ' book_title: ' || deq_book_data.title ||  
                        ' quantity: ' || deq_item_data.quantity); 
                dbms_output.put_line('ship_state: ' || deq_cust_data.state || 
                        ' ship_country: ' || deq_cust_data.country || 
                        ' ship_order_type: ' || deq_order_data.ordertype); 
  
            END IF; 
  
            commit; 
            dopt.dequeue_mode := DBMS_AQ.LOCKED; 
            dopt.msgid := NULL; 
            dopt.navigation := dbms_aq.NEXT_MESSAGE; 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE BOOKED ORDERS ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
end; 
/ 
 
CONNECT EXECUTE on get_northamerican_orders to OS; 
 
CONNECT ES/ES; 
 
/*  Browse all booked orders for East_Shipping: */ 
EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.BROWSE); 
 
CONNECT OS/OS; 
 
/*  Dequeue all international North American orders for Overseas_Shipping: */ 
EXECUTE BOLADM.get_northamerican_orders; 

Visual Basic (OO4O): Example Code

OO4O supports all the mdoes of dequeuing described above. Possible values include:

Java (JDBC): Example Code

public static void get_northamerican_orders(Connection db_conn)
{

    AQSession         aq_sess;
    Order             deq_order;
    Customer          deq_cust;
    String            cust_country;
    byte[]            deq_msgid;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    boolean           new_orders = true;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setConsumerName("Overseas_Shipping");
        deq_option.setWaitTime(AQDequeueOption.WAIT_NONE);
        deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE);
        deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED);

        bookedorders_q = aq_sess.getQueue("OS", "OS_bookedorders_que");    
    
        while(new_orders)
        {
            try
            {
              /* Dequeue the message - browse with lock */
              message = bookedorders_q.dequeue(deq_option, Order.getFactory());

              obj_payload = message.getObjectPayload();

              deq_msgid = message.getMessageId();
              deq_order = (Order)(obj_payload.getPayloadData());

              deq_cust = deq_order.getCustomer();

              cust_country = deq_cust.getCountry();

              if(cust_country.equals("Canada") ||
                 cust_country.equals("Mexico"))
              {
                deq_option.setDequeueMode(
                                  AQDequeueOption.DEQUEUE_REMOVE_NODATA);
                deq_option.setMessageId(deq_msgid);

                
                /* Delete the message */
                bookedorders_q.dequeue(deq_option, Order.getFactory());

                System.out.println("----  next booked order ------");
                System.out.println("Order no: " + deq_order.getOrderno());
                System.out.println("Ship state: " + deq_cust.getState());
                System.out.println("Ship country: " + deq_cust.getCountry());
                System.out.println("Order type: " + deq_order.getOrdertype());
                
              }

              db_conn.commit();

              deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED);
              deq_option.setMessageId(null);
              deq_option.setNavigationMode(
                                    AQDequeueOption.NAVIGATION_NEXT_MESSAGE); 
            }
            catch (AQException aqex)
            {
              new_orders = false;       
              System.out.println("--- No more booked orders ----");     
              System.out.println("Exception-1: " + aqex);       
            }
        }

    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }

}

Optimization of Waiting for Arrival of Messages

One of the most important features of AQ is that it allows applications to block on one or more queues waiting for the arrival of either a newly enqueued message or for a message that becomes ready. You can use the DEQUEUE operation to wait for arrival of a message in a queue (see "Dequeue a Message") or the LISTEN operation to wait for the arrival of a message in more than one queue (see "Listen to One (Many) Queue(s)" in Chapter 11, "Operational Interface: Basic Operations").

When the blocking DEQUEUE call returns, it returns the message properties and the message payload. By contrast, when the blocking LISTEN call returns, it discloses only the name of the queue in which a message has arrived. A subsequent DEQUEUE operation is needed to dequeue the message.

Applications can optionally specify a timeout of zero or more seconds to indicate the time that AQ must wait for the arrival of a message. The default is to wait forever until a message arrives in the queue. This optimization is important in two ways. It removes the burden of continually polling for messages from the application. And it saves CPU and network resource because the application remains blocked until a new message is enqueued or becomes READY after its DELAY time. In release 8.1.5 applications can also perform a blocking dequeue on exception queues to wait for arrival of EXPIRED messages.

A process or thread that is blocked on a dequeue is either woken up directly by the enqueuer if the new message has no DELAY or is woken up by the queue monitor process when the DELAY or EXPIRATION time has passed. Applications can not only wait for the arrival of a message in the queue that an enqueuer enqueues a message, but also on a remote queue, provided that propagation has been schedule to the remote queue using DBMS_AQADM.SCHEDULE_PROPAGATION. In this case the AQ propagator will wake-up the blocked dequeuer after a message has been propagated.

Example Scenario

In the BooksOnLine example, the get_rushtitles procedure discussed under dequeue methods specifies a wait time of 1 second in the dequeue_options argument for the dequeue call. Wait time can be specified in different ways as illustrated in the code below.

PL/SQL (DBMS_AQ/ADM Package): Example Code

/* dopt is a variable of type dbms_aq.dequeue_options_t. 
   Set the dequeue wait time to 10 seconds: */
dopt.wait := 10; 
 
/* Set the dequeue wait time to 0 seconds: */
dopt.wait := DBMS_AQ.NO_WAIT; 
 
/* Set the dequeue wait time to infinite (forever): */ 
dopt.wait := DBMS_AQ.FOREVER; 

Visual Basic (OO4O): Example Code

OO4O supports asynchronous dequeuing of messages. First, the monitor is started for a particular queue. When messages that fulfil the user criteria are dequeued, the user's callback object is notified. For more details, refer to the MonitorStart method of the OraAQ Object.

Java (JDBC): Example Code

AQDequeueOption deq-opt;

deq-opt = new AQDequeueOption ();


Asynchronous Notifications

This feature allows OCI clients to receive notifications when there is a message in a queue of interest. The client can use it to monitor multiple subscriptions. The client does not have to be connected to the database to receive notifications regarding its subscriptions.

You use the OCI function, OCISubcriptionRegister, to register interest in messages in a queue (see "Register for Notification" in Chapter 11, "Operational Interface: Basic Operations").


For more information about the OCI operation Register for Notification see:

 

The client can specify a callback function which is invoked for every new message that is enqueued. For non-persistent queues, the message is delivered to the client as part of the notification. For persistent queues, only the message properties are delivered as part of the notification. Consequently, in the case of persistent queues, the client has to make an explicit dequeue to access the contents of the message.

Example Scenario

In the BooksOnLine application, a customer can request Fed-ex shipping (priority 1), Priority air shipping (priority 2). or Regular ground shipping (priority 3).

The shipping application then ships the orders according to the user's request. It is of interest to BooksOnLine to find out how many requests of each shipping type come in each day. The application uses asynchronous notification facility for this purpose. It registers for notification on the WS.WS_bookedorders_que. When it is notified of new message in the queue, it updates the count for the appropriate shipping type depending on the priority of the message.

Visual Basic (OO4O): Example Code

Refer to the Visual Basic online help, "Monitoring Messages".

Java (JDBC): Example Code

This feature is not supported by the Java API.

C (OCI): Example Code

This example illustrates the use of OCIRegister. At the shipping site, an OCI client program keeps track of how many orders were made for each of the shipping types, FEDEX, AIR and GROUND. The priority field of the message enables us to determine the type of shipping desired.


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>
#ifdef WIN32COMMON
#define sleep(x)   Sleep(1000*(x))
#endif
static text *username = (text *) "WS";
static text *password = (text *) "WS";

static OCIEnv *envhp;
static OCIServer *srvhp;
static OCIError *errhp;
static OCISvcCtx *svchp;

static void checkerr(/*_ OCIError *errhp, sword status _*/);

struct ship_data
{
  ub4  fedex;
  ub4  air;
  ub4  ground;
};

typedef struct ship_data ship_data;

int main(/*_ int argc, char *argv[] _*/);


/* Notify callback: */
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode)
dvoid *ctx;
OCISubscription *subscrhp;
dvoid *pay;
ub4    payl;
dvoid *desc;
ub4    mode;
{
 text                *subname;
 ub4                 size;
 ship_data           *ship_stats = (ship_data *)ctx;
 text                *queue;
 text                *consumer;
 OCIRaw              *msgid;
 ub4                 priority;
 OCIAQMsgProperties  *msgprop;

 OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION,
                             (dvoid *)&subname, &size,
                             OCI_ATTR_SUBSCR_NAME, errhp);

 /* Extract the attributes from the AQ descriptor.
    Queue name: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, 
            OCI_ATTR_QUEUE_NAME, errhp);
  
 /* Consumer name: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, 
            OCI_ATTR_CONSUMER_NAME, errhp);

 /* Msgid: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, 
            OCI_ATTR_NFY_MSGID, errhp);

 /* Message properties: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, 
            OCI_ATTR_MSG_PROP, errhp);

 /* Get priority from message properties: */
 checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 
                             (dvoid *)&priority, 0, 
                             OCI_ATTR_PRIORITY, errhp));

  switch (priority)
  {
  case 1:  ship_stats->fedex++;
           break;
  case 2 : ship_stats->air++;
           break;
  case 3:  ship_stats->ground++;
           break;
  default: 
           printf(" Error priority %d", priority);
  }
}


int main(argc, argv)
int argc;
char *argv[];
{
  OCISession *authp = (OCISession *) 0;
  OCISubscription *subscrhp[8];
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;
  ship_data  ctx = {0,0,0};
  ub4 sleep_time = 0;

  printf("Initializing OCI Process\n");

  /* Initialize OCI environment with OCI_EVENTS flag set: */
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0,
                       (dvoid * (*)(dvoid *, size_t)) 0,
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                       (void (*)(dvoid *, dvoid *)) 0 );

  printf("Initialization successful\n");

  printf("Initializing OCI Env\n");
  (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 
);
  printf("Initialization successful\n");

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_
ERROR, 
                   (size_t) 0, (dvoid **) 0));

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_
SERVER,
                   (size_t) 0, (dvoid **) 0));

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_
SVCCTX,
                   (size_t) 0, (dvoid **) 0));


  printf("connecting to server\n");
  checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias",
           strlen("inst1_alias"), (ub4) OCI_DEFAULT));
  printf("connect successful\n");

  /* Set attribute server context in the service context: */
  checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, 
                    (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp));

  checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp,
                       (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0));
 
  /* Set username and password in the session handle: */
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                  (dvoid *) username, (ub4) strlen((char *)username),
                  (ub4) OCI_ATTR_USERNAME, errhp));
 
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                  (dvoid *) password, (ub4) strlen((char *)password),
                  (ub4) OCI_ATTR_PASSWORD, errhp));

  /* Begin session: */
  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS, 
                          (ub4) OCI_DEFAULT));

  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX,
                   (dvoid *) authp, (ub4) 0,
                   (ub4) OCI_ATTR_SESSION, errhp);


  /* Register for notification: */
  printf("allocating subscription handle\n");
  subscrhp[0] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], 
                        (ub4) OCI_HTYPE_SUBSCRIPTION,
                        (size_t) 0, (dvoid **) 0);
 
  printf("setting subscription name\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS", 
                 (ub4) strlen("WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);
 
  printf("setting subscription callback\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx, (ub4)sizeof(ctx),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  printf("setting subscription namespace\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

  printf("Registering \n");
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp, 
                                          OCI_DEFAULT));

  sleep_time = (ub4)atoi(argv[1]);
  printf ("waiting for %d s", sleep_time);
  sleep(sleep_time);

  printf("Exiting");
  exit(0);
}

void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
  text errbuf[512];
  sb4 errcode = 0;

  switch (status)
  {
  case OCI_SUCCESS:
    break;
  case OCI_SUCCESS_WITH_INFO:
    (void) printf("Error - OCI_SUCCESS_WITH_INFO\n");
    break;
  case OCI_NEED_DATA:
    (void) printf("Error - OCI_NEED_DATA\n");
    break;
  case OCI_NO_DATA:
    (void) printf("Error - OCI_NODATA\n");
    break;
  case OCI_ERROR:
    (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode,
                        errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR);
    (void) printf("Error - %.*s\n", 512, errbuf);
    break;
  case OCI_INVALID_HANDLE:
    (void) printf("Error - OCI_INVALID_HANDLE\n");
    break;
  case OCI_STILL_EXECUTING:
    (void) printf("Error - OCI_STILL_EXECUTE\n");
    break;
  case OCI_CONTINUE:
    (void) printf("Error - OCI_CONTINUE\n");
    break;
  default:
    break;
  }
}

Retry with Delay Interval

If the transaction dequeuing the message from a queue fails, it is regarded as an unsuccessful attempt to remove the message. AQ records the number of failed attempts to remove the message in the message history. Applications can query the retry_count column of the queue table view to find out the number of unsuccessful attempts on a message. In addition, AQ also allows the application to specify, at the queue level, the maximum number of retries for messages in the queue. If the number of failed attempts to remove a message exceed this number, the message is moved to the exception queue and is no longer available to applications.

Retry Delay

If the transaction receiving a message aborted, this could be because of a 'bad' condition. AQ allows users to 'hide' the bad message for a pre-specified interval. A retry_delay can be specified along with maximum retries. This means that a message which has had a failed attempt will be visible in the queue for dequeue after 'retry_delay' interval. Until then it will be in the 'WAITING' state. The AQ background process, the time manager enforces the retry delay property. The default value for maximum retries is 5 and that for retry delay is 0. Note that maximum retries and retry delay are not available with 8.0 compatible multi-consumer queues.

PL/SQL (DBMS_AQ/ADM Package): Example Code

 /*  Create a package that enqueue with delay set to one day: /*
      CONNECT BOLADM/BOLADM
     >
      /* queue has max retries = 4 and retry delay = 12 hours */
      execute dbms_aqadm.alter_queue(queue_name = 'WS.WS_BOOKED_ORDERS_QUE',
      max_retr
      ies = 4,
                                     retry_delay = 3600*12);
     >
      /* processes the next order available in the booked_order_queue */
      CREATE OR REPLACE PROCEDURE  process_next_order()
      AS
        dqqopt                   dbms_aq.dequeue_options_t;
        msgprop                  dbms_aq.message_properties_t;
        deq_msgid                RAW(16);
        book                     BOLADM.book_typ;
        item                     BOLADM.orderitem_typ;
        BOLADM.order_typ         order;
      BEGIN
     >
        dqqopt.dequeue_option := DBMS_AQ.FIRST_MESSAGE;
        dbms_aq.dequeue('WS.WS_BOOKED_ORDERS_QUEUE', dqqopt, msgprop, order,
      deq_msgid
      );
     >
        /* for simplicity, assume order has a single item */
       item = order.items(1);
        book = the_orders.item;
     >
        /* assume search_inventory searches inventory for the book */
        /* if we don't find the book in the warehouse, abort transaction */
        IF  (search_inventory(book) != TRUE)
            rollback;
        ELSE
           process_order(order);
        END IF;
     >
      END;
      /

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

public static void setup_queue(Connection db_conn)
{
    AQSession         aq_sess;
    AQQueue           bookedorders_q;
    AQQueueProperty   q_prop;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");    

        /* Alter queue - set max retries = 4 and retry delay = 12 hours */
        q_prop = new AQQueueProperty();
        q_prop.setMaxRetries(4);

        q_prop.setRetryInterval(3600*12);  // specified in seconds

        bookedorders_q.alterQueue(q_prop);


    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex); 
    }

}
public static void process_next_order(Connection db_conn)
{
    AQSession         aq_sess;
    Order             deq_order;
    OrderItem         order_item;
    Book              book;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;


    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE);

        bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");    


        /* Dequeue the message */
        message = bookedorders_q.dequeue(deq_option, Order.getFactory());

        obj_payload = message.getObjectPayload();

        deq_order = (Order)(obj_payload.getPayloadData());
        
        /* for simplicity, assume order has a single item */
        order_item = deq_order.getItems().getElement(0);
        book = order_item.getItem(); 

        /* assume search_inventory searches inventory for the book 
         * if we don't find the book in the warehouse, abort transaction 
         */
        if(search_inventory(book) != true)
          db_conn.rollback();
        else
          process_order(deq_order);
        
    }
    catch (AQException aqex)
    {
        System.out.println("Exception-1: " + aqex);     
    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }   

}

Exception Handling

AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES, EXPIRATION, MAX_RETRIES and RETRY_DELAY.

An exception_queue is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. Also, a multi-consumer exception queue cannot have subscribers associated with it. However, an application that intends to handle these expired or unserviceable messages can dequeue from the exception queue. The exception queue created for messages intended for a multi-consumer queue must itself be a multi-consumer queue. Like any other queue, the exception queue must be enabled for dequeue using the DBMS_AQADM.START_QUEUE procedure. You will get an Oracle error if you try to enable an exception queue for enqueue.

When a message has expired, it is moved to an exception queue. The exception queue for a message in multi-consumer queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE mode exactly once by specifying a NULL consumer name in the dequeue options. Hence, from a dequeue perspective multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL consumer name. Messages can also be dequeued from the exception queue by specifying the message ID. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'.

The exception queue is a message property that can be specified during enqueue time (see "Enqueue a Message [Specify Message Properties]" in Chapter 11, "Operational Interface: Basic Operations"). In PL/SQL users can use the exception_queue attribute of the DBMS_AQ.MESSAGE_PROPERTIES_T record to specify the exception queue. In OCI users can use the OCISetAttr procedure to set the OCI_ATTR_EXCEPTION_QUEUE attribute of the OCIAQMsgProperties descriptor.

If an exception queue is not specified, the default exception queue is used. If the queue is created in a queue table, say QTAB, the default exception queue will be called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created. Messages are moved to the exception queues by AQ under the following conditions.

Messages intended for 8.1-compatible multi-consumer queues cannot be dequeued by the intended recipients once the messages have been moved to an exception queue. These messages should instead be dequeued in the REMOVE or BROWSE mode exactly once by specifying a NULL consumer name in the dequeue options. The messages can also be dequeued by their message IDs.

Messages intended for single consumer queues, or for 8.0-compatible multi-consumer queues, can only be dequeued by their message IDs once the messages have been moved to an exception queue.

Users can associate a RETRY_DELAY with a queue. The default value for this parameter is 0 which means that the message will be available for dequeue immediately after the RETRY_COUNT is incremented. Otherwise the message will be unavailable for RETRY_DELAY seconds. After RETRY_DELAY seconds the queue monitor will mark the message as READY.

Example Scenario

In the BooksOnLine application, the business rule for each shipping region is that an order will be placed in a back order queue if the order cannot be filled immediately. The back order application will try to fill the order once a day. If the order cannot be filled within 5 days, it is placed in an exception queue for special processing. You can implement this process by making use of the retry and exception handling features in AQ.

The example below shows how you can create a queue with specific maximum retry and retry delay interval.

PL/SQL (DBMS_AQ/ADM Package): Example Code

/* Example for creating a back order queue in Western Region which allows a 
   maximum of 5 retries and 1 day delay between each retry. */  
CONNECT BOLADM/BOLADM 
BEGIN 
  dbms_aqadm.create_queue ( 
        queue_name              => 'WS.WS_backorders_que', 
        queue_table             => 'WS.WS_orders_mqtab', 
        max_retries             => 5, 
        retry_delay             => 60*60*24); 
END; 
/ 
 
/* Create an exception queue for the back order queue for Western Region. */
CONNECT BOLADM/BOLADM 
BEGIN 
  dbms_aqadm.create_queue ( 
        queue_name              => 'WS.WS_backorders_excpt_que', 
        queue_table             => 'WS.WS_orders_mqtab', 
        queue_type              => DBMS_AQADM.EXCEPTION_QUEUE); 
end; 
/ 
 
/* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que as 
the exception queue for the message: */ 
CONNECT BOLADM/BOLADM 
CREATE OR REPLACE PROCEDURE enqueue_WS_unfilled_order(backorder order_typ) 
 AS 
   back_order_queue_name    varchar2(62); 
   enqopt                   dbms_aq.enqueue_options_t; 
   msgprop                  dbms_aq.message_properties_t; 
   enq_msgid                raw(16); 
 BEGIN 
     
   /* Set back order queue name for this message: */ 
   back_order_queue_name := 'WS.WS_backorders_que'; 
 
   /* Set exception queue name for this message: */ 
   msgprop.exception_queue := 'WS.WS_backorders_excpt_que'; 
 
   dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop, 
                   backorder, enq_msgid); 
 END; 
 / 
 

Visual Basic (OO4O): Example Code

The exception queue is a message property that can be provided at the time of enqueuing a message. If this property is not set, the default exception queue of the queue will be used for any error conditions.

set oraaq = OraDatabase.CreateAQ("CBADM.deferbilling_que")
Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
Set OraOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")
OraMsg = OraOrder
   OraMsg.delay = 15*60*60*24
   OraMsg.ExceptionQueue = "WS.WS_backorders_que"
   'Fill up the order values
   OraMsg = OraOrder 'OraOrder contains the order details
   Msgid = OraAq.enqueue

Java (JDBC): Example Code

public static void createBackOrderQueues(Connection db_conn)
{
    AQSession         aq_sess;
    AQQueue           backorders_q;
    AQQueue           backorders_excp_q;
    AQQueueProperty   q_prop;
    AQQueueProperty   q_prop2;
    AQQueueTable      mq_table;
    
    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        mq_table = aq_sess.getQueueTable("WS", "WS_orders_mqtab");
        
    
        /* Create a back order queue in Western Region which allows a 
           maximum of 5 retries and 1 day delay between each retry. */  

        q_prop = new AQQueueProperty();
        q_prop.setMaxRetries(5);
        q_prop.setRetryInterval(60*24*24);

        backorders_q = aq_sess.createQueue(mq_table, "WS_backorders_que", 
                                          q_prop);
        
        backorders_q.start(true, true);

        /* Create an exception queue for the back order queue for 
           Western Region. */
        q_prop2 = new AQQueueProperty();
        q_prop2.setQueueType(AQQueueProperty.EXCEPTION_QUEUE);

        backorders_excp_q = aq_sess.createQueue(mq_table, 
                                          "WS_backorders_excpt_que", q_prop2);

    }
    catch (Exception ex)
    {
        System.out.println("Exception " + ex);
    }

}

/* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que 
   as the exception queue for the message: */ 
public static void enqueue_WS_unfilled_order(Connection db_conn, 
                                             Order back_order)
{
    AQSession         aq_sess;
    AQQueue           back_order_q;     
    AQEnqueueOption   enq_option;
    AQMessageProperty m_property;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    byte[]            enq_msg_id;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        back_order_q = aq_sess.getQueue("WS", "WS_backorders_que");

        message = back_order_q.createMessage();

        /* Set exception queue name for this message: */ 
        m_property = message.getMessageProperty();

        m_property.setExceptionQueue("WS.WS_backorders_excpt_que");

        obj_payload = message.getObjectPayload();
        obj_payload.setPayloadData(back_order);

        enq_option = new AQEnqueueOption();

        /* Enqueue the message */
        enq_msg_id = back_order_q.enqueue(enq_option, message);

        db_conn.commit();
    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex); 
    }
    
}

Rule-based Subscription

Messages may be routed to various recipients based on message properties or message content. Users define a rule-based subscription for a given queue to specify interest in receiving messages that meet particular conditions.

Rules are boolean expressions that evaluate to TRUE or FALSE. Similar in syntax to the WHERE clause of a SQL query, rules are expressed in terms of the attributes that represent message properties or message content. These subscriber rules are evaluated against incoming messages and those rules that match are used to determine message recipients. This feature thus supports the notions of content-based subscriptions and content-based routing of messages.

Example Scenario and Code

For the BooksOnLine application, we illustrate how rule-based subscriptions are used to implement a publish/subscribe paradigm utilizing content-based subscription and content-based routing of messages. The interaction between the Order Entry application and each of the Shipping Applications is modeled as follows;

Each shipping application subscribes to the OE booked orders queue. The following rule-based subscriptions are defined by the Order Entry user to handle the routing of booked orders from the Order Entry application to each of the Shipping applications.

PL/SQL (DBMS_AQ/ADM Package): Example Code

CONNECT OE/OE; 

Western Region Shipping defines an agent called 'West_Shipping' with the WS booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on order region and ordertype attributes.

/*  Add a rule-based subscriber for West Shipping - 
    West Shipping handles Western region US orders, 
    Rush Western region orders are handled by East Shipping: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('West_Shipping', 'WS.WS_bookedorders_que', null); 
  dbms_aqadm.add_subscriber( 
                queue_name => 'OE.OE_bookedorders_que', 
                subscriber => subscriber, 
                rule       => 'tab.user_data.orderregion =  
                    ''WESTERN'' AND tab.user_data.ordertype != ''RUSH'''); 
END; 
/ 

Eastern Region Shipping defines an agent called East_Shipping with the ES booked orders queue as the agent address (the destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on orderregion, ordertype and customer attributes.

/*  Add a rule-based subscriber for East Shipping - 
    East shipping handles all Eastern region orders, 
    East shipping also handles all US rush orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); 
  dbms_aqadm.add_subscriber( 
        queue_name => 'OE.OE_bookedorders_que', 
        subscriber => subscriber, 
        rule       => 'tab.user_data.orderregion = ''EASTERN'' OR  
                      (tab.user_data.ordertype = ''RUSH'' AND  
                       tab.user_data.customer.country = ''USA'') '); 
END; 
/ 

Overseas Shipping defines an agent called Overseas_Shipping with the OS booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on orderregion attribute.

/*  Add a rule-based subscriber for Overseas Shipping 
    Intl Shipping handles all non-US orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('Overseas_Shipping', 'OS.OS_bookedorders_que', 
null); 
  dbms_aqadm.add_subscriber( 
        queue_name => 'OE.OE_bookedorders_que', 
        subscriber => subscriber, 
        rule       => 'tab.user_data.orderregion = ''INTERNATIONAL'''); 
END; 
/ 
 

Visual Basic (OO4O): Example Code

This functionality is currently not available.

Java (JDBC): Example Code

public static void addRuleBasedSubscribers(Connection db_conn)
{

    AQSession         aq_sess;
    AQQueue           bookedorders_q;
    String            rule;
    AQAgent           agt1, agt2, agt3;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);


        bookedorders_q = aq_sess.getQueue("OE", "OE_booked_orders_que");

        /* Add a rule-based subscriber for West Shipping - 
           West Shipping handles Western region US orders, 
           Rush Western region orders are handled by East Shipping: */ 

        agt1 = new AQAgent("West_Shipping", "WS.WS_bookedorders_que"); 

        rule = "tab.user_data.orderregion = 'WESTERN' AND " +
               "tab.user_data.ordertype != 'RUSH'";

        bookedorders_q.addSubscriber(agt1, rule);


        /*  Add a rule-based subscriber for East Shipping - 
            East shipping handles all Eastern region orders, 
            East shipping also handles all US rush orders: */ 

        agt2 = new AQAgent("East_Shipping", "ES.ES_bookedorders_que"); 
        rule = "tab.user_data.orderregion = 'EASTERN' OR " +  
               "(tab.user_data.ordertype = 'RUSH' AND " + 
               "tab.user_data.customer.country = 'USA')"; 

        bookedorders_q.addSubscriber(agt2, rule);

        /*  Add a rule-based subscriber for Overseas Shipping 
            Intl Shipping handles all non-US orders: */ 

        agt3 = new AQAgent("Overseas_Shipping", "OS.OS_bookedorders_que");
        rule = "tab.user_data.orderregion = 'INTERNATIONAL'";

        bookedorders_q.addSubscriber(agt3, rule);
    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex);
    }

}
 

Listen Capability

In Oracle8i release 8.1.6, AQ has the capability to monitor multiple queues for messages with a single call, listen. An application can use listen to wait for messages for multiple subscriptions. It can also be used by gateway applications to monitor multiple queues. If the listen call returns successfully, a dequeue must be used to retrieve the message (see Listen to One (Many) Queue(s) in Chapter 11, "Operational Interface: Basic Operations").

Without the listen call, an application which sought to dequeue from a set of queues would have to continuously poll the queues to determine if there were a message. Alternatively, you could design your application to have a separate dequeue process for each queue. However, if there are long periods with no traffic in any of the queues, these approaches will create an unacceptable overhead. The listen call is well suited for such applications.

Note that when there are messages for multiple agents in the agent list, listen returns with the first agent for whom there is a message. In that sense listen is not 'fair' in monitoring the queues. The application designer must keep this in mind when using the call. To prevent one agent from 'starving' other agents for messages, the application could change the order of the agents in the agent list.

Example Scenario

In the customer service component of the BooksOnLine example, messages from different databases arrive in the customer service queues, indicating the state of the message. The customer service application monitors the queues and whenever there is a message about a customer order, it updates the order status in the order_status_table. The application uses the listen call to monitor the different queues. Whenever there is a message in any of the queues, it dequeues the message and updates the order status accordingly.

PL/SQL (DBMS_AQ/ADM Package): Example Code

CODE (in tkaqdocd.sql) 
 
/* Update the status of the order in the order status table: */ 
CREATE OR REPLACE PROCEDURE update_status( 
                                new_status    IN VARCHAR2, 
                                order_msg     IN BOLADM.ORDER_TYP) 
IS 
 old_status    VARCHAR2(30); 
 dummy         NUMBER; 
BEGIN 
 
  BEGIN   
    /* Query old status from the table: */ 
    SELECT st.status INTO old_status FROM order_status_table st  
       WHERE st.customer_order.orderno = order_msg.orderno; 
 
  /* Status can be 'BOOKED_ORDER', 'SHIPPED_ORDER', 'BACK_ORDER' 
     and   'BILLED_ORDER': */ 
 
   IF new_status = 'SHIPPED_ORDER' THEN   
      IF old_status = 'BILLED_ORDER' THEN 
        return;             /* message about a previous state */ 
      END IF; 
   ELSIF new_status = 'BACK_ORDER' THEN 
      IF old_status = 'SHIPPED_ORDER' OR old_status = 'BILLED_ORDER' THEN 
        return;             /* message about a previous state */ 
      END IF; 
   END IF; 
 
   /* Update the order status:  */ 
     UPDATE order_status_table st 
        SET st.customer_order = order_msg, st.status = new_status; 
 
   COMMIT; 
 
  EXCEPTION 
  WHEN OTHERS  THEN     /* change to no data found */ 
    /* First update for the order: */ 
    INSERT INTO order_status_table(customer_order, status) 
    VALUES (order_msg, new_status); 
    COMMIT; 
 
  END; 
END; 
/ 
         
 
/* Dequeues message from 'QUEUE' for 'CONSUMER': */ 
CREATE OR REPLACE PROCEDURE DEQUEUE_MESSAGE( 
                         queue      IN   VARCHAR2, 
                         consumer   IN   VARCHAR2, 
                         message    OUT  BOLADM.order_typ) 
IS 
  
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_msgid                RAW(16); 
BEGIN 
  dopt.dequeue_mode := dbms_aq.REMOVE; 
  dopt.navigation := dbms_aq.FIRST_MESSAGE; 
  dopt.consumer_name := consumer; 
 
  dbms_aq.dequeue( 
                queue_name => queue, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => message, 
                msgid => deq_msgid); 
  commit; 
END; 
/ 
 
/* Monitor the queues in the customer service databse for 'time' seconds: */ 
 CREATE OR REPLACE PROCEDURE MONITOR_STATUS_QUEUE(time  IN  NUMBER)  
IS 
  agent_w_message   aq$_agent; 
  agent_list        dbms_aq.agent_list_t; 
  wait_time         INTEGER := 120; 
  no_message        EXCEPTION; 
  pragma EXCEPTION_INIT(no_message, -25254); 
  order_msg         boladm.order_typ;  
  new_status        VARCHAR2(30); 
  monitor           BOOLEAN := TRUE; 
  begin_time        NUMBER; 
  end_time          NUMBER; 
BEGIN 
 
 begin_time :=  dbms_utility.get_time;     
 WHILE (monitor) 
 LOOP 
 BEGIN 
 
  /* Construct the waiters list: */ 
  agent_list(1) := aq$_agent('BILLED_ORDER', 'CS_billedorders_que', NULL); 
  agent_list(1) := aq$_agent('SHIPPED_ORDER', 'CS_shippedorders_que', 
NULL); 
  agent_list(2) := aq$_agent('BACK_ORDER', 'CS_backorders_que', NULL); 
  agent_list(3) := aq$_agent('Booked_ORDER', 'CS_bookedorders_que', NULL); 
 
   /* Wait for order status messages: */ 
   dbms_aq.listen(agent_list, wait_time, agent_w_message); 
    
   dbms_output.put_line('Agent' || agent_w_message.name || ' Address '|| 
agent_w_message.address); 
   /* Dequeue the message from the queue: */ 
   dequeue_message(agent_w_message.address, agent_w_message.name, order_msg); 
 
   /* Update the status of the order depending on the type of the message,  
    * the name of the agent contains the new state: */ 
   update_status(agent_w_message.name, order_msg); 
 
  /* Exit if we have been working long enough: */ 
   end_time := dbms_utility.get_time; 
   IF  (end_time - begin_time > time)   THEN 
     EXIT; 
   END IF; 
 
  EXCEPTION 
  WHEN  no_message  THEN 
    dbms_output.put_line('No messages in the past 2 minutes'); 
       end_time := dbms_utility.get_time; 
    /* Exit if we have done enough work: */ 
    IF  (end_time - begin_time > time)   THEN 
      EXIT; 
    END IF; 
  END; 
  
  END LOOP; 
END; 
/ 
 

Visual Basic (OO4O): Example Code

Feature not currently available.

Java (JDBC): Example Code

Feature not supported in Java.


Go to previous page Go to beginning of chapter Go to next page
Oracle
Copyright © 1996-2000, Oracle Corporation.

All Rights Reserved.

Library

Product

Contents

Index