Oracle9i Application Developer's Guide - Advanced Queuing Release 1 (9.0.1) Part Number A88890-02 |
|
A Sample Application Using AQ, 3 of 7
Oracle supports system-level access control for all queuing operations, allowing an application designer or DBA to designate users as queue administrators. A queue administrator can invoke AQ administrative and operational interfaces on any queue in the database. This simplifies the administrative work because all administrative scripts for the queues in a database can be managed under one schema. For more information, see "Oracle Enterprise Manager Support".
In the BooksOnLine
application, the DBA creates BOLADM
, the BooksOnLine
Administrator account, as the queue administrator of the database. This allows BOLADM
to create, drop, manage, and monitor queues in the database. If PL/SQL packages are needed in the BOLADM schema for applications to enqueue and dequeue, the DBA should grant ENQUEUE_ANY
and DEQUEUE_ANY
system privileges to BOLADM:
CREATE USER BOLADM IDENTIFIED BY BOLADM; GRANT CONNECT, RESOURCE, aq_administrator_role TO BOLADM; GRANT EXECUTE ON dbms_aq TO BOLADM; GRANT EXECUTE ON dbms_aqadm TO BOLADM; EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','BOLADM',FALSE); EXECUTE dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','BOLADM',FALSE);
If using the Java AQ API, BOLADM must be granted execute privileges on the DBMS_AQIN package:
GRANT EXECUTE ON DBMS_AQIN to BOLADM;
In the application, AQ propagators populate messages from the Order Entry(OE) schema to the Western Sales (WS), Eastern Sales (ES) and Worldwide Sales (OS) schemas. The WS
, ES
and OS
schemas in turn populate messages to the Customer Billing (CB) and Customer Service (CS) schemas. Hence the OE
, WS
, ES
and OS
schemas all host queues that serve as the source queues for the propagators.
When messages arrive at the destination queues, sessions based on the source queue schema name are used for enqueuing the newly arrived messages into the destination queues. This means that you need to grant schemas of the source queues enqueue privileges to the destination queues.
To simplify administration, all schemas that host a source queue in the BoooksOnLine application are granted the ENQUEUE_ANY
system privilege:
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','OE',FALSE); EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','WS',FALSE); EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','ES',FALSE); EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','OS',FALSE);
To propagate to a remote destination queue, the login user specified in the database link in the address field of the agent structure should either be granted the ENQUEUE
ANY QUEUE
privilege, or be granted the rights to enqueue to the destination queue. If the login user in the database link also owns the queue tables at the destination, no explicit privilege grant is needed.
Use the dbexecutesql
interface from the database for this functionality.
No example is provided with this release.
You can specify transformations between different Oracle and user-defined types. Transformations must be expressed as SQL expressions, PL/SQL functions (including callouts), or Java stored procedures, with a return type of the target type. Only one-to-one message transformations are supported. The transformation engine is tightly integrated with Advanced Queuing to facilitate transformation of messages as they move through the database messaging system.
An Advanced Queuing application can enqueue or dequeue messages from a queue in the format specified by the application. An application can also specify a message format when subscribing to queues. The AQ propagator transforms messages to the format of the destination queue message, as specified by the remote subscription. The transformation function cannot write the database state or commit/rollback the current transaction. Transformations are exported with a schema or a full database export. Figure 8-1 shows how transformations are integrated with Advanced Queuing.
In the BooksOnLine application, assume that the order type is represented differently in the order entry and the shipping applications.
The order type of the Order Entry application (in schema OE) is as follows:
create or replace type order_typ as object ( orderno number, status varchar2(30), ordertype varchar2(30), orderregion varchar2(30), custno number, paymentmethod varchar2(30), items orderitemlist_vartyp, ccnumber varchar2(20), order_date date); create or replace type customer_typ as object ( custno number, custid varchar2(20), name varchar2(100), street varchar2(100), city varchar2(30), state varchar2(2), zip number, country varchar2(100)); create or replace type book_typ as object ( title varchar2(100), authors varchar2(100), ISBN varchar2(20), price number); create or replace type orderitem_typ as object ( quantity number, item book_typ, subtotal number); create or replace type orderitemlist_vartyp as varray (20) of orderitem_typ;
The order item of the shipping application is defined as follows
create or replace type order_typ_sh as object ( orderno number, status varchar2(30), ordertype varchar2(30), orderregion varchar2(30), customer customer_typ_sh, paymentmethod varchar2(30), items orderitemlist_vartyp, ccnumber varchar2(20), order_date date); create or replace type customer_typ_sh as object ( custno number, name varchar2(100), street varchar2(100), city varchar2(30), state varchar2(2), zip number); create or replace type book_typ_sh as object ( title varchar2(100), authors varchar2(100), ISBN varchar2(20), price number); create or replace type orderitem_typ_sh as object ( quantity number, item book_typ, subtotal number); create or replace type orderitemlist_vartyp_sh as varray (20) of orderitem_typ_sh;
The Overseas Shipping application uses a composite type that contains an XMLType attribute. The payload type of the Overseas Shipping application is:
CREATE OR REPLACE TYPE order_xml_typ as OBJECT ( orderno NUMBER, details XMLTYPE);
You can create transformations in the following ways:
This representation is preferable for simple transformations or transformations that are not easily broken down into independent transformations for each attribute.
execute dbms_transform.create_transformation( schema => 'OE', name => 'OE2WS', from_schema => 'OE', from_type => 'order_typ', to_schema => 'WS', to_type => 'order_typ_sh', transformation( 'WS.order_typ_sh(source.user_data.orderno, source.user_data.status, source.user_data.ordertype, source.user_data.orderregion, WS.get_customer_info(source.user_data.custno), source.user_data.paymentmethod, source.user_data.items, source.user_data.ccnumber, source.user_data.order_date)');
In the BooksOnline application, assume that the Overseas Shipping site represents the order as ORDER_XML_TYP
, with the order information in an XMLType attribute. The Order Entry site represents the order as an Oracle object, ORDER_TYP
. Since the Overseas Shipping site subscribes to messages in the OE_BOOKEDORDERS_QUE
queue, a transformation is applied before messages are propagated from the Order Entry site to the Overseas Shipping site.
ORDER_XML_TYP
is a composite type that contains an XMLType attribute:
CREATE OR REPLACE TYPE order_xml_typ as OBJECT ( orderno NUMBER, details XMLTYPE);
The transformation is defined as follows:
CREATE OR REPLACE FUNCTION CONVERT_TO_ORDER_XML(input_order TYPE OE.ORDER_ TYP) RETURN OS.ORDER_XML_TYP AS xdata SYS.XMLType; new_order OS.ORDER_XML_TYP; BEGIN select SYS_XMLGEN(input_order) into xdata from dual; new_order := OS.ORDER_XML_TYP(input_order.orderno, xdata); RETURN new_order; END CONVERT_TO_ORDER_XML; execute dbms_transform.create_transformation( schema => 'OS', name => 'OE2XML', from_schema => 'OE', from_type => 'ORDER_TYP', to_schema => 'OS', to_type => 'ORDER_XML_TYP', transformation => 'CONVERT_TO_ORDER_XML(source.user_data)'); /* Add a rule-based subscriber for Overseas Shipping to the Booked orders queues with Transformation. Overseas Shipping handles all non-US orders: */ DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('Overseas_Shipping','OS.OS_bookedorders_que',null); dbms_aqadm.add_subscriber( queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''INTERNATIONAL''' transformation => 'OS.OE2XML'); END;
/* first create the transformation without any transformation expression*/ execute dbms_transform.create_transformation( schema => 'OE', name => 'OE2WS', from_schema => 'OE', from_type => 'order_typ', to_schema => 'WS', to_type => 'order_typ_sh'); /* specify each attribute of the target type as a function of the source type*/ execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'source.user_data.orderno'); execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'source.user_data.status'); execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'source.user_data.ordertype'); execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'source.user_data.orderregion'); execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'WS.get_customer_info(source.user_data.custno)'); execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'source.user_data.payment_method'); execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'source.user_data.orderitemlist_vartyp'); execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'source.user_data.ccnumber'); execute dbms_transform.modify_transformation( schema => 'OE', name => 'OE2WS', attribute_number => 1, transformation => 'source.user_data.order_date');
No example is provided with this release.
No example is provided with this release.
With Oracle AQ, you can use object types to structure and manage the payload of messages. The object-relational capabilities of Oracle provide a rich set of data types that range from traditional relational data types to user-defined types (see "Enqueuing and Dequeuing Object Type Messages That Contain LOB Attributes Using PL/SQL" in Appendix A, "Oracle Advanced Queuing by Example").
Using strongly typed content, that is, content whose format is defined by an external type system, makes the following features available:
You can also create payloads that contain Oracle objects with XMLType attributes. These can be used for transmitting and storing messages that contain XML documents. By defining Oracle objects with XMLType attributes, you can do the following:
XMLType.existsNode()
, XMLType.extract()
, and so on.
The BooksOnLine
application uses a rich set of data types to model book orders as message content.
customer_typ
.
CREATE OR REPLACE TYPE customer_typ AS OBJECT ( custno NUMBER, name VARCHAR2(100), street VARCHAR2(100), city VARCHAR2(30), state VARCHAR2(2), zip NUMBER, country VARCHAR2(100));
book_typ
.
CREATE OR REPLACE TYPE book_typ AS OBJECT ( title VARCHAR2(100), authors VARCHAR2(100), ISBN NUMBER, price NUMBER);
orderitem_typ
. An order item is a nested type that includes the book type.
CREATE OR REPLACE TYPE orderitem_typ AS OBJECT ( quantity NUMBER, item BOOK_TYP, subtotal NUMBER);
varray
of order items;
CREATE OR REPLACE TYPE orderitemlist_vartyp AS VARRAY (20) OF orderitem_typ;
order_typ
. The order type is a composite type that includes nested object types defined above. The order type captures details of the order, the customer information, and the item list.
CREATE OR REPLACE TYPE order_typ as object ( orderno NUMBER, status VARCHAR2(30), ordertype VARCHAR2(30), orderregion VARCHAR2(30), customer CUSTOMER_TYP, paymentmethod VARCHAR2(30), items ORDERITEMLIST_VARTYP, total NUMBER);
order_xml_typ
. This type is a composite type that contains an XMLType attribute:
CREATE OR REPLACE TYPE order_xml_typ as OBJECT ( orderno NUMBER, details XMLTYPE);
Use the dbexecutesql
interface from the database for this functionality.
After creating the types, use JPublisher to generate Java classes that map to the SQL types.
TYPE boladm.customer_typ as Customer TYPE boladm.book_typ as Book TYPE boladm.orderitem_typ AS OrderItem TYPE boladm.orderitemlist_vartyp AS OrderItemList TYPE boladm.order_typ AS Order
jpub -input=jaqbol.typ -user=boladm/boladm -case=mixed -methods=false -compatible=CustomDatum
This will create Java classes Customer, Book, OrderItem and OrderItemList that map to the SQL object types created above:
public static Connection loadDriver(String user, String passwd) { Connection db_conn = null; try { Class.forName("oracle.jdbc.driver.OracleDriver"); /* your actual hostname, port number, and SID will vary from what follows. Here we use 'dlsun736,' '5521,' and 'test,' respectively: */ db_conn = DriverManager.getConnection( "jdbc:oracle:thin:@dlsun736:5521:test", user, passwd); System.out.println("JDBC Connection opened "); db_conn.setAutoCommit(false); /* Load the Oracle8i AQ driver: */ Class.forName("oracle.AQ.AQOracleDriver"); System.out.println("Successfully loaded AQ driver "); } catch (Exception ex) { System.out.println("Exception: " + ex); ex.printStackTrace(); } return db_conn;
You can create queues with payloads that contain XMLType attributes. These can be used for transmitting and storing messages that contain XML documents. By defining Oracle objects with XMLType attributes, you can do the following:
XMLType.existsNode()
, XMLType.extract()
, and so on.
For details on XMlType operations refer to Application Developer's guide - XML
XMLType.existsNode()
and XMLType.extract()
.
In the BooksOnline application, assume that the Overseas Shipping site represents the order as ORDER_XML_TYP
, with the order information in an XMLType attribute. The Order Entry site represents the order as an Oracle object, ORDER_TYP
.
ORDER_XML_TYP
is a composite type that contains an XMLType attribute:
CREATE OR REPLACE TYPE order_xml_typ as OBJECT ( orderno NUMBER, details XMLTYPE);
The Overseas queue table and queue are created as follows:
BEGIN dbms_aqadm.create_queue_table( queue_table => 'OS_orders_pr_mqtab', comment => 'Overseas Shipping MultiConsumer Orders queue table', multiple_consumers => TRUE, queue_payload_type => 'OS.order_xml_typ', compatible => '8.1'); END; BEGIN dbms_aqadm.create_queue ( queue_name => 'OS_bookedorders_que', queue_table => 'OS_orders_pr_mqtab'); END;
Since the representation of orders at the Overseas Shipping site is different from the representation of orders at the Order Entry site, a transformation is applied before messages are propagated from the Order Entry site to the Overseas Shipping site.
/* Add a rule-based subscriber (for Overseas Shipping) to the Booked orders queues with Transformation. Overseas Shipping handles all non-US orders: */ DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('Overseas_Shipping','OS.OS_bookedorders_que',null); dbms_aqadm.add_subscriber( queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''INTERNATIONAL''', transformation => 'OS.OE2XML'); END;
For more details on defining transformations that convert the type used by the Order Entry application to the type used by Overseas shipping, see "Creating Transformations".
Assume that an application processes orders for customers in Canada. This application can dequeue messages using the following procedure:
/* Create procedures to enqueue into single-consumer queues: */ create or replace procedure get_canada_orders() as deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data OS.order_xml_typ; no_messages exception; pragma exception_init (no_messages, -25228); new_orders BOOLEAN := TRUE; begin dopt.wait := 1; /* Specify dequeue condition to select Orders for Canada */ dopt.deq_condition := 'tab.user_data.xdata.extract(
''/ORDER_TYP/CUSTOMER/COUNTRY/text()'').getStringVal()=''CANADA'''; dopt.consumer_name : = 'Overseas_Shipping'; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => 'OS.OS_bookedorders_que', dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); commit; dbms_output.put_line(' Order for Canada - Order No: ' || deq_order_data.orderno); EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE ORDERS ---- '); new_orders := FALSE; END; END LOOP; end;
Oracle supports queue-level access control for enqueue and dequeue operations. This feature allows the application designer to protect queues created in one schema from applications running in other schemas. The application designer needs to grant only minimal access privileges to the applications that run outside the queue schema. The supported access privileges on a queue are ENQUEUE
, DEQUEUE
and ALL
. For more information, see "Oracle Enterprise Manager Support".
The BooksOnLine application processes customer billings in its CB
and CBADM
schemas. CB
(Customer Billing) schema hosts the customer billing application, and the CBADM
schema hosts all related billing data stored as queue tables.
To protect the billing data, the billing application and the billing data reside in different schemas. The billing application is allowed only to dequeue messages from CBADM_shippedorders_que
, the shipped order queue. It processes the messages, and then enqueues new messages into CBADM_billedorders_que
, the billed order queue.
To protect the queues from other illegal operations from the application, the following two grant calls are needed:
/* Grant dequeue privilege on the shopped orders queue to the Customer Billing application. The CB application retrieves orders that are shipped but not billed from the shipped orders queue. */ EXECUTE dbms_aqadm.grant_queue_privilege( 'DEQUEUE','CBADM_shippedorders_que', 'CB', FALSE); /* Grant enqueue privilege on the billed orders queue to Customer Billing application. The CB application is allowed to put billed orders into this queue after processing the orders. */ EXECUTE dbms_aqadm.grant_queue_privilege( 'ENQUEUE', 'CBADM_billedorders_que', 'CB', FALSE);
Use the dbexecutesql interface from the database for this functionality.
public static void grantQueuePrivileges(Connection db_conn) { AQSession aq_sess; AQQueue sh_queue; AQQueue bi_queue; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); /* Grant dequeue privilege on the shipped orders queue to the Customer Billing application. The CB application retrieves orders that are shipped but not billed from the shipped orders queue. */ sh_queue = aq_sess.getQueue("CBADM", "CBADM_shippedorders_que"); sh_queue.grantQueuePrivilege("DEQUEUE", "CB", false); /* Grant enqueue privilege on the billed orders queue to Customer Billing application.The CB application is allowed to put billed orders into this queue after processing the orders. */ bi_queue = aq_sess.getQueue("CBADM", "CBADM_billedorders_que"); bi_queue.grantQueuePrivilege("ENQUEUE", "CB", false); } catch (AQException ex) { System.out.println("AQ Exception: " + ex); } }
A message in a nonpersistent queue is not stored in a database table. You create a nonpersistent queue, which can be of either single-consumer or multi-consumer type. These queues are created in a system-created queue table (AQ$_MEM_SC
for single-consumer queues and AQ$_MEM_MC
for multi-consumer queues) in the schema specified by the create_np_queue
command. Subscribers can be added to the multi-consumer queues (see "Creating a Nonpersistent Queue"). Nonpersistent queues can be destinations for propagation.
You use the enqueue interface to enqueue messages into a nonpersistent queue in the normal way. You can enqueue RAW and Object Type (ADT) messages into a nonpersistent queue. You retrieve messages from a nonpersistent queue through the asynchronous notification mechanism, registering for the notification (using OCISubcriptionRegister
or DBMS_AQADM.REGISTER
) for the queues you are interested in (see "Registering for Notification").
When a message is enqueued into a queue, it is delivered to clients with active registrations for the queue. The messages are published to the interested clients without incurring the overhead of storing them in the database.
See also:
Documentation on |
Assume that there are three application processes servicing user requests at the Order Entry system. The connection dispatcher shares out connection requests from the application processes. It attempts to maintain a count of the number of users logged on to the Order Entry system and the number of users per application process. The application processes are named APP_1
, APP_2
, APP_3
. (Application process failures are not considered in this example.)
Using nonpersistent queues meets the requirements in this scenario. When a user logs on to the database, the application process enqueues to the multi-consumer nonpersistent queue, LOGIN_LOGOUT
, with the application name as the consumer name. The same process occurs when a user logs out. To distinguish between the two events, the correlation of the message is LOGIN
for logins and LOGOUT
for logouts.
The callback function counts the login/logout events per application process. Note that the dispatcher process needs to connect to the database for registering the subscriptions only. The notifications themselves can be received while the process is disconnected from the database.
CONNECT oe/oe; /* Create the Object Type/ADT adtmsg */ CREATE OR REPLACE TYPE adtmsg AS OBJECT (id NUMBER, data VARCHAR2(4000)); /* Create the multiconsumer nonpersistent queue in OE schema: */ EXECUTE dbms_aqadm.create_np_queue(queue_name => 'LOGON_LOGOFF', multiple_consumers => TRUE); /* Enable the queue for enqueue and dequeue: */ EXECUTE dbms_aqadm.start_queue(queue_name => 'LOGON_LOGOFF'); /* Nonpersistent Queue Scenario - procedure to be executed upon logon: */ CREATE OR REPLACE PROCEDURE User_Logon(app_process IN VARCHAR2) AS msgprop dbms_aq.message_properties_t; enqopt dbms_aq.enqueue_options_t; enq_msgid RAW(16); payload RAW(1); BEGIN /* visibility must always be immediate for NonPersistent queues */ enqopt.visibility:=dbms_aq.IMMEDIATE; msgprop.correlation:= 'LOGON'; msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); /* payload is NULL */ dbms_aq.enqueue( queue_name => 'LOGON_LOGOFF', enqueue_options => enqopt, message_properties => msgprop, payload => payload, msgid => enq_msgid); END; /* Nonpersistent queue scenario - procedure to be executed upon logoff: */ CREATE OR REPLACE PROCEDURE User_Logoff(app_process IN VARCHAR2) AS msgprop dbms_aq.message_properties_t; enqopt dbms_aq.enqueue_options_t; enq_msgid RAW(16); payload adtmsg; BEGIN /* Visibility must always be immediate for NonPersistent queues: */ enqopt.visibility:=dbms_aq.IMMEDIATE; msgprop.correlation:= 'LOGOFF'; msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); /* Payload is NOT NULL: */ payload := adtmsg(1, 'Logging Off'); dbms_aq.enqueue( queue_name => 'LOGON_LOGOFF', enqueue_options => enqopt, message_properties => msgprop, payload => payload, msgid => enq_msgid); END; / /* If there is a login at APP1, enqueue a message into 'login_logoff' with correlation 'LOGIN': */ EXECUTE User_logon('APP1'); /* If there is a logout at APP3, enqueue a message into 'login_logoff' with correlation 'LOGOFF' and payload adtmsg(1, 'Logging Off'): */ EXECUTE User_logoff('App3'); /* The OCI program which waits for notifications: */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> #ifdef WIN32COMMON #define sleep(x) Sleep(1000*(x)) #endif /* LOGON / password: */ static text *username = (text *) "OE"; static text *password = (text *) "OE"; /* The correlation strings of messages: */ static char *logon = "LOGON"; static char *logoff = "LOGOFF"; /* The possible consumer names of queues: */ static char *applist[] = {"APP1", "APP2","APP3"}; static OCIEnv *envhp; static OCIServer *srvhp; static OCIError *errhp; static OCISvcCtx *svchp; static void checkerr(/*_ OCIError *errhp, sword status _*/); struct process_statistics { ub4 logon; ub4 logoff; }; typedef struct process_statistics process_statistics; int main(/*_ int argc, char *argv[] _*/); /* Notify Callback: */ ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) dvoid *ctx; OCISubscription *subscrhp; dvoid *pay; ub4 payl; dvoid *desc; ub4 mode; { text *subname; /* subscription name */ ub4 lsub; /* length of subscription name */ text *queue; /* queue name */ ub4 *lqueue; /* queue name */ text *consumer; /* consumer name */ ub4 lconsumer; text *correlation; ub4 lcorrelation; ub4 size; ub4 appno; OCIRaw *msgid; OCIAQMsgProperties *msgprop; /* message properties descriptor */ process_statistics *user_count = (process_statistics *)ctx; OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, (dvoid *)&subname, &lsub, OCI_ATTR_SUBSCR_NAME, errhp); /* Extract the attributes from the AQ descriptor: */ /* Queue name: */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, OCI_ATTR_QUEUE_NAME, errhp); /* Consumer name: */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &lconsumer, OCI_ATTR_CONSUMER_NAME, errhp); /* Message properties: */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, OCI_ATTR_MSG_PROP, errhp); /* Get correlation from message properties: */ checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)&correlation, &lcorrelation, OCI_ATTR_CORRELATION, errhp)); if (lconsumer == strlen(applist[0])) { if (!memcmp((dvoid *)consumer, (dvoid *)applist[0], strlen(applist[0]))) appno = 0; else if (!memcmp((dvoid *)consumer, (dvoid *)applist[1], strlen(applist[1]))) appno = 1; else if (!memcmp((dvoid *)consumer, (dvoid *)applist[2], strlen(applist[2]))) appno = 2; else { printf("Wrong consumer in notification"); return; } } else { /* consumer name must be "APP1", "APP2" or "APP3" */ printf("Wrong consumer in notification"); return; } if (lcorrelation == strlen(logon) && /* logon event */ !memcmp((dvoid *)correlation, (dvoid *)logon, strlen(logon))) { user_count[appno].logon++; /* increment logon count for the app process */ printf("Logon by APP%d \n", (appno+1)); printf("Logon Payload length = %d \n", pay1); } else if (lcorrelation == strlen(logoff) && /* logoff event */ !memcmp((dvoid *)correlation,(dvoid *)logoff, strlen(logoff))) { user_count[appno].logoff++; /* increment logoff count for the app process */ printf("Logoff by APP%d \n", (appno+1)); printf("Logoff Payload length = %d \n", pay1); } else /* correlation is "LOGON" or "LOGOFF" */ printf("Wrong correlation in notification"); printf("Total : \n"); printf("App1 : %d \n", user_count[0].logon-user_count[0].logoff); printf("App2 : %d \n", user_count[1].logon-user_count[1].logoff); printf("App3 : %d \n", user_count[2].logon-user_count[2].logoff); } int main(argc, argv) int argc; char *argv[]; { OCISession *authp = (OCISession *) 0; OCISubscription *subscrhp[3]; ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; process_statistics ctx[3] = {{0,0}, {0,0}, {0,0}}; ub4 sleep_time = 0; printf("Initializing OCI Process\n"); /* Initialize OCI environment with OCI_EVENTS flag set: */ (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0 ); printf("Initialization successful\n"); printf("Initializing OCI Env\n"); (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 ); printf("Initialization successful\n"); checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0)); checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0)); checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX, (size_t) 0, (dvoid **) 0)); printf("connecting to server\n"); checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias", strlen("inst1_alias"), (ub4) OCI_DEFAULT)); printf("connect successful\n"); /* Set attribute server context in the service context: */ checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp)); checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0)); /* Set username and password in the session handle: */ checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) username, (ub4) strlen((char *)username), (ub4) OCI_ATTR_USERNAME, errhp)); checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) password, (ub4) strlen((char *)password), (ub4) OCI_ATTR_PASSWORD, errhp)); /* Begin session: */ checkerr(errhp, OCISessionBegin ( svchp, errhp, authp, OCI_CRED_RDBMS, (ub4) OCI_DEFAULT)); (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *) authp, (ub4) 0, (ub4) OCI_ATTR_SESSION, errhp); /* Register for notification: */ printf("allocating subscription handle\n"); subscrhp[0] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); /* For application process APP1: */ printf("setting subscription name\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "OE.LOGON_LOGOFF:APP1", (ub4) strlen("OE.LOGON_LOGOFF:APP1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); printf("setting subscription callback\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx, (ub4)sizeof(ctx), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); printf("setting subscription namespace\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); printf("allocating subscription handle\n"); subscrhp[1] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); /* For application process APP2: */ printf("setting subscription name\n"); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "OE.LOGON_LOGOFF:APP2", (ub4) strlen("OE.LOGON_LOGOFF:APP2"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); printf("setting subscription callback\n"); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx, (ub4)sizeof(ctx), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); printf("setting subscription namespace\n"); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); printf("allocating subscription handle\n"); subscrhp[2] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); /* For application process APP3: */ printf("setting subscription name\n"); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "OE.LOGON_LOGOFF:APP3", (ub4) strlen("OE.LOGON_LOGOFF:APP3"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); printf("setting subscription callback\n"); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx, (ub4)sizeof(ctx), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); printf("setting subscription namespace\n"); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); printf("Registering fornotifications \n"); checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 3, errhp, OCI_DEFAULT)); sleep_time = (ub4)atoi(argv[1]); printf ("waiting for %d s \n", sleep_time); sleep(sleep_time); printf("Exiting"); exit(0); } void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; sb4 errcode = 0; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: (void) printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: (void) printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: (void) printf("Error - OCI_NODATA\n"); break; case OCI_ERROR: (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR); (void) printf("Error - %.*s\n", 512, errbuf); break; case OCI_INVALID_HANDLE: (void) printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: (void) printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: (void) printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* End of file tkaqdocn.c */
This feature is not supported currently.
This feature is not supported through the Java API.
AQ allows users to retain messages in the queue table and therefore use SQL to query messages for analysis. Messages are often related to each other. For example, if a message is produced as a result of the consumption of another message, the two are related. As the application designer, you may want to keep track of such relationships. Taken together, retention, message identifiers, and SQL queries make it possible to build powerful message warehouses.
Assume that you need to determine the average order processing time. This includes the time the order has to wait in the backed_order
queue. You want to know the average wait time in the backed_order
queue. SQL queries can determine the wait time for orders in the shipping application. Specify the retention as TRUE
for the shipping queues and specify the order number in the correlation field of the message.
For simplicity, only orders that have already been processed are analyzed. The processing time for an order in the shipping application is the difference between the enqueue time in the WS_bookedorders_que
and the enqueue time in the WS_shipped_orders_que
(see "tkaqdoca.sql: Script to Create Users, Objects, Queue Tables, Queues & Subscribers" of Appendix C, "Scripts for Implementing 'BooksOnLine'".
SELECT SUM(SO.enq_time - BO.enq_time) / count (*) AVG_PRCS_TIME FROM WS.AQ$WS_orders_pr_mqtab BO , WS.AQ$WS_orders_mqtab SO WHERE SO.msg_state = 'PROCESSED' and BO.msg_state = 'PROCESSED' AND SO.corr_id = BO.corr_id and SO.queue = 'WS_shippedorders_que'; /* Average waiting time in the backed order queue: */ SELECT SUM(BACK.deq_time - BACK.enq_time)/count (*) AVG_BACK_TIME FROM WS.AQ$WS_orders_mqtab BACK WHERE BACK.msg_state = 'PROCESSED' AND BACK.queue = 'WS_backorders_que';
Use the dbexecutesql interface from the database for this functionality.
No example is provided with this release.
Oracle AQ adds various features that allow you to develop an application based on a publish-subscribe model. The aim of this application model is to enable flexible and dynamic communication between applications functioning as publishers and applications playing the role of subscribers. The specific design point is that the applications playing these different roles should be decoupled in their communication. They should interact based on messages and message content.
In distributing messages, publisher applications do not have to explicitly handle or manage message recipients. This allows the dynamic addition of new subscriber applications to receive messages without changing any publisher application logic. Subscriber applications receive messages based on message content without regard to which publisher applications are sending messages. This allows the dynamic addition of subscriber applications without changing any subscriber application logic. Subscriber applications specify interest by defining a rule-based subscription on message content (payload) and message header properties of a queue. The system automatically routes messages by computing recipients for published messages using the rule-based subscriptions.
You can implement a publish-subscribe model of communication using AQ as follows:
The BooksOnLine application illustrates the use of a publish-subscribe model for communicating between applications. The following subsections give some examples.
The Order Entry application defines a queue (OE_booked_orders_que
) to communicate orders that are booked to various applications. The Order Entry application is not aware of the various subscriber applications and thus, a new subscriber application can be added without disrupting any setup or logic in the Order Entry (publisher) application.
The various shipping applications and the customer service application (i.e., Eastern region shipping, Western region shipping, Overseas shipping and Customer Service) are defined as subscribers to the booked_orders
queue of the Order Entry application. Rules are used to route messages of interest to the various subscribers. Thus, Eastern Region shipping, which handles shipment of all orders for the East coast and all rush U.S. orders, expresses the subscription rule as follows:
rule => 'tab.user_data.orderregion = ''EASTERN'' OR (tab.user_data.ordertype = ''RUSH'' AND tab.user_data.customer.country = ''USA'') '
Each subscriber can specify a local queue where messages are to be delivered. The Eastern region shipping application specifies a local queue (ES_booked_orders_que
) for message delivery by specifying the subscriber address as follows:
subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null);
Enable propagation from each publisher application queue. To allow subscribed messages to be delivered to remote queues, the Order Entry application enables propagation by means of the following statement:
execute dbms_aqadm.schedule_propagation(queue_name => 'OE.OE_bookedorders_que');
Booked orders are published by the Order Entry application when it enqueues orders (into the OE_booked_order_que
) that have been validated and are ready for shipping. These messages are then routed to each of the subscribing applications. Messages are delivered to local queues (if specified) at each of the subscriber applications.
Each of the shipping applications and the Customer Service application will then receive these messages in their local queues. For example, Eastern Region Shipping only receives booked orders that are for East Coast addresses or any U.S. order that is marked RUSH
. This application then dequeues messages and processes its orders for shipping.
The Oracle Real Application Clusters facility can be used to improve AQ performance by allowing different queues to be managed by different instances. You do this by specifying different instance affinities (preferences) for the queue tables that store the queues. This allows queue operations (enqueue/dequeue) on different queues to occur in parallel.
The AQ queue monitor process continuously monitors the instance affinities of the queue tables. The queue monitor assigns ownership of a queue table to the specified primary instance if it is available, failing which it assigns it to the specified secondary instance.
If the owner instance of a queue table terminates, the queue monitor changes ownership to a suitable instance such as the secondary instance.
AQ propagation is able to make use of Oracle Real Application Clusters, although it is transparent to the user. The affinities for jobs submitted on behalf of the propagation schedules are set to the same values as that of the affinities of the respective queue tables. Thus a job_queue_process
associated with the owner instance of a queue table will be handling the propagation from queues stored in that queue table, thereby minimizing pinging. Additional discussion on this topic can be found under AQ propagation scheduling (see "Scheduling a Queue Propagation" in Chapter 9, "Administrative Interface").
In the BooksOnLine
example, operations on the new_orders_queue
and booked_order_queue
at the order entry (OE) site can be made faster if the two queues are associated with different instances. This is done by creating the queues in different queue tables and specifying different affinities for the queue tables in the create_queue_table()
command.
In the example, the queue table OE_orders_sqtab
stores queue new_orders_queue
and the primary and secondary are instances 1 and 2 respectively. Queue table OE_orders_mqtab
stores queue booked_order_queue
and the primary and secondary are instances 2 and 1 respectively. The objective is to let instances 1 and 2 manage the two queues in parallel. By default, only one instance is available, in which case the owner instances of both queue tables will be set to instance 1. However, if Oracle Real Application Clusters are set up correctly and both instances 1 and 2 are available, then queue table OE_orders_sqtab
will be owned by instance 1 and the other queue table will be owned by instance 2. The primary and secondary instance specification of a queue table can be changed dynamically using the alter_queue_table
() command as shown in the example below. Information about the primary, secondary and owner instance of a queue table can be obtained by querying the view USER_QUEUE_TABLES
(see "Selecting Queue Tables in User Schema" in "Administrative Interface: Views").
/* Create queue tables, queues for OE */ CONNECT OE/OE; EXECUTE dbms_aqadm.create_queue_table( \ queue_table => 'OE_orders_sqtab',\ comment => 'Order Entry Single-Consumer Orders queue table',\ queue_payload_type => 'BOLADM.order_typ',\ compatible => '8.1',\ primary_instance => 1,\ secondary_instance => 2); EXECUTE dbms_aqadm.create_queue_table(\ queue_table => 'OE_orders_mqtab',\ comment => 'Order Entry Multi Consumer Orders queue table',\ multiple_consumers => TRUE,\ queue_payload_type => 'BOLADM.order_typ',\ compatible => '8.1',\ primary_instance => 2,\ secondary_instance => 1); EXECUTE dbms_aqadm.create_queue ( \ queue_name => 'OE_neworders_que',\ queue_table => 'OE_orders_sqtab'); EXECUTE dbms_aqadm.create_queue ( \ queue_name => 'OE_bookedorders_que',\ queue_table => 'OE_orders_mqtab'); /* Check instance affinity of OE queue tables from AQ administrative view: */ SELECT queue_table, primary_instance, secondary_instance, owner_instance FROM user_queue_tables; /* Alter instance affinity of OE queue tables: */ EXECUTE dbms_aqadm.alter_queue_table( \ queue_table => 'OE.OE_orders_sqtab',\ primary_instance => 2,\ secondary_instance => 1); EXECUTE dbms_aqadm.alter_queue_table( \ queue_table => 'OE.OE_orders_mqtab', \ primary_instance => 1,\ secondary_instance => 2); /* Check instance affinity of OE queue tables from AQ administrative view: */ SELECT queue_table, primary_instance, secondary_instance, owner_instance FROM user_queue_tables;
This feature currently not supported.
public static void createQueueTablesAndQueues(Connection db_conn) { AQSession aq_sess; AQQueueTableProperty sqt_prop; AQQueueTableProperty mqt_prop; AQQueueTable sq_table; AQQueueTable mq_table; AQQueueProperty q_prop; AQQueue neworders_q; AQQueue bookedorders_q; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); /* Create a single-consumer orders queue table */ sqt_prop = new AQQueueTableProperty("BOLADM.order_typ"); sqt_prop.setComment("Order Entry Single-Consumer Orders queue table"); sqt_prop.setCompatible("8.1"); sqt_prop.setPrimaryInstance(1); sqt_prop.setSecondaryInstance(2); sq_table = aq_sess.createQueueTable("OE", "OE_orders_sqtab", sqt_prop); /* Create a multi-consumer orders queue table */ mqt_prop = new AQQueueTableProperty("BOLADM.order_typ"); mqt_prop.setComment("Order Entry Multi Consumer Orders queue table"); mqt_prop.setCompatible("8.1"); mqt_prop.setMultiConsumer(true); mqt_prop.setPrimaryInstance(2); mqt_prop.setSecondaryInstance(1); mq_table = aq_sess.createQueueTable("OE", "OE_orders_mqtab", mqt_prop); /* Create Queues in these queue tables */ q_prop = new AQQueueProperty(); neworders_q = aq_sess.createQueue(sq_table, "OE_neworders_que", q_prop); bookedorders_q = aq_sess.createQueue(mq_table, "OE_bookedorders_que", q_prop); } catch (AQException ex) { System.out.println("AQ Exception: " + ex); } } public static void alterInstanceAffinity(Connection db_conn) { AQSession aq_sess; AQQueueTableProperty sqt_prop; AQQueueTableProperty mqt_prop; AQQueueTable sq_table; AQQueueTable mq_table; AQQueueProperty q_prop; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); /* Check instance affinities */ sq_table = aq_sess.getQueueTable("OE", "OE_orders_sqtab"); sqt_prop = sq_table.getProperty(); System.out.println("Current primary instance for OE_orders_sqtab: " + sqt_prop.getPrimaryInstance()); mq_table = aq_sess.getQueueTable("OE", "OE_orders_mqtab"); mqt_prop = mq_table.getProperty(); System.out.println("Current primary instance for OE_orders_mqtab: " + mqt_prop.getPrimaryInstance()); /* Alter queue table affinities */ sq_table.alter(null, 2, 1); mq_table.alter(null, 1, 2); sqt_prop = sq_table.getProperty(); System.out.println("Current primary instance for OE_orders_sqtab: " + sqt_prop.getPrimaryInstance()); mq_table = aq_sess.getQueueTable("OE", "OE_orders_mqtab"); mqt_prop = mq_table.getProperty(); System.out.println("Current primary instance for OE_orders_mqtab: " + mqt_prop.getPrimaryInstance()); } catch (AQException ex) { System.out.println("AQ Exception: " + ex); } }
Each instance keeps its own AQ statistics information in its own SGA, and does not have knowledge of the statistics gathered by other instances. When a GV$AQ view is queried by an instance, all other instances funnel their AQ statistics information to the instance issuing the query.
The gv$
view can be queried at any time to see the number of messages in waiting, ready or expired state. The view also displays the average number of seconds messages have been waiting to be processed. The order processing application can use this to dynamically tune the number of order processing processes (see "Selecting the Number of Messages in Different States for the Whole Database" in Chapter 10, "Administrative Interface: Views").
CONNECT oe/oe /* Count the number as messages and the average time for which the messages have been waiting: */ SELECT READY, AVERAGE_WAIT FROM gv$aq Stats, user_queues Qs WHERE Stats.qid = Qs.qid and Qs.Name = 'OE_neworders_que';
Use the dbexecutesql interface from the database for this functionality.
No example is provided with this release.
See Chapter 17, "Internet Access to Advanced Queuing" for information on Internet access to Advanced Queuing features.
|
Copyright © 1996-2001, Oracle Corporation. All Rights Reserved. |
|