Oracle8 Application Developer's Guide
Release 8.0

A58241-01

Library

Product

Contents

Index

Prev Next

11
Advanced Queuing

This chapter has four sections:

Introduction to Oracle Advanced Queuing

Introduction Overview

This introductory section:

Complex Systems

Consider the following application scenarios.

Application Scenario 1: :

A brokerage firm, Makers & Breakers, advertises to the public that its new service will let clients stipulate time as well as price as a parameter i.e. a request to buy or sell is not executed unless it takes place within a specific time period (e.g., within 15 minutes). The campaign is extremely successful and a mutual fund house, America's Standard Guarantee, takes advantage of the technology to offer its clients an opportunity to buy and sell units during course of the day rather than at the close of trading. However, M&B are informed by the Securities and Exchange Commission that they have received two complaints:

 
  • That in executing the buy/sell orders M&B are giving an unfair advantage to large customers, such as the mutual fund house.
 
  • That in managing the time parameter M&B are taking advantage of their customers e.g., in a falling market selling earlier in the time period than they inform their clients, and then pocketing the difference.
 

The problem facing Makers & Breakers is not only to answer these unfounded charges but to do so quickly and in such a way as to leave no doubt in the minds of the public regarding the fairness of their practices.


 

Application Scenario 2:

A large state university(35,000 students) decides to automate its class enrollment process. Students will be able to register for classes using web templates from home or at terminals on both the main and satellite campuses for any of the more than a thousand classes offered by Big U. The administration announces that the following parameters will apply:

 
  • Priority: registration is on a `first come' basis except that

- seniors receive priority for upper level courses, followed by juniors, sophomores and first-year students (frosh);

- frosh receive priority for entry level courses, followed by a senior who needs the course to graduate.

 
  • Registration phases: the above priority criteria hold only for specific defined time phases e.g., in the second phase, seniors and juniors are treated as being on an equal `first come' basis with regard to upper-level classes, but continue to receive priority over juniors and frosh.
 
  • Full-time undergraduates must register for a minimum of three classes and may register for a maximum of four classes without special permission. Students may register for as many as ten classes (ranking their preferences 1-10) in case they are not admitted to their preferred classes, but only the first three choices are regarded as `live'. In the event that a class becomes full, the student's next choice becomes `live'. However, should a full class develop a vacancy, the student with the highest priority will be admitted, at that time being removed from the roll of a class of her/his lower preference.
 

Application Scenario 3:

A power utility, Most Power, develops a sophisticated model in order to decide how to deploy its resources. The way the system works is that the utility gets ongoing reports from

  • numerous weather centers regarding current conditions, and
  • power stations regarding ongoing utilization.

It then compares this information to historical data in order to predict demand for power in specific geographic areas for given time periods. A crucial part of this modeling has to do with noting the rapidity and degree of change in the incoming reports as weather changes and power is deployed.

During a prolonged blizzard, matters are complicated by failure of a power station which also forwards weather data. The question facing Most Power is whether to purchase power from a neighboring utility, since there is a lead time of five days for making such arrangements.


 

Queuing and the Intricacy of Message Passing

Although not every application developer will have worked with each of these types of scenario, the basic elements of these problem domains will be familiar. Each of these scenarios describes a situation in which messages come from and are disbursed to multiple clients (nodes) in a distributed computing environment. Messages are not only passed back and forth between client and server but also are intricately interleaved between processes on the server.

If we focus on these scenarios in terms of messages, the applications can be viewed as consisting of multi-step processes in which each step is triggered by one or more messages, and gives rises to one or more messages. Another way of saying this is that messages are events that trigger other message-events.

Business Process Management, or Workflow, which is based on this notion of the interrelation of messages and events, is becoming recognized as a fundamental technology. Queuing is one of the key technologies for this class of application because it implements deferred execution of messages. This decoupling of `requests for service' from `supply of services' increases efficiency, and provides the infrastructure for complex scheduling.

Securing Messages in a Vulnerable Environment

Handling the intricacy message-passing is not the only problem. Unfortunately, networks, computing hardware, and software applications will all fail from time to time, as is the case in power utility scenario. Nevertheless, the ACID properties of the information must be preserved. Chaos would quickly follow if buy orders and the order in which they issued were `lost', or if the changing status of students could not be matched to class availability, or if power could not routed to where the combination of incoming reports an historical patterns of changes in demand indicated it would be most required. In other words, messaging must be persistent. By integrating transaction processing with queuing technology, persistent messaging in the form of queuing is made possible. The importance of queuing has been proven by TP-monitors that typically include such a facility.

Message Persistence as Extension in Time and Space

The persistence of messages that is required goes beyond the ability to recover information in the event of system failure. Applications may have to deal with multiple unprocessed messages arriving simultaneously from external clients or from programs internal to the application. The communication links between databases may not be available all the time or may be reserved for some other purpose. If the system falls short in its capacity to deal with these messages immediately, the application must be able to store the messages until they can be processed. By the same token, external clients or internal programs may not be ready to receive messages that have been processed.

Even more important, applications must be able to deal with priorities: messages arriving later may be of higher priority than messages arriving earlier; messages arriving earlier may have to wait for messages arriving later before actions are executed; the same message may have to be accessed by different processes; and so on. All these cases become more pressing in situations in which messages are communicated between remote locations.

Moreover, priorities are not fixed. One crucial dimension of handling the dynamic aspect of message persistence has to do with windows of opportunity that grow and shrink.It may be that messages in a specific queue become more important than messages in other queues, and so need to be processed with less delay or interference from messages in other queues. Similarly, it may be more pressing to send messages to some destinations than to others. In the case of the share brokerage application, the window for completing the sale shrinks to nothing (i.e. an offer to sell expires) from the time the offer to sell message is received. In the case of the student registration application, different priorities apply during different temporal phases, and data must be re-evaluated with the transition from one phase to another. And in the case of the power utility, the entire decision-making process depends on whether conditions are stable (the persistence of a large window) or dynamic (the rapid appearance and disappearance of windows).

Control Data as Essential Information

What is true for all the scenarios is the time that messages are received or dispatched is a crucial part of the message. This means that the control component of the message - in this case, time markers - is as important as the payload data. Put another way: the message retains importance as a business asset after it has been executed.

Persistent messaging thus implies accurate documentation of messages for analysis of historical patterns and future trends. For instance:

Specific Requirements of a Messaging System

What are the key requirements of a persistent messaging system given the above issues?

Possible Solutions: Synchronous versus Disconnected/Deferred Communication

Generally, attempts to provide communication between programs can be classified into one of two types: Synchronous and Disconnected/Deferred Communication.

Synchronous Communication

This model of communication, also called on-line or connected, is based on the request/reply paradigm. In this model a program sends a request to another program and waits (blocks) until the reply arrives. This model of communication, in which the sender and receiver of the message are tightly coupled, is suitable for programs that need to get a reply before they can proceed with any task. Traditional client/server architectures are based on this model.

The major drawback of the synchronous model of communication is that the programs must be available and running for the application to work. In the event of network or machine failure, or even if the program needed being busy, the entire application grinds to a halt.

Disconnected/Deferred Messaging

In this model programs in the role of producers place requests in a queue and then proceed with their work. Programs in the role of consumers retrieve requests from the queue and acts on them. This model is well suited for applications that can continue with their work after placing a request in the queue because they are not blocked waiting for a reply. It is also suited to applications that can continue with their work until there is a message to retrieve.

For deferred execution to work correctly even in the presence of network, machine and application failures, the requests must be stored persistently, and processed exactly once. This can be achieved by combining persistent queuing with transaction protection. Oracle8 provides a queuing technology that does not depend on the use of TP-monitors or any other evolving Message-Oriented Middleware (MOM) infrastructure.

Oracle Advanced Queuing - Features

Oracle AQ (Oracle Advanced Queueing) provides message queuing as an integrated part of the Oracle server. Oracle AQ provides this functionality by integrating the queuing system with the database, thereby creating a message-enabled database. By providing an integrated solution Oracle AQ frees application developers to devote their efforts to their specific business logic rather than having to construct a messaging infrastructure.

General Features

ENQUEUE Features

DEQUEUE Features

Propagation Features

Oracle Advanced Queuing - Primary Components

Queuing Entities

Message

A message is the smallest unit of information inserted into and retrieved from a queue. A message consists of control information (metadata) and payload (data). The control information represents message properties used by AQ to manage messages. The payload data is the information stored in the queue and is transparent to Oracle AQ. A message can reside in only one queue. A message is created by the enqueue call and consumed by the dequeue call.

Queue

A queue is a repository for messages. There are two types of queues: user queues, also known as normal queues, and exception queues. The user queue is for normal message processing. Messages are transferred to an exception queue if they can not be retrieved and processed for some reason. Queues can be created, altered, started, stopped, and dropped by using the Oracle AQ administrative interfaces.

Queue Table

Queues are stored in queue tables. Each queue table is a database table and contains one or more queues. Each queue table contains a default exception queue.

The following figure shows the relationship between messages, queues, and queue tables. The columns represent message queues, with rows representing individual messages.

Agents

An agent is a queue user. There are two types of agents: producers who place messages in a queue (enqueuing) and consumers who retrieve messages (dequeuing). Any number of producers and consumers may be accessing the queue at a given time. Agents insert messages into a queue and retrieve messages from the queue by using the Oracle AQ operational interfaces

An agent is identified by its name, address and protocol. The address field is a character field of up to 1024 bytes that is interpreted in the context of the protocol. For instance, the default value for the protocol is 0, signifying a database link addressing. In this case, the address for this protocol is of the form

queue_name@dblink

where queue_name is of the form [schema.]queue and dblink may either be a fully qualified database link name or the database link name without the domain name. The only supported protocol value is 0 at this time.

Queue Monitor

The queue monitor is a background process that monitors the messages in the queue. It provides the mechanism for message expiration, retry and delay.

Figure 11-1 Modeling Queue Entities

Modeling Queue Entities

Figure 11-1 (above) portrays a queue table that contains two queues, and one exception queue:

Basic Queuing

Basic Queuing - One Producer, One Consumer

At its most basic, one producer may enqueues different messages into one queue. Each message will be dequeued and processed once by one of the consumers. A message will stay in the queue until a consumer dequeues it or the message expires. A producer may stipulate a delay before the message is available to be consumed, and a time after which the message expires. Likewise, a consumer may wait when trying to dequeue a message if no message is available. Note that an agent program, or application, can act as both a producer and a consumer.

Basic Queueing - Many Producers, One Consumer

At a slightly higher level of complexity, many producers may enqueue messages into a queue, all of which are processed by one consumer.

Basic Queueing - Many Producers, Many Consumers of Discrete Messages

In this next stage, many producers may enqueue messages, each message being processed by a different consumer depending on type and correlation identifier. The figure below shows this scenario.

Illustrating Basic Queuing

Figure 11-2 (below) portrays a queue table that contains one queue into which messages are being enqueued and from which messages are being dequeued.

Producers

The figure indicates that there are 6 producers of messages, although only four are shown. This assumes that two other producers (P4 and P5) have the right to enqueue messages even though there are no messages enqueued by them at the moment portrayed by the figure. The figure shows:

Consumers

According to the figure, there are 3 consumers of messages, representing the total population of consumers. The figure shows:

Illustrating Client-Server Communication Using AQ

The previous figure portrayed the enqueuing of multiple messages by a set of producers, and the dequeuing of messages by a set of consumers. What may not be readily evident in that sketch is the notion of time, and the advantages offered by Oracle AQ.

Client-Server applications normally execute in a synchronous manner, with all the disadvantages of that tight coupling described above. Figure 11-3 demonstrates the asynchronous alternative using AQ. In this example Application B (a server) provides service to Application A (a client) using a request/response queue.

Figure 11-3 Client-Server Communication Using AQ

  1. Application A enqueues a request into the request queue.
  2. Application B dequeues the request.
  3. Application B processes the request.
  4. Application B enqueues the result in the response queue.
  5. Application A dequeues the result from the response queue.

In this way the client does not have to wait to establish a connection with the server, and the server dequeues the message at its own pace. When the server is finished processing the message, there is no need for the client to be waiting to receive the result. In this way a process of double-deferral frees both client and server.


Note:

The various enqueue and dequeue operations are part of different transactions.

 

Multiple-Consumer Dequeuing of the Same Message

A message can only be enqueued into one queue at a time. If a producer had to insert the same message into several queues in order to reach different consumers, this would require management of a very large number of queues. Oracle AQ provides two mechanisms to allow for multiple consumers to dequeue the same message: queue subscribers and message recipients. The queue must reside in a queue table that is created with multiple consumer option to allow for subscriber and recipient lists. Each message remains in the queue until it is consumed by all its intended consumers.

Queue Subscribers

Using this approach, multiple consumer-subscribers are associated with a queue. This will cause all messages enqueued in the queue to be made available to be consumed by each of the queue subscribers. The subscribers to the queue can be changed dynamically without any change to the messages or message producers. Subscribers to the queue are added and removed by using the Oracle AQ administrative package. The diagram below shows multiple producers enqueuing messages into queue, each of which is consumed by multiple consumer-subscribers.

Message Recipients

A message producer can submit a list of recipients at the time a message is enqueued. This allows for a unique set of recipients for each message in the queue. The recipient list associated with the message overrides the subscriber list associated with the queue, if there is one. The recipients need not be in the subscriber list. However, recipients may be selected from among the subscribers.

Figure 11-4 Multiple-Consumer Dequeuing of the Same Message

Illustrating Multiple-Consumer Dequeuing of the Same Message

Figure 11-4 describes the case in which three consumers are all listed as subscribers of a queue. This is the same as saying that they all subscribe to all the messages that might ever be enqueued into that queue. The drawing illustrates a number of important points:

Figure 11-5 illustrates the same technology from a dynamic perspective. This examples concerns a scenario in which more than one application needs the result produced by an application. Every message enqueued by Application A is dequeued by Application B and Application C. To make this possible, the multiple consumer queue is specially configured with Application B and Application C as queue subscribers. Consequently, they are implicit recipients of every message placed in the queue.


Note:

Queue subscribers can be applications or other queues.

 

Figure 11-6 Dequeuing of Specified Messages by Specified Recipients

Illustrating Dequeuing of Specified Messages by Specified Recipients

Figure 11-6 shows how a message can be specified for one or more recipients. In this case, Message 5 is specified to be dequeued by Recipient-1 and Recipient-2. As described by the drawing, neither of the recipients is one of the 3 subscribers to the queue.

Figure 11-7 Explicit and Implicit Recipients of Messages

We earlier referred to subscribers as "implicit recipients" in that they are able to dequeue all the messages placed into a specific queue. This is like subscribing to a magazine and thereby implicitly gaining access to all its articles. The category of consumers that we have referred to as recipients may also be viewed as "explicit recipients" in that they are designated targets of particular messages.

Figure 11-7 shows how Oracle AQ can adjust dynamically to accommodate both kinds of consumers. In this scenario Application B and Application C are implicit recipients (subscribers). But messages can also be explicitly directed toward specific consumers (recipients) who may or may not be subscribers to the queue. The list of such recipients is specified in the enqueue call for that message and overrides the list of subscribers for that queue. In the figure, Application D is specified as the sole recipient of a message enqueued by Application A.


Note:

Multiple producers may simultaneously enqueue messages aimed at different targeted recipients.

 

Illustrating the Implementation of Workflows using AQ

Figure 11-8 illustrates the use of AQ for implementing workflows, also knows as chained application transactions. It shows a workflow consisting of 4 steps performed by Applications A, B, C and D. The queues are used to buffer the flow of information between different processing stages of the business process. By specifying delay interval and expiration time for a message, a window of execution can be provided for each of the applications.

Figure 11-8 Implementing Workflows using AQ

From a workflow perspective, the passing of messages is a business asset above and beyond the value of the payload data. Hence, AQ supports the optional retention of messages for analysis of historical patterns and prediction of future trends. For instance, two of the three application scenarios at the head of the chapter are founded in an implementation of workflow analysis.


Note:

The contents of the messages 1, 2 and 3 can be the same or different. Even when they are different, messages may contain parts of the of the contents of previous messages.

 

Message Propagation

Fanning-out of messages

In AQ, message recipients can be either consumers or other queues. If the message recipient is a queue, the actual recipients are determined by the subscribers to the queue (which may in turn be other queues). Thus it is possible to fan-out messages to a large number of recipients without requiring them all to dequeue messages from a single queue.

For example: A queue, Source, may have as its as its subscribers queues dispatch1@dest1 and dispatch2@dest2. Queue dispatch1@dest1 may in turn have as its subscribers the queues outerreach1@dest3 and outerreach2@dest4, while queue dispatch2@dest2 has as subscribers the queue outerreach3@dest21 and outerreach4@dest4. In this way, messages enqueued in Source will be propagated to all the subscribers of four different queues.

Funneling-in of messages

Another use of queues as a message recipient is the ability to combine messages from different queues into a single queue. This process is sometimes described as "compositing"

For example, if queue composite@endpoint is a subscriber to both queues funnel1@source1 and funnel2@source2 then the subscribers to queue composite@endpoint can get all messages enqueued in those queues as well as messages enqueued directly into itself.

Figure 11-9 IMessage Propagation

llustration of Message Propagation

Figure 11-9 illustrates applications on different databases communicating via AQ. Each application has an inbox and an outbox for handling incoming and outgoing messages. An application enqueues a message into its outbox irrespective of whether the message has to be sent to an application that is local (on the same node) or remote (on a different node).

Likewise, an application is not concerned as to whether a message originates locally or remotely. In all cases, an application dequeues messages from its inbox.

Oracle AQ facilitates all this interchange, treating messages on the same basis.

Oracle Advanced Queuing by Example

Overview Summary

Oracle AQ by Example guides users by means of a step-by-step approach.

Assign Roles and Privileges

/* Create user and grant privileges: */
CONNECT sys/change_on_install as sysdba; 
CREATE user aq identified by AQ;
GRANT AQ_ADMINISTRATOR_ROLE TO aq;
GRANT CONNECT TO aq;
GRANT RESOURCE TO aq;
EXECUTE dbms_aqadm.grant_type_access(`aq');
CONNECT aq/AQ;

SET ECHO ON;
SET SERVEROUTPUT ON;

Create Queue Tables and Queues

Create a queue table and queue of object type

/* Create a message type: */
CREATE type aq.message_type as object (
subject     VARCHAR2(30),
text        VARCHAR2(80));   

/* Create a object type queue table and queue: */
EXECUTE dbms_aqadm.create_queue_table (
queue_table        => 'aq.msg',
queue_payload_type => 'aq.message_type');

EXECUTE dbms_aqadm.create_queue (
queue_name         => 'msg_queue',
queue_table        => 'aq.msg');

EXECUTE dbms_aqadm.start_queue (
queue_name         => 'msg_queue');

Create a queue table and queue of raw type

/* Create a RAW type queue table and queue: */
EXECUTE dbms_aqadm.create_queue_table ( 
queue_table          => 'aq.raw_msg', 
queue_payload_type   => 'RAW'); 
  
EXECUTE dbms_aqadm.create_queue ( 
queue_name          => 'raw_msg_queue', 
queue_table         => 'aq.raw_msg'); 
  
EXECUTE dbms_aqadm.start_queue ( 
queue_name          => 'raw_msg_queue'); 

Create a prioritized message queue table and queue


EXECUTE dbms_aqadm.create_queue_table (
queue_table        => 'aq.priority_msg', 
sort_list          => 'PRIORITY,ENQ_TIME', 
queue_payload_type => 'aq.message_type');

EXECUTE dbms_aqadm.create_queue (
queue_name         => 'priority_msg_queue', 
queue_table        => 'aq.priority_msg');

EXECUTE dbms_aqadm.start_queue ( 
queue_name         => 'priority_msg_queue');

Create a multiple consumer queue table and queue

EXECUTE dbms_aqadm.create_queue_table (
queue_table        => 'aq.msg_multiple',
multiple_consumers => TRUE, 
queue_payload_type => 'aq.message_type');

EXECUTE dbms_aqadm.create_queue (
queue_name         => 'msg_queue_multiple',
queue_table        => 'aq.msg_multiple');

EXECUTE dbms_aqadm.start_queue (
queue_name         => 'msg_queue_multiple');

Create a queue to demonstrate propagation

EXECUTE dbms_aqadm.create_queue (
queue_name        => 'another_msg_queue',
queue_table       => 'aq.msg_multiple');

EXECUTE dbms_aqadm.start_queue (
queue_name         => 'another_msg_queue');

Enqueue and Dequeue of Object Type Messages

To enqueue a single message without any other parameters specify the queue name and the payload.

/* Enqueue to msg_queue: */
DECLARE
enqueue_options     dbms_aq.enqueue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

BEGIN
message := message_type('NORMAL MESSAGE',
'enqueued to msg_queue first.');

dbms_aq.enqueue(queue_name => 'msg_queue',           
enqueue_options      => enqueue_options,       
message_properties   => message_properties,     
payload              => message,               
msgid                => message_handle);

COMMIT;
END;
/

/* Dequeue from msg_queue: */
DECLARE
dequeue_options     dbms_aq.dequeue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

BEGIN
dbms_aq.dequeue(queue_name => 'msg_queue',
              dequeue_options    => dequeue_options,
              message_properties => message_properties,
              payload            => message,
              msgid              => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
                                      ' ... ' || message.text );
COMMIT;
END;
/

Enqueue and Dequeue of Object Type Messages Using Pro*C/C++


#include  <stdio.h>
#include  <string.h>
#include  <sqlca.h>
#include  <sql2oci.h>
/* The header file generated by processing 
object type 'aq.message_type': */
#include  "pceg.h"

void sql_error(msg)
char *msg;
{
EXEC SQL WHENEVER SQLERROR CONTINUE;
printf("%s\n", msg);
printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
exit(1);
}

main()
{
message_type     *message = (message_type*)0;  /* payload */
char             user[60]="aq/AQ";  /* user logon password */
char             subject[30];  /* components of the */ 
char             txt[80];  /* payload type */
                                       
/* ENQUEUE and DEQUEUE to an OBJECT QUEUE */    
       
/*  Connect to database: */
EXEC SQL CONNECT :user;  
 
/* On an oracle error print the error number :*/
EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");

/* Allocate memory for the host variable from the object cache : */
EXEC SQL ALLOCATE :message;

/* ENQUEUE */

strcpy(subject, "NORMAL ENQUEUE");
strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC");

/* Initialize the components of message : */
EXEC SQL OBJECT SET SUBJECT, TEXT OF  :message TO :subject, :txt;

/* Embedded PLSQL call to the AQ enqueue procedure : */
EXEC SQL EXECUTE
DECLARE
message_properties   dbms_aq.message_properties_t;
enqueue_options      dbms_aq.enqueue_options_t; 
msgid                RAW(16);
BEGIN
 /* Bind the host variable 'message' to the payload: */
dbms_aq.enqueue(queue_name => 'msg_queue',
message_properties => message_properties,
enqueue_options => enqueue_options,
payload => :message,  
msgid => msgid);
END;
END-EXEC;
/* Commit work */
EXEC SQL COMMIT;                            
            
printf("Enqueued Message \n");
printf("Subject  :%s\n",subject);
printf("Text     :%s\n",txt);

/* Dequeue */

/* Embedded PLSQL call to the AQ dequeue procedure : */
EXEC SQL EXECUTE
DECLARE
message_properties  dbms_aq.message_properties_t;
dequeue_options     dbms_aq.dequeue_options_t; 
msgid               RAW(16);
BEGIN  
/* Return the payload into the host variable 'message':  */
dbms_aq.dequeue(queue_name => 'msg_queue',
message_properties => message_properties,
dequeue_options => dequeue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
 /* Commit work :*/
EXEC SQL COMMIT; 
                                       
/* Extract the components of message: */
EXEC SQL OBJECT GET SUBJECT,TEXT FROM :message INTO :subject,:txt;

printf("Dequeued Message \n");
printf("Subject  :%s\n",subject);
printf("Text     :%s\n",txt);
}

Enqueue and Dequeue of Object Type Messages Using OCI

#ifndef SL_ORACLE
#include <sl.h>
#endif

#ifndef OCI_ORACLE
#include <oci.h>
#endif

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main()
{
  OCIEnv 	*envhp;
  OCIServer 	*srvhp;
  OCIError 	*errhp;
  OCISvcCtx 	*svchp;
  dvoid	     	*tmp;
  OCIType 	*mesg_tdo = (OCIType *) 0;
  message  	msg;
  null_message 	nmsg;
  message 	*mesg = &msg;
  null_message 	*nmesg = &nmsg;
  message 	*deqmesg = (message *)0;
  null_message 	*ndeqmesg = (null_message *)0;
  
  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
	(dvoid * (*)()) 0,  (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
	   52, (dvoid **) &tmp);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
	   52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
	   52, (dvoid **) &tmp);
  
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
	   52, (dvoid **) &tmp);
  
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
	   (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
 
  OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"),
	       (CONST text *)"MESSAGE_TYPE", strlen("MESSAGE_TYPE"), 
	       (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp,
	    (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), 
	    &mesg->subject);
  OCIStringAssignText(envhp, errhp,
	    (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), 
	    &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;  

  /* enqueue into the msg_queue */
  OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0,
	       mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0,
	       mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);

}

Enqueue and Dequeue of RAW Type Messages

/* Enqueue a message containing a RAW: */ 
DECLARE 
      enqueue_options     dbms_aq.enqueue_options_t; 
      message_properties  dbms_aq.message_properties_t; 
      message_handle      RAW(16); 
      message             RAW(4096); 
  
BEGIN 
      message :=  hextoraw(rpad('FF',4095,'FF')); 
      dbms_aq.enqueue(queue_name => 'raw_msg_queue',            
              enqueue_options    => enqueue_options,        
              message_properties => message_properties,      
              payload  => message,                
              msgid              => message_handle); 

      COMMIT; 
END; 
/* Dequeue from raw_msg_queue: */ 
DECLARE 
      dequeue_options     dbms_aq.dequeue_options_t; 
      message_properties  dbms_aq.message_properties_t; 
      message_handle      RAW(16); 
      message             RAW(4096); 
  
BEGIN 
 dbms_aq.dequeue(queue_name => 'raw_msg_queue', 
dequeue_options    => dequeue_options, 
message_properties => message_properties, 
payload            => message, 
msgid              => message_handle); 
    
COMMIT; 
END;

Enqueue and Dequeue of RAW Type Messages using Pro*C/C++

#include  <stdio.h>
#include  <string.h>
#include  <sqlca.h>
#include  <sql2oci.h>

void sql_error(msg)
char *msg;
{
EXEC SQL WHENEVER SQLERROR CONTINUE;
printf("%s\n", msg);
printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
exit(1);
}

main()
{
OCIEnv         *oeh;   /* OCI Env handle */
OCIError       *err;   /* OCI Err handle */
OCIRaw         *message= (OCIRaw*)0;   /* payload */
ub1            message_txt[100];   /* data for payload */
char           user[60]="aq/AQ";   /* user logon password */
int            status;   /* returns status of the OCI call */

/* Enqueue and dequeue to a RAW queue */

/* Connect to database: */
EXEC SQL CONNECT :user;

/* On an oracle error print the error number: */
EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");
  
/* Get the OCI Env handle: */
if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS)
{
printf(" error in SQLEnvGet \n");
exit(1);
}
/* Get the OCI Error handle: */ 
if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err,
(ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0))
{
printf(" error in OCIHandleAlloc %d \n", status);
exit(1);
}

/* Enqueue */
/* The bytes to be put into the raw payload:*/
strcpy(message_txt, "Enqueue to a Raw payload queue ");

/* Assign bytes to the OCIRaw pointer :
Memory needs to be allocated explicitly to OCIRaw*: */
if (status=OCIRawAssignBytes(oeh, err, message_txt, 100,
 &message))
{
printf(" error in  OCIRawAssignBytes  %d \n", status);
exit(1);
}

/*  Embedded PLSQL call to the AQ enqueue procedure : */
EXEC SQL EXECUTE
DECLARE
message_properties   dbms_aq.message_properties_t;
enqueue_options      dbms_aq.enqueue_options_t; 
msgid                RAW(16);
BEGIN
/* Bind the host variable message to the raw payload: */ 
dbms_aq.enqueue(queue_name => 'raw_msg_queue',
message_properties => message_properties,
enqueue_options => enqueue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work: */ 
EXEC SQL COMMIT;                        
 
/* Dequeue */
/*  Embedded PLSQL call to the AQ dequeue procedure :*/  
EXEC SQL EXECUTE
DECLARE
message_properties  dbms_aq.message_properties_t;
dequeue_options     dbms_aq.dequeue_options_t; 
msgid               RAW(16);
BEGIN
/* Return the raw payload into the host variable 'message':*/
dbms_aq.dequeue(queue_name => 'raw_msg_queue',
message_properties => message_properties,
dequeue_options => dequeue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work: */  
EXEC SQL COMMIT;                             
}

Enqueue and Dequeue of RAW Type Messages using OCI

#ifndef SL_ORACLE
#include <sl.h>
#endif

#ifndef OCI_ORACLE
#include <oci.h>
#endif

int main()
{
  OCIEnv 	*envhp;
  OCIServer 	*srvhp;
  OCIError 	*errhp;
  OCISvcCtx 	*svchp;
  dvoid	     	*tmp;
  OCIType 	*mesg_tdo = (OCIType *) 0;
  char  	msg_text[100];
  OCIRaw  	*mesg = (OCIRaw *)0;
  OCIRaw	*deqmesg = (OCIRaw *)0;
  OCIInd   	ind = 0;
  dvoid	  	*indptr = (dvoid *)&ind;
  int		i;
  
  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
	(dvoid * (*)()) 0,  (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
	   52, (dvoid **) &tmp);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
	   52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
	   52, (dvoid **) &tmp);
  
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
	   52, (dvoid **) &tmp);
  
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
	   (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
 
  OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0);

  /* obtain the TDO of the RAW data type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"SYS", strlen("SYS"),
	       (CONST text *)"RAW", strlen("RAW"), 
	       (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  strcpy(msg_text, "Enqueue to a RAW queue");
  OCIRawAssignBytes(envhp, errhp, msg_text, strlen(msg_text), &mesg);

  /* enqueue the message into raw_msg_queue */
  OCIAQEnq(svchp, errhp, (CONST text *)"raw_msg_queue", 0, 0,
	       mesg_tdo, (dvoid **)&mesg, (dvoid **)&indptr, 0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue the same message into C variable deqmesg */
  OCIAQDeq(svchp, errhp, (CONST text *)"raw_msg_queue", 0, 0, 
	   mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&indptr, 0, 0);
  for (i = 0; i < OCIRawSize(envhp, deqmesg); i++)
    printf("%c", *(OCIRawPtr(envhp, deqmesg) + i));
  OCITransCommit(svchp, errhp, (ub4) 0);
}

Enqueue and Dequeue of Messages by Priority

When two messages are enqued with the same priority, the message which was enqued earlier will be dequeued first. However, if two messages are of different priorities, the message with the lower value (higher priority) will be dequeued first.


/* Enqueue two messages with priority 30 and 5: */
DECLARE
enqueue_options     dbms_aq.enqueue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
      message             aq.message_type;

BEGIN
message := message_type('PRIORITY MESSAGE',
enqued at priority 30.');

message_properties.priority := 30;

dbms_aq.enqueue(queue_name => 'priority_msg_queue',
enqueue_options    => enqueue_options,
message_properties => message_properties,
payload            => message,
msgid              => message_handle);

message := message_type('PRIORITY MESSAGE',
'Enqueued at priority 5.');

message_properties.priority := 5;

      dbms_aq.enqueue(queue_name => 'priority_msg_queue',
              enqueue_options    => enqueue_options,
              message_properties => message_properties,
              payload            => message,
              msgid              => message_handle);
END;
/

/* Dequeue from priority queue: */
DECLARE
dequeue_options     dbms_aq.dequeue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

BEGIN
dbms_aq.dequeue(queue_name  => 'priority_msg_queue',
dequeue_options       => dequeue_options,
message_properties    => message_properties,
payload               => message,
msgid                 => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
' ... ' || message.text );

COMMIT;

dbms_aq.dequeue(queue_name => 'priority_msg_queue',
dequeue_options      => dequeue_options,
message_properties   => message_properties,
payload              => message,
msgid                => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
' ... ' || message.text );
COMMIT;
END;
/
On return, the second message with priority set to 5 will be retrieved before 
the message with priority set to 30 since priority takes precedence over 
enqueue time.

Dequeue of Messages after Preview by Criterion

An application can preview messages in browse mode or locked mode without deleting the message. The message of interest can then be removed from the queue.


/* Enqueue 6 messages to msg_queue
- GREEN, GREEN, YELLOW, VIOLET, BLUE, RED */

DECLARE
enqueue_options     dbms_aq.enqueue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

BEGIN
message := message_type('GREEN',
'GREEN enqueued to msg_queue first.');

dbms_aq.enqueue(queue_name => 'msg_queue',           
enqueue_options      => enqueue_options,       
message_properties   => message_properties,     
payload              => message,               
msgid                => message_handle);

message := message_type('GREEN',
'GREEN also enqueued to msg_queue second.');

dbms_aq.enqueue(queue_name => 'msg_queue',           
enqueue_options      => enqueue_options,       
message_properties   => message_properties,     
payload              => message,               
msgid                => message_handle);

message := message_type('YELLOW',
'YELLOW enqueued to msg_queue third.');

dbms_aq.enqueue(queue_name => 'msg_queue',           
enqueue_options      => enqueue_options,       
message_properties   => message_properties,     
payload              => message,               
msgid                => message_handle);

dbms_output.put_line ('Message handle: ' || message_handle);

message := message_type('VIOLET',
'VIOLET enqueued to msg_queue fourth.');

dbms_aq.enqueue(queue_name => 'msg_queue',           
enqueue_options      => enqueue_options,       
message_properties   => message_properties,     
payload              => message,               
msgid                => message_handle);

message := message_type('BLUE',
'BLUE enqueued to msg_queue fifth.');

dbms_aq.enqueue(queue_name => 'msg_queue',           
enqueue_options      => enqueue_options,       
message_properties   => message_properties,     
payload              => message,               
msgid                => message_handle);

message := message_type('RED',
'RED enqueued to msg_queue sixth.');

dbms_aq.enqueue(queue_name => 'msg_queue',           
enqueue_options      => enqueue_options,       
message_properties   => message_properties,     
payload              => message,               
msgid                => message_handle);

      COMMIT;
END;
/

/* Dequeue in BROWSE mode until RED is found,
and remove RED from queue: */
DECLARE
dequeue_options     dbms_aq.dequeue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

BEGIN
dequeue_options.dequeue_mode := dbms_aq.BROWSE;

      LOOP
dbms_aq.dequeue(queue_name          => 'msg_queue',
                 dequeue_options    => dequeue_options,
                 message_properties => message_properties,
                 payload            => message,
                 msgid              => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
                                         ' ... ' || message.text );
         
EXIT WHEN message.subject = 'RED';

END LOOP;

dequeue_options.dequeue_mode := dbms_aq.REMOVE;
dequeue_options.msgid        := message_handle;

dbms_aq.dequeue(queue_name => 'msg_queue',
dequeue_options    => dequeue_options,
message_properties => message_properties,
payload            => message,
msgid              => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
' ... ' || message.text );

      COMMIT;
END;
/

/* Dequeue in LOCKED mode until BLUE is found,
and remove BLUE from queue: */
DECLARE
dequeue_options     dbms_aq.dequeue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

BEGIN
dequeue_options.dequeue_mode := dbms_aq.LOCKED;

      LOOP

dbms_aq.dequeue(queue_name => 'msg_queue',
                 dequeue_options    => dequeue_options,
                 message_properties => message_properties,
                 payload            => message,
                 msgid              => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
          ' ... ' || message.text );
         
EXIT WHEN message.subject = 'BLUE';
      END LOOP;

dequeue_options.dequeue_mode := dbms_aq.REMOVE;
dequeue_options.msgid        := message_handle;

dbms_aq.dequeue(queue_name => 'msg_queue',
dequeue_options      => dequeue_options,
message_properties   => message_properties,
payload              => message,
msgid => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
' ... ' || message.text );

      COMMIT;
END;
/

Enqueue and Dequeue of Messages with Time Delay and Expiration

An enqueue can specify the time before which a message cannot be retrieved by a dequeue call. To do this, the producer (i.e the agent enqueuing the message) can also specify the time when a message expires, at which time the message is can use the parameter "delay" when enqueuing the message. The producer can also specify the time when a message expires, at which time the message is moved to an exception queue.


Note:

Expiration is calculated from the earliest dequeue time. So, if an application wants a message to be dequeued no earlier than a week from now, but no later than 3 weeks from now, this requires setting the expiration time for 2 weeks. This scenario is described in the following code segment.

 

/* Enqueue message for delayed availability: */
DECLARE
enqueue_options     dbms_aq.enqueue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

BEGIN
message := message_type('DELAYED', 
'This message is delayed one week.');
message_properties.delay := 7*24*60*60;
message_properties.expiration := 2*7*24*60*60;

dbms_aq.enqueue(queue_name => 'msg_queue',
enqueue_options      => enqueue_options,
message_properties   => message_properties,
payload            => message,
msgid              => message_handle);

      COMMIT;
END;

Enqueue and Dequeue of Messages by Correlation and Message Id Using Pro*C/C++

#include  <stdio.h>
#include  <string.h>
#include  <sqlca.h>
#include  <sql2oci.h>
/* The header file generated by processing 
object type 'aq.message_type': */  
#include  "pceg.h" 

void sql_error(msg)
char *msg;
{
EXEC SQL WHENEVER SQLERROR CONTINUE;
printf("%s\n", msg);
printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
exit(1);
}

main()
{
OCIEnv            *oeh;  /* OCI Env Handle */  
OCIError          *err;  /* OCI Error Handle */
message_type      *message = (message_type*)0; /* queue  payload */
OCIRaw            *msgid = (OCIRaw*)0;  /* message id */
ub1               msgmem[16]="";  /* memory for msgid */
char              user[60]="aq/AQ";  /* user login password */
char              subject[30];  /* components of */
char              txt[80];  /* message_type */
char              correlation1[30];  /* message correlation  */
char              correlation2[30]; 
int               status;  /* code returned by the OCI calls */
                                                     
/* Dequeue by correlation and msgid */

/* Connect to the database: */
EXEC SQL CONNECT :user;                      
EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");

/* Allocate space in the object cache for the host variable: */
EXEC SQL ALLOCATE :message;

/* Get the OCI Env handle: */
if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS)
{
 printf(" error in SQLEnvGet \n");
 exit(1);
} 
/* Get the OCI Error handle: */
if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err,
(ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0))
{
printf(" error in OCIHandleAlloc %d \n", status);
exit(1);
}


/* Assign memory for msgid: 
Memory needs to be allocated explicitly to OCIRaw*: */
if (status=OCIRawAssignBytes(oeh, err, msgmem, 16, &msgid))
{
printf(" error in  OCIRawAssignBytes  %d \n", status);
exit(1);
}

/* First enqueue */

strcpy(correlation1, "1st message");
strcpy(subject, "NORMAL ENQUEUE1");
strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); 
 
/* Initialize the components of message: */
EXEC SQL OJECT SET SUBJECT, TEXT OF :message TO :subject, :txt;

/* Embedded PLSQL call to the AQ enqueue procedure: */
EXEC SQL EXECUTE
DECLARE
message_properties   dbms_aq.message_properties_t;
enqueue_options      dbms_aq.enqueue_options_t;
BEGIN
/* Bind the host variable 'correlation1': to message correlation*/
message_properties.correlation := :correlation1;

/* Bind the host variable 'message' to payload and 
 return message id into host variable 'msgid': */
dbms_aq.enqueue(queue_name => 'msg_queue',
message_properties => message_properties,
enqueue_options => enqueue_options,
payload => :message,
msgid => :msgid);
END;
END-EXEC;
/* Commit work: */
EXEC SQL COMMIT;

printf("Enqueued Message \n");
printf("Subject  :%s\n",subject);
printf("Text     :%s\n",txt);
  
/* Second enqueue */

strcpy(correlation2, "2nd message");
strcpy(subject, "NORMAL ENQUEUE2");
strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC");

/* Initialize the components of message: */
EXEC SQL OBJECT SET SUBJECT, TEXT OF :messsage TO :subject,:txt;

/* Embedded PLSQL call to the AQ enqueue procedure: */  
EXEC SQL EXECUTE
DECLARE
message_properties   dbms_aq.message_properties_t;
enqueue_options      dbms_aq.enqueue_options_t; 
msgid                RAW(16);
BEGIN
/* Bind the host variable 'correlation2':  to message correlaiton */  
message_properties.correlation := :correlation2;

/* Bind the host variable 'message': to payload */  
dbms_aq.enqueue(queue_name => 'msg_queue',
message_properties => message_properties,
enqueue_options => enqueue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work: */
EXEC SQL COMMIT;
printf("Enqueued Message \n");
printf("Subject  :%s\n",subject);
printf("Text     :%s\n",txt);

/* First dequeue - by  correlation */
 
EXEC SQL EXECUTE
DECLARE
message_properties  dbms_aq.message_properties_t;
dequeue_options     dbms_aq.dequeue_options_t; 
msgid               RAW(16);
BEGIN
/* Dequeue by  correlation in host variable 'correlation2': */  
dequeue_options.correlation := :correlation2;

/* Return the payload into host variable 'message': */
dbms_aq.dequeue(queue_name => 'msg_queue',
message_properties => message_properties,
dequeue_options => dequeue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work : */
EXEC SQL COMMIT;

/* Extract the values of the components of message: */
EXEC SQL OBJECT GET SUBJECT, TEXT FROM :message INTO :subject,:txt;

printf("Dequeued Message \n");
printf("Subject  :%s\n",subject);
printf("Text     :%s\n",txt);

/* SECOND DEQUEUE - by MSGID  */   
                                
EXEC SQL EXECUTE
DECLARE
message_properties  dbms_aq.message_properties_t;
dequeue_options     dbms_aq.dequeue_options_t; 
msgid               RAW(16);
BEGIN
/* Dequeue by msgid in host variable 'msgid': */
dequeue_options.msgid := :msgid;

/* Return the payload into host variable 'message':  */
dbms_aq.dequeue(queue_name => 'msg_queue',
message_properties => message_properties,
dequeue_options => dequeue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work: */
EXEC SQL COMMIT;
}

Enqueue and Dequeue of Messages by Correlation and Message ID using OCI

#ifndef SL_ORACLE
#include <sl.h>
#endif

#ifndef OCI_ORACLE
#include <oci.h>
#endif

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main()
{
  OCIEnv 	*envhp;
  OCIServer 	*srvhp;
  OCIError 	*errhp;
  OCISvcCtx 	*svchp;
  dvoid	     	*tmp;
  OCIType 	*mesg_tdo = (OCIType *) 0;
  message  	msg;
  null_message 	nmsg;
  message 	*mesg = &msg;
  null_message 	*nmesg = &nmsg;
  message 	*deqmesg = (message *)0;
  null_message 	*ndeqmesg = (null_message *)0;
  
  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
	(dvoid * (*)()) 0,  (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
	   52, (dvoid **) &tmp);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
	   52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
	   52, (dvoid **) &tmp);
  
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
	   52, (dvoid **) &tmp);
  
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
	   (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
 
  OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"),
	       (CONST text *)"MESSAGE_TYPE", strlen("MESSAGE_TYPE"), 
	       (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp,
	    (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), 
	    &mesg->subject);
  OCIStringAssignText(envhp, errhp,
	    (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), 
	    &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;  

  /* enqueue into the msg_queue */
  OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0,
	       mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0,
	       mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);

}

Enqueue and Dequeue of Messages to/from a Multiconsumer Queue using PL/SQL

/* Create subscriber list: */
DECLARE
subscriber sys.aq$_agent;

/* Add subscribers RED and GREEN to the suscriber list: */
BEGIN
subscriber := sys.aq$_agent('RED', NULL, NULL);
dbms_aqadm.add_subscriber(queue_name => 'msg_queue_multiple',
subscriber => subscriber);

subscriber := sys.aq$_agent('GREEN', NULL, NULL);
dbms_aqadm.add_subscriber(queue_name => 'msg_queue_multiple',
subscriber => subscriber); 
END;
/
DECLARE
enqueue_options     dbms_aq.enqueue_options_t;
message_properties  dbms_aq.message_properties_t;
recipients          dbms_aq.aq$_recipient_list_t;
message_handle      RAW(16);
message             aq.message_type;

/* Enqueue MESSAGE 1 for subscribers to the queue 
i.e. for RED and GREEN: */
BEGIN
message := message_type('MESSAGE 1',
'This message is queued for queue subscribers.');

dbms_aq.enqueue(queue_name => 'msg_queue_multiple',
enqueue_options    => enqueue_options,
message_properties => message_properties,
payload            => message,
msgid              => message_handle);

/* Enqueue MESSAGE 2 for specified recipients i.e. for RED and BLUE.*/
message := message_type('MESSAGE 2',
'This message is queued for two recipients.');
recipients(1) := sys.aq$_agent('RED', NULL, NULL);
recipients(2) := sys.aq$_agent('BLUE', NULL, NULL);
message_properties.recipient_list := recipients;

dbms_aq.enqueue(queue_name => 'msg_queue_multiple',
enqueue_options    => enqueue_options,
message_properties => message_properties,
payload            => message,
msgid              => message_handle);

COMMIT;
END;
/

Note that RED is both a subscriber to the queue, as well as being a specified recipient of MESSAGE 2. By contrast, GREEN is only a subscriber to those messages in the queue (in this case, MESSAGE) for which no recipients have been specified. BLUE, while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2.


/* Dequeue messages from msg_queue_multiple: */
DECLARE
dequeue_options     dbms_aq.dequeue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;
no_messages   exception;
pragma exception_init (no_messages, -25228);

BEGIN

dequeue_options.wait := dbms_aq.NO_WAIT;
   
/* Consumer BLUE will get MESSAGE 2: */
dequeue_options.consumer_name := 'BLUE';

LOOP

dbms_aq.dequeue(queue_name   => 'msg_queue_multiple',
          dequeue_options    => dequeue_options,
          message_properties => message_properties,
          payload            => message,
          msgid              => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
         ' ... ' || message.text );

END LOOP;
EXCEPTION
WHEN no_messages THEN
dbms_output.put_line ('No more messages for BLUE');
COMMIT;
END;

BEGIN
/* Consumer RED will get MESSAGE 1 and MESSAGE 2: */
dequeue_options.consumer_name := 'RED';
      
LOOP
dbms_aq.dequeue(queue_name   => 'msg_queue_multiple',
          dequeue_options    => dequeue_options,
          message_properties => message_properties,
          payload            => message,
          msgid              => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
                                         ' ... ' || message.text );
END LOOP;
EXCEPTION
WHEN no_messages THEN
dbms_output.put_line ('No more messages for RED');
COMMIT;
END;

BEGIN
/* Consumer GREEN will get MESSAGE 1: */
dequeue_options.consumer_name := 'GREEN';
      
LOOP
dbms_aq.dequeue(queue_name   => 'msg_queue_multiple',
          dequeue_options    => dequeue_options,
          message_properties => message_properties,
          payload            => message,
          msgid              => message_handle);

dbms_output.put_line ('Message: ' || message.subject ||
          ' ... ' || message.text );
END LOOP;
EXCEPTION
WHEN no_messages THEN
dbms_output.put_line ('No more messages for GREEN');
COMMIT;
END;
/

Enqueue and Dequeue of Messages to/from a Multiconsumer Queue using OCI

#ifndef SL_ORACLE
#include <sl.h>
#endif

#ifndef OCI_ORACLE
#include <oci.h>
#endif

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main()
{
  OCIEnv 		*envhp;
  OCIServer 		*srvhp;
  OCIError 		*errhp;
  OCISvcCtx 		*svchp;
  dvoid	     		*tmp;
  OCIType 		*mesg_tdo = (OCIType *) 0;
  message  		msg;
  null_message 		nmsg;
  message 		*mesg = &msg;
  null_message 		*nmesg = &nmsg;
  message 		*deqmesg = (message *)0;
  null_message 		*ndeqmesg = (null_message *)0;
  OCIAQMsgProperties 	*msgprop = (OCIAQMsgProperties *)0;
  OCIAQAgent 		*agents[2];
  OCIAQDeqOptions 	*deqopt = (OCIAQDeqOptions *)0;
  ub4			wait = OCI_DEQ_NO_WAIT;
  ub4	 		navigation = OCI_DEQ_FIRST_MSG;
  
  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
	(dvoid * (*)()) 0,  (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
	   52, (dvoid **) &tmp);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
	   52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
	   52, (dvoid **) &tmp);
  
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
	   52, (dvoid **) &tmp);
  
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
	   (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
 
  OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"),
	       (CONST text *)"MESSAGE_TYPE", strlen("MESSAGE_TYPE"), 
	       (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp,
	    (CONST text *)"MESSAGE 1", strlen("MESSAGE 1"), 
	    &mesg->subject);
  OCIStringAssignText(envhp, errhp,
	    (CONST text *)"mesg for queue subscribers", 
	    strlen("mesg for queue subscribers"), &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;  

  /* enqueue MESSAGE 1 for subscribers to the queue i.e. for RED and GREEN */
  OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, 0,
	       mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);

  /* enqueue MESSAGE 2 for specified recipients i.e. for RED and BLUE */
  /* prepare message payload */
  OCIStringAssignText(envhp, errhp,
	    (CONST text *)"MESSAGE 2", strlen("MESSAGE 2"), 
	    &mesg->subject);
  OCIStringAssignText(envhp, errhp,
	    (CONST text *)"mesg for two recipients", 
	    strlen("mesg for two recipients"), &mesg->data);

  /* allocate AQ message properties and agent descriptors */
  OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, 
			OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
  OCIDescriptorAlloc(envhp, (dvoid **)&agents[0], 
				  OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  OCIDescriptorAlloc(envhp, (dvoid **)&agents[1], 
				  OCI_DTYPE_AQAGENT, 0, (dvoid **)0);

  /* prepare the recipient list, RED and BLUE */
  OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, "RED", strlen("RED"),
	     OCI_ATTR_AGENT_NAME, errhp);
  OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, "BLUE", strlen("BLUE"),
	     OCI_ATTR_AGENT_NAME, errhp);
  OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2,
		      OCI_ATTR_RECIPIENT_LIST, errhp);

  OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, msgprop,
	       mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);

  OCITransCommit(svchp, errhp, (ub4) 0);

  /* now dequeue the messages using different consumer names */
  /* allocate dequeue options descriptor to set the dequeue options */
  OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, 
		     (dvoid **)0);

  /* set wait parameter to NO_WAIT so that the dequeue returns immediately */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, 
	     OCI_ATTR_WAIT, errhp);

  /* set navigation to FIRST_MESSAGE so that the dequeue resets the position */
  /* after a new consumer_name is set in the dequeue options		     */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, 
	     OCI_ATTR_NAVIGATION, errhp);
  
  /* dequeue from the msg_queue_multiple as consumer BLUE */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE", strlen("BLUE"), 
	     OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
	 mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) 
	 == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue_multiple as consumer RED */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED", strlen("RED"), 
	     OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
	 mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) 
	 == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue_multiple as consumer GREEN */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN",strlen("GREEN"), 
	     OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
	 mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) 
	 == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);
}

Enqueue of Messages for remote subscribers/recipients to a Multiconsumer Queue and Propagation Scheduling

/* Create subscriber list: */
DECLARE
subscriber sys.aq$_agent;

/* Add subscribers RED and GREEN with different addresses  to the suscriber 
list: */
BEGIN
/* Add subscriber RED that will dequeue messages from another_msg_queue queue 
in the same datatbase */
subscriber := sys.aq$_agent('RED', 'another_msg_queue', NULL);
dbms_aqadm.add_subscriber(queue_name => 'msg_queue_multiple',
subscriber => subscriber);

/* Schedule propagation from msg_queue_multiple to other queues in the same 
database: */
dbms_aqadm.schedule_propagation(queue_name => 'msg_queue_multiple');

/* Add subscriber GREEN that will dequeue messages from the msg_queue queue in 
another database reached by the database link another_db.world */
subscriber := sys.aq$_agent('GREEN', 'msg_queue@another_db.world', NULL);
dbms_aqadm.add_subscriber(queue_name => 'msg_queue_multiple',
subscriber => subscriber); 

/* Schedule propagation from msg_queue_multiple to other queues in the  
database "another_database": */
BEGIN
dbms_aqadm.schedule_propagation(queue_name => 'msg_queue_multiple', 
destination  => 'another_db.world');

END;
/

DECLARE
enqueue_options     dbms_aq.enqueue_options_t;
message_properties  dbms_aq.message_properties_t;
recipients          dbms_aq.aq$_recipient_list_t;
message_handle      RAW(16);
message             aq.message_type;

/* Enqueue MESSAGE 1 for subscribers to the queue  
i.e. for RED at address another_msg_queue and GREEN at address 
msg_queue@another_db.world: */
BEGIN
message := message_type('MESSAGE 1',
'This message is queued for queue subscribers.');

dbms_aq.enqueue(queue_name => 'msg_queue_multiple',
enqueue_options    => enqueue_options,
message_properties => message_properties,
payload            => message,
msgid              => message_handle);

/* Enqueue MESSAGE 2 for specified recipients i.e. for RED at address 
another_msg_queue and BLUE.*/
message := message_type('MESSAGE 2',
'This message is queued for two recipients.'); 
recipients(1) := sys.aq$_agent('RED', 'another_msg_queue', NULL); 
recipients(2) := sys.aq$_agent('BLUE', NULL, NULL);
message_properties.recipient_list := recipients;

dbms_aq.enqueue(queue_name => 'msg_queue_multiple',
enqueue_options    => enqueue_options,
message_properties => message_properties,
payload            => message, 
msgid              => message_handle);

COMMIT;
END;
/


Note:

RED at address another_msg_queue is both a subscriber to the queue, as well as being a specified recipient of MESSAGE 2. By contrast, GREEN at address msg_queue@another_db.world is only a subscriber to those messages in the queue (in this case, MESSAGE 1) for which no recipients have been specified. BLUE, while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2.

 

Unscheduling Propagation

/* unschedule propagation from msg_queue_multiple to the destination 
another_db.world */
execute dbms_aqadm.unschedule_propagation(queue_name => 'msg_queue_multiple', 
destination => 'another_db.world');

Enqueue and Dequeue using Message Grouping

CONNECT aq/aq

EXECUTE dbms_aqadm.create_queue_table ( 
queue_table        => 'aq.msggroup',
queue_payload_type => 'aq.message_type',
message_grouping => dbms_aqadm.TRANSACTIONAL);

EXECUTE dbms_aqadm.create_queue(
queue_name	   => 'msggroup_queue',
queue_table        => 'aq.msggroup');

EXECUTE dbms_aqadm.start_queue(queue_name => 'msggroup_queue');

/* Enqueue three messages in each transaction */
DECLARE
enqueue_options     dbms_aq.enqueue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

BEGIN

  /* loop through three times, committing after every iteration */
  FOR txnno in 1..3 LOOP

    /* loop through three times, enqueuing each iteration */
    FOR mesgno in 1..3 LOOP
      message := message_type('GROUP#' || txnno, 
			      'Message#' || mesgno ||  ' in group' || txnno);

      dbms_aq.enqueue(queue_name 	     => 'msggroup_queue',
	              enqueue_options        => enqueue_options,       
		      message_properties     => message_properties,     
		      payload                => message,               
		      msgid                  => message_handle);
    END LOOP;
    /* commit the transaction */
    COMMIT; 
  END LOOP;
END;
/

/* Now dequeue the messages as groups */
DECLARE
dequeue_options     dbms_aq.dequeue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_type;

no_messages   exception;
end_of_group  exception;

pragma exception_init (no_messages, -25228);
pragma exception_init (end_of_group, -25235);

BEGIN
dequeue_options.wait 	   := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;     

LOOP
   BEGIN
   dbms_aq.dequeue(queue_name   	   => 'msggroup_queue',
          	dequeue_options    => dequeue_options,
          	message_properties => message_properties,
          	payload            => message,
          	msgid              => message_handle);

  dbms_output.put_line ('Message: ' || message.subject || 
  			' ... ' || message.text );

  dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;

  EXCEPTION
    WHEN end_of_group THEN
      dbms_output.put_line ('Finished processing a group of messages');
      COMMIT;
      dequeue_options.navigation := DBMS_AQ.NEXT_TRANSACTION;
  END;
END LOOP;
EXCEPTION
  WHEN no_messages THEN
    dbms_output.put_line ('No more messages');
END;
/

Drop AQ Objects

/* Cleans up all objects related to the object type: */
CONNECT aq/AQ;

EXECUTE dbms_aqadm.stop_queue ( 
queue_name => 'msg_queue');  

EXECUTE dbms_aqadm.drop_queue ( 
queue_name => 'msg_queue');  

EXECUTE dbms_aqadm.drop_queue_table ( 
queue_table => 'aq.msg');

/* Cleans up all objects related to the RAW type: */
EXECUTE dbms_aqadm.stop_queue ( 
        queue_name      => 'raw_msg_queue');   
  
EXECUTE dbms_aqadm.drop_queue ( 
        queue_name      => 'raw_msg_queue');   
  
EXECUTE dbms_aqadm.drop_queue_table (
            queue_table => 'aq.raw_msg'); 

/* Cleans up all objects related to the priority queue: */
EXECUTE dbms_aqadm.stop_queue ( 
        queue_name     => 'priority_msg_queue');  

EXECUTE dbms_aqadm.drop_queue ( 
        queue_name     => 'priority_msg_queue');  

EXECUTE dbms_aqadm.drop_queue_table ( 
queue_table   => 'aq.priority_msg'); 

/* Cleans up all objects related to the multiple-consumer queue: */
EXECUTE dbms_aqadm.stop_queue ( 
queue_name  => 'msg_queue_multiple');  

EXECUTE dbms_aqadm.drop_queue ( 
queue_name  => 'msg_queue_multiple');  

EXECUTE dbms_aqadm.drop_queue_table ( 
queue_table => 'aq.msg_multiple');

drop type aq.message_type;

Revoke Roles and Privileges

CONNECT sys/change_on_install;
drop user aq;

Oracle Advanced Queuing Reference

Reference Overview

This section contains a detailed description of the technical specifications:

- Init ora Parameter

- Data Structures

- Agent

- Message Properties

- Queue Options

- Operational Interface

- Administrative Interface

- Administration Topics

- Data Objects

INIT.ORA Parameter

AQ_TM_PROCESSES

A parameter called AQ_TM_PROCESSES should be specified in the init.ora PARAMETER file if you want to perform time monitoring on queue messages. This will be used for messages which have delay and expiration properties specified. This parameter can be set in a range from 0 to 10. Setting it to any other number will result in an error. If this parameter is set to 1, one queue monitor process will be created as a background process to monitor the messages. If the parameter is not specified, or is set to 0, the queue monitor process is not created. The administrative interfaces to start and stop the queue monitor are only valid if the queue monitor process is started as part of instance startup by specifying this parameter.

Parameter Name:  

aq_tm_processes  

Parameter Type:  

integer  

Parameter Class:  

Dynamic  

Allowable Values:  

0 to 10  

Syntax:  

aq_tm_processes = <0 to 10>  

Name of process:  

ora_qmon_<oracle sid>  

Example:  

aq_tm_processes = 1  

JOB_QUEUE_PROCESSES

Propagation is handled by job queue (SNP) processes. The number of job queue processes started in an instance is controlled by the init.ora parameter JOB_QUEUE_PROCESSES. The default value of this parameter is 0. In order for message propagation to take place, this parameter must be set to at least 1. The DBA can set it to higher values if there are many queues from which the messages have to be propagated, or if there are many destinations to which the messages have to be propagated, or if there are other jobs in the job queue.

See Also:

Oracle8 Reference for complete details about JOB_QUEUE_PROCESSES.

 

COMPATIBLE

The COMPATIBLE init.ora parameter must be set to 8.0.4 in order to use the AQ propagation feature. Specifically, the COMPATIBLE parameter will be checked under the following three conditions:

  1. An AQ agent's (see sys.aq$_agent) address field is specified in the DBMS_AQADM.ADD_SUBSCRIBER command.
  2. An AQ agent's (see sys.aq$_agent) address field is specified in the recipient_list of dbms_aq.message_properties_t.
  3. The DBMS_AQADM.SCHEDULE_PROPAGATION command is used.

Users can downgrade to 8.0.3 after using the 8.0.4 features by using

        ALTER DATABASE RESET COMPATIBILITY

Users will not be allowed to restart the database in 8.0.3 compatible mode under the following conditions:

  1. There are messages in queues that have not yet been propagated to their destinations.
  2. There are propagation schedules that are still pending, in which case you may use the DBMS_AQADM.UNSCHEDULE_PROPAGATION command to remove the schedules.
  3. There are queues that have remote subscribers (i.e. a non NULL address field in sys.aq$_agent), in which case you may remove remote subscribers by means of the DBMS_AQADM.REMOVE_SUBSCRIBER command.

    See Also:

    For more details on compatibility, refer to the upgrade/downgrade section of the migration guide.

     

Data Structures

The following data structures are used in the operational and administrative interfaces.

Object name

Purpose:

Naming of database objects. This naming convention applies to queues, queue tables and object types.

Syntax:

object_name := VARCHAR2
object_name := [<schema_name>.]<name>

Usage:

Names for objects are specified by an optional schema name and a name. If the schema name is not specified then the current schema is assumed. The schema name and the name can each be up to 30 bytes long. However, queue names and queue table names can be a maximum of 24 bytes.

Type name

Purpose:

Defining queue types.

Syntax:

type_name := VARCHAR2
type_name := <object_type> | "RAW"

Usage:

Table 11-1 Type Name
Parameter   Description  

<object_types>  

For details on creating object types please refer to Server concepts manual. The maximum number of attributes in the object type is limited to 900.  

"RAW"  

To store payload of type RAW, AQ will create a queue table with a LOB column as the payload repository. The size of the payload is limited to 32K bytes of data. Because LOB columns are used for storing RAW payload, the AQ administrator can choose the LOB tablespace and configure the LOB storage by constructing a LOB storage string in the storage_clause parameter during queue table creation time.  

Agent

Purpose:

To identify a producer or a consumer of a message.

Syntax:


TYPE sys.aq$_agent IS OBJECT (

name            VARCHAR2(30), 
address         VARCHAR2(1024),
protocol        NUMBER)

Usage:

Table 11-2 Agent
Parameter   Description  

name  

Name of a producer or consumer of a message.  

address  

Protocol specific address of the recipient. If the protocol is 0 (default) the address is of the form [schema.]queue[@dblink]  

protocol  

Protocol to interpret the address and propagate the message. The default (and currently the only supported) value is 0.  

Message Properties

Purpose:

The Message Properties describe the information that is used by AQ to manage individual messages. These are set at enqueue time and their values are returned at dequeue time.

Syntax:

TYPE message_properties_t IS RECORD (

priority          BINARY_INTEGER default 1,
delay             BINARY_INTEGER default NO_DELAY,
expiration        BINARY_INTEGER default NEVER,
correlation       VARCHAR2(128) default NULL,
attempts          BINARY_INTEGER,
recipient_list    aq$_recipient_list_t,
exception_queue   VARCHAR2(51) default NULL,
enqueue_time      DATE,
state             BINARY_INTEGER)
TYPE aq$_recipient_list_t IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER

Usage

:

Table 11-3 Message properties
Parameter   Description  

priority  

Specifies the priority of the message. A smaller number indicates higher priority. The priority can be any number, including negative numbers.  

delay  

Specifies the delay of the enqueued message. The delay represents the number of seconds after which a message is available for dequeuing. Dequeuing by msgid overrides the delay specification. A message enqueued with delay set will be in the WAITING state, when the delay expires the messages goes to the READY state. DELAY processing requires the queue monitor to be started. Note that delay is set by the producer who enqueues the message.

NO_DELAY: the message is available for immediate dequeuing.

number: the number of seconds to delay the message.  

expiration  

Specifies the expiration of the message. It determines, in seconds, the duration the message is available for dequeuing. This parameter is an offset from the delay. Expiration processing requires the queue monitor to be running.

NEVER: message will not expire.

number: number of seconds message will remain in READY state. If the message is not dequeued before it expires, it will be moved to the exception queue in the EXPIRED state.  

correlation  

Specifies the identification supplied by the producer for a message at enqueuing.  

attempts  

Specifies the number of attempts that have been made to dequeue this message. This parameter can not be set at enqueue time.  

recipient_list  

For type definition please refer to section titled "Agent".

This parameter is only valid for queues which allow multiple consumers. The default recipients are the queue subscribers. This parameter is not returned to a consumer at dequeue time.  

exception_queue  

Specifies the name of the queue to which the message is moved if it cannot be processed successfully. Messages are moved in two cases: The number of unsuccessful dequeue attempts has exceeded max_retries or the message has expired. All messages in the exception queue are in the EXPIRED state.

The default is the exception queue associated with the queue table. If the exception queue specified does not exist at the time of the move the message will be moved to the default exception queue associated with the queue table and a warning will be logged in the alert file. If the default exception queue is used the parameter will return a NULL value at dequeue time.  

enqueue_time  

Specifies the time the message was enqueued. This value is determined by the system and cannot be set by the user. This parameter can not be set at enqueue time.  

state  

Specifies the state of the message at the time of the dequeue. This parameter can not be set at enqueue time.

0: The message is ready to be processed.

1: The message delay has not yet been reached.

3: The message has been processed and is retained.

4: The message has been moved to the exception queue.  

Queue Options

Enqueue options

Purpose:

To specify the options available for the enqueue operation.

Syntax:

TYPE enqueue_options_t IS RECORD (

visibility          BINARY_INTEGER default ON_COMMIT,
relative_msgid      RAW(16) default NULL,
sequence_deviation  BINARY_INTEGER default NULL)

Usage:
Table 11-4
Parameter   Description  

visibility  

Specifies the transactional behavior of the enqueue request.

ON_COMMIT: The enqueue is part of the current transaction. The operation is complete when the transaction commits. This is the default case.

IMMEDIATE: The enqueue is not part of the current transaction. The operation constitutes a transaction on its own.  

relative_msgid

 

Specifies the message identifier of the message which is referenced in the sequence deviation operation. This field is valid if and only if BEFORE is specified in sequence_deviation. This parameter will be ignored if sequence deviation is not specified.  

sequence_deviation

 

Specifies if the message being enqueued should be dequeued before other message(s) already in the queue.

BEFORE: The message is enqueued ahead of the message specified by relative_msgid.

TOP: The message is enqueued ahead of any other messages.

NULL: Default  

Dequeue options

Purpose:

To specify the options available for the dequeue operation.

Syntax:

TYPE dequeue_options_t IS RECORD (

consumer_name    VARCHAR2(30) default NULL,
dequeue_mode     BINARY_INTEGER default REMOVE,
navigation       BINARY_INTEGER default NEXT_MESSAGE,
visibility       BINARY_INTEGER default ON_COMMIT,
wait             BINARY_INTEGER default FOREVER
msgid            RAW(16) default NULL,
correlation      VARCHAR2(128) default NULL)

Usage

Table 11-5 DEQUEUE options
Parameter   Description  

consumer_name  

Name of the consumer. Only those messages matching the consumer name are accessed. If a queue is not set up for multiple consumers, this field should be set to NULL.  

dequeue_mode  

Specifies the locking behavior associated with the dequeue.

BROWSE: Read the message without acquiring any lock on the message. This is equivalent to a select statement.

LOCKED: Read and obtain a write lock on the message. The lock lasts for the duration of the transaction. This is equivalent to a select for update statement.

REMOVE: Read the message and update or delete it. This is the default. The message can be retained in the queue table based on the retention properties.  

navigation  

Specifies the position of the message that will be retrieved. First, the position is determined. Second, the search criterion is applied. Finally, the message is retrieved.

NEXT_MESSAGE: Retrieve the next message which is available and matches the search criteria. If the previous message belongs to a message group, AQ will retrieve the next available message which matches the search criteria and belongs to the message group. This is the default.

NEXT_TRANSACTION: Skip the remainder of the current transaction group (if any) and retrieve the first message of the next transaction group. This option can only be used if message grouping is enabled for the current queue.

FIRST_MESSAGE: Retrieves the first message which is available and matches the search criteria. This will reset the position to the beginning of the queue.  

visibility  

Specifies whether the new message is dequeued as part of the current transaction.The visibility parameter is ignored when using the BROWSE mode.

ON_COMMIT: The dequeue will be part of the current transaction. This is the default case.

IMMEDIATE: The dequeued message is not part of the current transaction. It constitutes a transaction on its own.  

wait  

Specifies the wait time if there is currently no message available which matches the search criteria.

FOREVER: wait forever. This is the default.

NO_WAIT: do not wait

number: wait time in seconds  

msgid  

Specifies the message identifier of the message to be dequeued.  

correlation

 

Specifies the correlation identifier of the message to be dequeued. Special pattern matching characters, such as the percent sign ( %) and the underscore (_) can be used. If more than one message satisfies the pattern, the order of dequeuing is undetermined.  

Operational Interface

The following interface calls are available to enqueue and dequeue messages from queues.

DBMS_AQ.ENQUEUE

Purpose:

Adds a message to the specified queue. In the simplest case, if the user wants to enqueue a message, without any other parameters, only the queue name and the payload have to be specified.

Syntax:


DBMS_AQ.ENQUEUE (

queue_name          IN      VARCHAR2,
enqueue_options     IN      enqueue_options_t,
message_properties  IN      message_properties_t,
payload             IN      "<type_name>",
msgid               OUT     RAW)

Usage:

Table 11-6 DBMS_AQ.ENQUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue to which this message should be enqueued. The queue cannot be an exception queue.  

enqueue_options

(IN enqueue_option_t)  

For the definition please refer to the section titled "ENQUEUE Options."  

message_properties

(IN message_properties_t)  

For the definition please refer to the section titled "Message Properties."  

payload

(IN "<type_name>")  

Not interpreted by Oracle AQ.

The payload must be specified according to the specification in the associated queue table. NULL is an acceptable parameter. For the definition of <type_name> please refer to section titled "Type name"  

msgid

(OUT RAW)  

The system generated identification of the message. This is a globally unique identifier that can be used to identify the message at dequeue time.  

Using sequence deviation:

The sequence_deviation parameter in enqueue_options can be used to change the order of processing between two messages. The identity of the other message, if any, is specified by the enqueue_options parameter relative_msgid. The relationship is identified by the sequence_deviation parameter.

Specifying sequence_deviation for a message introduces some restrictions for the delay and priority values that can be specified for this message. The delay of this message has to be less than or equal to the delay of the message before which this message is to be enqueued. The priority of this message has to be greater than or equal to the priority of the message before which this message is to be enqueued.

DBMS_AQ.DEQUEUE

Purpose:

Dequeues a message from the specified queue.

Syntax:

DBMS_AQ.DEQUEUE (

queue_name          IN      VARCHAR2,
dequeue_options     IN      dequeue_options_t,
message_properties  OUT     message_properties_t,
payload             OUT    "<type_name>",
msgid               OUT     raw)

Usage:

Table 11-7 DBMS_AQ.DEQUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue.

 

dequeue_options

(IN dequeue_option_t)  

For the definition please refer to the section titled "DEQUEUE Options."

 

message_properties

(OUT message_properties_t)  

For the definition please refer to the section titled "Message Properties."

 

payload

(OUT "<type_name>")  

Not interpreted by Oracle AQ.

The payload must be specified according to the specification in the associated queue table. For the definition of <type_name> please refer to section titled "Type name"  

msgid

(OUT RAW)  

The system generated identification of the message.  

Search criteria and dequeue order for messages:

The search criteria for messages to be dequeued is determined by the consumer_name, msgid and correlation parameters in the dequeue_options. Msgid uniquely identifies the message to be dequeued. Correlation identifiers are application-defined identifiers that are not interpreted by AQ.

Only messages in the READY state are dequeued unless a msgid is specified.

The dequeue order is determined by the values specified at the time the queue table is created unless overridden by the msgid and correlation id in dequeue_options.

The database consistent read mechanism is applicable for queue operations. For example, a BROWSE call may not see a message that is enqueued after the beginning of the browsing transaction.

Navigating through a queue:

The default NAVIGATION parameter during dequeue is NEXT_MESSAGE. This means that subsequent dequeues will retrieve the messages from the queue based on the snapshot obtained in the first dequeue. In particular, a message that is enqueued after the first dequeue command will be processed only after processing all the remaining messages in the queue. This is usually sufficient when all the messages have already been enqueued into the queue, or when the queue does not have a priority-based ordering. However, applications must use the FIRST_MESSAGE navigation option when the first message in the queue needs to be processed by every dequeue command. This usually becomes necessary when a higher priority message arrives in the queue while messages already-enqueued are being processed.


Note:

It may also be more efficient to use the FIRST_MESSAGE navigation option when there are messages being concurrently enqueued. If the FIRST_MESSAGE option is not specified, AQ will have to continually generate the snapshot as of the first dequeue command, leading to poor performance. If the FIRST_MESSAGE option is specified, AQ will use a new snapshot for every dequeue command.

 

Dequeue by Message Grouping:

Messages enqueued in the same transaction into a queue that has been enabled for message grouping will form a group. If only one message is enqueued in the transaction, this will effectively form a group of one message. There is no upper limit to the number of messages that can be grouped in a single transaction.

In queues that have not been enabled for message grouping, a dequeue in LOCKED or REMOVE mode locks only a single message. By contrast, a dequeue operation that seeks to dequeue a message that is part of a group will lock the entire group. This is useful when all the messages in a group need to be processed as an atomic unit.

When all the messages in a group have been dequeued, the dequeue returns an error indicating that all messages in the group have been processed. The application can then use the NEXT_TRANSACTION to start dequeuing messages from the next available group. In the event that no groups are available, the dequeue will time-out after the specified WAIT period.

Enumerated Constants in the Operational Interface

When using enumerated constants such as BROWSE, LOCKED, REMOVE, the PL/SQL constants need to be specified with the scope of the packages defining it. All types associated with the operational interfaces have to be prepended with dbms_aq. For example:

dbms_aq.BROWSE

Table 11-8 Enumerated types in the operational interface
Parameter   Options  

visibility  

IMMEDIATE, ON_COMMIT  

mode  

BROWSE, LOCKED, REMOVE  

navigation  

FIRST_MESSAGE, NEXT_MESSAGE, NEXT_TRANSACTION  

state  

WAITING, READY, PROCESSED, EXPIRED  

sequence_deviation  

BEFORE, TOP  

wait  

FOREVER, NO_WAIT  

delay  

NO_DELAY  

expiration  

NEVER  

Administrative Interface

Configuration information can be managed through procedures in the DBMS_AQADM package. Because incorrect usage of the administration interface can have substantial performance impact on the database system, the administration interface should be treated as privileged commands, and only the designated queue administrator or privileged users should be granted access to the administration package. Initially, only SYS has the execution privilege for the procedures in DBMS_AQADM and DBMS_AQ.

Privileges and access control

Access to AQ operations are granted to users through roles. These roles provide execution privileges on the AQ procedures. Currently, we do not support fine grained access control at the database object level. This implies that a user with the AQ_USER_ROLE can enqueue and dequeue to any queue in the system.

Administrator role

AQ_ADMINISTRATOR_ROLE grants execute privileges to procedures in the DBMS_AQADM and DBMS_AQ packages. These include all the administrative and operational interfaces. The user 'SYS' must grant the AQ_ADMINISTRATOR_ROLE to the AQ administrator.

User role

AQ_USER_ROLE grants execute privileges to procedures in the DBMS_AQ packages. These include all the operational interfaces. The AQ administrator must grant the AQ_USER_ROLE to AQ users.

Access to AQ object types

The procedure grant_type_access must first be executed by the user 'SYS' to grant access for AQ object types to the AQ administrator. The AQ administrator can then execute this procedure to grant access for AQ object types to other AQ users. The procedure needs to be executed if the user wishes to perform any administrative operation involving a multiple consumer queue. These include CREATE_QUEUE_TABLE, CREATE_QUEUE, ADD_SUBSCRIBER and REMOVE_SUBSCRIBER.

Syntax:

PROCEDURE grant_type_access (user_name IN VARCHAR2);

Calling DBMS_AQ from a PL/SQL function or procedure

If you wish to call DBMS_AQ from a PL/SQL function or procedure, you will need to have been explicitly granted the EXECUTE privilege. You cannot inherit this right from either the AQ_USER_ROLE or the AQ_ADMINISTRATOR_ROLE.

Syntax:

GRANT EXECUTE ON DBMS_AQ TO <user>;

Example

  1. Scott is appointed as the AQ administrator.
    CONNECT sys/change_on_install
    GRANT AQ_ADMINISTRATOR_ROLE to scott with admin option; 
    execute dbms_aqadm.grant_type_access('scott');
    

  1. Scott lets Jones use AQ.
    CONNECT scott/tiger
    GRANT AQ_USER_ROLE to jones;
    

  1. Jones wishes to create queue tables that are enabled for multiple dequeues.
    CONNECT scott/tiger
    execute dbms_aqadm.grant_type_access('jones');
    

DBMS_AQADM.CREATE_QUEUE_TABLE

Purpose:

Create a queue table for messages of a pre-defined type. The sort keys for dequeue ordering, if any, need to be defined at table creation time. The following objects are created at this time:

  1. The default exception queue associated with the queue table called aq$_<queue_table_name>_e.
  2. A read-only view which is used by AQ applications for querying queue data called aq$<queue_table_name>.
  3. An index for the queue monitor operations called aq$_<queue_table_name>_t.
  4. An index or an index organized table (IOT) in the case of multiple consumer queues for dequeue operations called aq$_<queue_table_name>_i.
Syntax:


DBMS_AQADM.CREATE_QUEUE_TABLE (

queue_table        IN      VARCHAR2,
queue_payload_type IN      VARCHAR2,
storage_clause     IN      VARCHAR2 default NULL,
sort_list          IN      VARCHAR2 default NULL,
multiple_consumers IN      BOOLEAN default FALSE,
message_grouping   IN      BINARY_INTEGER default NONE,
comment            IN      VARCHAR2 default NULL,
auto_commit        IN      BOOLEAN default TRUE)

Usage

Table 11-9 DBMS_AQADM.CREATE_QUEUE_TABLE
Parameter   Description  

queue_table

(IN VARCHAR2)  

Specifies the name of a queue table to be created.  

queue_payload_type

(IN VARCHAR2)  

Specifies the type of the user data stored. Please see section entitled "Type name" for valid values for this parameter.  

storage_clause

(IN VARCHAR2)  

Specifies the storage parameter. The storage parameter will be

included in the `CREATE TABLE' statement when the queue table is created. The storage parameter can be made up of any combinations of the following parameters:

PCTFREE, PCTUSED, INITRANS, MAXTRANS, TABLEPSACE, LOB and a table storage clause.

Please refer to the SQL reference guide for the usage of these parameters.  

sort_list

(IN VARCHAR2)  

Specifies the columns to be used as the sort key in ascending order.

Sort_list has the following format: `<sort_column_1>,<sort_column_2>'.

The allowed column names are priority and enq_time. If both columns are specified then <sort_column_1> defines the most significant order.

Once a queue table is created with a specific ordering mechanism, all queues in the queue table inherit the same defaults. The order of a queue table cannot be altered once the queue table has been created.

If no sort list is specified all the queues in this queue table will be sorted by the enqueue time in ascending order. This order is equivalent to FIFO order.

Even with the default ordering defined, a dequeuer is allowed to choose a message to dequeue by specifying its msgid or correlation. Msgid, correlation and sequence_deviation take precedence over the default dequeueing order if they are specified.  

multiple_consumers

(IN BOOLEAN)  

FALSE: Queues created in the table can only have one consumer per message. This is the default.

TRUE: Queues created in the table can have multiple consumers per message. The user must have been granted type access by executing the grant_type_access procedure.  

message_grouping

(IN BINARY_INTEGER)  

Specifies the message grouping behavior for queues created in the table.

NONE: Each message is treated individually.

TRANSACTIONAL: Messages enqueued as part of one transaction are considered part of the same group and can be dequeued as a group of related messages.  

comment

(IN VARCHAR2)  

Specifies the user-specified description of the queue table. This user comment will be added to the queue catalog.  

auto_commit

(IN BOOLEAN)  

TRUE: causes the current transaction, if any, to commit before the CREATE_QUEUE_TABLE operation is carried out. The CREATE_QUEUE_TABLE operation becomes persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  

DBMS_AQADM.CREATE_QUEUE

Purpose:

Create a queue in the specified queue table. All queue names must be unique within a schema. Once a queue is created with CREATE_QUEUE, it can be enabled by calling START_QUEUE. By default, the queue is created with both enqueue and dequeue disabled.

Syntax:


DBMS_AQADM.CREATE_QUEUE (

queue_name          IN       VARCHAR2,
queue_table         IN       VARCHAR2,
queue_type          IN       BINARY_INTEGER default
                             NORMAL_QUEUE,
max_retries         IN       NUMBER default 0,
retry_delay         IN       NUMBER default 0,
retention_time      IN       NUMBER default 0,
dependency_tracking IN       BOOLEAN default FALSE,
comment             IN       VARCHAR2 default NULL,
auto_commit         IN       BOOLEAN default TRUE)

Usage:

Table 11-10 DBMS_AQADM.CREATE_QUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue that is to be created.  

queue_table

(IN VARCHAR2)  

Specifies the name of the queue table that will contain the queue.  

queue_type

(IN BINARY_INTEGER)  

Specifies whether the queue being created is an exception queue or a normal queue.

NORMAL_QUEUE: The queue is a normal queue. This is the default.

EXCEPTION_QUEUE: It is an exception queue. Only the dequeue operation is allowed on the exception queue.  

max_retries

(IN NUMBER)  

Limits the number of times a dequeue with the REMOVE mode can be attempted on a message. The count is incremented when the application issues a rollback after executing the dequeue. The message is moved to the exception queue when it is reaches its max_retries. Default is 0, which means no retry is allowed.  

retry_delay

(IN NUMBER)  

Specifies the delay time, in seconds before this message is scheduled for processing again after an application rollback. The default is 0, which means the message can be retried as soon as possible. This parameter will have no effect if max_retries is set to 0. Retry_delay cannot be specified with multiple consumer queues.  

retention_time

(IN NUMBER)  

Specifies the number of seconds for which a message will be retained in the queue table after being dequeued from the queue.

INFINITE: Message will be retained forever.

number: Number of seconds for which to retain the messages. The default is 0, i.e. no retention.  

dependency_tracking

(IN BOOLEAN)  

Reserved for future use.

FALSE: This is the default.

TRUE: Not permitted in this release.  

comment

(IN VARCHAR2)  

User-specified description of the queue. This user comment will be added to the queue catalog.  

auto_commit

(IN BOOLEAN)  

TRUE: Causes the current transaction, if any, to commit before the CREATE_QUEUE operation is carried out. The CREATE_QUEUE operation becomes persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  

DBMS_AQADM.DROP_QUEUE_TABLE

Purpose:

Drop an existing queue table. All the queues in a queue table have to be stopped and dropped before the queue table can be dropped.

Syntax:


DBMS_AQADM.DROP_QUEUE_TABLE (

queue_table       IN    VARCHAR2,
force             IN    BOOLEAN default FALSE,
auto_commit       IN    BOOLEAN default TRUE)

Usage:

Table 11-11 DBMS_AQADM.DROP_QUEUE_TABLE
Parameter   Description  

queue_table

(IN VARCHAR2)  

Specifies the name of a queue table to be dropped.  

force

(IN BOOLEAN)  

FALSE: The operation will not succeed if there are any queues in the table.This is the default.

TRUE: All queues in the table are stopped and dropped automatically.  

auto_commit

(IN BOOLEAN)  

TRUE: Causes the current transaction, if any, to commit before the DROP_QUEUE_TABLE operation is carried out. The DROP_QUEUE_TABLE operation becomes persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  

DBMS_AQADM.DROP_QUEUE

Purpose:

Drops an existing queue. DROP_QUEUE is not allowed unless STOP_QUEUE has been called to disable the queue for both enqueuing and dequeuing. All the queue data is deleted as part of the drop operation.

Syntax:


DBMS_AQADM.DROP_QUEUE (

queue_name        IN            VARCHAR2,
auto_commit       IN            BOOLEAN default TRUE)

Usage:

Table 11-12 DBMS_AQADM.DROP_QUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue that is to be dropped.  

auto_commit

(IN BOOLEAN)  

TRUE: Causes the current transaction, if any, to commit before the DROP_QUEUE operation is carried out. The DROP_QUEUE operation becomes persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  

DBMS_AQADM.ALTER_QUEUE

Purpose:

Alter existing properties of a queue. Only max_retries, retry_delay, and retention_time can be altered.

Syntax:

DBMS_AQADM.ALTER_QUEUE (

queue_name        IN    VARCHAR2,
max_retries       IN    NUMBER default NULL,
retry_delay       IN    NUMBER default NULL,
retention_time    IN    NUMBER default NULL,
auto_commit       IN    BOOLEAN default TRUE)

Usage:

Table 11-13 DBMS_AQADM.ALTER_QUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue that is to be altered.  

max_retries

(IN NUMBER)  

Limits the number of times a dequeue with REMOVE mode can be attempted on a message. The count is incremented when the application issues a rollback after executing the dequeue. If the time at which one of the retries has passed the expiration time, no further retries will be attempted. Default is NULL which means that the value will not be altered.  

retry_delay

(IN NUMBER)  

Specifies the delay time in seconds before this message is scheduled for processing again after an application rollback. The default is NULL which means that the value will not be altered.  

retention_time

(IN NUMBER)  

Specifies the retention time in seconds for which a message will be retained in the queue table after being dequeued. The default is NULL which means that the value will not be altered.  

auto_commit

(IN BOOLEAN)  

TRUE: Causes the current transaction, if any, to commit before the ALTER_QUEUE operation is carried out. The ALTER_QUEUE operation become persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  

DBMS_AQADM.START_QUEUE

Purpose:

Enables the specified queue for enqueuing and/or dequeueing. After creating a queue the administrator must use START_QUEUE to enable the queue. The default is to enable it for both ENQUEUE and DEQUEUE. Only dequeue operations are allowed on an exception queue. This operation takes effect when the call completes and does not have any transactional characteristics.

Syntax:

DBMS_AQADM.START_QUEUE ( 

queue_name      IN       VARCHAR2,
enqueue         IN       BOOLEAN default TRUE,
dequeue         IN       BOOLEAN default TRUE)

Usage
Table 11-14 DBMS_AQADM.START_QUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue to be enabled.  

enqueue

(IN BOOLEAN)  

Specifies whether ENQUEUE should be enabled on this queue.

TRUE: Enable ENQUEUE. This is the default.

FALSE: Do not alter the current setting.  

dequeue

(IN BOOLEAN)  

Specifies whether DEQUEUE should be enabled on this queue.

TRUE: Enable DEQUEUE. This is the default.

FALSE: Do not alter the current setting.  

DBMS_AQADM.STOP_QUEUE

Purpose:

Disables enqueuing and/or dequeuing on the specified queue. By default, it disables both ENQUEUEs or DEQUEUEs. A queue cannot be stopped if there are outstanding transactions against the queue. This operation takes effect when the call completes and does not have any transactional characteristics.

Syntax:

DBMS_AQADM.STOP_QUEUE (   

queue_name      IN       VARCHAR2,
enqueue         IN       BOOLEAN default TRUE,
dequeue         IN       BOOLEAN default TRUE,
wait            IN       BOOLEAN default TRUE)

Usage:

Table 11-15 DBMS_AQADM.STOP_QUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue to be disabled.  

enqueue

(IN BOOLEAN)  

Specifies whether ENQUEUE should be disabled on this queue.

TRUE: Disable ENQUEUE. This is the default.

FALSE: Do not alter the current setting.  

dequeue

(IN BOOLEAN)  

Specifies whether DEQUEUE should be disabled on this queue.

TRUE: Disable DEQUEUE. This is the default.

FALSE: Do not alter the current setting.  

wait

(IN BOOLEAN)  

The wait parameter allows you to specify whether to wait for the completion of outstanding transactions.

TRUE: Wait if there are any outstanding transactions. In this state no new transactions are allowed to enqueue to or dequeue from this queue.

FALSE: Return immediately either with a success or an error.  

DBMS_AQADM.ADD_SUBSCRIBER

Purpose:

Add a default subscriber to a queue. A program can enqueue messages to a specific list of recipients or to the default list of subscribers. This operation will only succeed on queues that allow multiple consumers. This operation takes effect immediately and the containing transaction is committed. Enqueue requests that are executed after the completion of this call will reflect the new behavior. The user must have been granted type access by executing the grant_type_access procedure.

Syntax:

DBMS_AQADM.ADD_SUBSCRIBER(

queue_name     IN   VARCHAR2,
subscriber     IN   sys.aq$_agent)

Usage:
Table 11-16 DBMS_AQADM.ADD_SUBSCRIBER
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue.  

subscriber

(IN aq$_agent)  

See definition in section titled `Agent'.  

DBMS_AQADM.REMOVE_SUBSCRIBER

Purpose:

Remove a default subscriber from a queue. This operation takes effect immediately and the containing transaction is committed. All references to the subscriber in existing messages are removed as part of the operation. The user must have been granted type access by executing the grant_type_access procedure.

Syntax:


DBMS_AQADM.REMOVE_SUBSCRIBER(
    queue_name         IN         VARCHAR2,
    subscriber         IN         sys.aq$_agent)

Usage:

Table 11-17
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue.  

subscriber

(IN aq$_agent)  

See definition in section titled `Agent'.  

DBMS_AQADM.SCHEDULE_PROPAGATION

Purpose:

Schedule propagation of messages from a queue to a destination identified by a specific dblink. Messages may also be propagated to other queues in the same database by specifying a NULL destination. If a message has multiple recipients at the same destination in either the same or different queues the message will be propagated to all of them at the same time.

Syntax:

DBMS_AQADM.SCHEDULE_PROPAGATION(

src_queue_name  IN    VARCHAR2,
destination     IN    VARCHAR2 default NULL
start_time      IN    DATE default SYSDATE,
duration        IN    NUMBER default NULL,
next_time       IN    VARCHAR2 default NULL,
latency         IN    NUMBER default 60)

Usage:
Table 11-18 DBMS_AQADM.SCHEDULE_PROPAGATION
Parameter   Description  

src_queue_name

(IN VARCHAR2)  

Specifies the name of the source queue whose messages are to be propagated, including the schema name. If the schema name is not specified, it defaults to the schema name of the administrative user.  

destination

(IN VARCHAR2)  

Specifies the destination dblink. Messages in the source queue for recipients at this destination will be propagated. If it is NULL, the destination is the local database and messages will be propagated to other queues in the local database. The length of this field is currently limited to 128 bytes and if the name is not fully qualified the default domain name is used.  

start_time

(IN DATE)  

Specifies the initial start time for the propagation window for messages from the source queue to the destination.  

duration

(IN NUMBER)  

Specifies the duration of the propagation window in seconds. A NULL value means the propagation window is forever or until the propagation is unscheduled.  

next_time

(IN VARCHAR2)  

date function to compute the start of the next propagation window from the end of the current window. If this value is NULL, propagation will be stopped at the end of the current window. For example, to start the window at the same time every day, next_time should be specified as `SYSDATE + 1 - duration/86400'.  

latency

(IN NUMBER)  

maximum wait, in seconds, in the propagation window for a message to be propagated after it is enqueued. For example, if the latency is 60 seconds, then during the propagation window, if there are no messages to be propagated, messages from that queue for the destination will not be propagated for at least 60 more seconds. It will be at least 60 seconds before the queue will be checked again for messages to be propagated for the specified destination. If the latency is 600, then the queue will not be checked for 10 minutes and if the latency is 0, then a job queue process will be waiting for messages to be enqueued for the destination and as soon as a message is enqueued it will be propagated.  

DBMS_AQADM.UNSCHEDULE_PROPAGATION

Purpose:

Unscheduled previously scheduled propagation of messages from a queue to a destination identified by a specific dblink.

Syntax:

DBMS_AQADM.UNSCHEDULE_PROPAGATION(

src_queue_name   IN    VARCHAR2,
destination      IN    VARCHAR2 default NULL)

Usage:
Table 11-19 DBMS_AQADM.UNSCHEDULE_PROPAGATION
Parameter   Description  

src_queue_name

(IN VARCHAR2)  

Specifies the name of the source queue whose messages are to be propagated, including the schema name. If the schema name is not specified, it defaults to the schema name of the administrative user.  

destination

(IN VARCHAR2)  

Specifies the destination dblink. Messages in the source queue for recipients at this destination will be propagated. If it is NULL, the destination is the local database and messages will be propagated to other queues in the local database. The length of this field is currently limited to 128 bytes and if the name is not fully qualified the default domain name is used.  

DBMS_AQADM.VERIFY_QUEUE_TYPES

Purpose:

Verify that the source and destination queues have identical types. The result of the verification is stored in sys.aq$_message_types tables, overwriting all previous output of this command.

Syntax:
DBMS_AQADM.SCHEDULE_PROPAGATION(

src_queue_name    IN    VARCHAR2,
dest_queue_name   IN    VARCHAR2,
destination       IN    VARCHAR2 default NULL
rc                OUT   BINARY_INTEGER)

Usage:
Table 11-20 DBMS_AQADM.SCHEDULE_PROPAGATION
Parameter   Description  

src_queue_name

(IN VARCHAR2)  

Specifies the name of the source queue whose messages are to be propagated, including the schema name. If the schema name is not specified, it defaults to the schema name of the user.  

dest_queue_name

(IN VARCHAR2)  

Specifies the name of the destination queue where messages are to be propagated, including the schema name. If the schema name is not specified, it defaults to the schema name of the user.  

destination

(IN VARCHAR2)  

Specifies the destination dblink. the destination queue name is in the database that is specified by the dblink. If the destination is NULL, the destination queue is the same database as the source queue. The length of this field is currently limited to 128 bytes and if the name is not fully qualified the default domain name is used.  

rc

(OUT BINARY_INTEGER)  

Return code for the result of the procedure. If there is no error and if the source and destination queue types match the result is 1, if they do not match the result is 0. If an Oracle error is encountered it is returned in rc.  

Enumerated Constants in the Administrative Interface

When using enumerated constants such as BROWSE, LOCKED, REMOVE, the symbol needs to be specified with the scope of the packages defining it. All types associated with the administrative interfaces have to be prepended with dbms_aqadm. For example:

dbms_aqadm.NORMAL_QUEUE

Table 11-21 Enumerated types in the administrative interface
Parameter   Options  

retention  

INFINITE  

message_grouping  

TRANSACTIONAL, NONE  

queue_type  

NORMAL_QUEUE, EXCEPTION_QUEUE  

Database Objects

Queue table view

This is a view of the queue table in which message data is stored. This view is automatically created with each queue table and is called aq$<queue_table_name>. This view should be used for querying the queue data. The dequeue history data (time, user identification and transaction identification) is only valid for single consumer queues. For dequeue history of messages in a multiple consumer queue please refer to a following section.

The administrator can use any SQL statement or SQL tool to analyze and review the content of a queue or queue table. SQL provides full access to the message metadata and/or payload. Use ENQ_TXN_ID and DEQ_TXN_ID to correlate transactions. If the ENQ_TXN_ID of message m2 is the same as the DEQ_TXN_ID of m1, m2 is created in the transaction that consumed m1. (You may use CONNECT BY in your SQL statements to identify related messages). Remove retained messages that are not automatically removed by AQ. Do not update or modify messages since this may destroy the consistency of the queue metadata. Before you use SQL to correct any error in AQ, please contact the Oracle service representative.

Table 11-22 Queue Table View
Column Name & Description   Null?   Type  

QUEUE - queue name  

 

VARCHAR2(30)  

MSG_ID - unique identifier of the message  

 

RAW(16)  

CORR_ID - user-provided correlation identifier  

 

VARCHAR2(128)  

MSG_PRIORITY - message priority  

 

NUMBER  

MSG_STATE - state of this message  

 

VARCHAR2(9)  

DELAY - number of seconds the message is delayed  

 

DATE  

EXPIRATION - number of seconds in which the message will expire after being READY  

 

NUMBER  

ENQ_TIME - enqueue time  

 

DATE  

ENQ_USER_ID - enqueue user id  

 

NUMBER  

ENQ_TXN_ID - enqueue transaction id  

NOT NULL  

VARCHAR2(30)  

DEQ_TIME - dequeue time  

 

DATE  

DEQ_USER_ID - dequeue user id  

 

NUMBER  

DEQ_TXN_ID - dequeue transaction id  

 

VARCHAR2(30)  

RETRY_COUNT - number of retries  

 

NUMBER  

EXCEPTION_QUEUE_OWNER - exception queue schema  

 

VARCHAR2(30)  

EXCEPTION_QUEUE - exception queue name  

 

VARCHAR2(30)  

USER_DATA - user data  

 

BLOB  


DBA_QUEUE_TABLES

This view describes the names and types of all queue tables created in the database.

Table 11-23 DBA_QUEUE_TABLES
Column Name & Description   Null?   Type  

OWNER - queue table schema  

 

VARCHAR2(30)  

QUEUE_TABLE - queue table name  

 

VARCHAR2(30)  

TYPE - payload type  

 

VARCHAR2(7)  

OBJECT_TYPE - name of object type, if any  

 

VARCHAR2(61)  

SORT_ORDER - user specified sort order  

 

VARCHAR2(22)  

RECIPIENTS - SINGLE or MULTIPLE  

 

VARCHAR2(8)  

MESSAGE_GROUPING - NONE or TRANSACTIONAL  

 

VARCHAR2(13)  

USER_COMMENT - user comment for the queue table  

 

VARCHAR2(50)  


USER_QUEUE_TABLES

This view is the same as DBA_QUEUES_TABLES with the exception that it only shows queue tables in the user's schema. It does not contain a column for OWNER.


DBA_QUEUES

Users can specify operational characteristics for individual queues. DBA_QUEUES contains the view which contains relevant information for every queue in a database.

Table 11-24 DBA_QUEUES
Column Name & Description   Null?   Type  

OWNER - queue schema name  

NOT NULL  

VARCHAR2(30)  

NAME - queue name  

NOT NULL  

VARCHAR2(30)  

QUEUE_TABLE - queue table where this queue resides  

NOT NULL  

VARCHAR2(30)  

QID - unique queue identifier  

NOT NULL  

NUMBER  

QUEUE_TYPE - queue type  

 

VARCHAR2(15)  

MAX_RETRIES - number of dequeue attempts allowed  

 

NUMBER  

RETRY_DELAY - number of seconds before retry can be attempted  

 

NUMBER  

ENQUEUE_ENABLED - YES/NO  

 

VARCHAR2(7)  

DEQUEUE_ENABLED - YES/NO  

 

VARCHAR2(7)  

RETENTION - number of seconds message is retained after dequeue  

 

VARCHAR2(40)  

USER_COMMENT - user comment for the queue  

 

VARCHAR2(50)  

USER_QUEUES

This view is the same as DBA_QUEUES with the exception that it only shows queues in the user's schema. It does not contain a column for OWNER.

DBMS_AQADM.QUEUE_SUBSCRIBERS

Purpose:

To get a list of subscribers for a queue.

Syntax:

DBMS_AQADM.QUEUE_SUBSCRIBERS(

queue_name    IN     VARCHAR2)
RETURN aq$_subscriber_list_t

Usage:

The function returns a PL/SQL table of aq$_agent. This can be used to get the list of all subscribers for a queue.

Example:

DECLARE

subs            dbms_aqadm.aq$_subscriber_list_t; 
nsubs           BINARY_INTEGER; 
i               BINARY_INTEGER; 
BEGIN
subs  := dbms_aqadm.queue_subscribers('Q1DEF'); 
nsubs := subs.COUNT; 
FOR i IN 0..nsubs-1 LOOP 

dbms_output.put_line(subs(i).name); 
END LOOP;
END;

/

DBA_QUEUE_SCHEDULES

Purpose:
This view describes the current schedules for propagating messages.
Table 11-25 DBA_QUEUE_SCHEDULES
Column Name & Description   Null?   Type  

SCHEMA

schema name for the source queue  

NOT NULL  

VARCHAR2(30)  

QNAME

source queue name  

NOT NULL  

VARCHAR2(30)  

DESTINATION

destination name, currently limited to be a DBLINK name  

NOT NULL  

VARCHAR2(128)  

START_DATE

date to start propagation in the default date format  

 

DATE  

START_TIME

time of day at which to start propagation in HH:MI:SS format  

 

VARCHAR2(8)  

PROPAGATION_WINDOW

duration in seconds for the propagation window  

 

NUMBER  

NEXT_TIME

function to compute the start of the next propagation window  

 

VARCHAR2(128)  

LATENCY

maximum wait time to propagate a message during the propagation window.  

 

NUMBER  

Recipients and dequeue history of multiple consumer messages

The queue table view provides the dequeue history for single consumer queue messages. To query the list of recipients or the dequeue history of a message in a multiple-consumer queue you need to execute a SQL query on the queue table for the message of interest.

For example, to view the dequeue history of the message with msgid

`105E7A2EBFF11348E03400400B40F149' in queue table sys.queue_tab the following query must be executed. The query will return one row per consumer of the message.

SELECT consumer, transaction_id, deq_time, deq_user 
FROM THE(select cast(history as sys.aq$_dequeue_history_t) 

FROM sys.queue_tab 
WHERE msgid='105E7A2EBFF11348E03400400B40F149'); 

Error Messages

The error messages for AQ are reported in two ranges:

24000 - 24099

25200 - 25299

Administration Topics

Performance

Queues are stored in database tables. The performance characteristics of queue operations are very similar to the underlying database operations.

Table and index structures

To understand the performance characteristics of queues it is important to understand the tables and index layout for AQ objects.

Creating a queue table creates a database table with approximately 25 columns. These columns store the AQ meta data and the user defined payload. The payload can be of an object type or RAW. The AQ meta data contains object types and scaler types. A view and two indexes are created on the queue table. The view allows users to query the message data. The indexes are used to accelerate access to message data. Please refer to the create queue table command for a detailed description of the objects created.

Throughput

The code path of an enqueue operation is comparable to an insert into a multi-column table with two indexes. The code path of a dequeue operation is comparable to a select and delete operation on a similar table. These operations are performed using PL/SQL functions.

Availability

Oracle Parallel Server (OPS) can be used to ensure highly available access to queue data. Queues are implemented using database tables. The tail and the head of a queue can be extreme hot spots. Since OPS does not scale well in the presence of hot spots it is recommended to limit normal access to a queue from one instance only. In case of an instance failure messages managed by the failed instance can be processed immediately by one of the surviving instances.

Scalability

Queue operation scalability is similar to the underlying database operation scalability. If a dequeue operation with wait option is issued in a Multi-Threaded Server (MTS) environment the shared server process will be dedicated to the dequeue operation for the duration of the call including the wait time. The presence of many such processes could cause severe performance and availability problems and could result in deadlocking the shared server processes. For this reason it is recommended that dequeue requests with wait option be only issued via dedicated server processes. This restriction is not enforced.

Optimizing Propagation

In setting the number of JOB_QUEUE_PROCESSES, the DBA should aware that this need is determined by the number of queues from which the messages have to be propagated and the number of destinations (rather than queues) to which messages have to be propagated.

Reliability and Recoverability

The standard database reliability and recoverability characteristics apply to queue data.

Enterprise Manager Support

Enterprise manager supports GUIs for some of the administrative functions listed in the administrative interfaces section.

These include:

  1. Queues as part of schema manager to view properties.
  2. Start and stop queue.
  3. Schedule and unschedule propagation.
  4. Add and remove subscriber.
  5. View the current propagation schedule.

Importing and Exporting Queue Data

Queues are implemented on tables. The import/export of queues constitutes the import/export of the underlying queue tables and related dictionary tables. Import and export of queues can only be done at queue table granularity.

When a queue table is exported, both the table definition information and the queue data are exported. When a queue table is imported, export action procedures will maintain the queue dictionary. Because the queue table data is also exported, the user is responsible for maintaining application-level data integrity when queue table data are being transported.

Importing queue data into a queue table with existing data is not recommended. During a table mode import, if the queue table already exists at the import site the old queue table definition, and the old queue definition will be dropped and recreated. Hence, queue table and queue definitions prior to the import will be lost.

Performing EXPORTS and IMPORTS of queue tables with multiple recipients

For every queue table that supports multiple recipients, there is a index-organized table (IOT) that contains important queue metadata. This metadata is essential to the operations of the queue, so the user must export and import this IOT as well as the queue table for the queues in this table to work after import. When the schema containing the queue table is exported, the IOT is also automatically exported. The behavior is similar at import time. Because the metadata table contains rowids of some rows in the queue table, import will issue a note about the rowids being obsolete when importing the metadata table. This message can be ignored, as the queueing system will automatically correct the obsolete rowids as a part of the import process. However, if another problem is encountered while doing the import (such as running out of rollback segment space), the problem should be corrected and the import should be rerun.

Troubleshooting

This section describes some troubleshooting tips to diagnose problems with message propagation.

Message history

AQ updates the message history when a message has been successfully propagated to a destination. The message history is stored as a collection in the queue table. An administrator can execute a SQL query to determine if a message has been propagated. For example, to check if a message with msgid

105E7A2EBFF11348E03400400B40F149'

in queue table aqadmn.queue_tab has been propagated to destination 'boston', the following query can be executed:


SELECT consumer, transaction_id, deq_time, deq_user, propagated_msgid 
   FROM THE(select cast(history as sys.aq$_dequeue_history_t)  
   FROM adadmn.queue_tab 
      WHERE msgid='105E7A2EBFF11348E03400400B40F149') 
      WHERE consumer LIKE '%BOSTON%'; 

A non-NULL transaction_id indicates that the message was successfully propagated. Further, the deq_time indicates the time of propagation, the deq_user indicates the userid used for propagation, and the propagated_msgid indicates the msgid of the message that was enqueued at the destination. If the message with the msgid cannot be found in the queue table, an administrator can check the exception queue (if the exception queue is in a different queue table) for the message history.

Propagation Schedules

The administrator can check the DBA_QUEUE_SCHEDULES view to check if propagation has been scheduled for a particular combination of source queue and destination. If propagation has been scheduled, the jobno of the job used to propagate messages can be determined from the sys.aq$_schedules table. The jobno can then be used to query the DBA_JOBS view to determine the last time that the propagation was scheduled for the combination of source queue and destination. The DBA_JOBS view also indicates the next time the propagation will be scheduled, and if the job has been marked as broken. If the job has been marked as broken, check for errors in trace file(s) generated by the job_queue processes in the $ORACLE_HOME/log directory.

Database link

There are a number of points at which the propagation may break down:

Type checking

AQ will not propagate messages from one queue to another if the payload-types of the two queues are not equivalent. An administrator can verify if the source and destination's payload types match by executing the DBMS_AQADM.VERIFY_QUEUE_TYPES procedure. The results of the type checking will be stored in the sys.aq$_message_types table. This table can be accessed using the OID of the source queue and the address of the destination queue (i.e. [schema.]queue_name[@destination]).

Dynamic Statistics Views

As you can see, the GV$ view and V$ view are exactly the same:

Table 11-26 GV$AQ
Column Name   Type  

QID  

NUMBER  

WAITING  

NUMBER  

READY  

NUMBER  

EXPIRED  

NUMBER  

TOTAL_WAIT  

NUMBER  

AVERAGE_WAIT  

NUMBER  

Table 11-27 V$AQ
Column Name   Type  

QID  

NUMBER  

WAITING  

NUMBER  

READY  

NUMBER  

EXPIRED  

NUMBER  

TOTAL_WAIT  

NUMBER  

AVERAGE_WAIT  

NUMBER  

Column Name   Explanation  

QID  

the identity of the queue. This is the same as the qid in user_queues and dba_queues.  

WAITING  

the number of messages in the state 'WAITING'.  

READY  

the number of messages in state 'READY'.  

EXPIRED  

the number of messages in state 'EXPIRED'.  

TOTAL_WAIT  

the number of seconds for which messages in the queue have been waiting in state 'READY'  

AVERAGE_WAIT  

the average number of seconds a message in state 'READY' has been waiting to be dequeued.  

The difference between these two views is that the GV$ view gives information about the number of messages in different states for the whole database while the V$ view gives information regarding specific instances. The way this works is that each instance keeps its own AQ statistics information in its own SGA, and does not have knowledge of the statistics gathered by other instances. Then, when a GV$AQ view is queried by an instance, all other instances funnel their AQ statistics information to the instance issuing the query.


Note:

If you need to associate a specific queue or queues with a specific instance, you will have to enforce this at the application level.

 

Reference to Demos

The following demos may be found in the related directories:

$ORACLE_HOME/demo/aqdemo00.sql   Main driver of demo 
$ORACLE_HOME/demo/aqdemo01.sql   Create queue tables and queues using  
                                 AQ administration interface 
$ORACLE_HOME/demo/aqdemo02.sql   Load the demo package 
$ORACLE_HOME/demo/aqdemo03.sql     Submit the event handler as a job to Job
                                 Queue 
$ORACLE_HOME/demo/aqdemo04.sql   Enqueue messages 

Compatibility & Upgrade

The operational interface in Orcacle AQ 8.0.4 is backward compatible with the 8.0.3 Oracle AQ interface.

New Fields Enabled for the AQ$_AGENT Data Type

In the latest release, the address field is now enabled for the aq$_agent datatype. Consequently, it is now possible for this field to be specified wherever an interface takes an Agent as an argument - such as in the recipient list of the message properties, and the DBMS_AQADM.ADD_SUBSCRIBER administrative interface.

The Extended Address Field

The address field in the aq$_agent datatype has been extended to 1024 bytes. To use the extended address field, you will have to complete the following steps:

  1. Save the contents of the all existing queues using the Export Utility.
  2. Run CATNOQUEUE.SQL to drop the existing dictionary and queue tables:

    SVRMGRL> @CATNOQUEUE.SQL

  3. Run CATQUEUE.SQL to redefine the new types and dictionary tables:

    SVRMGRL> @CATQUEUE.SQL

  4. Import the queues you exported using the Import Utility.


    Note:

    If your application does not require you to extend the address field, you need not complete these steps. In that case there will be no need to run the scripts and to perform export and import operations.

     

New Dictionary Tables



Prev

Next
Oracle
Copyright © 1997 Oracle Corporation.

All Rights Reserved.

Library

Product

Contents

Index