Oracle8i Application Developer's Guide - Advanced Queuing
Release 2 (8.1.6)

A76938-01

Library

Product

Contents

Index

Prev Up Next

Oracle Advanced Queuing by Example, 7 of 8


Deploy AQ with XA


Note:

You may need to set up the following data structures for certain examples to work:

CONNECT system/manager;
DROP USER aqadm CASCADE;
GRANT CONNECT, RESOURCE TO aqadm; 
CREATE USER aqadm IDENTIFIED BY aqadm;
GRANT EXECUTE ON DBMS_AQADM TO aqadm;
GRANT Aq_administrator_role TO aqadm;
DROP USER aq CASCADE;
CREATE USER aq IDENTIFIED BY aq;
GRANT CONNECT, RESOURCE TO aq; 
GRANT EXECUTE ON dbms_aq TO aq;
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(
   queue_table => 'aq.qtable', 
   queue_payload_type => 'RAW');

EXECUTE DBMS_AQADM.CREATE_QUEUE(
   queue_name => 'aq.aqsqueue', 
   queue_table => 'aq.qtable');

EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 
'aq.aqsqueue');
 


/* 
 * The program uses the XA interface to enqueue 100 messages and then 
 * dequeue them.
 * Login: aq/aq
 * Requires: AQ_USER_ROLE to be granted to aq
 *        a RAW queue called "aqsqueue" to be created in aqs schema
 *        (above steps can be performed by running aqaq.sql)
 * Message Format: Msgno: [0-1000] HELLO, WORLD!
 * Author: schandra@us.oracle.com
 */

#ifndef OCI_ORACLE
#include <oci.h>
#endif

#include <xa.h> 

/* XA open string */
char xaoinfo[] =  "oracle_xa+ACC=P/AQ/AQ+SESTM=30+Objects=T"; 

/* template for generating XA XIDs */
XID  xidtempl = { 0x1e0a0a1e, 12, 8, "GTRID001BQual001" }; 

/* Pointer to Oracle XA function table */
extern struct xa_switch_t xaosw;                         /* Oracle XA switch */
static struct xa_switch_t *xafunc = &xaosw; 

/* dummy stubs for ax_reg and ax_unreg */
int ax_reg(rmid, xid, flags)
int  rmid;
XID *xid;
long flags;
{
  xid->formatID = -1;
  return 0;
}
 
int ax_unreg(rmid, flags)
int     rmid;
long    flags;
{
  return 0;
}

/* generate an XID */
void xidgen(xid, serialno) 
XID *xid;
int  serialno;
{ 
  char seq [11]; 
 
  sprintf(seq, "%d", serialno); 
  memcpy((void *)xid, (void *)&xidtempl, sizeof(XID));
  strncpy((&xid->data[5]), seq, 3); 
} 

/* check if XA operation succeeded */
#define checkXAerr(action, funcname)    \
    if ((action) != XA_OK)        \
    {                  \
      printf("%s failed!\n", funcname);   \
      exit(-1);            \
    } else

/* check if OCI operation succeeded */
static void checkOCIerr(errhp, status)
OCIError *errhp;
sword      status;
{
  text errbuf[512];
  ub4 buflen;
  sb4 errcode;

  if (status == OCI_SUCCESS) return;

  if (status == OCI_ERROR)
  {
    OCIErrorGet((dvoid *) errhp, 1, (text *)0, &errcode, errbuf,
      (ub4)sizeof(errbuf), OCI_HTYPE_ERROR);
    printf("Error - %s\n", errbuf);
  }
  else
    printf("Error - %d\n", status);
  exit (-1);
}


void main(argc, argv)
int    argc;
char **argv;
{ 
  int         msgno = 0;             /* message being enqueued */
  OCIEnv       *envhp;                /* OCI environment handle */
  OCIError     *errhp;                 /* OCI Error handle */
  OCISvcCtx    *svchp;                    /* OCI Service handle */
  char      message[128];                /* message buffer */
  ub4      mesglen;             /* length of message */
  OCIRaw       *rawmesg = (OCIRaw *)0;       /* message in OCI RAW format */
  OCIInd      ind = 0;                 /* OCI null indicator */
  dvoid          *indptr = (dvoid *)&ind;       /* null indicator pointer */
  OCIType      *mesg_tdo = (OCIType *) 0;         /* TDO for RAW datatype */
  XID      xid;                 /* XA's global transaction id */
  ub4      i;                      /* array index */
  
  checkXAerr(xafunc->xa_open_entry(xaoinfo, 1, TMNOFLAGS), "xaoopen");

  svchp = xaoSvcCtx((text *)0);           /* get service handle from XA */
  envhp = xaoEnv((text *)0);          /* get enviornment handle from XA */

  if (!svchp || !envhp)
  {
    printf("Unable to obtain OCI Handles from XA!\n");
    exit (-1);
  }

  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&errhp, 
       OCI_HTYPE_ERROR, 0, (dvoid **)0);  /* allocate error handle */

  /* enqueue 1000 messages, 1 message per XA transaction */
  for (msgno = 0; msgno < 1000; msgno++)         
  {
    sprintf((const char *)message, "Msgno: %d, Hello, World!", msgno);
    mesglen = (ub4)strlen((const char *)message);
    xidgen(&xid, msgno);                 /* generate an XA xid */

    checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart");

    checkOCIerr(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *)message, mesglen,
                &rawmesg));

    if (!mesg_tdo)          /* get Type descriptor (TDO) for RAW type */
      checkOCIerr(errhp, OCITypeByName(envhp, errhp, svchp, 
                      (CONST text *)"AQADM", strlen("AQADM"),
                       (CONST text *)"RAW", strlen("RAW"), 
                   (text *)0, 0, OCI_DURATION_SESSION, 
                   OCI_TYPEGET_ALL, &mesg_tdo));

    checkOCIerr(errhp, OCIAQEnq(svchp, errhp, (CONST text *)"aqsqueue",
               0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 
            0, 0)); 

    checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend");
    checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit");
    printf("%s Enqueued\n", message);
  }

  /* dequeue 1000 messages within one XA transaction */
  xidgen(&xid, msgno);                           /* generate an XA xid */
  checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart");
  for (msgno = 0; msgno < 1000; msgno++)         
  {
    checkOCIerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"aqsqueue", 
             0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 
            0, 0));
    if (ind)
      printf("Null Raw Message");
    else
      for (i = 0; i < OCIRawSize(envhp, rawmesg); i++)
   printf("%c", *(OCIRawPtr(envhp, rawmesg) + i));
    printf("\n");

  }    
  checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend");
  checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit");
} 


Prev Up Next
Oracle
Copyright © 1999 Oracle Corporation.

All Rights Reserved.

Library

Product

Contents

Index