Oracle9i Application Developer's Guide - Advanced Queuing
Release 1 (9.0.1)

Part Number A88890-02
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback

Go to previous page Go to beginning of chapter Go to next page

Creating Applications Using JMS , 8 of 9


JMS Propagation

Remote Subscribers

This feature enables applications to communicate with each other without having to be connected to the same database.

AQ allows a remote subscriber, that is a subscriber at another database, to subscribe to a topic. When a message published to the topic meets the criterion of the remote subscriber, AQ will automatically propagate the message to the queue/topic at the remote database specified for the remote subscriber.

The snapshot (job_queue) background process performs propagation. Propagation is performed using database links and Net8

There are two ways to implement remote subscribers:

There are two kinds of remote subscribers:

Case 1

The remote subscriber is a topic. This occurs when no name is specified for the remote subscriber in the AQjmsAgent object and the address is a topic. The message satisfying the subscriber's subscription is propagated to the remote topic. The propagated message is now available to all the subscriptions of the remote topic that it satisfies.

Case 2

Specify a specific remote recipient for the message. The remote subscription can be for a particular consumer at the remote database. If the name of the remote recipient is specified (in the AQjmsAgent object), then the message satisfying the subscription is propagated to the remote database for that recipient only. The recipient at the remote database uses the TopicReceiver interface to retrieve its messages. The remote subscription can also be for a point-to-point queue

Example Scenario for Case 1

Assume the order entry application and Western region shipping application are on different databases, db1 and db2. Further assume that there is a dblink dblink_oe_ws from database db1, the order entry database, to the western shipping database db2. The WS_bookedorders_topic at db2 is a remote subscriber to the OE_bookedorders_topic in db1.

Example Scenario for Case 2

Assume the order entry application and Western region shipping application are on different databases, db1 and db2. Further assume that there is a dblink dblink_oe_ws from the local order entry database db1 to the western shipping database db2. The agent "Priority" at WS_bookedorders_topic in db2 is a remote subscriber to the OE_bookedorders_topic in db1. Messages propagated to the WS_bookedorders_topic are for "Priority" only.

public void remote_subscriber(TopicSession jms_session)
   {
     Topic            topic;
     ObjectMessage    obj_message;
     AQjmsAgent       remote_sub;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("OE",
                                                   "OE_bookedorders_topic");
      /* create the remote subscriber, name unspecified  and address
       * the topic WS_booked_orders_topic at db2 
       */
      remote_sub = new AQjmsAgent(null, "WS.WS_bookedorders_topic@dblink_oe_
ws");

      /* subscribe for western region orders */
      ((AQjmsSession)jms_session).createRemoteSubscriber(topic, remote_sub, 
"Region = 'Western' ");
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
    catch (java.sql.SQLException  ex1)
    {System.out.println("SQL Exception :" + ex1); }
  }
   

Database db2 - shipping database: The WS_booked_orders_topic has two subscribers, one for priority shipping and the other normal. The messages from the Order Entry database are propagated to the Shipping database and delivered to the correct subscriber. Priority orders have a message priority of 1.

public void  get_priority_messages(TopicSession jms_session)
   {
     Topic            topic;
     TopicSubscriber  tsubs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

       /* Create local subscriber - for priority messages */
      tsubs = jms_session.createDurableSubscriber(topic, "PRIORITY",
                                       " JMSPriority = 1 ", false);

      obj_message = (ObjectMessage) tsubs.receive();
      
      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Priority Order:  for customer " +  
customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }

  public void  get_normal_messages(TopicSession jms_session)
   {
     Topic            topic;
     TopicSubscriber  tsubs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

       /* Create local subscriber - for priority messages */
      tsubs = jms_session.createDurableSubscriber(topic, "PRIORITY",
                                       " JMSPriority > 1 ", false);

      obj_message = (ObjectMessage) tsubs.receive();
      
      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Normal Order:  for customer " +  customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }
 

public void remote_subscriber1(TopicSession jms_session)
   {
     Topic            topic;
     ObjectMessage    obj_message;
     AQjmsAgent       remote_sub;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("OE",
                                                   "OE_bookedorders_topic");
      /* create the remote subscriber, name "Priority"  and address
       * the topic WS_booked_orders_topic at db2 
       */
      remote_sub = new AQjmsAgent("Priority", "WS.WS_bookedorders_topic@dblink_
oe_ws");

      /* subscribe for western region orders */
      ((AQjmsSession)jms_session).createRemoteSubscriber(topic, remote_sub, 
"Region = 'Western' ");
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
    catch (java.sql.SQLException  ex1)
    {System.out.println("SQL Exception :" + ex1); }
  }


   Remote database:
   database db2 - Western Shipping database.
/* get messages for subscriber priority */
   public void  get_priority_messages1(TopicSession jms_session)
   {
     Topic            topic;
     TopicReceiver    trecs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* create a local receiver "Priority" for the remote subscription
       * to WS_bookedorders_topic 
       */
      trecs = ((AQjmsSession)jms_session).createTopicReceiver(topic, "Priority", 
null);

      obj_message = (ObjectMessage) trecs.receive();
      
      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Priority Order:  for customer " +  
customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }

Scheduling Propagation

Propagation must be scheduled via the schedule_propagation method for every topic from which messages are propagated to target destination databases.

A schedule indicates the time frame during which messages can be propagated from the source topic. This time frame may depend on a number of factors such as network traffic, load at source database, load at destination database, and so on. The schedule therefore has to be tailored for the specific source and destination. When a schedule is created, a job is automatically submitted to the job_queue facility to handle propagation.

The administrative calls for propagation scheduling provide great flexibility for managing the schedules (see "Scheduling a Queue Propagation", Chapter 9, "Administrative Interface"). The duration or propagation window parameter of a schedule specifies the time frame during which propagation has to take place. If the duration is unspecified then the time frame is an infinite single window. If a window has to be repeated periodically then a finite duration is specified along with a next_time function that defines the periodic interval between successive windows.

The latency parameter for a schedule is relevant only when a queue does not have any messages to be propagated. This parameter specifies the time interval within which a queue has to be rechecked for messages. Note that if the latency parameter is to be enforced, then the job_queue_interval parameter for the job_queue_processes should be less than or equal to the latency parameter. The propagation schedules defined for a queue can be changed or dropped at anytime during the life of the queue. In addition there are calls for temporarily disabling a schedule (instead of dropping the schedule) and enabling a disabled schedule. A schedule is active when messages are being propagated in that schedule. All the administrative calls can be made irrespective of whether the schedule is active or not. If a schedule is active then it will take a few seconds for the calls to be executed.

Job queue processes must be started for propagation to take place. At least 2 job queue processes must be started. The dblinks to the destination database must also be valid. The source and destination topics of the propagation must be of the same message type. The remote topic must be enabled for enqueue. The user of the dblink must also have enqueue privileges to the remote topic.

Example Code

 public void  schedule_propagation(TopicSession jms_session)
  {
     Topic            topic;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Schedule propagation immediately with duration of 5 minutes and latency 
20 sec */
      ((AQjmsDestination)topic).schedulePropagation(jms_session, "dba", null,
                                       new Double(5*60), null, new Double(20));
    }catch (JMSException ex)
    {System.out.println("Exception: " + ex);}
  }

  Propagation schedule parameters can also be altered.


  /* alter duration to 10 minutes and latency to zero */
  public void  alter_propagation(TopicSession jms_session)
  {
     Topic            topic;
    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Schedule propagation immediately with duration of 5 minutes and latency 
20 sec */
    ((AQjmsDestination)topic).alterPropagationSchedule(jms_session, "dba",
                        new Double(10*60), null, new Double(0));
    }catch (JMSException ex)
    {System.out.println("Exception: " + ex);}
  }

Enhanced Propagation Scheduling Capabilities

Detailed information about the schedules can be obtained from the catalog views defined for propagation. Information about active schedules -- such as the name of the background process handling that schedule, the SID (session, serial number) for the session handling the propagation and the Oracle instance handling a schedule (relevant if Real Application Clusters are being used) -- can be obtained from the catalog views. The same catalog views also provide information about the previous successful execution of a schedule (last successful propagation of message) and the next execution of the schedule.

For each schedule, detailed propagation statistics are maintained:

These statistics have been designed to provide useful information to the queue administrators for tuning the schedules such that maximum efficiency can be achieved.

Propagation has built-in support for handling failures and reporting errors. For example, if the database link specified is invalid, or the remote database is unavailable, or the remote topic/queue is not enabled for enqueuing, then the appropriate error message is reported. Propagation uses an exponential backoff scheme for retrying propagation from a schedule that encountered a failure. If a schedule continuously encounters failures, the first retry happens after 30 seconds, the second after 60 seconds, the third after 120 seconds and so forth. If the retry time is beyond the expiration time of the current window then the next retry is attempted at the start time of the next window.

A maximum of 16 retry attempts are made after which the schedule is automatically disabled. When a schedule is disabled automatically due to failures, the relevant information is written into the alert log. At anytime it is possible to check if there were failures encountered by a schedule and if so how many successive failure were encountered, the error message indicating the cause for the failure and the time at which the last failure was encountered. By examining this information, an administrator can fix the failure and enable the schedule.

During a retry if propagation is successful then the number of failures is reset to 0. Propagation has support built in for Real Application Clusters and is completely transparent to the user and the administrator. The job that handles propagation is submitted to the same instance as the owner of the queue table where the source topic resides. If at anytime there is a failure at an instance and the queue table that stores the topic is migrated to a different instance, the propagation job is also automatically migrated to the new instance. This will minimize the pinging between instances and thus offer better performance. Propagation has been designed to handle any number of concurrent schedules.

Note that the number of job_queue_processes is limited to a maximum of 36 and some of these may be used to handle non-propagation related jobs.Hence, propagation has built in support for multi-tasking and load balancing. The propagation algorithms are designed such that multiple schedules can be handled by a single snapshot (job_queue) process. The propagation load on a job_queue processes can be skewed based on the arrival rate of messages in the different source topics. If one process is overburdened with several active schedules while another is less loaded with many passive schedules, propagation automatically re-distributes the schedules among the processes such that they are loaded uniformly.

Example Scenario

In the BooksOnLine example, the OE_bookedorders_topic is busy since messages in it are propagated to different shipping sites. The following example code illustrates the calls supported by enhanced propagation scheduling for error checking and schedule monitoring.

Example Code

CONNECT OE/OE; 
/*  get averages 
select avg_time, avg_number, avg_size from user_queue_schedules; 
 
/*  get totals 
select total_time, total_number, total_bytes from user_queue_schedules; 
 
/*  get maximums for a window 
select max_number, max_bytes from user_queue_schedules; 
 
/*  get current status information of schedule 
select process_name, session_id, instance, schedule_disabled  
   from user_queue_schedules; 
 
/*  get information about last and next execution 
select last_run_date, last_run_time, next_run_date, next_run_time 
   from user_queue_schedules; 
 
/*  get last error information if any 
select failures, last_error_msg, last_error_date, last_error_time  
   from user_queue_schedules; 

Exception Handling During Propagation

When a system errors such as a network failure occurs, AQ will continue to attempt to propagate messages using an exponential back-off algorithm. In some situations that indicate application errors AQ will mark messages as UNDELIVERABLE if there is an error in propagating the message.

Examples of such errors are when the remote queue/topic does not exist or when there is a type mismatch between the source queue/topic and the remote queue/topic.In such situations users must query the DBA_SCHEDULES view to determine the last error that occurred during propagation to a particular destination.The trace files in the $ORACLE_HOME/log directory can provide additional information about the error.


Go to previous page Go to beginning of chapter Go to next page
Oracle
Copyright © 1996-2001, Oracle Corporation.

All Rights Reserved.
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback