Oracle8i Application Developer's Guide - Advanced Queuing
Release 8.1.5

A68005-01

Library

Product

Contents

Index

Prev Next

2
Implementing AQ -- A Sample Application

In Chapter 1 we described a messaging system for an imaginary company, BooksOnLine. In this chapter we consider the features of AQ in the context of a sample application based on that scenario.

A Sample Application

The operations of a large bookseller, BooksOnLine, are based on an online book ordering system which automates activities across the various departments involved in the entire sale process.The front end of the system is an order entry application which is used to enter new orders.These incoming orders are processed by an order processing application which validates and records the order. Shipping departments located at regional warehouses are then responsible for ensuring that these orders are shipped in a timely fashion. There are three regional warehouses: one serving the East Region, one serving the West Region, and a third warehouse for shipping International orders. Once an order has been shipped, the order information is routed to a central billing department which handles payment processing.The customer service department, located at its own site, is responsible for maintaining order status and handling inquiries about orders.

In Chapter 1 we outlined a messaging system for an imaginary company, BooksOnLine. In this chapter we consider the features of AQ in the context of a sample application based on that scenario. This sample application has been devised for the sole purpose of demonstrating the features of Oracle AQ. Our aim in creating this integrated scenario is to make it easier to grasp the possibilities of this technology by locating our explanations within a single context. We have also provided the complete script for the code as an appendix (see Appendix A, "Scripts for Implementing 'BooksOnLine'"). However, please keep in mind that is not possible within the scope of a single relatively small code sample to demonstrate every possible application of AQ.

General Features

System Level Access Control

Oracle 8i supports system level access control for all queueing operations. This feature allows application designer or DBA to create users as queue administrators. A queue administrator can invoke all AQ interface (both administration and operation) on any queue in the database. This simplify the administrative work as all administrative scripts for the queues in a database can be managed under one schema for more information, see "Security" in Chapter 3, "Managing Oracle AQ").

Example Scenario and Code

In the BooksOnLine application, the DBA creates BOLADM, the BooksOnLine Administrator account, as the queue administrator of the database. This allows BOLADM to create, drop, manage, and monitor any queues in the database. If you decide to create PL/SQL packages in the BOLADM schema that can be used by any applications to enqueue or dequeue, then you should also grant BOLADM the ENQUEUE_ANY and DEQUEUE_ANY system privilege.

CREATE USER BOLADM IDENTIFIED BY BOLADM; 
GRANT CONNECT, RESOURCE, aq_administrator_role TO BOLADM; 
GRANT EXECUTE ON dbms_aq TO BOLADM; 
GRANT EXECUTE ON dbms_aqadm TO BOLADM; 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','BOLADM',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','BOLADM',FALSE); 
 

In the application, AQ propagators populate messages from the OE (Order Entry) schema to WS (Western Sales), ES (Eastern Sales) and OS (Worldwide Sales) schemas. WS, ES and OS schemas in turn populates messages to CB (Customer Billing) and CS (Customer Service) schemas. Hence the OE, WS, ES and OS schemas all host queues that serve as the source queues for the propagators.

When messages arrive at the destination queues, sessions based on the source queue schema name are used for enqueuing the newly arrived messages into the destination queues. This means that you need to grant schemas of the source queues enqueue privileges to the destination queues.

To simplify administration, all schemas that host a source queue in the BooksOnLine application are granted the ENQUEUE_ANY system privilege.

EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','OE',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','WS',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','ES',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','OS',FALSE);  
 

To propagate to a remote destination queue, the login user specified in the database link in the address field of the agent structure should either be granted the 'ENQUEUE ANY QUEUE' privilege, or be granted the rights to enqueue to the destination queue. However, you do not need to grant any explicit privileges if the login user in the database link also owns the queue tables at the destination.

Structured Payload

Oracle AQ lets you use object types to structure and manage the payload of messages. Object Relational Database Systems (ORDBMSs) generally have a richer type system than messaging systems. The object-relational capabilities of Oracle 8i provide a rich set of data types that range from traditional relational data types to user-defined types (see "Enqueuing and Dequeuing Object Type Messages That Contain LOB Attributes Using PL/SQL" inChapter 8, "Oracle Advanced Queuing by Example").

Many powerful features are enabled as a result of having strongly typed content i.e., content whose format is defined by an external type system. These features include;

Example Scenario and Code

The BooksOnLine application uses a rich set of data types to model book orders as message content.

Queue Level Access Control

Oracle 8i supports queue level access control for enqueue and dequeue operations. This feature allows the application designer to protect queues created in one schema from applications running in other schemas. You need to grant only minimal access privileges to the applications that run outside the queue's schema. The supported access privileges on a queue are ENQUEUE, DEQUEUE and ALL for more information, see "Security" in Chapter 3, "Managing Oracle AQ").

Example Scenario

The BooksOnLine application processes customer billings in its CB and CBADM schemas. CB (Customer Billing) schema hosts the customer billing application, and the CBADM schema hosts all related billing data stored as queue tables.

To protect the billing data, the billing application and the billing data reside in different schemas. The billing application is allowed only to dequeue messages from CBADM_shippedorders_que, the shipped order queue. It processes the messages, and them enqueues new messages into CBADM_billedorders_que, the billed order queue.

To protect the queues from other illegal operations from the application, the following two grant calls are made:

Example Code

/* Grant dequeue privilege on the shopped orders queue to the Customer 
   Billing application. The CB application retrieves orders that are shipped but 
   not billed from the shipped orders queue. */  
EXECUTE dbms_aqadm.grant_queue_privilege(
   'DEQUEUE','CBADM_shippedorders_que', 'CB', FALSE); 
 
/* Grant enqueue privilege on the billed orders queue to Customer Billing 
   application.The CB application is allowed to put billed orders into this 
   queue after processing the orders. */ 
 
EXECUTE dbms_aqadm.grant_queue_privilege(
   'ENQUEUE', 'CBADM_billedorders_que', 'CB', FALSE); 
 

Non-Persistent Queues

Messages in a non-persistent queues are not persistent in that hey are not stored in database tables.

You create a non-persistent RAW queue which can be of either single-consumer or multi-consumer type. These queues are created in a system created queue-table (AQ$_MEM_SC for single-consumer queues and AQ$_MEM_MC for multi-consumer queues) in the schema specified by the create_np_queue command. Subscribers can be added to the multi-consumer queues (see "Create a Non-Persistent Queue" in Chapter 2, "Implementing AQ -- A Sample Application"). Non-persistent queues can be destinations for propagation.

You use the enqueue interface to enqueue messages into a non-persistent queue in the normal way. You retrieve messages from a non-persistent queue through the asynchronous notification mechanism, registering for the notification (using OCISubcriptionRegister) for those queues in which you are interested (see "Register for Notification" in Chapter 6, "Operational Interface: Basic Operations").

When a message is enqueued into a queue, it is delivered to the clients that have active registrations for the queue. The messages are then published to the interested clients without incurring the overhead of storing them in the database.


For more information see:

 

Example Scenario

Assume that there are three application processes servicing user requests at the ORDER ENTRY system. The connection dispatcher process, which shares out the connection requests among the application processes, would like to maintain a count of the number of users logged on to the Order Entry system as well as the number of users per application process. The application process are named APP_1, APP_2, APP_3. To simplify things we shall not worry about application process failures.

One way to solve this requirement is to use non-persistent queues. When a user logs-on to the database, the application process enqueues to the multi-consumer non-persistent queue, LOGIN_LOGOUT, with the application name as the consumer name. The same process occurs when a user logs out. To distinguish between the two events, the correlation of the message is 'LOGIN' for logins and 'LOGOUT' for logouts.

The callback function counts the login/logout events per application process. Note that the dispatcher process only needs to connect to the database for registering the subscriptions. The notifications themselves can be received while the process is disconnected from the database.

Example Code

CONNECT oe/oe; 

/* Create the multiconsumer nonpersistent queue in OE schema: */ 
EXECUTE dbms_aqadm.create_np_queue(queue_name         => 'LOGON_LOGOFF', 
                                   multiple_consumers => TRUE);                   
 
/* Enable the queue for enqueue and dequeue: */
EXECUTE dbms_aqadm.start_queue(queue_name => 'LOGON_LOGOFF'); 
 
/* Non Persistent Queue Scenario - procedure to be executed upon logon: */ 
CREATE OR REPLACE PROCEDURE  User_Logon(app_process IN VARCHAR2)  
AS 
  msgprop        dbms_aq.message_properties_t; 
  enqopt         dbms_aq.enqueue_options_t; 
  enq_msgid      RAW(16); 
  payload        RAW(1); 
BEGIN 
  /* visibility must always be immediate for NonPersistent queues */ 
  enqopt.visibility:=dbms_aq.IMMEDIATE; 
  msgprop.correlation:= 'LOGON'; 
  msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); 
  /* payload is NULL */ 
  dbms_aq.enqueue( 
        queue_name         => 'LOGON_LOGOFF', 
        enqueue_options    => enqopt, 
        message_properties => msgprop, 
        payload            => payload, 
        msgid              => enq_msgid);  
 
END; 
/ 
 
/* Non Persistent queue scenario - procedure to be executed upon logoff: */ 
CREATE OR REPLACE PROCEDURE  User_Logoff(app_process IN VARCHAR2) 
AS 
  msgprop        dbms_aq.message_properties_t; 
  enqopt         dbms_aq.enqueue_options_t; 
  enq_msgid      RAW(16); 
  payload        RAW(1); 
BEGIN 
  /* Visibility must always be immediate for NonPersistent queues: */ 
  enqopt.visibility:=dbms_aq.IMMEDIATE; 
  msgprop.correlation:= 'LOGOFF'; 
  msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); 
  /* Payload is NULL: */ 
  dbms_aq.enqueue( 
        queue_name         => 'LOGON_LOGOFF', 
        enqueue_options    => enqopt, 
        message_properties => msgprop, 
        payload            => payload, 
        msgid              => enq_msgid);  
 END; 
/ 
 
  
/* If there is a login at APP1, enqueue a message into 'login_logoff' with 
   correlation 'LOGIN': */ 
EXECUTE User_logon('APP1'); 
 
/* If there is a logout at APP13 enqueue a message into 'login_logoff' with 
   correlation 'LOGOFF': */ 
EXECUTE User_logoff('App3'); 
 
 
/* The OCI program which waits for notifications: */ 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <oci.h> 
#ifdef WIN32COMMON 
#define sleep(x)   Sleep(1000*(x)) 
#endif 
 
/* LOGON / password:  */ 
static text *username = (text *) "OE"; 
static text *password = (text *) "OE"; 
 
/* The correlation strings of messages: */ 
static char  *logon = "LOGON"; 
static char  *logoff = "LOGOFF"; 
 
/* The possible consumer names of queues: */ 
static char *applist[] = {"APP1", "APP2","APP3"}; 
 
static OCIEnv *envhp; 
static OCIServer *srvhp; 
static OCIError *errhp; 
static OCISvcCtx *svchp; 
 
static void checkerr(/*_ OCIError *errhp, sword status _*/); 
 
struct process_statistics 
{ 
  ub4  logon; 
  ub4  logoff; 
}; 
 
typedef struct process_statistics process_statistics; 
 
int main(/*_ int argc, char *argv[] _*/); 
 
 
/* Notify Callback: */ 
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) 
dvoid *ctx; 
OCISubscription *subscrhp; 
dvoid *pay; 
ub4    payl; 
dvoid *desc; 
ub4    mode; 
{ 
 text                *subname;   /* subscription name */ 
 ub4                  lsub;      /* length of subscription name */ 
 text                *queue;     /* queue name */ 
 ub4                 *lqueue;    /* queue name */ 
 text                *consumer;  /* consumer name */ 
 ub4                  lconsumer;   
 text                *correlation; 
 ub4                  lcorrelation; 
 ub4                  size; 
 ub4                  appno; 
 OCIRaw              *msgid;               
 OCIAQMsgProperties  *msgprop;   /* message properties descriptor */ 
 process_statistics   *user_count = (process_statistics *)ctx; 
 
 OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, 
                             (dvoid *)&subname, &lsub, 
                             OCI_ATTR_SUBSCR_NAME, errhp); 
 
 /* Extract the attributes from the AQ descriptor: */ 
 /* Queue name: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size,  
            OCI_ATTR_QUEUE_NAME, errhp); 
   
 /* Consumer name: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &lconsumer,  
            OCI_ATTR_CONSUMER_NAME, errhp); 
 
 /* Message properties: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size,  
            OCI_ATTR_MSG_PROP, errhp); 
 
 /* Get correlation from message properties: */ 
  checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES,  
                             (dvoid *)&correlation, &lcorrelation,  
                             OCI_ATTR_CORRELATION, errhp)); 
   
  if (lconsumer == strlen(applist[0])) 
  { 
    if (!memcmp((dvoid *)consumer, (dvoid *)applist[0], strlen(applist[0]))) 
     appno = 0; 
    else if (!memcmp((dvoid *)consumer, (dvoid *)applist[1], 
strlen(applist[1]))) 
     appno = 1; 
    else if (!memcmp((dvoid *)consumer, (dvoid *)applist[2], 
strlen(applist[2]))) 
     appno = 2; 
    else  
    { 
     printf("Wrong consumer in notification"); 
     return; 
    } 
  } 
  else 
  {  /* consumer name must be "APP1", "APP2" or "APP3"  */ 
    printf("Wrong consumer in notification");   
    return; 
  } 
 
  if (lcorrelation == strlen(logon) &&                   /* logon event */ 
       !memcmp((dvoid *)correlation, (dvoid *)logon, strlen(logon))) 
  { 
     user_count[appno].logon++; 
                           /* increment logon count for the app process */     
         printf("Logon by APP%d \n", (appno+1));  
   } 
  else if  (lcorrelation == strlen(logoff) &&           /* logoff event */ 
       !memcmp((dvoid *)correlation,(dvoid *)logoff, strlen(logoff))) 
  { 
     user_count[appno].logoff++;  
                          /* increment logoff count for the app process */ 
     printf("Logoff by APP%d \n", (appno+1));  
  }  
  else                            /* correlation is "LOGON" or "LOGOFF" */ 
    printf("Wrong correlation in notification");   
 
  printf("Total  : \n"); 
 
  printf("App1 : %d \n", user_count[0].logon-user_count[0].logoff); 
  printf("App2 : %d \n", user_count[1].logon-user_count[1].logoff); 
  printf("App3 : %d \n", user_count[2].logon-user_count[2].logoff); 
 
} 
 
int main(argc, argv) 
int argc; 
char *argv[]; 
{ 
  OCISession *authp = (OCISession *) 0; 
  OCISubscription *subscrhp[3]; 
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; 
  process_statistics  ctx[3] = {{0,0}, {0,0}, {0,0}}; 
  ub4 sleep_time = 0; 
 
  printf("Initializing OCI Process\n"); 
 
  /* Initialize OCI environment with OCI_EVENTS flag set: */ 
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, 
                       (dvoid * (*)(dvoid *, size_t)) 0, 
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0, 
                       (void (*)(dvoid *, dvoid *)) 0 ); 
 
  printf("Initialization successful\n"); 
 
  printf("Initializing OCI Env\n"); 
  (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 
); 
  printf("Initialization successful\n"); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, 
OCI_HTYPE_ERROR,  
                   (size_t) 0, (dvoid **) 0)); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, 
OCI_HTYPE_SERVER, 
                   (size_t) 0, (dvoid **) 0)); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, 
OCI_HTYPE_SVCCTX, 
                   (size_t) 0, (dvoid **) 0)); 
 
  printf("connecting to server\n"); 
  checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias", 
           strlen("inst1_alias"), (ub4) OCI_DEFAULT)); 
  printf("connect successful\n"); 
 
  /* Set attribute server context in the service context: */ 
  checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp,  
                    (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp)); 
 
  checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, 
                       (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0)); 
  
  /* Set username and password in the session handle: */ 
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, 
                  (dvoid *) username, (ub4) strlen((char *)username), 
                  (ub4) OCI_ATTR_USERNAME, errhp)); 
  
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, 
                  (dvoid *) password, (ub4) strlen((char *)password), 
                  (ub4) OCI_ATTR_PASSWORD, errhp)); 
 
  /* Begin session: */ 
  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS,  
                          (ub4) OCI_DEFAULT)); 
 
  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, 
                   (dvoid *) authp, (ub4) 0, 
                   (ub4) OCI_ATTR_SESSION, errhp); 
 
   /* Register for notification: */ 
   printf("allocating subscription handle\n"); 
  subscrhp[0] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
  
  /* For application process APP1: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP1",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP1"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
 printf("allocating subscription handle\n"); 
  subscrhp[1] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
  
  /* For application process APP2: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP2",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP2"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
   printf("allocating subscription handle\n"); 
  subscrhp[2] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
 
  /* For application process APP3: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP3",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP3"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
  printf("Registering fornotifications \n"); 
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 3, errhp,  
                                          OCI_DEFAULT)); 
 
  sleep_time = (ub4)atoi(argv[1]); 
  printf ("waiting for %d s \n", sleep_time); 
  sleep(sleep_time); 
 
  printf("Exiting"); 
  exit(0); 
} 
 
void checkerr(errhp, status) 
OCIError *errhp; 
sword status; 
{ 
  text errbuf[512]; 
  sb4 errcode = 0; 
 
  switch (status) 
  { 
  case OCI_SUCCESS: 
    break; 
  case OCI_SUCCESS_WITH_INFO: 
    (void) printf("Error - OCI_SUCCESS_WITH_INFO\n"); 
    break; 
  case OCI_NEED_DATA: 
    (void) printf("Error - OCI_NEED_DATA\n"); 
    break; 
  case OCI_NO_DATA: 
    (void) printf("Error - OCI_NODATA\n"); 
    break; 
  case OCI_ERROR: 
    (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode, 
                        errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR); 
    (void) printf("Error - %.*s\n", 512, errbuf); 
    break; 
  case OCI_INVALID_HANDLE: 
    (void) printf("Error - OCI_INVALID_HANDLE\n"); 
    break; 
  case OCI_STILL_EXECUTING: 
    (void) printf("Error - OCI_STILL_EXECUTE\n"); 
    break; 
  case OCI_CONTINUE: 
    (void) printf("Error - OCI_CONTINUE\n"); 
    break; 
  default: 
    break; 
  } 
} 
 
/* End of file tkaqdocn.c */ 
 

Retention and Message History

AQ allows users retain messages in the queue-table which means that SQL can then be used to query these message for analysis. Messages often are related to each other. For example, if a message is produced as a result of the consumption of another message, the two are related. As the application designer, you may want to keep track of such relationships. Along with retention and message identifiers, AQ lets you automatically create message journals, also referred to as tracking journals or event journals. Taken together -- retention, message identifiers and SQL queries -- make it possible to build powerful message warehouses.

Example Scenario

Let us suppose that the shipping application needs to determine the average processing times of orders. This includes the time the order has to wait in the backed_order queue. It would also like to find out the average wait time in the backed_order queue. Specifying the retention as TRUE for the shipping queues and specifying the order number in the correlation field of the message, SQL queries can be written to determine the wait time for orders in the shipping application.

For simplicity, we will only analyze orders that have already been processed. The processing time for an order in the shipping application is the difference between the enqueue time in the WS_bookedorders_queue and the enqueue time in the WS_shipped_orders_queue.

Example Code

SELECT  SUM(SO.enq_time - BO.enq_time) / count (*) AVG_PRCS_TIME 
   FROM WS.AQ$WS_orders_pr_mqtab BO , WS.AQ$WS_orders_mqtab SO  
   WHERE SO.msg_state = 'PROCESSED' and BO.msg_state = 'PROCESSED' 
   AND SO.corr_id = BO.corr_id and SO.queue = 'WS_shippedorders_que'; 
 
/* Average waiting time in the backed order queue: */ 
SELECT SUM(BACK.deq_time - BACK.enq_time)/count (*) AVG_BACK_TIME 
   FROM WS.AQ$WS_orders_mqtab BACK  
   WHERE BACK.msg_state = 'PROCESSED' AND BACK.queue = 'WS_backorders_que'; 

Publish/Subscribe Support

Oracle AQ adds various features that allow you to develop an application based on a publish/subscribe model. The aim of this application model is to enable flexible and dynamic communication between applications functioning as publishers and applications playing the role of subscribers. The specific design point is that the applications playing these different roles should be decoupled in their communication, that they should interact based on messages and message content.

In distributing messages publisher applications do not have to explicitly handle or manage message recipients. This allows the dynamic addition of new subscriber applications to receive messages without changing any publisher application logic. Subscriber applications receive messages based on message content without regarding to which publisher applications are sending messages. This allows the dynamic addition of subscriber applications without changing any subscriber application logic. Subscriber applications specify interest by defining a rule-based subscription on message content (payload) and message header properties of a queue. The system automatically routes messages by computing recipients for published messages using the rule-based subscriptions.

You can implement a publish/subscribe model of communication using AQ by taking the following steps:

Example Scenario

The BooksOnLine application illustrates the use of a publish/subscribe model for communicating between applications. For example,

Define queues

The Order Entry application defines a queue (OE_booked_orders_que) to communicate orders that are booked to various applications. The Order Entry application is not aware of the various subscriber applications and thus, a new subscriber application may be added without disrupting any setup or logic in the Order Entry (publisher) application.

Set up Subscriptions

The various shipping applications and the customer service application (i.e., Eastern region shipping, Western region shipping, Overseas shipping and Customer Service) are defined as subscribers to the booked_orders queue of the Order Entry application. Rules are used to route messages of interest to the various subscribers. Thus, Eastern Region shipping, which handles shipment of all orders for the East coast and all rush US orders, would express its subscription rule as follows;

rule  => 'tab.user_data.orderregion = ''EASTERN'' OR 
(tab.user_data.ordertype = ''RUSH'' AND  
tab.user_data.customer.country = ''USA'') ' 
 

Each subscriber can specify a local queue to which messages are to be delivered. The Eastern region shipping application specifies a local queue (ES_booked_orders_que) for message delivery by specifying the subscriber address as follows:

subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); 
 
Set up propagation

Enable propagation from each publisher application queue. To allow subscribed messages to be delivered to remote queues, the Order Entry application enables propagation by means of the following statement:

execute dbms_aqadm.schedule_propagation(queue_name => 'OE.OE_bookedorders_que');  

Publish Messages

Booked orders are published by the Order Entry application when it enqueues orders (into the OE_booked_order_que) that have been validated and are ready for shipping. These messages are then routed to each of the subscribing applications. Messages are delivered to local queues (if specified) at each of the subscriber applications.

Receive Messages

Each of the shipping applications and the Customer Service application will then receive these messages in their local queues. For example, Eastern Region Shipping only receives booked orders that are for East Coast addresses or any US order that is marked RUSH. This application then dequeues messages and processes its orders for shipping.

Support for Oracle Parallel Server (OPS)

The Oracle Parallel Server facility can be used to improve AQ performance by allowing different queues to be managed by different instances. You do this by specifying different instance affinities (preferences) for the queue tables that store the queues. This allows queue operations (enqueue/dequeue) on different queues to occur in parallel.

The AQ queue monitor process continuously monitors the instance affinities of the queue tables. The queue monitor assigns ownership of a queue table to the specified primary instance if it is available, failing which it assigns it to the specified secondary instance. If the owner instance of a queue table ceases to exist at any time, the queue monitor changes the ownership of the queue table to a suitable instance -- the secondary instance or some other available instance if the secondary instance is also unavailable.

AQ propagation is able to make use of OPS although it is completely transparent to the user. The affinities for jobs submitted on behalf of the propagation schedules are set to the same values as that of the affinities of the respective queue tables. Thus a job_queue_process associated with the owner instance of a queue table will be handling the propagation from queues stored in that queue table thereby minimizing 'pinging'. Additional discussion on this topic can be found under AQ propagation scheduling (see "Schedule a Queue Propagation" in Chapter 4, "Administrative Interface: Basic Operations").


For information about Oracle Parallel Server (OPS) see:

 

Example Scenario

In the BooksOnLine example, operations on the new_orders_queue and booked_order_queue at the order entry (OE) site can be made faster if the two queues are associated with different instances. This is done by creating the queues in different queue tables and specifying different affinities for the queue tables in the create_queue_table() command.

In the example, the queue table OE_orders_sqtab stores queue new_orders_queue and the primary and secondary are instances 1 and 2 respectively. For queue table OE_orders_mqtab stores queue booked_order_queue and the primary and secondary are instances 2 and 1 respectively. The objective is to let instances 1 & 2 manage the two queues in parallel. By default, only one instance is available in which case the owner instances of both queue tables will be set to instance 1. However, if OPS is setup correctly and both instances 1 and 2 are available, then queue table OE_orders_sqtab will be owned by instance 1 and the other queue table will be owned by instance 2. The primary and secondary instance specification of a queue table can be changed dynamically using the alter_queue_table() command as shown in the example below. Information about the primary, secondary and owner instance of a queue table can be obtained by querying the view USER_QUEUE_TABLES (see "Select Queue Tables in User Schema" in "Administrative Interface: Views").

Example Code

/* Create queue tables, queues for OE  */
CONNECT OE/OE; 
EXECUTE dbms_aqadm.create_queue_table( \
        queue_table        => 'OE_orders_sqtab',\
        comment            => 'Order Entry Single-Consumer Orders queue table',\
        queue_payload_type => 'BOLADM.order_typ',\
        compatible         => '8.1',\
        primary_instance   => 1,\
        secondary_instance => 2);
  
EXECUTE dbms_aqadm.create_queue_table(\
        queue_table        => 'OE_orders_mqtab',\
        comment            => 'Order Entry Multi Consumer Orders queue table',\
        multiple_consumers => TRUE,\
        queue_payload_type => 'BOLADM.order_typ',\
        compatible         => '8.1',\
        primary_instance   => 2,\
        secondary_instance => 1); 
  
EXECUTE dbms_aqadm.create_queue ( \
        queue_name         => 'OE_neworders_que',\
        queue_table        => 'OE_orders_sqtab'); 
  
EXECUTE dbms_aqadm.create_queue ( \
        queue_name         => 'OE_bookedorders_que',\
        queue_table        => 'OE_orders_mqtab'); 
  
/* Check instance affinity of OE queue tables from AQ administrative view: */ 
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 
  
/* Alter instance affinity of OE queue tables: */ 
EXECUTE dbms_aqadm.alter_queue_table( \
        queue_table        => 'OE.OE_orders_sqtab',\
        primary_instance   => 2,\
        secondary_instance => 1); 
  
EXECUTE dbms_aqadm.alter_queue_table(  \
        queue_table        => 'OE.OE_orders_mqtab', \
        primary_instance   => 1,\
        secondary_instance => 2); 
  
/* Check instance affinity of OE queue tables from AQ administrative view: */
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 

Support for Statistics Views

Each instance keeps its own AQ statistics information in its own SGA, and does not have knowledge of the statistics gathered by other instances. Then, when a GV$AQ view is queried by an instance, all other instances funnel their AQ statistics information to the instance issuing the query.

Example Scenario

The gv$ view can be queried at any time to see the number of messages in waiting, ready or expired state. The view also displays the average number of seconds for which messages have been waiting to be processed. The order processing application can use this to dynamically tune the number of order processing processes (see "Select the Number of Messages in Different States for the Whole Database" in Chapter 5, "Administrative Interface: Views").

Example Code

CONNECT oe/oe 
 
/* Count the number as messages and the average time for which the messages have 
   been waiting: */ 
SELECT READY, AVERAGE_WAIT FROM gv$aq Stats, user_queues Qs 
  WHERE Stats.qid = Qs.qid and Qs.Name = 'OE_neworders_que'; 

ENQUEUE Features

Subscriptions and Recipient Lists

In a single-consumer queue a message can be processed once by only one consumer. What happens when there are multiple processes or operating system threads concurrently dequeuing from the same queue? Given that a locked message cannot be dequeued by a process other than the one which has created the lock, each process will dequeue the first unlocked message that is at the head of the queue. After processing, the message is removed if the retention_time of the queue is 0, or retained for the specified retention time. While the message is retained the message can be either queried using SQL on the queue table view or by dequeuing using the BROWSE mode and specifying the message ID of the processed message.

AQ allows a single message to be processed/consumed by more than one consumer. To use this feature, you must create multi-consumer queues and enqueue the messages into these multi-consumer queues. AQ allows two methods of identifying the list of consumers for a message: subscriptions and recipient lists.

Subscriptions

You can add a subscription to a queue by using the DBMS_AQADM.ADD_SUBSCRIBER PL/SQL procedure (see "Add a Subscriber" in Chapter 4, "Administrative Interface: Basic Operations"). This lets you specify a consumer by means of the AQ$_AGENT parameter for enqueued messages. You can add more subscribers by repeatedly using the DBMS_AQADM.ADD_SUBSCRIBER procedure up to a maximum of 1024 subscribers for a multi-consumer queue. (Note that you are limited to 32 subscriber for multi-consumer queue created using Oracle 8.0.3.)

All consumers that are added as subscribers to a multi-consumer queue must have unique values for the AQ$_AGENT parameter. This means that two subscribers cannot have the same values for the NAME, ADDRESS and PROTOCOL attributes for the AQ$_AGENT type. At least one of the three attributes must be different for two subscribers (see "Agent" in Chapter 3, "Managing Oracle AQ" for formal description of this data structure).

you cannot add subscriptions to single-consumer queues or exception queues. A consumer that is added as a subscriber to a queue will only be able to dequeue messages that are enqueued after the DBMS_AQADM.ADD_SUBSCRIBER procedure is completed. In other words, messages that had been enqueued before this procedure is executed will not be available for dequeue by this consumer.

You can remove a subscription by using the DBMS_AQADM.REMOVE_SUBSCRIBER procedure (see "Remove a Subscriber" in Chapter 4, "Administrative Interface: Basic Operations"). AQ will automatically remove from the queue all metadata corresponding to the consumer identified by the AQ$_AGENT parameter. In other words, it is not an error to execute the REMOVE_SUBSCRIBER procedure even when there are pending messages that are available for dequeue by the consumer. These messages will be automatically made unavailable for dequeue after the REMOVE_SUBSCRIBER procedure is executed. In a queue table that is created with the compatible parameter set to '8.1' or higher, such messages that were not dequeued by the consumer will be shown as "UNDELIVERABLE" in the AQ$<queue_table> view. Note that a multi-consumer queue table created without the compatible parameter, or with the compatible parameter set to '8.0', does not display the state of a message on a consumer basis, but only displays the global state of the message.

Recipient Lists

You do not need to specify subscriptions for a multi-consumer queue provided that producers of messages for enqueue supply a recipient list of consumers. In some situations it may be desirable to enqueue a message that is targeted to a specific set of consumers rather than the default list of subscribers. You accomplish this by specifying a recipient list at the time of enqueuing the message.

If a recipient list is specified during enqueue, it overrides the subscription list. In other words, messages that have a specified recipient list will not be available for dequeue by the subscribers of the queue. The consumers specified in the recipient list may or may not be subscribers for the queue. It is an error if the queue does not have any subscribers and the enqueue does not specify a recipient list (see "Enqueue a Message" in Chapter 6, "Operational Interface: Basic Operations").

Priority and Ordering of Messages

The message ordering dictates the order in which messages will be dequeued from a queue. The ordering method for a queue is specified when a queue table is created (see "Create a Queue Table" in Chapter 4, "Administrative Interface: Basic Operations"). Currently, AQ supports two types of message ordering:

Example Scenario

In the BooksOnLine application, a customer can request

The Order Entry application uses a FIFO-priority queue to store booked orders. Booked orders are propagated to the regional booked orders queues. At each region, orders in these regional booked orders queues are processed in the order of the shipping priorities.

The following calls create the FIFO-priority queues for the Order Entry application.

Example Code

/* Create a priority queue table for OE: */ 
EXECUTE dbms_aqadm.create_queue_table( \                        
   queue_table         => 'OE_orders_pr_mqtab', \          
   sort_list           =>'priority,enq_time', \ 
   comment             => 'Order Entry Priority  \
                          MultiConsumer Orders queue table',\ 
   multiple_consumers  => TRUE, \                           
   queue_payload_type  => 'BOLADM.order_typ', \                      
   compatible          => '8.1', \                                  
   primary_instance    => 2, \                                       
   secondary_instance  => 1); 
 
EXECUTE dbms_aqadm.create_queue ( \                                
   queue_name          => 'OE_bookedorders_que', \       
   queue_table         => 'OE_orders_pr_mqtab'); 
 
/* When an order arrives, the order entry application can use the following 
   procedure to enqueue the order into its booked orders queue. A shipping 
   priority is specified for each order: */
CREATE OR REPLACE procedure order_enq(book_title        IN VARCHAR2, 
                                      book_qty          IN NUMBER, 
                                      order_num         IN NUMBER, 
                                      shipping_priority IN NUMBER, 
                                      cust_state        IN VARCHAR2, 
                                      cust_country      IN VARCHAR2, 
                                      cust_region       IN VARCHAR2, 
                                      cust_ord_typ      IN VARCHAR2) AS 
 
OE_enq_order_data        BOLADM.order_typ; 
OE_enq_cust_data         BOLADM.customer_typ; 
OE_enq_book_data         BOLADM.book_typ; 
OE_enq_item_data         BOLADM.orderitem_typ; 
OE_enq_item_list         BOLADM.orderitemlist_vartyp; 
enqopt                   dbms_aq.enqueue_options_t; 
msgprop                  dbms_aq.message_properties_t; 
enq_msgid                RAW(16); 
  
BEGIN 
   msgprop.correlation := cust_ord_typ; 
   OE_enq_cust_data    := BOLADM.customer_typ(NULL, NULL, NULL, NULL, 
                                cust_state, NULL, cust_country); 
   OE_enq_book_data    := BOLADM.book_typ(book_title, NULL, NULL, NULL); 
   OE_enq_item_data    := BOLADM.orderitem_typ(book_qty,  
                                OE_enq_book_data, NULL); 
   OE_enq_item_list    := BOLADM.orderitemlist_vartyp( 
                                BOLADM.orderitem_typ(book_qty,  
                                OE_enq_book_data, NULL)); 
   OE_enq_order_data   := BOLADM.order_typ(order_num, NULL,  
                                cust_ord_typ, cust_region, 
                                OE_enq_cust_data, NULL,  
                                OE_enq_item_list, NULL); 
 
   /*Put the shipping priority into message property before enqueueing 
     the message: */
   msgprop.priority    := shipping_priority; 
   dbms_aq.enqueue('OE.OE_bookedorders_que', enqopt, msgprop,  
                        OE_enq_order_data, enq_msgid); 
        COMMIT; 
  END; 
  / 
 
 
/* At each region, similar booked order queues are created. The orders are    
   propagated from the central Order Entry's booked order queues to the regional 
   booked order queues.For example, at the western region, the booked orders   
   queue is created. 
   Create a priority queue table for WS shipping: */
EXECUTE dbms_aqadm.create_queue_table( \                       
   queue_table        =>  'WS_orders_pr_mqtab',           
   sort_list          =>'  priority,enq_time',  \
   comment            =>  'West Shipping Priority  \
                           MultiConsumer Orders queue table',\ 
   multiple_consumers => TRUE, \                           
   queue_payload_type => 'BOLADM.order_typ', \                     
   compatible         => '8.1'); 
 
/* Booked orders are stored in the priority queue table: */ 
EXECUTE dbms_aqadm.create_queue ( \                                 
   queue_name         => 'WS_bookedorders_que', \        
   queue_table        => 'WS_orders_pr_mqtab'); 
 
/* At each region, the shipping application dequeues orders from the regional 
   booked order queue according to the orders' shipping priorities, processes 
   the orders, and enqueues the processed orders into the shipped orders queues 
   or the back orders queues. */

Time Specification: Delay

Messages can be enqueued to a queue with a delay. The delay represents a time interval after which the message becomes available for dequeuing. A message specified with a delay is in a waiting state until the delay expires and the message becomes available. Note that delay processing requires the queue monitor to be started. Note also that dequeuing by msgid overrides the delay specification.

Example Scenario

In the BooksOnLine application, delay can be used to implement deferred billing. A billing application can define a queue in which shipped orders that are not billed immediately can be placed in a deferred billing queue with a delay. For example, a certain class of customer accounts, such as those of corporate customers, may not be billed for 15 days. The billing application dequeues incoming shipped order messages (from the shippedorders queue) and if the order is for a corporate customer, this order is enqueued into a deferred billing queue with a delay.

Example Code

/* Enqueue an order to implement deferred billing so that the order is not made 
   visible again until delay has expired: */
CREATE OR REPLACE PROCEDURE defer_billing(deferred_billing_order order_typ) 
AS 
  defer_bill_queue_name    VARCHAR2(62); 
  enqopt                   dbms_aq.enqueue_options_t; 
  msgprop                  dbms_aq.message_properties_t; 
  enq_msgid                RAW(16); 
BEGIN 
 
/* Enqueue the order into the deferred billing queue with a delay of 15 days: */ 
  defer_bill_queue_name := 'CBADM.deferbilling_que'; 
  msgprop.delay := 15*60*60*24; 
  dbms_aq.enqueue(defer_bill_queue_name, enqopt, msgprop,  
                  deferred_billing_order, enq_msgid); 
END; 
/ 
 

Time Specification: Expiration

Messages can be enqueued with an expiration which specifies the interval of time the message is available for dequeuing. Note that expiration processing requires that the queue monitor be running.

Example Scenario

In the BooksOnLine application, expiration can be used to control the amount of time that is allowed to process a back order. The shipping application places orders for books that are not available on a back order queue. If the shipping policy is that all back orders must be shipped within a week, then messages can be enqueued into the back order queue with an expiration of 1 week. In this case, any back orders that are not processed within one week are moved to the exception queue with the message state set to EXPIRED. This can be used to flag any orders that have not been shipped according to the back order shipping policy.

Example Code

CONNECT BOLADM/BOLADM 
/* Req-enqueue a back order into a back order queue and set a delay of 7 days; 
   all back orders must be processed in 7 days or they are moved to the 
   exception queue: */ 
CREATE OR REPLACE PROCEDURE requeue_back_order(sale_region varchar2,  
                                               backorder order_typ) 
AS 
  back_order_queue_name    VARCHAR2(62); 
  enqopt                   dbms_aq.enqueue_options_t; 
  msgprop                  dbms_aq.message_properties_t; 
  enq_msgid                RAW(16); 
BEGIN 
  /* Look up a back order queue based the the region by means of a directory 
     service: */
  IF sale_region = 'WEST' THEN 
    back_order_queue_name := 'WS.WS_backorders_que';  
  ELSIF sale_region = 'EAST' THEN 
    back_order_queue_name := 'ES.ES_backorders_que';  
  ELSE 
    back_order_queue_name := 'OS.OS_backorders_que';  
  END IF; 
 
  /* Enqueue the order with expiration set to 7 days: */ 
  msgprop.expiration := 7*60*60*24; 
  dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop,  
                  backorder, enq_msgid); 
END; 
/ 
 

Message Grouping

Messages belonging to one queue can be grouped to form a set that can only be consumed by one user at a time. This requires the queue be created in a queue table that is enabled for transactional message grouping (see "Create a Queue Table" in Chapter 4, "Administrative Interface: Basic Operations"). All messages belonging to a group have to be created in the same transaction and all messages created in one transaction belong to the same group. This feature allows you to segment complex messages into simple messages.

For example, messages directed to a queue containing invoices could be constructed as a group of messages starting with the header message, followed by messages representing details, followed by the trailer message. Message grouping is also very useful if the message payload contains complex large objects such as images and video that can be segmented into smaller objects.

The general message properties (priority, delay, expiration) for the messages in a group are determined solely by the message properties specified for the first message (head) of the group irrespective of which properties are specified for subsequent messages in the group.

The message grouping property is preserved across propagation. However, it is important to note that the destination queue to which messages have to be propagated must also be enabled for transactional grouping. There are also some restrictions you need to keep in mind if the message grouping property is to be preserved while dequeuing messages from a queue enabled for transactional grouping (see "Dequeue Methods" and "Modes of Dequeuing" for additional information).

Example Scenario

In the BooksOnLine application, message grouping can be used to handle new orders. Each order contains a number of books ordered one by one in succession. Items ordered over the Web exhibit similar behavior.

In the example given below, each enqueue corresponds to an individual book that is part of an order and the group/transaction represents a complete order. Only the first enqueue contains customer information. Note that the OE_neworders_que is stored in the table OE_orders_sqtab which has been enabled for transactional grouping. Refer to the example code for descriptions of procedures new_order_enq() and same_order_enq().

Example Code

connect OE/OE; 
 
/* Create queue table for OE:  */
EXECUTE dbms_aqadm.create_queue_table( \
        queue_table        => 'OE_orders_sqtab',\
        comment            => 'Order Entry Single-Consumer Orders queue table',\
        queue_payload_type => 'BOLADM.order_typ',\
        message_grouping   => DBMS_AQADM.TRANSACTIONAL, \
        compatible         => '8.1',  \
        primary_instance   => 1,\
        secondary_instance => 2); 
 
/* Create neworders queue for OE: */ 
EXECUTE dbms_aqadm.create_queue ( \
        queue_name         => 'OE_neworders_que', \
        queue_table        => 'OE_orders_sqtab'); 
 
/* Login into OE account :*/
CONNECT OE/OE; 
SET serveroutput on; 
  
/* Enqueue some orders using message grouping into OE_neworders_que,
   First Order Group: */
EXECUTE BOLADM.new_order_enq('My First   Book', 1, 1001, 'CA'); 
EXECUTE BOLADM.same_order_enq('My Second  Book', 2); 
COMMIT; 
/ 
/* Second Order Group: */ 
EXECUTE BOLADM.new_order_enq('My Third   Book', 1, 1002, 'WA'); 
COMMIT; 
/ 
/* Third Order Group: */ 
EXECUTE BOLADM.new_order_enq('My Fourth  Book', 1, 1003, 'NV'); 
EXECUTE BOLADM.same_order_enq('My Fifth   Book', 3); 
EXECUTE BOLADM.same_order_enq('My Sixth   Book', 2); 
COMMIT; 
/ 
/* Fourth Order Group: */
EXECUTE BOLADM.new_order_enq('My Seventh Book', 1, 1004, 'MA'); 
EXECUTE BOLADM.same_order_enq('My Eighth  Book', 3); 
EXECUTE BOLADM.same_order_enq('My Ninth   Book', 2); 
COMMIT; 
/

Asynchronous Notifications

This feature allows OCI clients to receive notifications when there is a message in a queue of interest. The client can use it to monitor multiple subscriptions. The client does not have to be connected to the database to receive notifications regarding its subscriptions.

You use the OCI function, OCISubcriptionRegister, to register interest in messages in a queue (see "Register for Notification" in Chapter 6, "Operational Interface: Basic Operations").


For more information about the OCI operation Register for Notification see:

 

The client can specify a callback function which is invoked for every new message that is enqueued. For non-persistent queues, the message is delivered to the client as part of the notification. For persistent queues, only the message properties are delivered as part of the notification. Consequently, in the case of persistent queues, the client has to make an explicit dequeue to access the contents of the message.

Example Scenario

In the BooksOnLine application, a customer can request Fed-ex shipping (priority 1), Priority air shipping (priority 2). or Regular ground shipping (priority 3).

The shipping application then ships the orders according to the user's request. It is of interest to BooksOnLine to find out how many requests of each shipping type come in each day. The application uses asynchronous notification facility for this purpose. It registers for notification on the WS.WS_bookedorders_que. When it is notified of new message in the queue, it updates the count for the appropriate shipping type depending on the priority of the message.

Example Code

This example illustrates the use of OCIRegister. At the shipping site, an OCI client program keeps track of how many orders were made for each of the shipping types, FEDEX, AIR and GROUND. The priority field of the message enables us to determine the type of shipping desired.


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>
#ifdef WIN32COMMON
#define sleep(x)   Sleep(1000*(x))
#endif
static text *username = (text *) "WS";
static text *password = (text *) "WS";

static OCIEnv *envhp;
static OCIServer *srvhp;
static OCIError *errhp;
static OCISvcCtx *svchp;

static void checkerr(/*_ OCIError *errhp, sword status _*/);

struct ship_data
{
  ub4  fedex;
  ub4  air;
  ub4  ground;
};

typedef struct ship_data ship_data;

int main(/*_ int argc, char *argv[] _*/);


/* Notify callback: */
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode)
dvoid *ctx;
OCISubscription *subscrhp;
dvoid *pay;
ub4    payl;
dvoid *desc;
ub4    mode;
{
 text                *subname;
 ub4                 size;
 ship_data           *ship_stats = (ship_data *)ctx;
 text                *queue;
 text                *consumer;
 OCIRaw              *msgid;
 ub4                 priority;
 OCIAQMsgProperties  *msgprop;

 OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION,
                             (dvoid *)&subname, &size,
                             OCI_ATTR_SUBSCR_NAME, errhp);

 /* Extract the attributes from the AQ descriptor.
    Queue name: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, 
            OCI_ATTR_QUEUE_NAME, errhp);
  
 /* Consumer name: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, 
            OCI_ATTR_CONSUMER_NAME, errhp);

 /* Msgid: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, 
            OCI_ATTR_NFY_MSGID, errhp);

 /* Message properties: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, 
            OCI_ATTR_MSG_PROP, errhp);

 /* Get priority from message properties: */
 checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 
                             (dvoid *)&priority, 0, 
                             OCI_ATTR_PRIORITY, errhp));

  switch (priority)
  {
  case 1:  ship_stats->fedex++;
           break;
  case 2 : ship_stats->air++;
           break;
  case 3:  ship_stats->ground++;
           break;
  default: 
           printf(" Error priority %d", priority);
  }
}


int main(argc, argv)
int argc;
char *argv[];
{
  OCISession *authp = (OCISession *) 0;
  OCISubscription *subscrhp[8];
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;
  ship_data  ctx = {0,0,0};
  ub4 sleep_time = 0;

  printf("Initializing OCI Process\n");

  /* Initialize OCI environment with OCI_EVENTS flag set: */
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0,
                       (dvoid * (*)(dvoid *, size_t)) 0,
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                       (void (*)(dvoid *, dvoid *)) 0 );

  printf("Initialization successful\n");

  printf("Initializing OCI Env\n");
  (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 
);
  printf("Initialization successful\n");

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_
ERROR, 
                   (size_t) 0, (dvoid **) 0));

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_
SERVER,
                   (size_t) 0, (dvoid **) 0));

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_
SVCCTX,
                   (size_t) 0, (dvoid **) 0));


  printf("connecting to server\n");
  checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias",
           strlen("inst1_alias"), (ub4) OCI_DEFAULT));
  printf("connect successful\n");

  /* Set attribute server context in the service context: */
  checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, 
                    (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp));

  checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp,
                       (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0));
 
  /* Set username and password in the session handle: */
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                  (dvoid *) username, (ub4) strlen((char *)username),
                  (ub4) OCI_ATTR_USERNAME, errhp));
 
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                  (dvoid *) password, (ub4) strlen((char *)password),
                  (ub4) OCI_ATTR_PASSWORD, errhp));

  /* Begin session: */
  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS, 
                          (ub4) OCI_DEFAULT));

  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX,
                   (dvoid *) authp, (ub4) 0,
                   (ub4) OCI_ATTR_SESSION, errhp);


  /* Register for notification: */
  printf("allocating subscription handle\n");
  subscrhp[0] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], 
                        (ub4) OCI_HTYPE_SUBSCRIPTION,
                        (size_t) 0, (dvoid **) 0);
 
  printf("setting subscription name\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS", 
                 (ub4) strlen("WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);
 
  printf("setting subscription callback\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx, (ub4)sizeof(ctx),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  printf("setting subscription namespace\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

  printf("Registering \n");
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp, 
                                          OCI_DEFAULT));

  sleep_time = (ub4)atoi(argv[1]);
  printf ("waiting for %d s", sleep_time);
  sleep(sleep_time);

  printf("Exiting");
  exit(0);
}

void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
  text errbuf[512];
  sb4 errcode = 0;

  switch (status)
  {
  case OCI_SUCCESS:
    break;
  case OCI_SUCCESS_WITH_INFO:
    (void) printf("Error - OCI_SUCCESS_WITH_INFO\n");
    break;
  case OCI_NEED_DATA:
    (void) printf("Error - OCI_NEED_DATA\n");
    break;
  case OCI_NO_DATA:
    (void) printf("Error - OCI_NODATA\n");
    break;
  case OCI_ERROR:
    (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode,
                        errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR);
    (void) printf("Error - %.*s\n", 512, errbuf);
    break;
  case OCI_INVALID_HANDLE:
    (void) printf("Error - OCI_INVALID_HANDLE\n");
    break;
  case OCI_STILL_EXECUTING:
    (void) printf("Error - OCI_STILL_EXECUTE\n");
    break;
  case OCI_CONTINUE:
    (void) printf("Error - OCI_CONTINUE\n");
    break;
  default:
    break;
  }
}

DEQUEUE Features

Dequeue Methods

A message can be dequeued from a queue using one of two dequeue methods: a correlation identifier or a message identifier.

A correlation identifier is a user defined message property (of VARCHAR2 datatype) while a message identifier is a system-assigned value (of RAW datatype). Multiple messages with the same correlation identifier can be present in a queue while only one message with a given message identifier can be present. A dequeue call with a correlation identifier will directly remove a message of specific interest rather than using a combination of locked and remove mode to first examine the content and then remove the message. Hence, the correlation identifier usually contains the most useful attribute of a payload. If there are multiple messages with the same correlation identifier, the ordering (enqueue order) between messages may not be preserved on dequeue calls. The correlation identifier cannot be changed between successive dequeue calls without specifying the first message navigation option.

Note that dequeueing a message with either of the two dequeue methods will not preserve the message grouping property (see "Message Grouping" and "Message Navigation in Dequeue" for further information).

Example Scenario

In the following scenario of the BooksOnLine example, rush orders received by the East shipping site are processed first. This is achieved by dequeueing the message using the correlation identifier which has been defined to contain the order type (rush/normal). For an illustration of dequeueing using a message identifier please refer to the get_northamerican_orders procedure discussed in the example under "Modes of Dequeuing".

Example Code

CONNECT boladm/boladm; 
 
/*  Create procedures to enqueue into single-consumer queues: */ 
create or replace procedure get_rushtitles(consumer in varchar2) as 
  
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
qname                    varchar2(30); 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 
  
begin 
  
        dopt.consumer_name := consumer; 
        dopt.wait := 1; 
        dopt.correlation := 'RUSH'; 
  
        IF (consumer = 'West_Shipping') THEN 
                qname := 'WS.WS_bookedorders_que'; 
        ELSIF (consumer = 'East_Shipping') THEN 
                qname := 'ES.ES_bookedorders_que'; 
        ELSE 
                qname := 'OS.OS_bookedorders_que'; 
        END IF; 
  
        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name => qname, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => deq_order_data, 
                msgid => deq_msgid); 
            commit; 
         
            deq_item_data := deq_order_data.items(1); 
            deq_book_data := deq_item_data.item; 
  
            dbms_output.put_line(' rushorder book_title: ' ||  
                                deq_book_data.title ||  
                        ' quantity: ' || deq_item_data.quantity); 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE RUSH TITLES ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
end; 
/ 
 
CONNECT EXECUTE on get_rushtitles to ES; 
 
/* Dequeue the orders: */ 
CONNECT ES/ES; 
 
/*  Dequeue all rush order titles for East_Shipping: */ 
EXECUTE BOLADM.get_rushtitles('East_Shipping'); 

Multiple Recipients

A consumer can dequeue a message from a multi-consumer normal queue by supplying the name that was used in the AQ$_AGENT type of the DBMS_AQADM.ADD_SUBSCRIBER procedure or the recipient list of the message properties (see "Add a Subscriber" or Enqueue a Message [Specify Message Properties]).

There can be multiple processes or operating system threads that use the same consumer_name to dequeue concurrently from a queue. In that case AQ will provide the first unlocked message that is at the head of the queue and is intended for the consumer. Unless the message ID of a specific message is specified during dequeue, the consumers can dequeue messages that are in the READY state.

A message is considered PROCESSED only when all intended consumers have successfully dequeued the message. A message is considered EXPIRED if one or more consumers did not dequeue the message before the EXPIRATION time. When a message has expired, it is moved to an exception queue.

The exception queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued the intended recipients of the message. However, they can be dequeued in the REMOVE mode exactly once by specifying a NULL consumer name in the dequeue options. Hence, from a dequeue perspective, multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL consumer name. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'.

In release 8.0.x when two or more processes/threads that are using different consumer_names are dequeuing from a queue, only one process/thread can dequeue a given message in the LOCKED or REMOVE mode at any time. What this means is that other consumers that need to the dequeue the same message will have to wait until the consumer that has locked the message commits or aborts the transaction and releases the lock on the message. However, while release 8.0.x did not support concurrency among different consumers for the same message., with release 8.1.x all consumers can access the same message concurrently. The result is that two processes/threads that are using different consumer_name to dequeue the same message do not block each other. AQ achieves this improvement by decoupling the task of dequeuing a message and the process of removing the message from the queue. In release 8.1.x only the queue monitor removes messages from multi-consumer queues. This allows dequeuers to complete the dequeue operation by not locking the message in the queue table. Since the queue monitor performs the task of removing messages that have been processed by all consumers from multi-consumer queues approximately once every minute, users may see a delay when the messages have been completely processed and when they are physically removed from the queue.


Local and Remote Recipients

Consumers of a message in multi-consumer queues (either by virtue of being a subscriber to the queue or because the consumer was a recipient in the enqueuer's recipient list) can be local or remote.

When a consumer is remote, a message will be marked as PROCESSED in the source queue immediately after the message has been propagated even though the consumer may not have dequeued the message at the remote queue. Similarly, when a propagated message expires at the remote queue, the message is moved to the DEFAULT exception queue of the remote queue's queue table, and not to the exception queue of the local queue. As can be seen in both cases, AQ does not currently propagate the exceptions to the source queue. You can use the MSGID and the ORIGINAL_MSGID columns in the queue table view (AQ$<queue_table>) to chain the propagated messages. When a message with message ID m1 is propagated to a remote queue, m1 is stored in the ORIGINAL_MSGID column of the remote queue.

The DELAY, EXPIRATION and PRIORITY parameters apply identically to both local and remote consumers. AQ accounts for any delay in propagation by adjusting the DELAY and EXPIRATION parameters accordingly. For example, if the EXPIRATION is set to one hour, and the message is propagated after 15 minutes, the expiration at the remote queue will be set to 45 minutes.

Message Navigation in Dequeue

You have several options for selecting a message from a queue. You can select the 'first message'. Alternatively, once you have selected a message and established its position in the queue (for example, as the fourth message), you can then retrieve the 'next message'.

These selections work in a slightly different way if the queue is enabled for transactional grouping.

Note that the transaction grouping property is negated if a dequeue is performed in one of the following ways: dequeue by specifying a correlation identifier, dequeue by specifying a message identifier, or dequeueing some of the messages of a transaction and committing. For additional information on dequeueing by specifying a correlation identifier or a message identifier please refer to the section on dequeue methods.

If in navigating through the queue, the program reaches the end of the queue while using the 'next message' or' next transaction' option, and you have specified a non-zero wait time, then the navigating position is automatically changed to the beginning of the queue.

Example Scenario

The following scenario in the BooksOnLine example continues the message grouping example already discussed with regard to enqueuing (see "Dequeue Methods").

The get_orders() procedure dequeues orders from the OE_neworders_que. Recall that each transaction refers to an order and each message corresponds to an individual book in the order. The get_orders() procedure loops through the messages to dequeue the book orders. It resets the position to the beginning of the queue using the first message option before the first dequeues. It then uses the next message navigation option to retrieve the next book (message) of an order (transaction). If it gets an error message indicating all message in the current group/transaction have been fetched, it changes the navigation option to next transaction and get the first book of the next order. It then changes the navigation option back to next message for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.

Example Code

CONNECT boladm/boladm; 
 
create or replace procedure get_new_orders as 
 
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
qname                    VARCHAR2(30); 
no_messages              exception; 
end_of_group             exception; 
pragma exception_init    (no_messages, -25228); 
pragma exception_init    (end_of_group, -25235); 
new_orders               BOOLEAN := TRUE; 
  
begin 
  
        dopt.wait := 1; 
        dopt.navigation := DBMS_AQ.FIRST_MESSAGE;  
        qname := 'OE.OE_neworders_que'; 
        WHILE (new_orders) LOOP 
          BEGIN 
            LOOP 
                BEGIN 
                    dbms_aq.dequeue( 
                        queue_name          => qname, 
                        dequeue_options     => dopt, 
                        message_properties  => mprop, 
                        payload             => deq_order_data, 
                        msgid               => deq_msgid); 
         
                    deq_item_data := deq_order_data.items(1); 
                    deq_book_data := deq_item_data.item; 
                    deq_cust_data := deq_order_data.customer; 
  
                    IF (deq_cust_data IS NOT NULL) THEN 
                      dbms_output.put_line(' **** NEXT ORDER **** ');  
                      dbms_output.put_line('order_num: ' ||  
                                deq_order_data.orderno); 
                      dbms_output.put_line('ship_state: ' ||  
                                deq_cust_data.state); 
                    END IF; 
                    dbms_output.put_line(' ---- next book ---- ');  
                    dbms_output.put_line(' book_title: ' ||  
                                deq_book_data.title ||  
                                ' quantity: ' || deq_item_data.quantity); 
                EXCEPTION 
                    WHEN end_of_group THEN 
                      dbms_output.put_line ('*** END OF ORDER ***'); 
                      commit; 
                      dopt.navigation := DBMS_AQ.NEXT_TRANSACTION; 
                END; 
            END LOOP; 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE NEW ORDERS ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
end; 
/ 
 
CONNECT EXECUTE ON get_new_orders to OE; 
 
/*  Dequeue the orders: */
CONNECT OE/OE; 
EXECUTE BOLADM.get_new_orders; 

Modes of Dequeuing

A dequeue request can either view a message or delete a message (see "Dequeue a Message" in Chapter 6, "Operational Interface: Basic Operations").

If a message is browsed it remains available for further processing. Similarly if a message is locked it remains available for further processing once the lock on it is released by performing a transaction commit or rollback. Once a message is deleted using either of the remove modes, it is no longer available for dequeue requests.

When a message is dequeued using REMOVE_NODATA mode, the payload of the message is not retrieved. This mode can be useful when the user has already examined the message payload, possibly by means of a previous BROWSE dequeue. In this way, you can avoid the overhead of payload retrieval which can be substantial for large payloads

A message is retained in the queue table after it has been removed only if a retention time is specified for a queue. Messages cannot be retained in exception queues (refer to the section on exceptions for further information). Removing a message with no data is generally used if the payload is known (from a previous browse/locked mode dequeue call), or the message will not be used.

Note that after a message has been browsed there is no guarantee that the message can be dequeued again since a dequeue call from a concurrent user might have removed the message. To prevent a viewed message from being dequeued by a concurrent user, you should view the message in the locked mode.

You need to take special care while using the browse mode for other reasons as well. The dequeue position is automatically changed to the beginning of the queue if a non-zero wait time is specified and the navigating position reaches the end of the queue. Hence repeating a dequeue call in the browse mode with the 'next message' navigation option and a non-zero wait time can dequeue the same message over and over again. We recommend that you use a non-zero wait time for the first dequeue call on a queue in a session, and then use a zero wait time with the next message navigation option for subsequent dequeue calls. If a dequeue call gets an 'end of queue' error message, the dequeue position can be explicitly set by the dequeue call to the beginning of the queue using the 'first message' navigation option, following which the messages in the queue can be browsed again.

Example Scenario

In the following scenario from the BooksOnLine example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) and the customer country (message payload) is checked. If the customer country is Mexico or Canada the message be deleted from the queue using the remove with no data (since the payload is already known) mode. Otherwise, the lock on the message is released by the commit call. Note that the remove dequeue call uses the message identifier obtained from the locked mode dequeue call. The shipping_bookedorder_deq (refer to the example code for the description of this procedure) call illustrates the use of the browse mode.

Example Code

CONNECT boladm/boladm; 
 
create or replace procedure get_northamerican_orders as 
  
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
deq_order_nodata         BOLADM.order_typ; 
qname                    VARCHAR2(30); 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 
  
begin 
  
        dopt.consumer_name := consumer; 
        dopt.wait := DBMS_AQ.NO_WAIT; 
        dopt.navigation := dbms_aq.FIRST_MESSAGE; 
        dopt.dequeue_mode := DBMS_AQ.LOCKED; 
  
        qname := 'OS.OS_bookedorders_que'; 
  
        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name => qname, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => deq_order_data, 
                msgid => deq_msgid); 
         
            deq_item_data := deq_order_data.items(1); 
            deq_book_data := deq_item_data.item; 
            deq_cust_data := deq_order_data.customer; 
  
            IF (deq_cust_data.country = 'Canada' OR  
                deq_cust_data.country = 'Mexico' ) THEN 
  
                dopt.dequeue_mode := dbms_aq.REMOVE_NODATA; 
                dopt.msgid := deq_msgid; 
                dbms_aq.dequeue( 
                        queue_name => qname, 
                        dequeue_options => dopt, 
                        message_properties => mprop, 
                        payload => deq_order_nodata, 
                        msgid => deq_msgid); 
                commit; 
  
                dbms_output.put_line(' **** next booked order **** ');  
                dbms_output.put_line('order_no: ' || deq_order_data.orderno ||  
                        ' book_title: ' || deq_book_data.title ||  
                        ' quantity: ' || deq_item_data.quantity); 
                dbms_output.put_line('ship_state: ' || deq_cust_data.state || 
                        ' ship_country: ' || deq_cust_data.country || 
                        ' ship_order_type: ' || deq_order_data.ordertype); 
  
            END IF; 
  
            commit; 
            dopt.dequeue_mode := DBMS_AQ.LOCKED; 
            dopt.msgid := NULL; 
            dopt.navigation := dbms_aq.NEXT_MESSAGE; 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE BOOKED ORDERS ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
  
end; 
/ 
 
CONNECT EXECUTE on get_northamerican_orders to OS; 
 
CONNECT ES/ES; 
 
/*  Browse all booked orders for East_Shipping: */ 
EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.BROWSE); 
 
CONNECT OS/OS; 
 
/*  Dequeue all international North American orders for Overseas_Shipping: */ 
EXECUTE BOLADM.get_northamerican_orders; 
 

Optimization of Waiting for Arrival of Messages

One of the most important features of AQ is that it allows applications to block on one or more queues waiting for the arrival of either a newly enqueued message or for a message that becomes ready. You can use the DEQUEUE operation to wait for arrival of a message in a queue (see "Dequeue a Message") or the LISTEN operation to wait for the arrival of a message in more than one queue (see "Listen to One (Many) Queue(s)" in Chapter 6, "Operational Interface: Basic Operations").

When the blocking DEQUEUE call returns, it returns the message properties and the message payload. By contrast, when the blocking LISTEN call returns, it discloses only the name of the queue in which a message has arrived. A subsequent DEQUEUE operation is needed to dequeue the message.

Applications can optionally specify a timeout of zero or more seconds to indicate the time that AQ must wait for the arrival of a message. The default is to wait forever until a message arrives in the queue. This optimization is important in two ways. It removes the burden of continually polling for messages from the application. And it saves CPU and network resource because the application remains blocked until a new message is enqueued or becomes READY after its DELAY time. In release 8.1.5 applications can also perform a blocking dequeue on exception queues to wait for arrival of EXPIRED messages.

A process or thread that is blocked on a dequeue is either woken up directly by the enqueuer if the new message has no DELAY or is woken up by the queue monitor process when the DELAY or EXPIRATION time has passed. Applications can not only wait for the arrival of a message in the queue that an enqueuer enqueues a message, but also on a remote queue, provided that propagation has been schedule to the remote queue using DBMS_AQADM.SCHEDULE_PROPAGATION. In this case the AQ propagator will wake-up the blocked dequeuer after a message has been propagated.

Example Scenario

In the BooksOnLine example, the get_rushtitles procedure discussed under dequeue methods specifies a wait time of 1 second in the dequeue_options argument for the dequeue call. Wait time can be specified in different ways as illustrated in the code below.

Example Code

/* dopt is a variable of type dbms_aq.dequeue_options_t. 
   Set the dequeue wait time to 10 seconds: */
dopt.wait := 10; 
 
/* Set the dequeue wait time to 0 seconds: */
dopt.wait := DBMS_AQ.NO_WAIT; 
 
/* Set the dequeue wait time to infinite (forever): */ 
dopt.wait := DBMS_AQ.FOREVER; 

Retry with Delay Interval

AQ supports delay delivery of messages by letting the enqueuer specify a delay interval on a message when enqueueing the message, that is, the time before which a message cannot be retrieved by a dequeue call. (see "Enqueue a Message [Specify Message Properties]" in Chapter 6, "Operational Interface: Basic Operations"). The delay interval determines when an enqueued message is marked as available to the dequeuers after message is enqueued. The producer can also specify the time when a message expires, at which time the message is moved to an exception queue.

When a message is enqueued with a delay time set, the message is marked as in WAIT state. Messages in WAIT state are masked from the default dequeue calls.

A background time-manager daemon wakes up periodically, scans an internal index for all WAIT state messages, and marks messages as READY if their delay time has passed. The time-manager will then post to all foreground processes that are waiting on queues in which messages have just been made available.

Example Scenario

An order is placed in a back order queue at a specific shipping region if the order cannot be filled immediately. To avoid repeatedly processing an unfilled order, all unfilled orders are enqueued into the backorder queue with a delay time of 1 day. The shipping application will attempt to ship a backorder by dequeuing an order from the backorder queue. If the order cannot be filled, it will re-enqueue the order into the same backorder queue with delay interval of the order set to 1 day.

The following procedure re-enqueues an unfilled order. It demonstrate enqueuing a backorder with delay time set to 1 day. This guarantees that each backorder will be processed only once a day until the order is filled.

Example Code

/*  Create a package that enqueue with delay set to one day: /* 
CONNECT BOLADM/BOLADM 
CREATE OR REPLACE PROCEDURE requeue_unfilled_order(sale_region varchar2,  
                                                   backorder order_typ) 
AS 
  back_order_queue_name    VARCHAR2(62); 
  enqopt                   dbms_aq.enqueue_options_t; 
  msgprop                  dbms_aq.message_properties_t; 
  enq_msgid                RAW(16); 
BEGIN 
  /* Choose a back order queue based the the region: */ 
  IF sale_region = 'WEST' THEN 
    back_order_queue_name := 'WS.WS_backorders_que';  
  ELSIF sale_region = 'EAST' THEN 
    back_order_queue_name := 'ES.ES_backorders_que';  
  ELSE 
    back_order_queue_name := 'OS.OS_backorders_que';  
  END IF; 
 
  /* Enqueue the order with delay time set to 1 day: */ 
  msgprop.delay := 60*60*24; 
  dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop,  
                  backorder, enq_msgid); 
END; 
/ 

Exception Handling

AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES, EXPIRATION, MAX_RETRIES and RETRY_DELAY.

An exception_queue is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. Also, a multi-consumer exception queue cannot have subscribers associated with it. However, an application that intends to handle these expired or unserviceable messages can dequeue from the exception queue. The exception queue created for messages intended for a multi-consumer queue must itself be a multi-consumer queue. Like any other queue, the exception queue must be enabled for dequeue using the DBMS_AQADM.START_QUEUE procedure. You will get an Oracle error if you try to enable an exception queue for enqueue.

When a message has expired, it is moved to an exception queue. The exception queue for a message in multi-consumer queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE mode exactly once by specifying a NULL consumer name in the dequeue options. Hence, from a dequeue perspective multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL consumer name. Messages can also be dequeued from the exception queue by specifying the message ID. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'.

The exception queue is a message property that can be specified during enqueue time (see "Enqueue a Message [Specify Message Properties]" in Chapter 6, "Operational Interface: Basic Operations"). In PL/SQL users can use the exception_queue attribute of the DBMS_AQ.MESSAGE_PROPERTIES_T record to specify the exception queue. In OCI users can use the OCISetAttr procedure to set the OCI_ATTR_EXCEPTION_QUEUE attribute of the OCIAQMsgProperties descriptor.

If an exception queue is not specified, the default exception queue is used. If the queue is created in a queue table, say QTAB, the default exception queue will be called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created. Messages are moved to the exception queues by AQ under the following conditions.

Messages intended for 8.1-compatible multiconsumer queues cannot be dequeued by the intended recipients once the messages have been moved to an exception queue. These messages should instead be dequeued in the REMOVE or BROWSE mode exactly once by specifying a NULL consumer name in the dequeue options. The messages can also be dequeued by their message IDs.

Messages intended for single consumer queues, or for 8.0-compatible multi-consumer queues, can only be dequeued by their message IDs once the messages have been moved to an exception queue.

Users can associate a RETRY_DELAY with a queue. The default value for this parameter is 0 which means that the message will be available for dequeue immediately after the RETRY_COUNT is incremented. Otherwise the message will be unavailable for RETRY_DELAY seconds. After RETRY_DELAY seconds the queue monitor will mark the message as READY.

Example Scenario

In the BooksOnLine application, the business rule for each shipping region is that an order will be placed in a back order queue if the order cannot be filled immediately. The back order application will try to fill the order once a day. If the order cannot be filled within 5 days, it is placed in an exception queue for special processing. You can implement this process by making use of the retry and exception handling features in AQ.

The example below shows how you can create a queue with specific maximum retry and retry delay interval.

Example Code

/* Example for creating a back order queue in Western Region which allows a 
   maximum of 5 retries and 1 day delay between each retry. */  
CONNECT BOLADM/BOLADM 
BEGIN 
  dbms_aqadm.create_queue ( 
        queue_name              => 'WS.WS_backorders_que', 
        queue_table             => 'WS.WS_orders_mqtab', 
        max_retries             => 5, 
        retry_delay             => 60*60*24); 
END; 
/ 
 
/* Create an exception queue for the back order queue for Western Region. */
CONNECT BOLADM/BOLADM 
BEGIN 
  dbms_aqadm.create_queue ( 
        queue_name              => 'WS.WS_backorders_excpt_que', 
        queue_table             => 'WS.WS_orders_mqtab', 
        queue_type              => DBMS_AQADM.EXCEPTION_QUEUE); 
end; 
/ 
 
/* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que as 
the exception queue for the message: */ 
CONNECT BOLADM/BOLADM 
CREATE OR REPLACE PROCEDURE enqueue_WS_unfilled_order(backorder order_typ) 
 AS 
   back_order_queue_name    varchar2(62); 
   enqopt                   dbms_aq.enqueue_options_t; 
   msgprop                  dbms_aq.message_properties_t; 
   enq_msgid                raw(16); 
 BEGIN 
     
   /* Set back order queue name for this message: */ 
   back_order_queue_name := 'WS.WS_backorders_que'; 
 
   /* Set exception queue name for this message: */ 
   msgprop.exception_queue := 'WS.WS_backorders_excpt_que'; 
 
   dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop, 
                   backorder, enq_msgid); 
 END; 
 / 

Rule-based Subscription

Messages may be routed to various recipients based on message properties or message content. Users define a rule-based subscription for a given queue to specify interest in receiving messages that meet particular conditions.

Rules are boolean expressions that evaluate to TRUE or FALSE. Similar in syntax to the WHERE clause of a SQL query, rules are expressed in terms of the attributes that represent message properties or message content. These subscriber rules are evaluated against incoming messages and those rules that match are used to determine message recipients. This feature thus supports the notions of content-based subscriptions and content-based routing of messages.

Example Scenario and Code

For the BooksOnLine application, we illustrate how rule-based subscriptions are used to implement a publish/subscribe paradigm utilizing content-based subscription and content-based routing of messages. The interaction between the Order Entry application and each of the Shipping Applications is modeled as follows;

Each shipping application subscribes to the OE booked orders queue. The following rule-based subscriptions are defined by the Order Entry user to handle the routing of booked orders from the Order Entry application to each of the Shipping applications.

CONNECT OE/OE; 
 

Western Region Shipping defines an agent called 'West_Shipping' with the WS booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on order region and ordertype attributes.

/*  Add a rule-based subscriber for West Shipping - 
    West Shipping handles Western region US orders, 
    Rush Western region orders are handled by East Shipping: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('West_Shipping', 'WS.WS_bookedorders_que', null); 
  dbms_aqadm.add_subscriber( 
                queue_name => 'OE.OE_bookedorders_que', 
                subscriber => subscriber, 
                rule       => 'tab.user_data.orderregion =  
                    ''WESTERN'' AND tab.user_data.ordertype != ''RUSH'''); 
END; 
/ 
 

Eastern Region Shipping defines an agent called East_Shipping with the ES booked orders queue as the agent address (the destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on orderregion, ordertype and customer attributes.

/*  Add a rule-based subscriber for East Shipping - 
    East shipping handles all Eastern region orders, 
    East shipping also handles all US rush orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); 
  dbms_aqadm.add_subscriber( 
        queue_name => 'OE.OE_bookedorders_que', 
        subscriber => subscriber, 
        rule       => 'tab.user_data.orderregion = ''EASTERN'' OR  
                      (tab.user_data.ordertype = ''RUSH'' AND  
                       tab.user_data.customer.country = ''USA'') '); 
END; 
/ 
 

Overseas Shipping defines an agent called Overseas_Shipping with the OS booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on orderregion attribute.

/*  Add a rule-based subscriber for Overseas Shipping 
    Intl Shipping handles all non-US orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('Overseas_Shipping', 'OS.OS_bookedorders_que', 
null); 
  dbms_aqadm.add_subscriber( 
        queue_name => 'OE.OE_bookedorders_que', 
        subscriber => subscriber, 
        rule       => 'tab.user_data.orderregion = ''INTERNATIONAL'''); 
END; 
/ 
 

Listen Capability

In Oracle8i release 8.1.x, AQ has the capability to monitor multiple queues for messages with a single call, listen. An application can use listen to wait for messages for multiple subscriptions. It can also be used by gateway applications to monitor multiple queues. If the listen call returns successfully, a dequeue must be used to retrieve the message (see Listen to One (Many) Queue(s) in Chapter 6, "Operational Interface: Basic Operations").

Without the listen call, an application which sought to dequeue from a set of queues would have to continuously poll the queues to determine if there were a message. Alternatively, you could design your application to have a separate dequeue process for each queue. However, if there are long periods with no traffic in any of the queues, these approaches will create an unacceptable overhead. The listen call is well suited for such applications.

Note that when there are messages for multiple agents in the agent list, listen returns with the first agent for whom there is a message. In that sense listen is not 'fair' in monitoring the queues. The application designer must keep this in mind when using the call. To prevent one agent from 'starving' other agents for messages, the application could change the order of the agents in the agent list.

Example Scenario

In the customer service component of the BooksOnLine example, messages from different databases arrive in the customer service queues, indicating the state of the message. The customer service application monitors the queues and whenever there is a message about a customer order, it updates the order status in the order_status_table. The application uses the listen call to monitor the different queues. Whenever there is a message in any of the queues, it dequeues the message and updates the order status accordingly.

Example Code

CODE (in tkaqdocd.sql) 
 
/* Update the status of the order in the order status table: */ 
CREATE OR REPLACE PROCEDURE update_status( 
                                new_status    IN VARCHAR2, 
                                order_msg     IN BOLADM.ORDER_TYP) 
IS 
 old_status    VARCHAR2(30); 
 dummy         NUMBER; 
BEGIN 
 
  BEGIN   
    /* Query old status from the table: */ 
    SELECT st.status INTO old_status FROM order_status_table st  
       WHERE st.customer_order.orderno = order_msg.orderno; 
 
  /* Status can be 'BOOKED_ORDER', 'SHIPPED_ORDER', 'BACK_ORDER' 
     and   'BILLED_ORDER': */ 
 
   IF new_status = 'SHIPPED_ORDER' THEN   
      IF old_status = 'BILLED_ORDER' THEN 
        return;             /* message about a previous state */ 
      END IF; 
   ELSIF new_status = 'BACK_ORDER' THEN 
      IF old_status = 'SHIPPED_ORDER' OR old_status = 'BILLED_ORDER' THEN 
        return;             /* message about a previous state */ 
      END IF; 
   END IF; 
 
   /* Update the order status:  */ 
     UPDATE order_status_table st 
        SET st.customer_order = order_msg, st.status = new_status; 
 
   COMMIT; 
 
  EXCEPTION 
  WHEN OTHERS  THEN     /* change to no data found */ 
    /* First update for the order: */ 
    INSERT INTO order_status_table(customer_order, status) 
    VALUES (order_msg, new_status); 
    COMMIT; 
 
  END; 
END; 
/ 
         
 
/* Dequeues message from 'QUEUE' for 'CONSUMER': */ 
CREATE OR REPLACE PROCEDURE DEQUEUE_MESSAGE( 
                         queue      IN   VARCHAR2, 
                         consumer   IN   VARCHAR2, 
                         message    OUT  BOLADM.order_typ) 
IS 
  
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_msgid                RAW(16); 
BEGIN 
  dopt.dequeue_mode := dbms_aq.REMOVE; 
  dopt.navigation := dbms_aq.FIRST_MESSAGE; 
  dopt.consumer_name := consumer; 
 
  dbms_aq.dequeue( 
                queue_name => queue, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => message, 
                msgid => deq_msgid); 
  commit; 
END; 
/ 
 
/* Monitor the queues in the customer service databse for 'time' seconds: */ 
 CREATE OR REPLACE PROCEDURE MONITOR_STATUS_QUEUE(time  IN  NUMBER)  
IS 
  agent_w_message   aq$_agent; 
  agent_list        dbms_aq.agent_list_t; 
  wait_time         INTEGER := 120; 
  no_message        EXCEPTION; 
  pragma EXCEPTION_INIT(no_message, -25254); 
  order_msg         boladm.order_typ;  
  new_status        VARCHAR2(30); 
  monitor           BOOLEAN := TRUE; 
  begin_time        NUMBER; 
  end_time          NUMBER; 
BEGIN 
 
 begin_time :=  dbms_utility.get_time;     
 WHILE (monitor) 
 LOOP 
 BEGIN 
 
  /* Construct the waiters list: */ 
  agent_list(1) := aq$_agent('BILLED_ORDER', 'CS_billedorders_que', NULL); 
  agent_list(1) := aq$_agent('SHIPPED_ORDER', 'CS_shippedorders_que', 
NULL); 
  agent_list(2) := aq$_agent('BACK_ORDER', 'CS_backorders_que', NULL); 
  agent_list(3) := aq$_agent('Booked_ORDER', 'CS_bookedorders_que', NULL); 
 
   /* Wait for order status messages: */ 
   dbms_aq.listen(agent_list, wait_time, agent_w_message); 
    
   dbms_output.put_line('Agent' || agent_w_message.name || ' Address '|| 
agent_w_message.address); 
   /* Dequeue the message from the queue: */ 
   dequeue_message(agent_w_message.address, agent_w_message.name, order_msg); 
 
   /* Update the status of the order depending on the type of the message,  
    * the name of the agent contains the new state: */ 
   update_status(agent_w_message.name, order_msg); 
 
  /* Exit if we have been working long enough: */ 
   end_time := dbms_utility.get_time; 
   IF  (end_time - begin_time > time)   THEN 
     EXIT; 
   END IF; 
 
  EXCEPTION 
  WHEN  no_message  THEN 
    dbms_output.put_line('No messages in the past 2 minutes'); 
       end_time := dbms_utility.get_time; 
    /* Exit if we have done enough work: */ 
    IF  (end_time - begin_time > time)   THEN 
      EXIT; 
    END IF; 
  END; 
  
  END LOOP; 
END; 
/ 
 

Propagation Features

Propagation

This feature enables applications to communicate with each other without having to be connected to the same database, or to the same queue. Messages can be propagated from one Oracle AQ to another, irrespective of whether these are local or remote. Propagation is performed by snapshot (job_queue) background processes. Propagation to remote queues is done using database links, and Net 8.

The propagation feature is used as follows. First one or more subscribers are defined for the queue from which messages are to be propagated (see "Subscriptions and Recipient Lists"). Second, a schedule is defined for each destination to which messages are to be propagated from the queue. Enqueued messages will now be propagated and automatically be available for dequeuing at the destination queues.

Note that two or more number of job_queue background processes must be running to use propagation. This is in addition to the number of job_queue background processes needed for handling non-propagation related jobs. Also, if you wish to deploy remote propagation, you must ensure that the database link specified for the schedule is valid and have proper privileges for enqueuing into the destination queue. For more information about the administrative commands for managing propagation schedules, see "Asynchronous Notifications" below.

Propagation also has mechanisms for handling failure. For example, if the database link specified is invalid, or if the remote database is unavailable, or if the remote queue is not enabled for enqueuing, then the appropriate error message is reported.

Finally, propagation provides detailed statistics about the messages propagated and the schedule itself. This information can be used to properly tune the schedules for best performance. Failure handling/error reporting facilities of propagation and propagation statistics are discussed under "Enhanced Propagation Scheduling Capabilities".

Propagation Scheduling

A propagation schedule is defined for a pair of source and destination queues. If a queue has messages to be propagated to several queues then a schedule has to be defined for each of the destination queues. A schedule indicates the time frame during which messages can be propagated from the source queue. This time frame may depend on a number of factors such as network traffic, load at source database, load at destination database, and so on. The schedule therefore has to be tailored for the specific source and destination. When a schedule is created, a job is automatically submitted to the job_queue facility to handle propagation.

The administrative calls for propagation scheduling provide great flexibility for managing the schedules (see "Schedule a Queue Propagation" in Chapter 4, "Administrative Interface: Basic Operations"). The duration or propagation window parameter of a schedule specifies the time frame during which propagation has to take place. If the duration is unspecified then the time frame is an infinite single window. If a window has to be repeated periodically then a finite duration is specified along with a next_time function that defines the periodic interval between successive windows.

The latency parameter for a schedule is relevant only when a queue does not have any messages to be propagated. This parameter specifies the time interval within which a queue has to be rechecked for messages. Note that if the latency parameter is to be enforced, then the job_queue_interval parameter for the job_queue_processes should be less than or equal to the latency parameter.

The propagation schedules defined for a queue can be changed or dropped at anytime during the life of the queue. In addition there are calls for temporarily disabling a schedule (instead of dropping the schedule) and enabling a disabled schedule. A schedule is active when messages are being propagated in that schedule. All the administrative calls can be made irrespective of whether the schedule is active or not. If a schedule is active then it will take a few seconds for the calls to be executed.

Example Scenario

In the BooksOnLine example, messages in the OE_bookedorders_que are propagated to different shipping sites. The following example code illustrates the various administrative calls available for specifying and managing schedules. It also shows the calls for enqueuing messages into the source queue and for dequeuing the messages at the destination site). The catalog view USER_QUEUE_SCHEDULES provides all information relevant to a schedule (see "Select Propagation Schedules in User Schema" in Chapter 5, "Administrative Interface: Views").

Example Code

CONNECT OE/OE; 
 
/* Schedule Propagation from bookedorders_que to shipping: */ 
EXECUTE dbms_aqadm.schedule_propagation( \
   queue_name      => 'OE.OE_bookedorders_que'); 
 
/* Check if a schedule has been created: */ 
SELECT * FROM user_queue_schedules; 
 
/* Enqueue some orders into OE_bookedorders_que: */ 
EXECUTE BOLADM.order_enq('My First   Book', 1, 1001, 'CA', 'USA', \
   'WESTERN', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Second  Book', 2, 1002, 'NY', 'USA', \
   'EASTERN', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Third   Book', 3, 1003, '',   'Canada', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Fourth  Book', 4, 1004, 'NV', 'USA', \
   'WESTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Fifth   Book', 5, 1005, 'MA', 'USA', \
   'EASTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Sixth   Book', 6, 1006, ''  , 'UK', \ 
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Seventh Book', 7, 1007, '',   'Canada', \
   'INTERNATIONAL', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Eighth  Book', 8, 1008, '',   'Mexico', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Ninth   Book', 9, 1009, 'CA', 'USA', \
   'WESTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Tenth   Book', 8, 1010, ''  , 'UK', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Last    Book', 7, 1011, ''  , 'Mexico', \
   'INTERNATIONAL', 'NORMAL'); 
 
/* Wait for propagation to happen: */ 
EXECUTE dbms_lock.sleep(100); 
 
/* Connect to shipping sites and check propagated messages: */
CONNECT WS/WS; 
set serveroutput on; 
 
/*  Dequeue all booked orders for West_Shipping: */
EXECUTE BOLADM.shipping_bookedorder_deq('West_Shipping', DBMS_AQ.REMOVE); 
 
CONNECT ES/ES; 
SET SERVEROUTPUT ON; 
 
/* Dequeue all remaining booked orders (normal order) for East_Shipping: */  
EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.REMOVE); 
 
CONNECT OS/OS; 
SET SERVEROUTPUT ON; 
 
/* Dequeue all international North American orders for Overseas_Shipping: */ 
EXECUTE BOLADM.get_northamerican_orders('Overseas_Shipping'); 
 
/* Dequeue rest of the booked orders for Overseas_Shipping: */ 
EXECUTE BOLADM.shipping_bookedorder_deq('Overseas_Shipping', DBMS_AQ.REMOVE); 
 
/* Disable propagation schedule for booked orders 
EXECUTE dbms_aqadm.disable_propagation_schedule(   \
   queue_name   => 'OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule has been disabled: */ 
SELECT schedule_disabled FROM user_queue_schedules; 
 
/* Alter propagation schedule for booked orders to execute every  
   15 mins (900 seconds) for a window duration of 300 seconds: */ 
EXECUTE dbms_aqadm.alter_propagation_schedule( \
   queue_name     => 'OE_bookedorders_que', \
   duration       => 300, \
   next_time      => 'SYSDATE + 900/86400',\
   latency        => 25); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule parameters have changed: */ 
SELECT next_time, latency, propagation_window FROM user_queue_schedules; 
 
/* Enable propagation schedule for booked orders: 
EXECUTE dbms_aqadm.enable_propagation_schedule( \
   queue_name     => 'OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/* Check if the schedule has been enabled: */ 
SELECT schedule_disabled FROM user_queue_schedules; 
 
/* Unschedule propagation for booked orders: */ 
EXECUTE dbms_aqadm.unschedule_propagation(   \
   queue_name      => 'OE.OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule has been dropped 
SELECT *  FROM user_queue_schedules; 
 

Propagation of Messages with LOB Attributes

Large Objects can be propagated using AQ using two methods:

Note that AQ does not support propagation from Object queues that have BFILE or REF attributes in the payload.

Example Scenario

In the BooksOnLine application, the company may wish to send promotional coupons along with the book orders. These coupons are generated depending on the content of the order, and other customer preferences. The coupons are images generated from some multimedia database, and are stored as LOBs.

When the order information is sent to the shipping warehouses, the coupon contents are also sent to the warehouses. In the code shown below the order_typ is enhanced to contain a coupon attribute of LOB type. The code demonstrates how the LOB contents are inserted into the message that is enqueued into OE_bookedorders_que when an order is placed. The message payload is first constructed with an empty LOB. The place holder (LOB locator) information is obtained from the queue table and is then used in conjunction with the LOB manipulation routines, such as DBMS_LOB.WRITE(), to fill the LOB contents. The example has additional examples regarding for enqueue and dequeue of messages with LOBs as part the payload.

A COMMIT is issued only after the LOB contents are filled in with the appropriate image data. Propagation automatically takes care of moving the LOB contents along with the rest of the message contents. The code below also shows a dequeue at the destination queue for reading the LOB contents from the propagated message. The LOB contents are read into a buffer that can be sent to a printer for printing the coupon.

Example Code

/* Enhance the type order_typ to contain coupon field (lob field): */ 
CREATE OR REPLACE TYPE order_typ AS OBJECT ( 
        orderno         NUMBER, 
        status          VARCHAR2(30), 
        ordertype       VARCHAR2(30), 
        orderregion     VARCHAR2(30), 
        customer        customer_typ, 
        paymentmethod   VARCHAR2(30), 
        items           orderitemlist_vartyp, 
        total           NUMBER, 
        coupon          BLOB);    
/ 
 
/* lob_loc is a variable of type BLOB, 
   buffer is a variable of type RAW, 
   length is a variable of type NUMBER. */ 
 
/* Complete the order data and perform the enqueue using the order_enq() 
   procedure: */
dbms_aq.enqueue('OE.OE_bookedorders_que', enqopt, msgprop,  
                OE_enq_order_data, enq_msgid); 
 
/* Get the lob locator in the queue table after enqueue: */ 
SELECT t.user_data.coupon INTO lob_loc 
FROM   OE.OE_orders_pr_mqtab t 
WHERE  t.msgid = enq_msgid; 
 
/* Generate a sample LOB of 100 bytes: */ 
buffer := hextoraw(rpad('FF',100,'FF')); 
 
/* Fill in the lob using LOB routines in the dbms_lob package: */
dbms_lob.write(lob_loc, 90, 1, buffer); 
 
/* Issue a commit only after filling in lob contents: */ 
COMMIT; 
 
/* Sleep until propagation is complete: */ 
 
/* Perform dequeue at the Western Shipping warehouse: */ 
dbms_aq.dequeue( 
        queue_name         => qname, 
        dequeue_options    => dopt, 
        message_properties => mprop, 
        payload            => deq_order_data, 
        msgid              => deq_msgid); 
 
/* Get the LOB locator after dequeue: */ 
lob_loc := deq_order_data.coupon; 
 
/* Get the length of the LOB: */ 
length := dbms_lob.getlength(lob_loc); 
 
/* Read the LOB contents into the buffer: */ 
dbms_lob.read(lob_loc, length, 1, buffer); 

Enhanced Propagation Scheduling Capabilities

Detailed information about the schedules can be obtained from the catalog views defined for propagation. Information about active schedules --such as the name of the background process handling that schedule, the SID (session, serial number) for the session handling the propagation and the Oracle instance handling a schedule (relevant if OPS is being used) -- can be obtained from the catalog views. The same catalog views also provide information about the previous successful execution of a schedule (last successful propagation of message) and the next execution of the schedule.

For each schedule detailed propagation statistics are maintained. This includes the total number of messages propagated in a schedule, total number of bytes propagated in a schedule, maximum number of messages propagated in a window, maximum number of bytes propagated in a window, average number of messages propagated in a window, average size of propagated messages and the average time to propagated a message. These statistics have been designed to provide useful information to the queue administrators for tuning the schedules such that maximum efficiency can be achieved.

Propagation has built in support for handling failures and reporting errors. For example, if the database link specified is invalid, the remote database is unavailable or if the remote queue is not enabled for enqueuing then the appropriate error message is reported. Propagation uses an exponential backoff scheme for retrying propagation from a schedule that encountered a failure. If a schedule continuously encounters failures, the first retry happens after 30 seconds, the second after 60 seconds, the third after 120 seconds and so forth. If the retry time is beyond the expiration time of the current window then the next retry is attempted at the start time of the next window. A maximum of 16 retry attempts are made after which the schedule is automatically disabled. When a schedule is disabled automatically due to failures, the relevant information is written into the alert log. At anytime it is possible to check if there were failures encountered by a schedule and if so how many successive failure were encountered, the error message indicating the cause for the failure and the time at which the last failure was encountered. By examining this information, a queue administrator can fix the failure and enable the schedule. During a retry if propagation is successful then the number of failures is reset to 0.

Propagation has support built in for OPS and is completely transparent to the user and the queue administrator. The job that handles propagation is submitted to the same instance as the owner of the queue table in which the queue resides. If at anytime there is a failure at an instance and the queue table that stores the queue is migrated to a different instance, the propagation job is also automatically migrated to the new instance. This will minimize the 'pinging' between instances and thus offer better performance. Propagation has been designed to handle any number of concurrent schedules. Note that the number of job_queue_processes is limited to a maximum of 36 and some of these may be used to handle non-propagation related jobs. Hence, propagation has built is support for multi-tasking and load balancing. The propagation algorithms are designed such that multiple schedules can be handled by a single snapshot (job_queue) process. The propagation load on a job_queue processes can be skewed based on the arrival rate of messages in the different source queues. If one process is overburdened with several active schedules while another is underloaded with many passive schedules, propagation automatically re-distributes the schedules among the processes such that they are loaded uniformly.

Example Scenario

In the BooksOnLine example, the OE_bookedorders_que is a busy queue since messages in it are propagated to different shipping sites. The following example code illustrates the calls supported by enhanced propagation scheduling for error checking and schedule monitoring.

Example Code

CONNECT OE/OE; 
 
/*  get averages 
select avg_time, avg_number, avg_size from user_queue_schedules; 
 
/*  get totals 
select total_time, total_number, total_bytes from user_queue_schedules; 
 
/*  get maximums for a window 
select max_number, max_bytes from user_queue_schedules; 
 
/*  get current status information of schedule 
select process_name, session_id, instance, schedule_disabled  
   from user_queue_schedules; 
 
/*  get information about last and next execution 
select last_run_date, last_run_time, next_run_date, next_run_time 
   from user_queue_schedules; 
 
/*  get last error information if any 
select failures, last_error_msg, last_error_date, last_error_time  
   from user_queue_schedules; 

Exception Handling During Propagation

When a system errors such as a network failure occurs, AQ will continue to attempt to propagate messages using an exponential back-off algorithm. In some situations that indicate application errors AQ will mark messages as UNDELIVERABLE if there is an error in propagating the message.

Examples of such errors are when the remote queue does not exist or when there is a type mismatch between the source queue and the remote queue. In such situations users must query the DBA_SCHEDULES view to determine the last error that occurred during propagation to a particular destination.The trace files in the $ORACLE_HOME/log directory can provide additional information about the error.

Example Scenario

In the BooksOnLine example, the ES_bookedorders_que in the Eastern Shipping region is stopped intentionally using the stop_queue() call. After a short while the propagation schedule for OE_bookedorders_que will display an error indicating that the remote queue ES_bookedorders_que is disabled for enqueuing. When the ES_bookedorders_que is started using the start_queue() call, propagation to that queue resumes and there is no error message associated with schedule for OE_bookedorders_que.

Example Scenario

/*  Intentionally stop the eastern shipping queue : */
connect BOLADM/BOLADM 
EXECUTE dbms_aqadm.stop_queue(queue_name => 'ES.ES_bookedorders_que');   
 
/* Wait for some time before error shows up in dba_queue_schedules: */ 
EXECUTE dbms_lock.sleep(100); 

/* This query will return an ORA-25207 enqueue failed error: */ 
SELECT qname, last_error_msg from dba_queue_schedules; 
 
/* Start the eastern shipping queue: */ 
EXECUTE dbms_aqadm.start_queue(queue_name => 'ES.ES_bookedorders_que');  
 
/* Wait for Propagation to resume for eastern shipping queue: */ 
EXECUTE dbms_lock.sleep(100); 
 
/* This query will indicate that there are no errors with propagation: 
SELECT qname, last_error_msg from dba_queue_schedules; 

 



Prev

Next
Oracle
Copyright © 1999 Oracle Corporation.

All Rights Reserved.

Library

Product

Contents

Index