Oracle8i Application Developer's Guide - Advanced Queuing Release 8.1.5 A68005-01 |
|
In this chapter we describe the operational interface to Oracle Advanced Queuing in terms of use cases. That is, we discuss each operation (such as "Enqueue a Message") as a use case by that name. The table listing all the use cases is provided at the head of the chapter (see "Use Case Model: Operational Interface -- Basic Operations").
A summary figure, "Use Case Diagram: Operational Interface -- Basic Operations", locates all the use cases in single drawing. If you are using the HTML version of this document, you can use this figure to navigate to the use case in which you are interested by clicking on the relevant use case title.
The individual use cases are themselves laid out as follows:
Adds a message to the specified queue.
DBMS_AQ.ENQUEUE (Queue_name IN VARCHAR2, Enqueue_options IN enqueue_options_t, Message_properties IN message_properties_t, Payload IN "<type_name>", Msgid OUT RAW);
Parameter | Description |
---|---|
|
Specifies the name of the queue to which this message should be enqueued. The queue cannot be an exception queue. |
|
For the definition please refer to the section titled Enqueue a Message [Specify Options] |
|
For the definition please refer to the section titled "Message Properties." |
|
The payload must be specified according to the specification in the associated queue table. |
|
The system generated identification of the message. This is a globally unique identifier that can be used to identify the message at dequeue time. |
sequence_deviation
parameter in enqueue_options can be used to change the order of processing between two messages. The identity of the other message, if any, is specified by the enqueue_options parameter relative_msgid. The relationship is identified by the sequence_deviation
parameter.
Specifying sequence_deviation
for a message introduces some restrictions for the delay and priority values that can be specified for this message. The delay of this message has to be less than or equal to the delay of the message before which this message is to be enqueued. The priority of this message has to be greater than or equal to the priority of the message before which this message is to be enqueued.
To specify the options available for the enqueue operation.
TYPE Enqueue_options_t IS RECORD ( Visibility BINARY_INTEGER DEFAULT ON_COMMIT, Relative_msgid RAW(16) DEFAULT NULL, Sequence_deviation BINARY_INTEGER DEFAULT NULL);
Do not use the immediate
option when you want to use LOB locators since LOB locators are valid only for the duration of the transaction. As the immediate
option automatically commits the transaction, your locator will not be valid.
The Message Properties describe the information that is used by AQ to manage individual messages. These are set at enqueue time and their values are returned at dequeue time.
TYPE Message_properties_t IS RECORD (Priority BINARY_INTEGER DEFAULT 1, Delay BINARY_INTEGER DEFAULT NO_DELAY, Expiration BINARY_INTEGER DEFAULT NEVER, Correlation VARCHAR2(128) DEFAULT NULL, Attempts BINARY_INTEGER, Recipient_list aq$_recipient_list_t, Exception_queue VARCHAR2(51) DEFAULT NULL, Enqueue_time DATE, State BINARY_INTEGER, Sender_id aq$_agent DEFAULT NULL, Original_msgid RAW(16) DEFAULT NULL);TYPE aq$_recipient_list_t IS TABLE OF aq$_agentINDEX BY BINARY_INTEGER;
TYPE aq$_agent IS OBJECT (Name VARCHAR2(30), Address VARCHAR2(1024), Protocol NUMBER);
To store a payload of type RAW
, AQ will create a queue table with LOB
column as the payload repository. The maximum size of the payload is determined by which programmatic environment you use to access AQ. For PL/SQL, Java and precompilers the limit is 32K; for the OCI the limit is 4G.
/* Enqueue to msg_queue: */ DECLARE Enqueue_options DBMS_AQ.enqueue_options_t; Message_properties DBMS_AQ.message_properties_t; Message_handle RAW(16); Message aq.message_typ; BEGIN Message := aq.message_typ('NORMAL MESSAGE', 'enqueued to msg_queue first.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', Enqueue_options => enqueue_options, Message_properties => message_properties, Payload => message, Msgid => message_handle); COMMIT; END;
/* The queue namepriority_msg_queue
is defined as an object type queue table. The payload object type ismessage
. The schema of the queue isaq
. */ /* Enqueue a message with priority 30: */ DECLARE Enqueue_options dbms_aq.enqueue_options_t; Message_properties dbms_aq.message_properties_t; Message_handle RAW(16); Message aq.Message_typ; BEGIN Message := Message_typ('PRIORITY MESSAGE', 'enqued at priority 30.'); message_properties.priority := 30; DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
To monitor one or more queues on behalf of a list of agents.
DBMS_AQ.LISTEN ( agent_list IN aq$_agent_list_t, wait IN BINARY_INTEGER default DBMS_AQ.FOREVER, agent OUT aq$_agent); TYPE aq$_agent_list_t IS TABLE of aq$_agent INDEX BY BINARY_INTEGER;
The call takes a list of agents as an argument. You specify the queue to be monitored in the address field of each agent listed. You also must specify the name of the agent when monitoring multiconsumer queues. For single-consumer queues, an agent name must not be specified. Only local queues are supported as addresses. Protocol is reserved for future use.
This is a blocking call that returns when there is a message ready for consumption for an agent in the list. If there are messages for more than one agent, only the first agent listed is returned. If there are no messages found when the wait time expires, an error is raised.
A successful return from the listen
call is only an indication that there is a message for one of the listed agents in one the specified queues. The interested agent must still dequeue the relevant message.
Note that you cannot call listen
on non-persistent queues.
/* The listen call allows you to monitor a list of queues for messages for specific agents. You need to have dequeue privileges for all the queues you wish to monitor. */
DECLARE Agent_w_msg aq$_agent; My_agent_list dbms_aq.agent_list_t; BEGIN /* NOTE: MCQ1, MCQ2, MCQ3 are multi-consumer queues in SCOTT's schema * SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema */ Qlist(1):= aq$_agent(NULL, 'scott.SCQ1', NULL); Qlist(2):= aq$_agent(NULL, 'SCQ2', NULL); Qlist(3):= aq$_agent(NULL, 'SCQ3', NULL); /* Listen with a time-out of zero: */ DBMS_AQ.LISTEN( Agent_list => My_agent_list, Wait => 0, Agent => agent_w_msg); DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' || agent_w_msg.address); DBMS_OUTPUT.PUT_LINE(''); END;
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* set agent into descriptor */ void SetAgent(agent, appname, queue,errhp) OCIAQAgent *agent; text *appname; text *queue; OCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) OCIAQAgent *agent; OCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent = (OCIAQAgent *)0; /* added next 2 121598 */ int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { agent_list[i] = (OCIAQAgent *)0; OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema */ SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp); SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* set agent into descriptor */ /* void SetAgent(agent, appname, queue) */ void SetAgent(agent, appname, queue,errhp) OCIAQAgent *agent; text *appname; text *queue; OCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) OCIAQAgent *agent; OCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent = (OCIAQAgent *)0; /* added next 2 121598 */ int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { agent_list[i] = (OCIAQAgent *)0; OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema */ SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp); SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
/* The listen call allows you to monitor a list of queues for messages for specific agents. You need to have dequeue privileges for all the queues you wish to monitor. */
DECLARE Agent_w_msg aq$_agent; My_agent_list dbms_aq.agent_list_t; BEGIN /* NOTE: MCQ1, MCQ2, MCQ3 are multi-consumer queues in SCOTT's schema * SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema */ Qlist(1):= aq$_agent('agent1', 'MCQ1', NULL); Qlist(2):= aq$_agent('agent2', 'scott.MCQ2', NULL); Qlist(3):= aq$_agent('agent3', 'scott.MCQ3', NULL); /* Listen with a time-out of zero: */ DBMS_AQ.LISTEN( agent_list => My_agent_list, wait => 0, agent => agent_w_msg); DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' || agent_w_msg.address); DBMS_OUTPUT.PUT_LINE(''); END; /
DECLARE Agent_w_msg aq$_agent; My_agent_list dbms_aq.agent_list_t; BEGIN /* NOTE: MCQ1, MCQ2, MCQ3 are multi-consumer queues in SCOTT's schema * SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema */ Qlist(1):= aq$_agent('agent1', 'MCQ1', NULL); Qlist(2):= aq$_agent(NULL, 'scott.SQ1', NULL); Qlist(3):= aq$_agent('agent3', 'scott.MCQ3', NULL); /* Listen with a time-out of 100 seconds */ DBMS_AQ.LISTEN( Agent_list => My_agent_list, Wait => 100, Agent => agent_w_msg); DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' || agent_w_msg.address || 'for agent' || agent_w_msg.name); DBMS_OUTPUT.PUT_LINE(''); END; /
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } void SetAgent(OCIAQAgent *agent, text *appname, text *queue, OCIError *errhp, OCIEnv *envhp); void GetAgent(OCIAQAgent *agent, OCIError *errhp); /*----------------------------------------------------------------*/ /* OCI Listen examples for multi-consumers */ /* */ void SetAgent(agent, appname, queue, errhp) OCIAQAgent *agent; text *appname; text *queue; OCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) OCIAQAgent *agent; OCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } /* main from AQ Listen to Multi-Consumer Queue(s) */ /* int main() */ int main(char *argv, int argc) { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent; int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **)0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * MCQ1, MCQ2, MCQ3 are multi-consumer queues in SCOTT's schema */ /* Listening to Multi-consumer Queues with Zero Timeout */ SetAgent(agent_list[0], "app1", "MCQ1", errhp); SetAgent(agent_list[1], "app2", "MCQ2", errhp); SetAgent(agent_list[2], "app3", "MCQ3", errhp); checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); /* Listening to Multi-consumer Queues with Timeout of 120 Seconds */ SetAgent(agent_list[0], "app1", "SCOTT.MCQ1", errhp); SetAgent(agent_list[1], "app2", "SCOTT.MCQ2", errhp); SetAgent(agent_list[2], "app3", "SCOTT.MCQ3", errhp); checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); /* Listening to a Mixture of Single and Multi-consumer Queues * with a Timeout of 100 Seconds */ SetAgent(agent_list[0], "app1", "SCOTT.MCQ1", errhp); SetAgent(agent_list[1], "app2", "SCOTT.MCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 100, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
Dequeues a message from the specified queue.
DBMS_AQ.DEQUEUE (queue_name IN VARCHAR2, dequeue_options IN dequeue_options_t, message_properties OUT message_properties_t, payload OUT "<type_name>", msgid OUT raw);
READY
state are dequeued unless a msgid is specified.
BROWSE
call may not see a message that is enqueued after the beginning of the browsing transaction.
The default NAVIGATION
parameter during dequeue is NEXT_MESSAGE
. This means that subsequent dequeues will retrieve the messages from the queue based on the snapshot obtained in the first dequeue. In particular, a message that is enqueued after the first dequeue command will be processed only after processing all the remaining messages in the queue. This is usually sufficient when all the messages have already been enqueued into the queue, or when the queue does not have a priority-based ordering. However, applications must use the FIRST_MESSAGE n
avigation option when the first message in the queue needs to be processed by every dequeue command. This usually becomes necessary when a higher priority message arrives in the queue while messages already-enqueued are being processed.
LOCKED
or REMOVE
mode locks only a single message. By contrast, a dequeue operation that seeks to dequeue a message that is part of a group will lock the entire group. This is useful when all the messages in a group need to be processed as an atomic unit.
NEXT_TRANSACTION
to start dequeuing messages from the next available group. In the event that no groups are available, the dequeue will time-out after the specified WAIT
period.
To specify the options available for the dequeue operation.
TYPE dequeue_options_t IS RECORD ( consumer_name VARCHAR2(30) default NULL, dequeue_mode BINARY_INTEGER default REMOVE, navigation BINARY_INTEGER default NEXT_MESSAGE, visibility BINARY_INTEGER default ON_COMMIT, wait BINARY_INTEGER default FOREVER, msgid RAW(16) default NULL, correlation VARCHAR2(128) default NULL);
Typically, you expect the consumer of messages to access messages using the dequeue interface. You can view processed messages or messages still to be processed by browsing by message id or by using SELECT
s.
/* Dequeue from msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END;
To specify the options available for the dequeue operation.
TYPE dequeue_options_t IS RECORD ( consumer_name VARCHAR2(30) default NULL, dequeue_mode BINARY_INTEGER default REMOVE, navigation BINARY_INTEGER default NEXT_MESSAGE, visibility BINARY_INTEGER default ON_COMMIT, wait BINARY_INTEGER default FOREVER, msgid RAW(16) default NULL, correlation VARCHAR2(128) default NULL);
To register a callback for message notification.
ub4 OCISubscriptionRegister ( OCISvcCtx *svchp, OCISubscription **subscrhpp, ub2 count, OCIError *errhp, ub4 mode);
message_receive
(dequeue) to retrieve the message.
OCI_SUBSCR_NAMESPACE_AQ
.
schema
.queue
' if the registration is for a single consumer queue and 'schema.queue:consumer_name
' if the registration is for a multiconsumer queues.
OCIAQListen
(), OCISubscriptionDisable
(), OCISubscriptionEnable
(), OCISubscriptionUnRegister
()
/* OCIRegister can be used by the client to register to receive notifications when messages are enqueued into non-persistent and normal queues. */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static OCIEnv *envhp; static OCIServer *srvhp; static OCIError *errhp; static OCISvcCtx *svchp; /* The callback that gets invoked on notification */ ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) dvoid *ctx; OCISubscription *subscrhp; /* subscription handle */ dvoid *pay; /* payload */ ub4 payl; /* payload length */ dvoid *desc; /* the AQ notification descriptor */ ub4 mode; { text *subname; ub4 size; ub4 *number = (ub4 *)ctx; text *queue; text *consumer; OCIRaw *msgid; OCIAQMsgProperties *msgprop; (*number)++; /* Get the subscription name */ OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, (dvoid *)&subname, &size, OCI_ATTR_SUBSCR_NAME, errhp); printf("got notification number %d for %.*s %d \n", *number, size, subname, payl); /* Get the queue name from the AQ notify descriptor */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, OCI_ATTR_QUEUE_NAME, errhp); /* Get the consumer name for which this notification was received */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, OCI_ATTR_CONSUMER_NAME, errhp); /* Get the message id of the message for which we were notified */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, OCI_ATTR_NFY_MSGID, errhp); /* Get the message properties of the message for which we were notified */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, OCI_ATTR_MSG_PROP, errhp); } int main(argc, argv) int argc; char *argv[]; { OCISession *authp = (OCISession *) 0; /* The subscription handles */ OCISubscription *subscrhp[5]; /* Registrations are for AQ namespace */ ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; /* The context fot the callback */ ub4 ctx[5] = {0,0,0,0,0}; printf("Initializing OCI Process\n"); /* The OCI Process Environment must be initialized with OCI_EVENTS */ /* OCI_OBJECT flag is set to enable us dequeue */ (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0 ); printf("Initialization successful\n"); /* The standard OCI setup */ printf("Initializing OCI Env\n"); (void) OCIEnvInit((OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 ); (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0); /* Server contexts */ (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0); (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX, (size_t) 0, (dvoid **) 0); printf("connecting to server\n"); (void) OCIServerAttach( srvhp, errhp, (text *)"", strlen(""), 0); printf("connect successful\n"); /* Set attribute server context in the service context */ (void) OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp); (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "scott", (ub4) strlen("scott"), (ub4) OCI_ATTR_USERNAME, errhp); (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin ( svchp, errhp, authp, OCI_CRED_RDBMS, (ub4) OCI_DEFAULT)); (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *) authp, (ub4) 0, (ub4) OCI_ATTR_SESSION, errhp); /* Setting the subscription handle for notification on a NORMAL single consumer queue */ printf("allocating subscription handle\n"); subscrhp[0] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); printf("setting subscription name\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.SCQ1", (ub4) strlen("SCOTT.SCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); printf("setting subscription callback\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); printf("setting subscription context \n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[0], (ub4)sizeof(ctx[0]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); printf("setting subscription namespace\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a NORMAL multi-consumer consumer queue */ subscrhp[1] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.MCQ1:APP1", (ub4) strlen("SCOTT.MCQ1:APP1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[1], (ub4)sizeof(ctx[1]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a non-persistent single-consumer queue */ subscrhp[2] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.NP_SCQ1", (ub4) strlen("SCOTT.NP_SCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[2], (ub4)sizeof(ctx[2]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a non-persistent multi consumer queue */ /* Waiting on user specified recipient */ subscrhp[3] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.NP_MCQ1", (ub4) strlen("SCOTT.NP_MCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[3], (ub4)sizeof(ctx[3]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); printf("Registering for all the subscriptiosn \n"); checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 4, errhp, OCI_DEFAULT)); printf("Waiting for notifcations \n"); /* wait for minutes for notifications */ sleep(300); printf("Exiting\n"); }