Oracle9i Application Developer's Guide - Advanced Queuing
Release 1 (9.0.1)

Part Number A88890-02
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback

Go to previous page Go to beginning of chapter Go to next page

A Sample Application Using AQ, 7 of 7


Propagation Features

Propagation

This feature enables applications to communicate with each other without having to be connected to the same database, or to the same queue. Messages can be propagated from one Oracle AQ to another, irrespective of whether these are local or remote. Propagation is performed by snapshot (job_queue_processes) background processes. Propagation to remote queues is done using database links and Net8.

The propagation feature is used as follows. First one or more subscribers are defined for the queue from which messages are to be propagated (see "Subscriptions and Recipient Lists"). Second, a schedule is defined for each destination where messages are to be propagated from the queue. Enqueued messages will be propagated and automatically available for dequeuing at the destination queues.

Note that two or more job_queue background processes must be running to use propagation. This is in addition to the number of job_queue background processes needed for handling non-propagation related jobs. Also, if you wish to deploy remote propagation, you must ensure that the database link specified for the schedule is valid and have proper privileges for enqueuing into the destination queue. For more information about the administrative commands for managing propagation schedules, see "Propagation Scheduling".

Propagation also has mechanisms for handling failure. For example, if the database link specified is invalid, then the appropriate error message is reported.

Finally, propagation provides detailed statistics about the messages propagated and the schedule itself. This information can be used to properly tune the schedules for best performance. See "Enhanced Propagation Scheduling Capabilities" for a discussion of the failure handling and error reporting facilities of propagation and propagation statistics.

Propagation Scheduling

A propagation schedule is defined for a pair of source and destination queues. If a queue has messages to be propagated to several queues, a schedule has to be defined for each of the destination queues. A schedule indicates the time frame during which messages can be propagated from the source queue. This time frame may depend on a number of factors such as network traffic, load at source database, load at destination database, and so on. The schedule therefore has to be tailored for the specific source and destination. When a schedule is created, a job is automatically submitted to the job_queue facility to handle propagation.

The administrative calls for propagation scheduling provide flexibility for managing the schedules (see "Scheduling a Queue Propagation" in Chapter 9, "Administrative Interface"). The duration or propagation window parameter of a schedule specifies the time frame during which propagation has to take place. If the duration is unspecified, the time frame is an infinite single window. If a window has to be repeated periodically, a finite duration is specified along with a next_time function that defines the periodic interval between successive windows.

The latency parameter for a schedule is relevant only when a queue does not have any messages to be propagated. This parameter specifies the time interval within which a queue has to be rechecked for messages. Note that if the latency parameter is to be enforced, then the job_queue_interval parameter for the job_queue_processes should be less than or equal to the latency parameter.

The propagation schedules defined for a queue can be changed or dropped at anytime during the life of the queue. In addition there are calls for temporarily disabling a schedule (instead of dropping the schedule) and enabling a disabled schedule. A schedule is active when messages are being propagated in that schedule. All the administrative calls can be made irrespective of whether the schedule is active or not. If a schedule is active, it will take a few seconds for the calls to be executed.

Scenario

In the BooksOnLine example, messages in the OE_bookedorders_que are propagated to different shipping sites. The following example code illustrates the various administrative calls available for specifying and managing schedules. It also shows the calls for enqueuing messages into the source queue and for dequeuing the messages at the destination site. The catalog view USER_QUEUE_SCHEDULES provides all information relevant to a schedule (see "Selecting Propagation Schedules in User Schema" in Chapter 10, "Administrative Interface: Views").

PL/SQL (DBMS_AQADM Package): Example Code

CONNECT OE/OE; 
 
/* Schedule Propagation from bookedorders_que to shipping: */ 
EXECUTE dbms_aqadm.schedule_propagation( \
   queue_name      => 'OE.OE_bookedorders_que'); 
 
/* Check if a schedule has been created: */ 
SELECT * FROM user_queue_schedules; 
 
/* Enqueue some orders into OE_bookedorders_que: */ 
EXECUTE BOLADM.order_enq('My First   Book', 1, 1001, 'CA', 'USA', \
   'WESTERN', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Second  Book', 2, 1002, 'NY', 'USA', \
   'EASTERN', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Third   Book', 3, 1003, '',   'Canada', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Fourth  Book', 4, 1004, 'NV', 'USA', \
   'WESTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Fifth   Book', 5, 1005, 'MA', 'USA', \
   'EASTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Sixth   Book', 6, 1006, ''  , 'UK', \ 
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Seventh Book', 7, 1007, '',   'Canada', \
   'INTERNATIONAL', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Eighth  Book', 8, 1008, '',   'Mexico', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Ninth   Book', 9, 1009, 'CA', 'USA', \
   'WESTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Tenth   Book', 8, 1010, ''  , 'UK', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Last    Book', 7, 1011, ''  , 'Mexico', \
   'INTERNATIONAL', 'NORMAL'); 
 
/* Wait for propagation to happen: */ 
EXECUTE dbms_lock.sleep(100); 
 
/* Connect to shipping sites and check propagated messages: */
CONNECT WS/WS; 
set serveroutput on; 
 
/*  Dequeue all booked orders for West_Shipping: */
EXECUTE BOLADM.shipping_bookedorder_deq('West_Shipping', DBMS_AQ.REMOVE); 
 
CONNECT ES/ES; 
SET SERVEROUTPUT ON; 
 
/* Dequeue all remaining booked orders (normal order) for East_Shipping: */  
EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.REMOVE); 
 
CONNECT OS/OS; 
SET SERVEROUTPUT ON; 
 
/* Dequeue all international North American orders for Overseas_Shipping: */ 
EXECUTE BOLADM.get_northamerican_orders('Overseas_Shipping'); 
 
/* Dequeue rest of the booked orders for Overseas_Shipping: */ 
EXECUTE BOLADM.shipping_bookedorder_deq('Overseas_Shipping', DBMS_AQ.REMOVE); 
 
/* Disable propagation schedule for booked orders 
EXECUTE dbms_aqadm.disable_propagation_schedule(   \
   queue_name   => 'OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule has been disabled: */ 
SELECT schedule_disabled FROM user_queue_schedules; 
 
/* Alter propagation schedule for booked orders to execute every  
   15 mins (900 seconds) for a window duration of 300 seconds: */ 
EXECUTE dbms_aqadm.alter_propagation_schedule( \
   queue_name     => 'OE_bookedorders_que', \
   duration       => 300, \
   next_time      => 'SYSDATE + 900/86400',\
   latency        => 25); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule parameters have changed: */ 
SELECT next_time, latency, propagation_window FROM user_queue_schedules; 
 
/* Enable propagation schedule for booked orders: 
EXECUTE dbms_aqadm.enable_propagation_schedule( \
   queue_name     => 'OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/* Check if the schedule has been enabled: */ 
SELECT schedule_disabled FROM user_queue_schedules; 
 
/* Unschedule propagation for booked orders: */ 
EXECUTE dbms_aqadm.unschedule_propagation(   \
   queue_name      => 'OE.OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule has been dropped 
SELECT *  FROM user_queue_schedules; 

Visual Basic (OO4O): Example Code

This functionality is currently not available.

Java (JDBC): Example Code

No example is provided with this release.

Propagation of Messages with LOB Attributes

Large Objects can be propagated using AQ using two methods:

Note that AQ does not support propagation from Object queues that have BFILE or REF attributes in the payload.

Scenario

In the BooksOnLine application, the company may wish to send promotional coupons along with the book orders. These coupons are generated depending on the content of the order, and other customer preferences. The coupons are images generated from some multimedia database, and are stored as LOBs.

When the order information is sent to the shipping warehouses, the coupon contents are also sent to the warehouses. In the code shown below the order_typ is enhanced to contain a coupon attribute of LOB type. The code demonstrates how the LOB contents are inserted into the message that is enqueued into OE_bookedorders_que when an order is placed. The message payload is first constructed with an empty LOB. The place holder (LOB locator) information is obtained from the queue table and is then used in conjunction with the LOB manipulation routines, such as DBMS_LOB.WRITE(), to fill the LOB contents. The example has additional examples regarding for enqueue and dequeue of messages with LOBs as part the payload.

A COMMIT is issued only after the LOB contents are filled in with the appropriate image data. Propagation automatically takes care of moving the LOB contents along with the rest of the message contents. The code below also shows a dequeue at the destination queue for reading the LOB contents from the propagated message. The LOB contents are read into a buffer that can be sent to a printer for printing the coupon.

PL/SQL (DBMS_AQADM Package): Example Code

/* Enhance the type order_typ to contain coupon field (lob field): */ 
CREATE OR REPLACE TYPE order_typ AS OBJECT ( 
        orderno         NUMBER, 
        status          VARCHAR2(30), 
        ordertype       VARCHAR2(30), 
        orderregion     VARCHAR2(30), 
        customer        customer_typ, 
        paymentmethod   VARCHAR2(30), 
        items           orderitemlist_vartyp, 
        total           NUMBER, 
        coupon          BLOB);    
/ 
 
/* lob_loc is a variable of type BLOB, 
   buffer is a variable of type RAW, 
   length is a variable of type NUMBER. */ 
 
/* Complete the order data and perform the enqueue using the order_enq() 
   procedure: */
dbms_aq.enqueue('OE.OE_bookedorders_que', enqopt, msgprop,  
                OE_enq_order_data, enq_msgid); 
 
/* Get the lob locator in the queue table after enqueue: */ 
SELECT t.user_data.coupon INTO lob_loc 
FROM   OE.OE_orders_pr_mqtab t 
WHERE  t.msgid = enq_msgid; 
 
/* Generate a sample LOB of 100 bytes: */ 
buffer := hextoraw(rpad('FF',100,'FF')); 
 
/* Fill in the lob using LOB routines in the dbms_lob package: */
dbms_lob.write(lob_loc, 90, 1, buffer); 
 
/* Issue a commit only after filling in lob contents: */ 
COMMIT; 
 
/* Sleep until propagation is complete: */ 
 
/* Perform dequeue at the Western Shipping warehouse: */ 
dbms_aq.dequeue( 
        queue_name         => qname, 
        dequeue_options    => dopt, 
        message_properties => mprop, 
        payload            => deq_order_data, 
        msgid              => deq_msgid); 
 
/* Get the LOB locator after dequeue: */ 
lob_loc := deq_order_data.coupon; 
 
/* Get the length of the LOB: */ 
length := dbms_lob.getlength(lob_loc); 
 
/* Read the LOB contents into the buffer: */ 
dbms_lob.read(lob_loc, length, 1, buffer); 
 

Visual Basic (OO4O): Example Code

This functionality is not available currently.

Java (JDBC): Example Code

No example is provided with this release.

Enhanced Propagation Scheduling Capabilities

Detailed information about the schedules can be obtained from the catalog views defined for propagation. Information about active schedules--such as the name of the background process handling that schedule, the SID (session, serial number) for the session handling the propagation and the Oracle instance handling a schedule (relevant if Oracle Real Application Clusters are being used)--can be obtained from the catalog views. The same catalog views also provide information about the previous successful execution of a schedule (last successful propagation of message) and the next execution of the schedule.

For each schedule, detailed propagation statistics are maintained:

This includes the total number of messages propagated in a schedule, total number of bytes propagated in a schedule, maximum number of messages propagated in a window, maximum number of bytes propagated in a window, average number of messages propagated in a window, average size of propagated messages and the average time to propagated a message. These statistics have been designed to provide useful information to the queue administrators for tuning the schedules such that maximum efficiency can be achieved.

Propagation has built-in support for handling failures and reporting errors. For example, if the specified database link is invalid, the remote database is unavailable, or if the remote queue is not enabled for enqueuing, then the appropriate error message is reported. Propagation uses an exponential backoff scheme for retrying propagation from a schedule that encountered a failure.

If a schedule continuously encounters failures, the first retry happens after 30 seconds, the second after 60 seconds, the third after 120 seconds and so forth. If the retry time is beyond the expiration time of the current window, the next retry is attempted at the start time of the next window. A maximum of 16 retry attempts is made, after which the schedule is automatically disabled. When a schedule is disabled automatically due to failures, the relevant information is written into the alert log.

A check for scheduling failures indicates:

By examining this information, a queue administrator can fix the failure and enable the schedule. During a retry, if propagation is successful, the number of failures is reset to 0.

Propagation has support built-in for Oracle Real Application Clusters and is transparent to the user and the queue administrator. The job that handles propagation is submitted to the same instance as the owner of the queue table where the queue resides.

If there is a failure at an instance and the queue table that stores the queue is migrated to a different instance, the propagation job is also migrated to the new instance. This will minimize pinging between instances and thus offer better performance. Propagation has been designed to handle any number of concurrent schedules. Note that the number of job_queue_processes is limited to a maximum of 36 and some of these may be used to handle non-propagation related jobs. Hence, propagation has built in support for multi-tasking and load balancing.

The propagation algorithms are designed such that multiple schedules can be handled by a single snapshot (job_queue) process. The propagation load on a job_queue processes can be skewed based on the arrival rate of messages in the different source queues.

If one process is overburdened with several active schedules while another is less loaded with many passive schedules, propagation automatically re-distributes the schedules so they are loaded uniformly.

Scenario

In the BooksOnLine example, the OE_bookedorders_que is a busy queue since messages in it are propagated to different shipping sites. The following example code illustrates the calls supported by enhanced propagation scheduling for error checking and schedule monitoring.

PL/SQL (DBMS_AQADM Package): Example Code

CONNECT OE/OE; 
 
/*  get averages 
select avg_time, avg_number, avg_size from user_queue_schedules; 
 
/*  get totals 
select total_time, total_number, total_bytes from user_queue_schedules; 
 
/*  get maximums for a window 
select max_number, max_bytes from user_queue_schedules; 
 
/*  get current status information of schedule 
select process_name, session_id, instance, schedule_disabled  
   from user_queue_schedules; 
 
/*  get information about last and next execution 
select last_run_date, last_run_time, next_run_date, next_run_time 
   from user_queue_schedules; 
 
/*  get last error information if any 
select failures, last_error_msg, last_error_date, last_error_time  
   from user_queue_schedules; 

Visual Basic (OO4O): Example Code

This functionality is currently not available.

Java (JDBC): Example Code

No example is provided with this release.

Exception Handling During Propagation

When system errors such as a network failure occur, Advanced Queuing continues to attempt to propagate messages using an exponential backoff algorithm. In some situations that indicate application errors, AQ will mark messages as UNDELIVERABLE if there is an error in propagating the message.

Examples of such errors are when the remote queue does not exist or when there is a type mismatch between the source queue and the remote queue. In such situations users must query the DBA_SCHEDULES view to determine the last error that occurred during propagation to a particular destination. The trace files in the $ORACLE_HOME/log directory can provide additional information about the error.

Scenario

In the BooksOnLine example, the ES_bookedorders_que in the Eastern Shipping region is stopped intentionally using the stop_queue() call. After a short while the propagation schedule for OE_bookedorders_que will display an error indicating that the remote queue ES_bookedorders_que is disabled for enqueuing. When the ES_bookedorders_que is started using the start_queue() call, propagation to that queue resumes and there is no error message associated with schedule for OE_bookedorders_que.

PL/SQL (DBMS_AQADM Package): Example Code

/*  Intentionally stop the eastern shipping queue : */
connect BOLADM/BOLADM 
EXECUTE dbms_aqadm.stop_queue(queue_name => 'ES.ES_bookedorders_que');   
 
/* Wait for some time before error shows up in dba_queue_schedules: */ 
EXECUTE dbms_lock.sleep(100); 

/* This query will return an ORA-25207 enqueue failed error: */ 
SELECT qname, last_error_msg from dba_queue_schedules; 
 
/* Start the eastern shipping queue: */ 
EXECUTE dbms_aqadm.start_queue(queue_name => 'ES.ES_bookedorders_que');  
 
/* Wait for Propagation to resume for eastern shipping queue: */ 
EXECUTE dbms_lock.sleep(100); 
 
/* This query will indicate that there are no errors with propagation: 
SELECT qname, last_error_msg from dba_queue_schedules; 

Visual Basic (OO4O): Example Code

This functionality is handled by the database.

Java (JDBC): Example Code

No example is provided with this release.

Message Format Transformation During Propagation

At propagation time, a transformation can be specified when adding a rule-based subscriber to OE_bookedorders_topic for Western shipping orders. The transformation is applied to the orders, transforming them to the WS.order_typ_sh type before propagating them to WS_bookedorders_topic.

PL/SQL (DBMS_AQADM Package): Example Code

declare
subscriber     sys.aq$_agent;
begin
  subscriber :=sys.aq$_agent('West_Shipping','WS.WS_bookedorders_topic',null);
  dbms_aqadm.add_subscriber(queue_name => 'OE.OE_bookedorders_topic',
       subscriber     => subscriber,
       rule           => 'tab.user_data.orderregion =''WESTERN''
                          AND tab.user_data.ordertype != ''RUSH''',
       transformation => 'OE.OE2WS');
end;

Visual Basic (OO4O): Example Code

No example is provided with this release.

Java (JDBC): Example Code

No example is provided with this release.

Propagation Using HTTP

In Oracle9i, you can set up Advanced Queuing propagation over HTTP and HTTPS (HTTP over SSL). HTTP propagation uses the Internet access infrastructure and requires that the AQ servlet that connects to the destination database be deployed. The database link must be created with the connect string indicating the Web server address and port and indicating HTTP as the protocol. The source database must be created for running Java and XML. Otherwise, the setup for HTTP propagation is more or less the same as Oracle Net Services (formerly Net8) propagation.

Scenario

In the BooksOnLine example, messages in the OE_bookedorders_que are propagated to different shipping sites. For the purpose of this scenario, the Western Shipping application is running on another database, 'dest-db' and we will propagate to WS_bookedorders_que.

Propagation Setup

  1. Deploy the AQ Servlet.

    HTTP propagation depends on Internet access to the destination database. Create a class AQPropServlet that extends the AQxmlServlet.

    import java.io.*;
    import javax.servlet.*;
    import javax.servlet.http.*;
    import oracle.AQ.*;
    import oracle.AQ.xml.*;
    import java.sql.*;
    import oracle.jms.*;
    import javax.jms.*;
    import java.io.*;
    import oracle.jdbc.pool.*;
    
    /* This is an AQ Propagation Servlet. */
    public class AQPropServlet extends oracle.AQ.xml.AQxmlServlet
    
    /* getDBDrv - specify the database to which the servlet will connect */
    public AQxmlDataSource createAQDataSource() throws AQxmlException
    {
      AQxmlDataSource  db_drv = null;
      db_drv = new AQxmlDataSource("aqadm", "aqadm", "dest-db", "dest-host",
          5521);
        return db_drv;
      }
    
      public void init()
      {
          try {
            AQxmlDataSource axds = this.createAQDataSource();
            setAQDataSource(axds) ;
            setSessionMaxInactiveTime(180) ;
    
          } catch (Exception e) {
             System.err.println("Error in init : " +e) ;
          }
      }
    }
    
    

    This servlet must connect to the destination database. The servlet must be deployed on the Web server in the path aqserv/servlet. In Oracle9i, the propagation servlet name and deployment path are fixed; that is, they must be AQPropServlet and aqserv/servlet, respectively.

    Assume that the Web server host and port are webdest.oracle.com and 8081, respectively.

  2. Create the database link dba.

    • Specify HTTP as the protocol.

    • Specify the username and password that will be used for authentication with the Web server/servlet runner as the host and port of the Web server running the AQ servlet.

    For this example, the connect string of the database link should be as follows:

    (DESCRIPTION=(ADDRESS=(PROTOCOL=http)(HOST=webdest.oracle.com)(PORT=8081))
    
    

    If SSL is used, then specify HTTPS as the protocol in the connect string.

    Create the database link as follows:

    create public database link dba connect to john identified by welcome
    using
    '(DESCRIPTION=(ADDRESS=(PROTOCOL=http)(HOST=webdest.oracle.com)(PORT=8081))'
    ;
    
    

    If SSL is used, then specify HTTPS as the protocol in the connect string.

    Create the database link as follows:

    create public database link dba connect to john identified by welcome
    using
    '(DESCRIPTION=(ADDRESS=(PROTOCOL=http)(HOST=webdest.oracle.com)(PORT=8081))'
    ;
    
    

    Here john is the AQ HTTP agent used to access the AQ (propagation) servlet. Welcome is the password used to authenticate with the Web server.

  3. Make sure that the AQ HTTP agent, John, is authorized to perform AQ operations. Do the following at the destination database.

    1. Register the AQ agent.

      dbms_aqadm.create_aq_agent(agent_name => 'John', enable_http => true);
      
      
    2. Map the AQ agent to a database user.

      dbms_aqadm.enable_db_access(agent_name =>'John', db_username =>'CBADM')'
      
      
  4. Set up the remote subscription to OE.OE_bookedorders_que.

    execute dbms_aqadm.add_subscriber('OE.OE_bookedorders_que',
    aq$_agent(null, 'WS.WS_bookedorders_que', null));
    
    
  5. Start propagation by calling dbms_aqdm.schedule_propagation at the source database.

    dbms_aqadm.schedule_propagation('OE.OE_bookedorders_que', 'dba');
    
    

All other propagation administration APIs work the same for HTTP propagation. Use the propagation views, DBA_QUEUE_SCHEDULES, to check the propagation statistics for propagation schedules using HTTP.


Go to previous page Go to beginning of chapter Go to next page
Oracle
Copyright © 1996-2001, Oracle Corporation.

All Rights Reserved.
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback