Oracle9i Application Developer's Guide - Advanced Queuing
Release 1 (9.0.1)

Part Number A88890-02
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback

Go to previous page Go to beginning of chapter Go to next page

A Sample Application Using AQ, 6 of 7


Asynchronous Notifications

This feature allows clients to receive notification of a message of interest. The client can use it to monitor multiple subscriptions. The client does not have to be connected to the database to receive notifications regarding its subscriptions.

Clients can use the OCI function, OCISubcriptionRegister, or the PL/SQL procedure DBMS_AQ.REGISTER to register interest in messages in a queue (see "Registering for Notification" in Chapter 11, "Operational Interface: Basic Operations").


See also:

Documentation on Register for Notification in the Oracle Call Interface Programmer's Guide and documentation on DBMS_AQ.REGISTER the Oracle Supplied PL/SQL Packages reference 


The client can specify a callback function that is invoked for every new message that is enqueued. The callback can be a C function in an OCI client, a user-defined PL/SQL procedure. The user can also specify an email address to which notifications will be mailed.

For nonpersistent queues, the message is delivered to the client as part of the notification. For persistent queues, only the message properties are delivered as part of the notification. Consequently, in the case of persistent queues, the client has to make an explicit dequeue to access the contents of the message.

Clients can also specify the presentation for notifications, either RAW or XML.

Scenario

In the BooksOnLine application, a customer can request Fed-Ex shipping (priority 1), priority air shipping (priority 2), or regular ground shipping (priority 3).

The shipping application then ships the orders according to the user's request. It is of interest to BooksOnLine to find out how many requests of each shipping type come in each day. The application uses asynchronous notification facility for this purpose. It registers for notification on the WS.WS_bookedorders_que. When it is notified of new message in the queue, it updates the count for the appropriate shipping type depending on the priority of the message.

Visual Basic (OO4O): Example Code

Refer to the Visual Basic online help, "Monitoring Messages".

Java (JDBC): Example Code

This feature is not supported by the Java API.

C (OCI): Example Code

This example illustrates the use of OCIRegister. At the shipping site, an OCI client program keeps track of how many orders were made for each of the shipping types, FEDEX, AIR and GROUND. The priority field of the message enables us to determine the type of shipping desired.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>
#ifdef WIN32COMMON
#define sleep(x)   Sleep(1000*(x))
#endif
static text *username = (text *) "WS";
static text *password = (text *) "WS";

static OCIEnv *envhp;
static OCIServer *srvhp;
static OCIError *errhp;
static OCISvcCtx *svchp;

static void checkerr(/*_ OCIError *errhp, sword status _*/);

struct ship_data
{
  ub4  fedex;
  ub4  air;
  ub4  ground;
};

typedef struct ship_data ship_data;

int main(/*_ int argc, char *argv[] _*/);


/* Notify callback: */
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode)
dvoid *ctx;
OCISubscription *subscrhp;
dvoid *pay;
ub4    payl;
dvoid *desc;
ub4    mode;
{
 text                *subname;
 ub4                 size;
 ship_data           *ship_stats = (ship_data *)ctx;
 text                *queue;
 text                *consumer;
 OCIRaw              *msgid;
 ub4                 priority;
 OCIAQMsgProperties  *msgprop;

 OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION,
                             (dvoid *)&subname, &size,
                             OCI_ATTR_SUBSCR_NAME, errhp);

 /* Extract the attributes from the AQ descriptor.
    Queue name: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, 
            OCI_ATTR_QUEUE_NAME, errhp);
  
 /* Consumer name: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, 
            OCI_ATTR_CONSUMER_NAME, errhp);

 /* Msgid: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, 
            OCI_ATTR_NFY_MSGID, errhp);

 /* Message properties: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, 
            OCI_ATTR_MSG_PROP, errhp);

 /* Get priority from message properties: */
 checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 
                             (dvoid *)&priority, 0, 
                             OCI_ATTR_PRIORITY, errhp));

  switch (priority)
  {
  case 1:  ship_stats->fedex++;
           break;
  case 2 : ship_stats->air++;
           break;
  case 3:  ship_stats->ground++;
           break;
  default: 
           printf(" Error priority %d", priority);
  }
}


int main(argc, argv)
int argc;
char *argv[];
{
  OCISession *authp = (OCISession *) 0;
  OCISubscription *subscrhp[8];
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;
  ship_data  ctx = {0,0,0};
  ub4 sleep_time = 0;

  printf("Initializing OCI Process\n");

  /* Initialize OCI environment with OCI_EVENTS flag set: */
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0,
                       (dvoid * (*)(dvoid *, size_t)) 0,
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                       (void (*)(dvoid *, dvoid *)) 0 );

  printf("Initialization successful\n");

  printf("Initializing OCI Env\n");
  (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 
);
  printf("Initialization successful\n");

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_
ERROR, 
                   (size_t) 0, (dvoid **) 0));

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_
SERVER,
                   (size_t) 0, (dvoid **) 0));

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_
SVCCTX,
                   (size_t) 0, (dvoid **) 0));


  printf("connecting to server\n");
  checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias",
           strlen("inst1_alias"), (ub4) OCI_DEFAULT));
  printf("connect successful\n");

  /* Set attribute server context in the service context: */
  checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, 
                    (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp));

  checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp,
                       (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0));
 
  /* Set username and password in the session handle: */
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                  (dvoid *) username, (ub4) strlen((char *)username),
                  (ub4) OCI_ATTR_USERNAME, errhp));
 
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                  (dvoid *) password, (ub4) strlen((char *)password),
                  (ub4) OCI_ATTR_PASSWORD, errhp));

  /* Begin session: */
  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS, 
                          (ub4) OCI_DEFAULT));

  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX,
                   (dvoid *) authp, (ub4) 0,
                   (ub4) OCI_ATTR_SESSION, errhp);


  /* Register for notification: */
  printf("allocating subscription handle\n");
  subscrhp[0] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], 
                        (ub4) OCI_HTYPE_SUBSCRIPTION,
                        (size_t) 0, (dvoid **) 0);
 
  printf("setting subscription name\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS", 
                 (ub4) strlen("WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);
 
  printf("setting subscription callback\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx, (ub4)sizeof(ctx),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  printf("setting subscription namespace\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

  printf("Registering \n");
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp, 
                                          OCI_DEFAULT));

  sleep_time = (ub4)atoi(argv[1]);
  printf ("waiting for %d s", sleep_time);
  sleep(sleep_time);

  printf("Exiting");
  exit(0);
}

void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
  text errbuf[512];
  sb4 errcode = 0;

  switch (status)
  {
  case OCI_SUCCESS:
    break;
  case OCI_SUCCESS_WITH_INFO:
    (void) printf("Error - OCI_SUCCESS_WITH_INFO\n");
    break;
  case OCI_NEED_DATA:
    (void) printf("Error - OCI_NEED_DATA\n");
    break;
  case OCI_NO_DATA:
    (void) printf("Error - OCI_NODATA\n");
    break;
  case OCI_ERROR:
    (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode,
                        errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR);
    (void) printf("Error - %.*s\n", 512, errbuf);
    break;
  case OCI_INVALID_HANDLE:
    (void) printf("Error - OCI_INVALID_HANDLE\n");
    break;
  case OCI_STILL_EXECUTING:
    (void) printf("Error - OCI_STILL_EXECUTE\n");
    break;
  case OCI_CONTINUE:
    (void) printf("Error - OCI_CONTINUE\n");
    break;
  default:
    break;
  }
}

PL/SQL (DBMS_AQ package): Example Code

This example illustrates the use of the DBMS_AQ.REGISTER procedure.

In the BooksOnline scenario, assume that we want a PL/SQL callback WS.notifyCB() to be invoked when the subscriber BOOKED_ORDER receives a message in the WS.WS_BOOKED_ORDERS_QUE queue. In addition, we want to send an email to john@company.com when an order is enqueued in the queue for subscriber BOOKED_ORDERS. Also assume that we want to invoke the servlet http://xyz.company.com/servlets/NofifyServlet. This can be done as follows:

First define a PL/SQL procedure that will be invoked on notification.

connect ws/ws;
set echo on;
set serveroutput on;

-- notifyCB callback 
create or replace procedure notifyCB(
  context raw, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor,
  payload raw, payloadl number)
AS
  dequeue_options   DBMS_AQ.dequeue_options_t;
  message_properies DBMS_AQ.message_properties_t;
  message_handle    RAW(16);
  message           BOLADM.order_typ;
BEGIN
  -- get the consumer name and msg_id from the descriptor
  dequeue_options.msgid := descr.msg_id;
  dequeue_options.consumer_name := descr.consumer_name;

  -- Dequeue the message
  DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
                  dequeue_options => dequeue_options,
                  message_properties => message_properties,
                  payload => message,
                  msgid => message_handle);

  commit;

  DBMS_OUTPUT.PUTLINE('Received Order: ' || message.orderno);

END;
/

The PL/SQL procedure, email address, and HTTP URL can be registered as follows:

connect ws/ws; 
set echo on; 
set serveroutput on; 

DECLARE 
  reginfo1     sys.aq$_reg_info; 
  reginfo2     sys.aq$_reg_info; 
  reginfo3     sys.aq$_reg_info; 
  reginfolist  sys.aq$_reg_info_list; 

BEGIN 
   -- register for the pl/sql procedure notifyCB to be called on notification 
  reginfo1 := sys.aq$_reg_info('WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS', 
                     DBMS_AQ.NAMESPACE_AQ, 'plsql://WS.notifyCB', 
                     HEXTORAW('FF')); 

  -- register for an email to be sent to john@company.com on notification 
  reginfo2 := sys.aq$_reg_info('WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS', 
                          DBMS_AQ.NAMESPACE_AQ, 'mailto://john@company.com', 
                            HEXTORAW('FF')); 

  -- register for an HTTP servlet to be invoked for notification 
  reginfo3 := sys.aq$_reg_info('WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS', 
                          DBMS_AQ.NAMESPACE_AQ, 
                          'http://xyz.oracle.com/servlets/NotifyServlet', 
                            HEXTORAW('FF')); 
  
  -- Create the registration info list 
  reginfolist := sys.aq$_reg_info_list(reginfo1); 
  reginfolist.EXTEND; 
  reginfolist(2) := reginfo2; 

  reginfolist.EXTEND; 
  reginfolist(3) := reginfo3; 

-- do the registration 
  sys.dbms_aq.register(reginfolist, 3); 

END; 

Registering for Notifications Using the AQ XML Servlet

Clients can register for AQ notifications over the Internet. See Chapter 17, "Internet Access to Advanced Queuing" for more information on registering for AQ notifications using IDAP.

The register request has the following format:

?xml version="1.0"?>
<Envelope xmlns= "http://ns.oracle.com/AQ/schemas/envelope">
      <Body>
      
        <AQXmlRegister xmlns = "http://ns.oracle.com/AQ/schemas/access">

          <register_options>
            <destination>WS.WS_BOOKEDORDERS_QUE</destination>
            <consumer_name>BOOKED_ORDERS</consumer_name>
            <notify_url>mailto://john@company.com</notify_url>
          </register_options>
          
          <AQXmlCommit/>  
        </AQXmlRegister>  
      </Body>
</Envelope>

The email notification sent to john@company.com will have the following format:

<?xml version="1.0"?>
<Envelope xmlns="http://www.oracle.com/schemas/IDAP/envelope">
    <Body>
        <AQXmlNotification xmlns="http://www.oracle.com/schemas/AQ/access">
            <notification_options>
                <destination>WS.WS_BOOKEDORDERS_QUE</destination>
            </notification_options>
            <message_set>
                <message>
                    <message_header>                       
                       <message_id>81128B6AC46D4B15E03408002092AA15</message_id>
                       <correlation>RUSH</correlation>
                       <priority>1</priority>
                       <delivery_count>0</delivery_count>
                       <sender_id>
                            <agent_name>john</agent_name>
                       </sender_id>
                       <message_state>0</message_state>
                    </message_header>
                </message>
            </message_set>
        </AQXmlNotification>
    </Body>
</Envelope>

Go to previous page Go to beginning of chapter Go to next page
Oracle
Copyright © 1996-2001, Oracle Corporation.

All Rights Reserved.
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback