Oracle9i Application Developer's Guide - Advanced Queuing Release 1 (9.0.1) Part Number A88890-02 |
|
A Sample Application Using AQ, 5 of 7
A message can be dequeued using one of the following dequeue methods:
A correlation identifier is a user-defined message property (of VARCHAR2
datatype) while a message identifier is a system-assigned value (of RAW
datatype). Multiple messages with the same correlation identifier can be present in a queue, while only one message with a given message identifier can be present. A dequeue call with a correlation identifier will directly remove a message of specific interest, rather than using a combination of locked and remove mode to first examine the content and then remove the message. Hence, the correlation identifier usually contains the most useful attribute of a payload. If there are multiple messages with the same correlation identifier, the ordering (enqueue order) between messages may not be preserved on dequeue calls. The correlation identifier cannot be changed between successive dequeue calls without specifying the first message navigation option.
A dequeue condition is an expression that is similar in syntax to the WHERE
clause of a SQL query. Dequeue conditions are expressed in terms of the attributes that represent message properties or message content. The messages in the queue are evaluated against the conditions and a message that satisfies the given condition is returned.
Note that dequeueing a message with any of the dequeue methods will not preserve the message grouping property (see "Message Grouping" and "Message Navigation in Dequeue" for further information).
In the BooksOnLine example, rush orders received by the East shipping site are processed first. This is achieved by dequeueing the message using the correlation identifier, which has been defined to contain the order type (rush/normal). For an illustration of dequeueing using a message identifier, refer to the get_northamerican_orders
procedure discussed in the example under "Modes of Dequeuing".
CONNECT boladm/boladm; /* Create procedures to dequeue RUSH orders */ create or replace procedure get_rushtitles(consumer in varchar2) as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; qname varchar2(30); no_messages exception; pragma exception_init (no_messages, -25228); new_orders BOOLEAN := TRUE; begin dopt.consumer_name := consumer; dopt.wait := 1; dopt.correlation := 'RUSH'; IF (consumer = 'West_Shipping') THEN qname := 'WS.WS_bookedorders_que'; ELSIF (consumer = 'East_Shipping') THEN qname := 'ES.ES_bookedorders_que'; ELSE qname := 'OS.OS_bookedorders_que'; END IF; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); commit; deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; dbms_output.put_line(' rushorder book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE RUSH TITLES ---- '); new_orders := FALSE; END; END LOOP; end; / CONNECT EXECUTE on get_rushtitles to ES; /* Dequeue the orders: */ CONNECT ES/ES; /* Dequeue all rush order titles for East_Shipping: */ EXECUTE BOLADM.get_rushtitles('East_Shipping');
set oraaq1 = OraDatabase.CreateAQ("WS.WS_backorders_que") set oraaq2 = OraDatabase.CreateAQ("ES.ES_backorders_que") set oraaq3 = OraDatabase.CreateAQ("CBADM.deferbilling_que") Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ") Set OraBackOrder = OraDatabase.CreateOraObject("BOLADM.order_typ") Private Sub Requeue_backorder Dim q as oraobject If sale_region = WEST then q = oraaq1 else if sale_region = EAST then q = oraaq2 else q = oraaq3 end if OraMsg.delay = 7*60*60*24 OraMsg = OraBackOrder 'OraOrder contains the order details Msgid = q.enqueue End Sub
public static void getRushTitles(Connection db_conn, String consumer) { AQSession aq_sess; Order deq_order; byte[] deq_msgid; AQDequeueOption deq_option; AQMessageProperty msg_prop; AQQueue bookedorders_q; AQMessage message; AQObjectPayload obj_payload; boolean new_orders = true; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); deq_option = new AQDequeueOption(); deq_option.setConsumerName(consumer); deq_option.setWaitTime(1); deq_option.setCorrelation("RUSH"); if(consumer.equals("West_Shipping")) { bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que"); } else if(consumer.equals("East_Shipping")) { bookedorders_q = aq_sess.getQueue("ES", "ES_bookedorders_que"); } else { bookedorders_q = aq_sess.getQueue("OS", "OS_bookedorders_que"); } while(new_orders) { try { /* Dequeue the message */ message = bookedorders_q.dequeue(deq_option, Order.getFactory()); obj_payload = message.getObjectPayload(); deq_order = (Order)(obj_payload.getPayloadData()); System.out.println("Order number " + deq_order.getOrderno() + " is a rush order"); } catch (AQException aqex) { new_orders = false; System.out.println("No more rush titles"); System.out.println("Exception-1: " + aqex); } } } catch (Exception ex) { System.out.println("Exception-2: " + ex); } }
A consumer can dequeue a message from a multi-consumer normal queue by supplying the name that was used in the AQ$_AGENT
type of the DBMS_AQADM
.ADD_SUBSCRIBER
procedure or the recipient list of the message properties. (See "Adding a Subscriber" or Enqueuing a Message [Specify Message Properties]).
consumer_name
field of the dequeue_options_t
record.
OCISetAttr
procedure to specify a text string as the OCI_ATTR_CONSUMER_NAME
of an OCI_DTYPE_AQDEQ_OPTIONS
descriptor.
Multiple processes or operating system threads can use the same consumer_name
to dequeue concurrently from a queue. In that case AQ will provide the first unlocked message that is at the head of the queue and is intended for the consumer. Unless the message ID of a specific message is specified during dequeue, the consumers can dequeue messages that are in the READY
state.
A message is considered PROCESSED
only when all intended consumers have successfully dequeued the message. A message is considered EXPIRED
if one or more consumers did not dequeue the message before the EXPIRATION
time. When a message has expired, it is moved to an exception queue.
The exception queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE
mode exactly once by specifying a NULL
consumer name in the dequeue options. Hence, from a dequeue perspective, multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL
consumer name. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table with the compatible parameter set to '8.0'.
Beginning with release 8.1.6, only the queue monitor removes messages from multi-consumer queues. This allows dequeuers to complete the dequeue operation by not locking the message in the queue table. Since the queue monitor removes messages that have been processed by all consumers from multi-consumer queues approximately once every minute, users may see a delay when the messages have been completely processed and when they are physically removed from the queue.
Consumers of a message in multi-consumer queues (either by virtue of being a subscriber to the queue or because the consumer was a recipient in the enqueuer's recipient list) can be local or remote.
NULL
NAME
and NULL
ADDRESS
and PROTOCOL
field in the AQ$_AGENT
type (see "Agent Type (aq$_agent)" in Chapter 2, "Basic Components").
ADDRESS
field refers to a queue in the same database. In this case the consumer will dequeue the message from a different queue in the same database. These addresses will be of the form [schema]
.queue_name
where queue_name
(optionally qualified by the schema name) is the target queue. If the schema is not specified, the schema of the current user executing the ADD_SUBSCRIBER
procedure or the enqueue is used (see "Adding a Subscriber", or "Enqueuing a Message" in Chapter 11, "Operational Interface: Basic Operations"). Use the DBMS_AQADM
.SCHEDULE_PROPAGATION
command with a NULL
destination (which is the default) to schedule propagation to such remote consumers (see "Scheduling a Queue Propagation" in Chapter 9, "Administrative Interface").
ADDRESS
field refers to a queue in a different database. In this case the database must be reachable using database links and the PROTOCOL
must be either NULL
or 0. These addresses will be of the form [schema]
.queue_name@dblink
. If the schema is not specified, the schema of the current user executing the ADD_SUBSCRIBER
procedure or the enqueue is used. If the database link is not a fully qualified name (does not have a domain name specified), the default domain as specified by the db_domain
init
.ora
parameter will be used. Use the DBMS_AQADM
.SCHEDULE_PROPAGATION
procedure with the database link as the destination to schedule the propagation. AQ does not support the use of synonyms to refer to queues or database links.
ADDRESS
field refers to a destination that can be reached by a third party protocol. You will need to refer to the documentation of the third party software to determine how to specify the ADDRESS
and the PROTOCOL
database link, and on how to schedule propagation.
When a consumer is remote, a message will be marked as PROCESSED
in the source queue immediately after the message has been propagated, even though the consumer may not have dequeued the message at the remote queue. Similarly, when a propagated message expires at the remote queue, the message is moved to the DEFAULT
exception queue of the remote queue's queue table, and not to the exception queue of the local queue. As can be seen in both cases, AQ does not currently propagate the exceptions to the source queue. You can use the MSGID
and the ORIGINAL_MSGID
columns in the queue table view (AQ$<queue_table>
) to chain the propagated messages. When a message with message ID m1 is propagated to a remote queue, m1 is stored in the ORIGINAL_MSGID
column of the remote queue.
The DELAY
, EXPIRATION
and PRIORITY
parameters apply identically to both local and remote consumers. AQ accounts for any delay in propagation by adjusting the DELAY
and EXPIRATION
parameters accordingly. For example, if the EXPIRATION
is set to one hour, and the message is propagated after 15 minutes, the expiration at the remote queue will be set to 45 minutes.
Since the database handles message propagation, OO4O does not differentiate between remote and local recipients. The same sequence of calls/steps are required to dequeue a message for local and remote recipients.
You have several options for selecting a message from a queue. You can select the "first message". Alternatively, once you have selected a message and established its position in the queue (for example, as the fourth message), you can then retrieve the "next message".
These selections work in a slightly different way if the queue is enabled for transactional grouping.
Note that the transaction grouping property is negated if a dequeue is performed in one of the following ways: dequeue by specifying a correlation identifier, dequeue by specifying a message identifier, or dequeueing some of the messages of a transaction and committing (see "Dequeue Methods").
In navigating through the queue, if the program reaches the end of the queue while using the "next message" or "next transaction" option, and you have specified a non-zero wait time, then the navigating position is automatically changed to the beginning of the queue. If a zero wait time is specified, you may get an exception when the end of the queue is reached.
The following scenario in the BooksOnLine
example continues the message grouping example already discussed with regard to enqueuing (see "Dequeue Methods").
The get_orders
() procedure dequeues orders from the OE_neworders_que
. Recall that each transaction refers to an order and each message corresponds to an individual book in the order. The get_orders
() procedure loops through the messages to dequeue the book orders. It resets the position to the beginning of the queue using the first message option before the first dequeues. It then uses the next message navigation option to retrieve the next book (message) of an order (transaction). If it gets an error message indicating all message in the current group/transaction have been fetched, it changes the navigation option to next transaction and gets the first book of the next order. It then changes the navigation option back to next message for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.
CONNECT boladm/boladm; create or replace procedure get_new_orders as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; qname VARCHAR2(30); no_messages exception; end_of_group exception; pragma exception_init (no_messages, -25228); pragma exception_init (end_of_group, -25235); new_orders BOOLEAN := TRUE; BEGIN dopt.wait := 1; dopt.navigation := DBMS_AQ.FIRST_MESSAGE; qname := 'OE.OE_neworders_que'; WHILE (new_orders) LOOP BEGIN LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; deq_cust_data := deq_order_data.customer; IF (deq_cust_data IS NOT NULL) THEN dbms_output.put_line(' **** NEXT ORDER **** '); dbms_output.put_line('order_num: ' || deq_order_data.orderno); dbms_output.put_line('ship_state: ' || deq_cust_data.state); END IF; dbms_output.put_line(' ---- next book ---- '); dbms_output.put_line(' book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); EXCEPTION WHEN end_of_group THEN dbms_output.put_line ('*** END OF ORDER ***'); commit; dopt.navigation := DBMS_AQ.NEXT_TRANSACTION; END; END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE NEW ORDERS ---- '); new_orders := FALSE; END; END LOOP; END; / CONNECT EXECUTE ON get_new_orders to OE; /* Dequeue the orders: */ CONNECT OE/OE; EXECUTE BOLADM.get_new_orders;
Dim OraSession as object Dim OraDatabase as object Dim OraAq as object Dim OraMsg as Object Dim OraOrder,OraItemList,OraItem,OraBook,OraCustomer as Object Dim Msgid as String Set OraSession = CreateObject("OracleInProcServer.XOraSession") Set OraDatabase = OraSession.DbOpenDatabase("", "boladm/boladm", 0&) set oraaq = OraDatabase.CreateAQ("OE.OE_neworders_que") Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ") OraAq.wait = 1 OraAq.Navigation = ORAAQ_DQ_FIRST_MESSAGE private sub get_new_orders Dim MsgIsDequeued as Boolean On Error goto ErrHandler MsgIsDequeued = TRUE msgid = q.Dequeue if MsgIsDequeued then set OraOrder = OraMsg OraItemList = OraOrder("items") OraItem = OraItemList(1) OraBook = OraItem("item") OraCustomer = OraOrder("customer") ' Populate the textboxes with the values if( OraCustomer ) then if OraAq.Navigation <> ORAAQ_DQ_NEXT_MESSAGE then MsgBox " ********* NEXT ORDER *******" end if txt_book_orderno = OraOrder("orderno") txt_book_shipstate = OraCustomer("state") End if OraAq.Navigation = ORAAQ_DQ_NEXT_MESSAGE txt_book_title = OraBook("title") txt_book_qty = OraItem("quantity") Else MsgBox " ********* END OF ORDER *******" End if ErrHandler : 'Handle error case, like no message etc If OraDatabase.LastServerErr = 25228 then OraAq.Navigation = ORAAQ_DQ_NEXT_TRANSACTION MsgIsDequeued = FALSE Resume Next End If 'Process other errors end sub
No example is provided with this release.
A dequeue request can either view a message or delete a message (see "Dequeuing a Message" in Chapter 11, "Operational Interface: Basic Operations").
If a message is browsed, it remains available for further processing. Similarly if a message is locked, it remains available for further processing after the lock is released by performing a transaction commit or rollback. After a message is consumed, using either of the remove modes, it is no longer available for dequeue requests.
When a message is dequeued using REMOVE_NODATA
mode, the payload of the message is not retrieved. This mode can be useful when the user has already examined the message payload, possibly by means of a previous BROWSE
dequeue. In this way, you can avoid the overhead of payload retrieval that can be substantial for large payloads
A message is retained in the queue table after it has been consumed only if a retention time is specified for a queue. Messages cannot be retained in exception queues (refer to the section on exceptions for further information). Removing a message with no data is generally used if the payload is known (from a previous browse/locked mode dequeue call), or the message will not be used.
Note that after a message has been browsed, there is no guarantee that the message can be dequeued again since a dequeue call from a concurrent user might have removed the message. To prevent a viewed message from being dequeued by a concurrent user, you should view the message in the locked mode.
In general, use care while using the browse mode. The dequeue position is automatically changed to the beginning of the queue if a non-zero wait time is specified and the navigating position reaches the end of the queue. Hence repeating a dequeue call in the browse mode with the "next message" navigation option and a non-zero wait time can dequeue the same message over and over again. We recommend that you use a non-zero wait time for the first dequeue call on a queue in a session, and then use a zero wait time with the next message navigation option for subsequent dequeue calls. If a dequeue call gets an "end of queue" error message, the dequeue position can be explicitly set by the dequeue call to the beginning of the queue using the "first message" navigation option, following which the messages in the queue can be browsed again.
In the following scenario from the BooksOnLine
example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) and the customer country (message payload) is checked. If the customer country is Mexico or Canada, the message is consumed (deleted from the queue) using REMOVE_NODATA
(since the payload is already known). Otherwise, the lock on the message is released by the commit call. Note that the remove dequeue call uses the message identifier obtained from the locked mode dequeue call. The shipping_bookedorder_deq
(refer to the example code for the description of this procedure) call illustrates the use of the browse mode.
CONNECT boladm/boladm; create or replace procedure get_northamerican_orders as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; deq_order_nodata BOLADM.order_typ; qname VARCHAR2(30); no_messages exception; pragma exception_init (no_messages, -25228); new_orders BOOLEAN := TRUE; begin dopt.consumer_name := consumer; dopt.wait := DBMS_AQ.NO_WAIT; dopt.navigation := dbms_aq.FIRST_MESSAGE; dopt.dequeue_mode := DBMS_AQ.LOCKED; qname := 'OS.OS_bookedorders_que'; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; deq_cust_data := deq_order_data.customer; IF (deq_cust_data.country = 'Canada' OR deq_cust_data.country = 'Mexico' ) THEN dopt.dequeue_mode := dbms_aq.REMOVE_NODATA; dopt.msgid := deq_msgid; dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_nodata, msgid => deq_msgid); commit; dbms_output.put_line(' **** next booked order **** '); dbms_output.put_line('order_no: ' || deq_order_data.orderno || ' book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); dbms_output.put_line('ship_state: ' || deq_cust_data.state || ' ship_country: ' || deq_cust_data.country || ' ship_order_type: ' || deq_order_data.ordertype); END IF; commit; dopt.dequeue_mode := DBMS_AQ.LOCKED; dopt.msgid := NULL; dopt.navigation := dbms_aq.NEXT_MESSAGE; EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE BOOKED ORDERS ---- '); new_orders := FALSE; END; END LOOP; end; / CONNECT EXECUTE on get_northamerican_orders to OS; CONNECT ES/ES; /* Browse all booked orders for East_Shipping: */ EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.BROWSE); CONNECT OS/OS; /* Dequeue all international North American orders for Overseas_Shipping: */ EXECUTE BOLADM.get_northamerican_orders;
OO4O supports all the modes of dequeuing described above. Possible values include:
Dim OraSession as object Dim OraDatabase as object Dim OraAq as object Dim OraMsg as Object Dim OraOrder,OraItemList,OraItem,OraBook,OraCustomer as Object Dim Msgid as String Set OraSession = CreateObject("OracleInProcServer.XOraSession") Set OraDatabase = OraSession.DbOpenDatabase("", "boladm/boladm", 0&) set oraaq = OraDatabase.CreateAQ("OE.OE_neworders_que") OraAq.DequeueMode = ORAAQ_DQ_BROWSE
public static void get_northamerican_orders(Connection db_conn) { AQSession aq_sess; Order deq_order; Customer deq_cust; String cust_country; byte[] deq_msgid; AQDequeueOption deq_option; AQMessageProperty msg_prop; AQQueue bookedorders_q; AQMessage message; AQObjectPayload obj_payload; boolean new_orders = true; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); deq_option = new AQDequeueOption(); deq_option.setConsumerName("Overseas_Shipping"); deq_option.setWaitTime(AQDequeueOption.WAIT_NONE); deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE); deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED); bookedorders_q = aq_sess.getQueue("OS", "OS_bookedorders_que"); while(new_orders) { try { /* Dequeue the message - browse with lock */ message = bookedorders_q.dequeue(deq_option, Order.getFactory()); obj_payload = message.getObjectPayload(); deq_msgid = message.getMessageId(); deq_order = (Order)(obj_payload.getPayloadData()); deq_cust = deq_order.getCustomer(); cust_country = deq_cust.getCountry(); if(cust_country.equals("Canada") || cust_country.equals("Mexico")) { deq_option.setDequeueMode( AQDequeueOption.DEQUEUE_REMOVE_NODATA); deq_option.setMessageId(deq_msgid); /* Delete the message */ bookedorders_q.dequeue(deq_option, Order.getFactory()); System.out.println("---- next booked order ------"); System.out.println("Order no: " + deq_order.getOrderno()); System.out.println("Ship state: " + deq_cust.getState()); System.out.println("Ship country: " + deq_cust.getCountry()); System.out.println("Order type: " + deq_order.getOrdertype()); } db_conn.commit(); deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED); deq_option.setMessageId(null); deq_option.setNavigationMode( AQDequeueOption.NAVIGATION_NEXT_MESSAGE); } catch (AQException aqex) { new_orders = false; System.out.println("--- No more booked orders ----"); System.out.println("Exception-1: " + aqex); } } } catch (Exception ex) { System.out.println("Exception-2: " + ex); } }
AQ allows applications to block on one or more queues waiting for the arrival of either a newly enqueued message or for a message that becomes ready. You can use the DEQUEUE
operation to wait for the arrival of a message in a queue (see "Dequeuing a Message") or the LISTEN
operation to wait for the arrival of a message in more than one queue (see "Listening to One (Many) Queue(s)" in Chapter 11, "Operational Interface: Basic Operations").
When the blocking DEQUEUE
call returns, it returns the message properties and the message payload. By contrast, when the blocking LISTEN
call returns, it discloses only the name of the queue where a message has arrived. A subsequent DEQUEUE
operation is needed to dequeue the message.
Applications can optionally specify a timeout of zero or more seconds to indicate the time that AQ must wait for the arrival of a message. The default is to wait forever until a message arrives in the queue. This optimization is important in two ways. It removes the burden of continually polling for messages from the application. And it saves CPU and network resource because the application remains blocked until a new message is enqueued or becomes READY
after its DELAY
time. Applications can also perform a blocking dequeue on exception queues to wait for arrival of EXPIRED
messages.
A process or thread that is blocked on a dequeue is either awakened directly by the enqueuer if the new message has no DELAY
or is awakened by the queue monitor process when the DELAY
or EXPIRATION
time has passed. Applications cannot only wait for the arrival of a message in the queue that an enqueuer enqueues a message, but also on a remote queue, provided that propagation has been scheduled to the remote queue using DBMS_AQADM
.SCHEDULE_PROPAGATION
. In this case, the AQ propagator will wake up the blocked dequeuer after a message has been propagated.
In the BooksOnLine example, the get_rushtitles
procedure discussed under dequeue methods specifies a wait time of 1 second in the dequeue_options
argument for the dequeue call. Wait time can be specified in different ways as illustrated in the code below.
DBMS_AQ
.NO_WAIT
, a wait time of 0 seconds is implemented. The dequeue call in this case will return immediately even if there are no messages in the queue.
DBMS_AQ
.FOREVER
, the dequeue call is blocked without a time out until a message is available in the queue.
/* dopt is a variable of type dbms_aq.dequeue_options_t. Set the dequeue wait time to 10 seconds: */ dopt.wait := 10; /* Set the dequeue wait time to 0 seconds: */ dopt.wait := DBMS_AQ.NO_WAIT; /* Set the dequeue wait time to infinite (forever): */ dopt.wait := DBMS_AQ.FOREVER;
OO4O supports asynchronous dequeuing of messages. First, the monitor is started for a particular queue. When messages that fulfil the user criteria are dequeued, the user's callback object is notified.
AQDequeueOption deq-opt;
deq-opt = new AQDequeueOption ();
If the transaction dequeuing the message from a queue fails, it is regarded as an unsuccessful attempt to consume the message. AQ records the number of failed attempts to consume the message in the message history. Applications can query the retry_count column of the queue table view to find out the number of unsuccessful attempts on a message. In addition, AQ allows the application to specify, at the queue level, the maximum number of retries for messages in the queue. If the number of failed attempts to remove a message exceeds this number, the message is moved to the exception queue and is no longer available to applications.
A bad condition can cause the transaction receiving a message to abort. AQ allows users to hide the bad message for a pre-specified interval. A retry_delay can be specified along with maximum retries. This means that a message that has had a failed attempt will be visible in the queue for dequeue after the retry_delay interval. Until then it will be in the WAITING state. In the AQ background process, the time manager enforces the retry delay property. The default value for maximum retries is 5. The default value for retry delay is 0. Note that maximum retries and retry delay are not available with 8.0-compatible multi-consumer queues.
/* Create a package that enqueue with delay set to one day: /* CONNECT BOLADM/BOLADM > /* queue has max retries = 4 and retry delay = 12 hours */ execute dbms_aqadm.alter_queue(queue_name = 'WS.WS_BOOKED_ORDERS_QUE', max_retr ies = 4, retry_delay = 3600*12); > /* processes the next order available in the booked_order_queue */ CREATE OR REPLACE PROCEDURE process_next_order() AS dqqopt dbms_aq.dequeue_options_t; msgprop dbms_aq.message_properties_t; deq_msgid RAW(16); book BOLADM.book_typ; item BOLADM.orderitem_typ; BOLADM.order_typ order; BEGIN > dqqopt.dequeue_option := DBMS_AQ.FIRST_MESSAGE; dbms_aq.dequeue('WS.WS_BOOKED_ORDERS_QUEUE', dqqopt, msgprop, order, deq_msgid ); > /* for simplicity, assume order has a single item */ item = order.items(1); book = the_orders.item; > /* assume search_inventory searches inventory for the book */ /* if we don't find the book in the warehouse, abort transaction */ IF (search_inventory(book) != TRUE) rollback; ELSE process_order(order); END IF; > END; /
Use the dbexecutesql interface from the database for this functionality.
public static void setup_queue(Connection db_conn) { AQSession aq_sess; AQQueue bookedorders_q; AQQueueProperty q_prop; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que"); /* Alter queue - set max retries = 4 and retry delay = 12 hours */ q_prop = new AQQueueProperty(); q_prop.setMaxRetries(4); q_prop.setRetryInterval(3600*12); // specified in seconds bookedorders_q.alterQueue(q_prop); } catch (Exception ex) { System.out.println("Exception: " + ex); } } public static void process_next_order(Connection db_conn) { AQSession aq_sess; Order deq_order; OrderItem order_item; Book book; AQDequeueOption deq_option; AQMessageProperty msg_prop; AQQueue bookedorders_q; AQMessage message; AQObjectPayload obj_payload; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); deq_option = new AQDequeueOption(); deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE); bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que"); /* Dequeue the message */ message = bookedorders_q.dequeue(deq_option, Order.getFactory()); obj_payload = message.getObjectPayload(); deq_order = (Order)(obj_payload.getPayloadData()); /* for simplicity, assume order has a single item */ order_item = deq_order.getItems().getElement(0); book = order_item.getItem(); /* assume search_inventory searches inventory for the book * if we don't find the book in the warehouse, abort transaction */ if(search_inventory(book) != true) db_conn.rollback(); else process_order(deq_order); } catch (AQException aqex) { System.out.println("Exception-1: " + aqex); } catch (Exception ex) { System.out.println("Exception-2: " + ex); } }
AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES
, EXPIRATION
, MAX_RETRIES
and RETRY_DELAY
.
An exception_queue
is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. Also, a multi-consumer exception queue cannot have subscribers associated with it. However, an application that intends to handle these expired or unserviceable messages can dequeue from the exception queue. The exception queue created for messages intended for a multi-consumer queue must itself be a multi-consumer queue. Like any other queue, the exception queue must be enabled for dequeue using the DBMS_AQADM
.START_QUEUE
procedure. You will get an Oracle error if you try to enable an exception queue for enqueue.
When a message has expired, it is moved to an exception queue. The exception queue for a message in multi-consumer queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE
mode exactly once by specifying a NULL
consumer name in the dequeue options. Hence, from a dequeue perspective multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL
consumer name. Messages can also be dequeued from the exception queue by specifying the message ID. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'.
The exception queue is a message property that can be specified during enqueue time (see "Enqueuing a Message [Specify Message Properties]" in Chapter 11, "Operational Interface: Basic Operations"). In PL/SQL users can use the exception_queue
attribute of the DBMS_AQ
.MESSAGE_PROPERTIES_T
record to specify the exception queue. In OCI users can use the OCISetAttr
procedure to set the OCI_ATTR_EXCEPTION_QUEUE
attribute of the OCIAQMsgProperties
descriptor.
If an exception queue is not specified, the default exception queue is used. If the queue is created in a queue table, for example, QTAB
, the default exception queue will be called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created. Messages are moved to the exception queues by AQ under the following conditions:
DBMS_AQ
.NEVER
, meaning the messages will not expire.
For messages intended for multiple recipients, each message keeps a separate retry count for each recipient. The message is moved to the exception queue only when retry counts for all recipients of the message have exceeded the specified retry limit. The default retry limit is 5 for single-consumer queues and 8.1-compatible multi-consumer queues. No retry limit is not supported for 8.0- compatible multi-consumer queues.
DBMS_AQ
.DEQUEUE
. If the dequeue procedure succeeds but the PL/SQL procedure raises an exception, AQ will attempt to increment the RETRY_COUNT
of the message returned by the dequeue procedure.
Messages intended for 8.1-compatible multi-consumer queues cannot be dequeued by the intended recipients once the messages have been moved to an exception queue. These messages should instead be dequeued in the REMOVE
or BROWSE
mode exactly once by specifying a NULL
consumer name in the dequeue options. The messages can also be dequeued by their message IDs.
Messages intended for single consumer queues, or for 8.0-compatible multi-consumer queues, can only be dequeued by their message IDs once the messages have been moved to an exception queue.
Users can associate a RETRY_DELAY
with a queue. The default value for this parameter is 0, meaning that the message will be available for dequeue immediately after the RETRY_COUNT
is incremented. Otherwise the message will be unavailable for RETRY_DELAY
seconds. After RETRY_DELAY
seconds, the queue monitor marks the message as READY
.
For a multi-consumer queue, RETRY_DELAY
is per subscriber.
In the BooksOnLine
application, the business rule for each shipping region is that an order will be placed in a back order queue if the order cannot be filled immediately. The back order application will try to fill the order once a day. If the order cannot be filled within 5 days, it is placed in an exception queue for special processing. You can implement this process by making use of the retry and exception handling features in AQ.
The example below shows how you can create a queue with specific maximum retry and retry delay interval.
/* Example for creating a back order queue in Western Region which allows a maximum of 5 retries and 1 day delay between each retry. */ CONNECT BOLADM/BOLADM BEGIN dbms_aqadm.create_queue ( queue_name => 'WS.WS_backorders_que', queue_table => 'WS.WS_orders_mqtab', max_retries => 5, retry_delay => 60*60*24); END; / /* Create an exception queue for the back order queue for Western Region. */ CONNECT BOLADM/BOLADM BEGIN dbms_aqadm.create_queue ( queue_name => 'WS.WS_backorders_excpt_que', queue_table => 'WS.WS_orders_mqtab', queue_type => DBMS_AQADM.EXCEPTION_QUEUE); end; / /* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que as the exception queue for the message: */ CONNECT BOLADM/BOLADM CREATE OR REPLACE PROCEDURE enqueue_WS_unfilled_order(backorder order_typ) AS back_order_queue_name varchar2(62); enqopt dbms_aq.enqueue_options_t; msgprop dbms_aq.message_properties_t; enq_msgid raw(16); BEGIN /* Set back order queue name for this message: */ back_order_queue_name := 'WS.WS_backorders_que'; /* Set exception queue name for this message: */ msgprop.exception_queue := 'WS.WS_backorders_excpt_que'; dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop, backorder, enq_msgid); END; /
The exception queue is a message property that can be provided at the time of enqueuing a message. If this property is not set, the default exception queue of the queue will be used for any error conditions.
set oraaq = OraDatabase.CreateAQ("CBADM.deferbilling_que") Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ") Set OraOrder = OraDatabase.CreateOraObject("BOLADM.order_typ") OraMsg = OraOrder OraMsg.delay = 15*60*60*24 OraMsg.ExceptionQueue = "WS.WS_backorders_que" 'Fill up the order values OraMsg = OraOrder 'OraOrder contains the order details Msgid = OraAq.enqueue
public static void createBackOrderQueues(Connection db_conn) { AQSession aq_sess; AQQueue backorders_q; AQQueue backorders_excp_q; AQQueueProperty q_prop; AQQueueProperty q_prop2; AQQueueTable mq_table; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); mq_table = aq_sess.getQueueTable("WS", "WS_orders_mqtab"); /* Create a back order queue in Western Region which allows a maximum of 5 retries and 1 day delay between each retry. */ q_prop = new AQQueueProperty(); q_prop.setMaxRetries(5); q_prop.setRetryInterval(60*24*24); backorders_q = aq_sess.createQueue(mq_table, "WS_backorders_que", q_prop); backorders_q.start(true, true); /* Create an exception queue for the back order queue for Western Region. */ q_prop2 = new AQQueueProperty(); q_prop2.setQueueType(AQQueueProperty.EXCEPTION_QUEUE); backorders_excp_q = aq_sess.createQueue(mq_table, "WS_backorders_excpt_que", q_prop2); } catch (Exception ex) { System.out.println("Exception " + ex); } } /* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que as the exception queue for the message: */ public static void enqueue_WS_unfilled_order(Connection db_conn, Order back_order) { AQSession aq_sess; AQQueue back_order_q; AQEnqueueOption enq_option; AQMessageProperty m_property; AQMessage message; AQObjectPayload obj_payload; byte[] enq_msg_id; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); back_order_q = aq_sess.getQueue("WS", "WS_backorders_que"); message = back_order_q.createMessage(); /* Set exception queue name for this message: */ m_property = message.getMessageProperty(); m_property.setExceptionQueue("WS.WS_backorders_excpt_que"); obj_payload = message.getObjectPayload(); obj_payload.setPayloadData(back_order); enq_option = new AQEnqueueOption(); /* Enqueue the message */ enq_msg_id = back_order_q.enqueue(enq_option, message); db_conn.commit(); } catch (Exception ex) { System.out.println("Exception: " + ex); } }
Messages can be routed to various recipients based on message properties or message content. Users define a rule-based subscription for a given queue to specify interest in receiving messages that meet particular conditions.
Rules are Boolean expressions that evaluate to TRUE
or FALSE
. Similar in syntax to the WHERE
clause of a SQL query, rules are expressed in terms of the attributes that represent message properties or message content. These subscriber rules are evaluated against incoming messages and those rules that match are used to determine message recipients. This feature thus supports the notions of content-based subscriptions and content-based routing of messages.
Subscription rules can also be defined on an attribute of type XMLType using XML operators such as ExistsNode
.
For the BooksOnLine application, we illustrate how rule-based subscriptions are used to implement a publish-subscribe paradigm utilizing content-based subscription and content-based routing of messages. The interaction between the Order Entry application and each of the Shipping Applications is modeled as follows:
XMLType
attribute to identify special handling.
Each shipping application subscribes to the OE booked orders queue. The following rule-based subscriptions are defined by the Order Entry user to handle the routing of booked orders from the Order Entry application to each of the Shipping applications.
CONNECT OE/OE;
Western Region Shipping defines an agent called 'West_Shipping
' with the WS
booked orders queue as the agent address (destination queue where messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on order region and ordertype
attributes.
/* Add a rule-based subscriber for West Shipping - West Shipping handles Western region U.S. orders, Rush Western region orders are handled by East Shipping: */ DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('West_Shipping', 'WS.WS_bookedorders_que', null); dbms_aqadm.add_subscriber( queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''WESTERN'' AND tab.user_data.ordertype != ''RUSH'''); END; /
Eastern Region Shipping defines an agent called East_Shipping
with the ES
booked orders queue as the agent address (the destination queue where messages must be delivered). This agent subscribes to the OE
booked orders queue using a rule specified on orderregion
, ordertype
and customer attributes.
/* Add a rule-based subscriber for East Shipping - East shipping handles all Eastern region orders, East shipping also handles all U.S. rush orders: */ DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); dbms_aqadm.add_subscriber( queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''EASTERN'' OR (tab.user_data.ordertype = ''RUSH'' AND tab.user_data.customer.country = ''USA'') '); END;
Overseas Shipping defines an agent called Overseas_Shipping
with the OS booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on the orderregion
attribute. Since the representation of orders at the Overseas Shipping site is different from the representation of orders at the Order Entry site, a transformation is applied before messages are propagated from the Order Entry site to the Overseas Shipping site.
/* Add a rule-based subscriber (for Overseas Shipping) to the Booked orders queues with Transformation. Overseas Shipping handles all non-US orders: */ DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('Overseas_Shipping','OS.OS_bookedorders_que',null); dbms_aqadm.add_subscriber( queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''INTERNATIONAL''', transformation => 'OS.OE2XML'); END;
See "Message Format Transformation" for more details on defining transformations.
Assume that the Overseas Shipping site has a subscriber, Overseas_DHL
, for handling RUSH
orders. Since OS_bookedorders_que
has the order details represented as an XMLType, the rule uses XPath syntax.
DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('Overseas_DHL', null, null); dbms_aqadm.add_subscriber( queue_name => 'OS.OS_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.xdata.extract(''/ORDER_TYP/ORDERTYPE/
text()'').getStringVal()=''RUSH'''); END;
This functionality is currently not available.
public static void addRuleBasedSubscribers(Connection db_conn) { AQSession aq_sess; AQQueue bookedorders_q; String rule; AQAgent agt1, agt2, agt3; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); bookedorders_q = aq_sess.getQueue("OE", "OE_booked_orders_que"); /* Add a rule-based subscriber for West Shipping - West Shipping handles Western region U.S. orders, Rush Western region orders are handled by East Shipping: */ agt1 = new AQAgent("West_Shipping", "WS.WS_bookedorders_que"); rule = "tab.user_data.orderregion = 'WESTERN' AND " + "tab.user_data.ordertype != 'RUSH'"; bookedorders_q.addSubscriber(agt1, rule); /* Add a rule-based subscriber for East Shipping - East shipping handles all Eastern region orders, East shipping also handles all U.S. rush orders: */ agt2 = new AQAgent("East_Shipping", "ES.ES_bookedorders_que"); rule = "tab.user_data.orderregion = 'EASTERN' OR " + "(tab.user_data.ordertype = 'RUSH' AND " + "tab.user_data.customer.country = 'USA')"; bookedorders_q.addSubscriber(agt2, rule); /* Add a rule-based subscriber for Overseas Shipping Intl Shipping handles all non-U.S. orders: */ agt3 = new AQAgent("Overseas_Shipping", "OS.OS_bookedorders_que"); rule = "tab.user_data.orderregion = 'INTERNATIONAL'"; bookedorders_q.addSubscriber(agt3, rule); } catch (Exception ex) { System.out.println("Exception: " + ex); } }
Advanced Queuing can monitor multiple queues for messages with a single call, LISTEN
. An application can use LISTEN
to wait for messages for multiple subscriptions. It can also be used by gateway applications to monitor multiple queues. If the LISTEN
call returns successfully, a dequeue must be used to retrieve the message (see Listening to One (Many) Queue(s) in Chapter 11, "Operational Interface: Basic Operations").
Without the LISTEN
call, an application which sought to dequeue from a set of queues would have to continuously poll the queues to determine if there were a message. Alternatively, you could design your application to have a separate dequeue process for each queue. However, if there are long periods with no traffic in any of the queues, these approaches will create unacceptable overhead. The LISTEN
call is well suited for such applications.
Note that when there are messages for multiple agents in the agent list, LISTEN
returns with the first agent for whom there is a message. In that sense LISTEN
is not 'fair' in monitoring the queues. The application designer must keep this in mind when using the call. To prevent one agent from 'starving' other agents for messages, the application can change the order of the agents in the agent list.
In the customer service component of the BooksOnLine
example, messages from different databases arrive in the customer service queues, indicating the state of the message. The customer service application monitors the queues and whenever there is a message about a customer order, it updates the order status in the order_status_table
. The application uses the listen
call to monitor the different queues. Whenever there is a message in any of the queues, it dequeues the message and updates the order status accordingly.
CODE (in tkaqdocd.sql) /* Update the status of the order in the order status table: */ CREATE OR REPLACE PROCEDURE update_status( new_status IN VARCHAR2, order_msg IN BOLADM.ORDER_TYP) IS old_status VARCHAR2(30); dummy NUMBER; BEGIN BEGIN /* Query old status from the table: */ SELECT st.status INTO old_status FROM order_status_table st WHERE st.customer_order.orderno = order_msg.orderno; /* Status can be 'BOOKED_ORDER', 'SHIPPED_ORDER', 'BACK_ORDER' and 'BILLED_ORDER': */ IF new_status = 'SHIPPED_ORDER' THEN IF old_status = 'BILLED_ORDER' THEN return; /* message about a previous state */ END IF; ELSIF new_status = 'BACK_ORDER' THEN IF old_status = 'SHIPPED_ORDER' OR old_status = 'BILLED_ORDER' THEN return; /* message about a previous state */ END IF; END IF; /* Update the order status: */ UPDATE order_status_table st SET st.customer_order = order_msg, st.status = new_status; COMMIT; EXCEPTION WHEN OTHERS THEN /* change to no data found */ /* First update for the order: */ INSERT INTO order_status_table(customer_order, status) VALUES (order_msg, new_status); COMMIT; END; END; / /* Dequeues message from 'QUEUE' for 'CONSUMER': */ CREATE OR REPLACE PROCEDURE DEQUEUE_MESSAGE( queue IN VARCHAR2, consumer IN VARCHAR2, message OUT BOLADM.order_typ) IS dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_msgid RAW(16); BEGIN dopt.dequeue_mode := dbms_aq.REMOVE; dopt.navigation := dbms_aq.FIRST_MESSAGE; dopt.consumer_name := consumer; dbms_aq.dequeue( queue_name => queue, dequeue_options => dopt, message_properties => mprop, payload => message, msgid => deq_msgid); commit; END; / /* Monitor the queues in the customer service databse for 'time' seconds: */ CREATE OR REPLACE PROCEDURE MONITOR_STATUS_QUEUE(time IN NUMBER) IS agent_w_message aq$_agent; agent_list dbms_aq.agent_list_t; wait_time INTEGER := 120; no_message EXCEPTION; pragma EXCEPTION_INIT(no_message, -25254); order_msg boladm.order_typ; new_status VARCHAR2(30); monitor BOOLEAN := TRUE; begin_time NUMBER; end_time NUMBER; BEGIN begin_time := dbms_utility.get_time; WHILE (monitor) LOOP BEGIN /* Construct the waiters list: */ agent_list(1) := aq$_agent('BILLED_ORDER', 'CS_billedorders_que', NULL); agent_list(2) := aq$_agent('SHIPPED_ORDER', 'CS_shippedorders_que', NULL); agent_list(3) := aq$_agent('BACK_ORDER', 'CS_backorders_que', NULL); agent_list(4) := aq$_agent('Booked_ORDER', 'CS_bookedorders_que', NULL); /* Wait for order status messages: */ dbms_aq.listen(agent_list, wait_time, agent_w_message); dbms_output.put_line('Agent' || agent_w_message.name || ' Address '|| agent_w_message.address); /* Dequeue the message from the queue: */ dequeue_message(agent_w_message.address, agent_w_message.name, order_msg); /* Update the status of the order depending on the type of the message, * the name of the agent contains the new state: */ update_status(agent_w_message.name, order_msg); /* Exit if we have been working long enough: */ end_time := dbms_utility.get_time; IF (end_time - begin_time > time) THEN EXIT; END IF; EXCEPTION WHEN no_message THEN dbms_output.put_line('No messages in the past 2 minutes'); end_time := dbms_utility.get_time; /* Exit if we have done enough work: */ IF (end_time - begin_time > time) THEN EXIT; END IF; END; END LOOP; END; /
Feature not currently available.
public static void monitor_status_queue(Connection db_conn) { AQSession aq_sess; AQAgent[] agt_list = null; AQAgent ret_agt = null; Order deq_order; AQDequeueOption deq_option; AQQueue orders_q; AQMessage message; AQObjectPayload obj_payload; String owner = null; String queue_name = null; int idx = 0; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); /* Construct the waiters list: */ agt_list = new AQAgent[4]; agt_list[0] = new AQAgent("BILLED_ORDER", "CS_billedorders_que", 0); agt_list[1] = new AQAgent("SHIPPED_ORDER", "CS_shippedorders_que", 0); agt_list[2] = new AQAgent("BACK_ORDER", "CS_backorders_que", 0); agt_list[3] = new AQAgent("BOOKED_ORDER", "CS_bookedorders_que", 0); /* Wait for order status messages for 120 seconds: */ ret_agt = aq_sess.listen(agt_list, 120); System.out.println("Message available for agent: " + ret_agt.getName() + " " + ret_agt.getAddress()); /* Get owner, queue where message is available */ idx = ret_agt.getAddress().indexOf("."); if(idx != -1) { owner = ret_agt.getAddress().substring(0, idx); queue_name = ret_agt.getAddress().substring(idx + 1); /* Dequeue the message */ deq_option = new AQDequeueOption(); deq_option.setConsumerName(ret_agt.getName()); deq_option.setWaitTime(1); orders_q = aq_sess.getQueue(owner, queue_name); /* Dequeue the message */ message = orders_q.dequeue(deq_option, Order.getFactory()); obj_payload = message.getObjectPayload(); deq_order = (Order)(obj_payload.getPayloadData()); System.out.println("Order number " + deq_order.getOrderno() + " retrieved"); } catch (AQException aqex) { System.out.println("Exception-1: " + aqex); } catch (Exception ex) { System.out.println("Exception-2: " + ex); } }
Continuing the scenario introduced in "Message Format Transformation" and "Message Transformation During Enqueue", the queues in the OE schema are of payload type OE.orders_typ
and the queues in the WS schema are of payload type WS.orders_typ_sh
.
At dequeue time, an application can move messages from OE_booked_orders_topic
to the WS_booked_orders_topi
c by using a selection criteria on dequeue to dequeue only orders with order_region
"WESTERN"
and order_type
not equal to "RUSH."
At the same time, the transformation is applied and the order in the ws.order_typ_sh
type is retrieved. Then the message is enqueued into the WS.ws_booked_orders
queue.
CREATE OR REPLACE PROCEDURE fwd_message_to_ws_shipping AS enq_opt dbms_aq.enqueue_options_t; deq_opt dbms_aq.dequeue_options_t; msg_prp dbms_aq.message_properties_t; booked_order WS.order_typ_sh; BEGIN /* First dequeue the message from OE booked orders topic */ deq_opt.transformation := 'OE.OE2WS'; deq_opt.condition := 'tab.user_data.order_region = ''WESTERN'' and tab.user_ data.order_type != ''RUSH'''; dbms_aq.dequeue('OE.oe_bookedorders_topic', deq_opt, msg_prp, booked_order); /* enqueue the message in the WS booked orders topic */ msg_prp.recipient_list(0) := aq$_agent('West_shipping', null, null); dbms_aq.enqueue('WS.ws_bookedorders_topic', enq_opt, msg_prp, booked_order); END;
No example is provided with this release.
No example is provided with this release.
You can perform dequeue requests over the Internet using IDAP. See Chapter 17, "Internet Access to Advanced Queuing" for more information on receiving AQ messages using IDAP.
In the BooksOnline scenario, assume that the East shipping application receives AQ messages with a correlation identifier 'RUSH' over the Internet. The dequeue request will have the following format:
<?xml version="1.0"?> <Envelope xmlns= "http://ns.oracle.com/AQ/schemas/envelope"> <Body> <AQXmlReceive xmlns = "http://ns.oracle.com/AQ/schemas/access"> <consumer_options> <destination>ES_ES_bookedorders_que</destination> <consumer_name>East_Shipping</consumer_name> <wait_time>0</wait_time> <selector> <correlation>RUSH</correlation> </selector> </consumer_options> <AQXmlCommit/> </AQXmlReceive> </Body> </Envelope>
|
![]() Copyright © 1996-2001, Oracle Corporation. All Rights Reserved. |
|