|Oracle8i Application Developer's Guide - Advanced Queuing
Release 2 (8.1.6)
Part Number A76938-01
A Sample Application Using AQ, 6 of 6
This feature enables applications to communicate with each other without having to be connected to the same database, or to the same queue. Messages can be propagated from one Oracle AQ to another, irrespective of whether these are local or remote. Propagation is performed by snapshot (job_queue_processes) background processes. Propagation to remote queues is done using database links, and Net 8.
The propagation feature is used as follows. First one or more subscribers are defined for the queue from which messages are to be propagated (see "Subscriptions and Recipient Lists"). Second, a schedule is defined for each destination to which messages are to be propagated from the queue. Enqueued messages will now be propagated and automatically be available for dequeuing at the destination queues.
Note that two or more number of
job_queue background processes must be running to use propagation. This is in addition to the number of
job_queue background processes needed for handling non-propagation related jobs. Also, if you wish to deploy remote propagation, you must ensure that the database link specified for the schedule is valid and have proper privileges for enqueuing into the destination queue. For more information about the administrative commands for managing propagation schedules, see "Asynchronous Notifications" below.
Propagation also has mechanisms for handling failure. For example, if the database link specified is invalid, or if the remote database is unavailable, or if the remote queue is not enabled for enqueuing, then the appropriate error message is reported.
Finally, propagation provides detailed statistics about the messages propagated and the schedule itself. This information can be used to properly tune the schedules for best performance. Failure handling/error reporting facilities of propagation and propagation statistics are discussed under "Enhanced Propagation Scheduling Capabilities".
A propagation schedule is defined for a pair of source and destination queues. If a queue has messages to be propagated to several queues then a schedule has to be defined for each of the destination queues. A schedule indicates the time frame during which messages can be propagated from the source queue. This time frame may depend on a number of factors such as network traffic, load at source database, load at destination database, and so on. The schedule therefore has to be tailored for the specific source and destination. When a schedule is created, a job is automatically submitted to the
job_queue facility to handle propagation.
The administrative calls for propagation scheduling provide great flexibility for managing the schedules (see "Schedule a Queue Propagation" in Chapter 9, "Administrative Interface"). The duration or propagation window parameter of a schedule specifies the time frame during which propagation has to take place. If the duration is unspecified then the time frame is an infinite single window. If a window has to be repeated periodically then a finite duration is specified along with a
next_time function that defines the periodic interval between successive windows.
The latency parameter for a schedule is relevant only when a queue does not have any messages to be propagated. This parameter specifies the time interval within which a queue has to be rechecked for messages. Note that if the latency parameter is to be enforced, then the
job_queue_interval parameter for the
job_queue_processes should be less than or equal to the
The propagation schedules defined for a queue can be changed or dropped at anytime during the life of the queue. In addition there are calls for temporarily disabling a schedule (instead of dropping the schedule) and enabling a disabled schedule. A schedule is active when messages are being propagated in that schedule. All the administrative calls can be made irrespective of whether the schedule is active or not. If a schedule is active then it will take a few seconds for the calls to be executed.
BooksOnLine example, messages in the
OE_bookedorders_que are propagated to different shipping sites. The following example code illustrates the various administrative calls available for specifying and managing schedules. It also shows the calls for enqueuing messages into the source queue and for dequeuing the messages at the destination site). The catalog view
USER_QUEUE_SCHEDULES provides all information relevant to a schedule (see "Select Propagation Schedules in User Schema" in Chapter 10, "Administrative Interface: Views").
CONNECT OE/OE; /* Schedule Propagation from bookedorders_que to shipping: */ EXECUTE dbms_aqadm.schedule_propagation( \ queue_name => 'OE.OE_bookedorders_que'); /* Check if a schedule has been created: */ SELECT * FROM user_queue_schedules; /* Enqueue some orders into OE_bookedorders_que: */ EXECUTE BOLADM.order_enq('My First Book', 1, 1001, 'CA', 'USA', \ 'WESTERN', 'NORMAL'); EXECUTE BOLADM.order_enq('My Second Book', 2, 1002, 'NY', 'USA', \ 'EASTERN', 'NORMAL'); EXECUTE BOLADM.order_enq('My Third Book', 3, 1003, '', 'Canada', \ 'INTERNATIONAL', 'NORMAL'); EXECUTE BOLADM.order_enq('My Fourth Book', 4, 1004, 'NV', 'USA', \ 'WESTERN', 'RUSH'); EXECUTE BOLADM.order_enq('My Fifth Book', 5, 1005, 'MA', 'USA', \ 'EASTERN', 'RUSH'); EXECUTE BOLADM.order_enq('My Sixth Book', 6, 1006, '' , 'UK', \ 'INTERNATIONAL', 'NORMAL'); EXECUTE BOLADM.order_enq('My Seventh Book', 7, 1007, '', 'Canada', \ 'INTERNATIONAL', 'RUSH'); EXECUTE BOLADM.order_enq('My Eighth Book', 8, 1008, '', 'Mexico', \ 'INTERNATIONAL', 'NORMAL'); EXECUTE BOLADM.order_enq('My Ninth Book', 9, 1009, 'CA', 'USA', \ 'WESTERN', 'RUSH'); EXECUTE BOLADM.order_enq('My Tenth Book', 8, 1010, '' , 'UK', \ 'INTERNATIONAL', 'NORMAL'); EXECUTE BOLADM.order_enq('My Last Book', 7, 1011, '' , 'Mexico', \ 'INTERNATIONAL', 'NORMAL'); /* Wait for propagation to happen: */ EXECUTE dbms_lock.sleep(100); /* Connect to shipping sites and check propagated messages: */ CONNECT WS/WS; set serveroutput on; /* Dequeue all booked orders for West_Shipping: */ EXECUTE BOLADM.shipping_bookedorder_deq('West_Shipping', DBMS_AQ.REMOVE); CONNECT ES/ES; SET SERVEROUTPUT ON; /* Dequeue all remaining booked orders (normal order) for East_Shipping: */ EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.REMOVE); CONNECT OS/OS; SET SERVEROUTPUT ON; /* Dequeue all international North American orders for Overseas_Shipping: */ EXECUTE BOLADM.get_northamerican_orders('Overseas_Shipping'); /* Dequeue rest of the booked orders for Overseas_Shipping: */ EXECUTE BOLADM.shipping_bookedorder_deq('Overseas_Shipping', DBMS_AQ.REMOVE); /* Disable propagation schedule for booked orders EXECUTE dbms_aqadm.disable_propagation_schedule( \ queue_name => 'OE_bookedorders_que'); /* Wait for some time for call to be effected: */ EXECUTE dbms_lock.sleep(30); /* Check if the schedule has been disabled: */ SELECT schedule_disabled FROM user_queue_schedules; /* Alter propagation schedule for booked orders to execute every 15 mins (900 seconds) for a window duration of 300 seconds: */ EXECUTE dbms_aqadm.alter_propagation_schedule( \ queue_name => 'OE_bookedorders_que', \ duration => 300, \ next_time => 'SYSDATE + 900/86400',\ latency => 25); /* Wait for some time for call to be effected: */ EXECUTE dbms_lock.sleep(30); /* Check if the schedule parameters have changed: */ SELECT next_time, latency, propagation_window FROM user_queue_schedules; /* Enable propagation schedule for booked orders: EXECUTE dbms_aqadm.enable_propagation_schedule( \ queue_name => 'OE_bookedorders_que'); /* Wait for some time for call to be effected: */ EXECUTE dbms_lock.sleep(30); /* Check if the schedule has been enabled: */ SELECT schedule_disabled FROM user_queue_schedules; /* Unschedule propagation for booked orders: */ EXECUTE dbms_aqadm.unschedule_propagation( \ queue_name => 'OE.OE_bookedorders_que'); /* Wait for some time for call to be effected: */ EXECUTE dbms_lock.sleep(30); /* Check if the schedule has been dropped SELECT * FROM user_queue_schedules;
This functionality is currently not available.
No example is provided with this release.
Large Objects can be propagated using AQ using two methods:
RAWqueues the message payload is stored as a Binary Large Object (
BLOB). This allows users to store up to 32KB of data when using the PL/SQL interface and as much data as can be contiguously allocated by the client when using OCI. This method is supported by all releases from 8.0.4 inclusive.
LOBattributes. The user can populate the
LOBand read from the
LOBhandling routines. The
LOBattributes can be
CLOBs (not NCLOBs). If the attribute is a
CLOBAQ will automatically perform any necessary characterset conversion between the source queue and the destination queue. This method is supported by all releases from 8.1.3 inclusive.
Note that AQ does not support propagation from Object queues that have BFILE or REF attributes in the payload.
In the BooksOnLine application, the company may wish to send promotional coupons along with the book orders. These coupons are generated depending on the content of the order, and other customer preferences. The coupons are images generated from some multimedia database, and are stored as
When the order information is sent to the shipping warehouses, the coupon contents are also sent to the warehouses. In the code shown below the
order_typ is enhanced to contain a coupon attribute of LOB type. The code demonstrates how the
LOB contents are inserted into the message that is enqueued into
OE_bookedorders_que when an order is placed. The message payload is first constructed with an empty
LOB. The place holder (
LOB locator) information is obtained from the queue table and is then used in conjunction with the
LOB manipulation routines, such as DBMS_LOB.WRITE(), to fill the LOB contents. The example has additional examples regarding for enqueue and dequeue of messages with LOBs as part the payload.
COMMIT is issued only after the LOB contents are filled in with the appropriate image data. Propagation automatically takes care of moving the LOB contents along with the rest of the message contents. The code below also shows a dequeue at the destination queue for reading the LOB contents from the propagated message. The LOB contents are read into a buffer that can be sent to a printer for printing the coupon.
/* Enhance the type order_typ to contain coupon field (lob field): */ CREATE OR REPLACE TYPE order_typ AS OBJECT ( orderno NUMBER, status VARCHAR2(30), ordertype VARCHAR2(30), orderregion VARCHAR2(30), customer customer_typ, paymentmethod VARCHAR2(30), items orderitemlist_vartyp, total NUMBER, coupon BLOB); / /* lob_loc is a variable of type BLOB, buffer is a variable of type RAW, length is a variable of type NUMBER. */ /* Complete the order data and perform the enqueue using the order_enq() procedure: */ dbms_aq.enqueue('OE.OE_bookedorders_que', enqopt, msgprop, OE_enq_order_data, enq_msgid); /* Get the lob locator in the queue table after enqueue: */ SELECT t.user_data.coupon INTO lob_loc FROM OE.OE_orders_pr_mqtab t WHERE t.msgid = enq_msgid; /* Generate a sample LOB of 100 bytes: */ buffer := hextoraw(rpad('FF',100,'FF')); /* Fill in the lob using LOB routines in the dbms_lob package: */ dbms_lob.write(lob_loc, 90, 1, buffer); /* Issue a commit only after filling in lob contents: */ COMMIT; /* Sleep until propagation is complete: */ /* Perform dequeue at the Western Shipping warehouse: */ dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); /* Get the LOB locator after dequeue: */ lob_loc := deq_order_data.coupon; /* Get the length of the LOB: */ length := dbms_lob.getlength(lob_loc); /* Read the LOB contents into the buffer: */ dbms_lob.read(lob_loc, length, 1, buffer);
This functionality is currently not avis currently not available.
No example is provided with this release.
Detailed information about the schedules can be obtained from the catalog views defined for propagation. Information about active schedules --such as the name of the background process handling that schedule, the SID (session, serial number) for the session handling the propagation and the Oracle instance handling a schedule (relevant if OPS is being used) -- can be obtained from the catalog views. The same catalog views also provide information about the previous successful execution of a schedule (last successful propagation of message) and the next execution of the schedule.
For each schedule detailed propagation statistics are maintained. This includes the total number of messages propagated in a schedule, total number of bytes propagated in a schedule, maximum number of messages propagated in a window, maximum number of bytes propagated in a window, average number of messages propagated in a window, average size of propagated messages and the average time to propagated a message. These statistics have been designed to provide useful information to the queue administrators for tuning the schedules such that maximum efficiency can be achieved.
Propagation has built in support for handling failures and reporting errors. For example, if the database link specified is invalid, the remote database is unavailable or if the remote queue is not enabled for enqueuing then the appropriate error message is reported. Propagation uses an exponential backoff scheme for retrying propagation from a schedule that encountered a failure. If a schedule continuously encounters failures, the first retry happens after 30 seconds, the second after 60 seconds, the third after 120 seconds and so forth. If the retry time is beyond the expiration time of the current window then the next retry is attempted at the start time of the next window. A maximum of 16 retry attempts are made after which the schedule is automatically disabled. When a schedule is disabled automatically due to failures, the relevant information is written into the alert log. At anytime it is possible to check if there were failures encountered by a schedule and if so how many successive failure were encountered, the error message indicating the cause for the failure and the time at which the last failure was encountered. By examining this information, a queue administrator can fix the failure and enable the schedule. During a retry if propagation is successful then the number of failures is reset to 0.
Propagation has support built in for OPS and is completely transparent to the user and the queue administrator. The job that handles propagation is submitted to the same instance as the owner of the queue table in which the queue resides. If at anytime there is a failure at an instance and the queue table that stores the queue is migrated to a different instance, the propagation job is also automatically migrated to the new instance. This will minimize the "pinging" between instances and thus offer better performance. Propagation has been designed to handle any number of concurrent schedules. Note that the number of job_queue_processes is limited to a maximum of 36 and some of these may be used to handle non-propagation related jobs. Hence, propagation has built is support for multi-tasking and load balancing. The propagation algorithms are designed such that multiple schedules can be handled by a single snapshot (job_queue) process. The propagation load on a job_queue processes can be skewed based on the arrival rate of messages in the different source queues. If one process is overburdened with several active schedules while another is less loaded with many passive schedules, propagation automatically re-distributes the schedules among the processes such that they are loaded uniformly.
BooksOnLine example, the
OE_bookedorders_que is a busy queue since messages in it are propagated to different shipping sites. The following example code illustrates the calls supported by enhanced propagation scheduling for error checking and schedule monitoring.
CONNECT OE/OE; /* get averages select avg_time, avg_number, avg_size from user_queue_schedules; /* get totals select total_time, total_number, total_bytes from user_queue_schedules; /* get maximums for a window select max_number, max_bytes from user_queue_schedules; /* get current status information of schedule select process_name, session_id, instance, schedule_disabled from user_queue_schedules; /* get information about last and next execution select last_run_date, last_run_time, next_run_date, next_run_time from user_queue_schedules; /* get last error information if any select failures, last_error_msg, last_error_date, last_error_time from user_queue_schedules;
This functionality is currently not available.
No example is provided with this release.
When a system errors such as a network failure occurs, AQ will continue to attempt to propagate messages using an exponential backoff algorithm. In some situations that indicate application errors AQ will mark messages as
UNDELIVERABLE if there is an error in propagating the message.
Examples of such errors are when the remote queue does not exist or when there is a type mismatch between the source queue and the remote queue. In such situations users must query the
DBA_SCHEDULES view to determine the last error that occurred during propagation to a particular destination.The trace files in the
$ORACLE_HOME/log directory can provide additional information about the error.
In the BooksOnLine example, the
ES_bookedorders_que in the Eastern Shipping region is stopped intentionally using the stop_queue() call. After a short while the propagation schedule for
OE_bookedorders_que will display an error indicating that the remote queue ES_bookedorders_que is disabled for enqueuing. When the
ES_bookedorders_que is started using the
start_queue() call, propagation to that queue resumes and there is no error message associated with schedule for
/* Intentionally stop the eastern shipping queue : */connect BOLADM/BOLADM EXECUTE dbms_aqadm.stop_queue(queue_name => 'ES.ES_bookedorders_que');
/* Wait for some time before error shows up in dba_queue_schedules: */EXECUTE dbms_lock.sleep(100);
/* This query will return an ORA-25207 enqueue failed error: */SELECT qname, last_error_msg from dba_queue_schedules; /* Start the eastern shipping queue: */ EXECUTE dbms_aqadm.start_queue(queue_name => 'ES.ES_bookedorders_que');
/* Wait for Propagation to resume for eastern shipping queue: */EXECUTE dbms_lock.sleep(100); /* This query will indicate that there are no errors with propagation: SELECT qname, last_error_msg from dba_queue_schedules;
This functionality is handled by the database.
No example is provided with this release.