Oracle8i Application Developer's Guide - Advanced Queuing
Release 2 (8.1.6)

Part Number A76938-01

Library

Product

Contents

Index

Go to previous page Go to beginning of chapter Go to next page

Creating Applications Using JMS , 7 of 8


Message Consumer Features

Receiving Messages

A JMS application can receive messages by creating a message consumer. Messages can be received synchronously using the receive call or an synchronously via a Message Listener.

There are three modes of receive,

Example Code: Block Until a Message Arrives

public BolOrder get_new_order1(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;
        
      try
      {
      /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* wait for a message to show up in the queue */
       obj_message = (ObjectMessage)qrec.receive();
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
      
       System.out.println("Order:  for customer " + 
                           customer.getName()); 
  
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Example: Block for a Maximum of 60 Seconds

public BolOrder get_new_order2(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;
        
      try
      {
            /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* wait for 60 seconds for a message to show up in the queue */
       obj_message = (ObjectMessage)qrec.receive(60000);
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
       
       System.out.println("Order:  for customer " + 
                           customer.getName()); 
  
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Example Code: Non-Blocking

public BolOrder poll_new_order3(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;
        
      try
      {
            /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* check for a message to show in the queue */
       obj_message = (ObjectMessage)qrec.receiveNoWait();
       
       new_order = (BolOrder)obj_message.getObject();
       
       customer = new_order.getCustomer();
       state    = customer.getState();
       
       System.out.println("Order:  for customer " + 
                           customer.getName()); 
  
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Message Navigation in Receive

When a consumer does the first receive in its session, its gets the first message in the queue or topic. Subsequent receives get the next message, and so on. The default behavior works well for FIFO queues and topics but not for priority ordered queues. If a high priority message arrives for the consumer, this client program will not receive the message until it has cleared the messages that were already there for it.

To provide the consumer a better control in navigating the queue for its messages, the AQ navigation modes are made available to it as JMS extensions. These modes can be set at the TopicSubscriber, QueueReceiver or the TopicReceiver.

For transaction grouping

Note that the transaction grouping property may be negated if messages are received in the following ways:

If in navigating through the queue, the program reaches the end of the queue while using the NEXT MESSAGE or NEXT TRANSACTION option, and you have specified a blocking receive, then the navigating position is automatically changed to the beginning of the queue.

By default, a QueueReceiver, Topic Receiver, or TopicSubscriber uses FIRST_MESSAGE for the first receive call, and NEXT_MESSAGE for the subsequent receive calls.

Example Scenario

The get_new_orders() procedure retrieves orders from the OE_neworders_que. Each transaction refers to an order, and each message corresponds to an individual book in that order. The get_orders() procedure loops through the messages to retrieve the book orders. It resets the position to the beginning of the queue using the FIRST MESSAGE option before the first receive. It then uses the next message navigation option to retrieve the next book (message) of an order (transaction). If it gets an exception indicating all message in the current group/transaction have been fetched, it changes the navigation option to next transaction and get the first book of the next order. It then changes the navigation option back to next message for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.

Example Code

public void get_new_orders(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order;
      String           state;
      int              new_orders = 1;  
             
      try
      {

         /* get a handle to the new_orders queue */
         queue = ((AQjmsSession) jms_session).getQueue("OE","OE_neworders_que");
         qrec = jms_session.createReceiver(queue);
        
    /* set navigation to first message */
        
((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_FIRST_
MESSAGE);

        while(new_orders != 0)
        {
          try{

             /* wait for a message to show up in the topic */
             obj_message = (ObjectMessage)qrec.receiveNoWait();
       
             if (obj_message != null)   /* no more orders in the queue */
             { 
               System.out.println(" No more orders ");
               new_orders = 0;
             }
             new_order = (BolOrder)obj_message.getObject();      
             customer = new_order.getCustomer();
             state    = customer.getState();
       
             System.out.println("Order: for customer " + 
                                customer.getName()); 

            /* Now get the next message */
            
((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_
MESSAGE);
 
          }catch(AQjmsException ex)
          {  if (ex.getErrorNumber() == 25235) 
             {
               System.out.println("End of transaction group");
              
((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_
TRANSACTION);
             }
             else
               throw ex;
          }
        }                   
     }catch (JMSException ex)
     {
        System.out.println("Exception: " + ex);
      }
   }

Modes for Receiving Messages

For Point-to-Point Mode

Aside from the normal receive, which allows the dequeuing client to delete the message from the queue, JMS provides an interface that allows the JMS client to Browse its messages in the queue. A QueueBrowser can be created via the createBrowser method from QueueSession.

If a message is browsed, it remains available for further processing. Note that after a message has been browsed there is no guarantee that the message will be available to the JMS session again as a receive call from a concurrent session might remove the message.

To prevent a viewed message from being removed by a concurrent JMS client, you can view the message in the locked mode. To do this, you need to create a QueueBrowser with the locked mode using the AQ extension to the JMS interface.The lock on the message with a browser with locked mode is released when the session performs a commit or a rollback.

To remove the message viewed by a QueueBrowser, the session must create a QueueReceiver and use the JMSmesssageID as the selector.

Example Code

Refer to the QueueBrowser Example in Point to Point features

Remove-No-Data

The MessageConsumer can remove the message from the queue or topic without retrieving the message using the receiveNoData call. This is useful when the application has already examined the message, perhaps using the QueueBrowser. This mode allows the JMS client to avoid the overhead of retrieving the payload from the database, which can be substantial for a large message.

Example Scenario and Code

In the following scenario from the BooksOnLine example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) via the QueueBrowser and the customer country (message payload) is checked. If the customer country is Mexico or Canada the message be deleted from the queue using the remove with no data (since the payload is already known) mode. Alternatively, the lock on the message is released by the commit call. Note that the receive call uses the message identifier obtained from the locked mode browse.

 public void process_international_orders(QueueSession jms_session)
  {
   QueueBrowser    browser;
   Queue           queue;
   ObjectMessage   obj_message;
   BolOrder        new_order;
   Enumeration     messages;
   String          customer_name;
   String          customer_country;
   QueueReceiver   qrec;
   String          msg_sel;

   try
   {
     /* get a handle to the new_orders queue */
     queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

     /* create a Browser to look at RUSH orders */
     browser = ((AQjmsSession)jms_session).createBrowser(queue, null, true);

     for (messages = browser.getEnumeration() ; messages.hasMoreElements() ;)
     {
       obj_message = (ObjectMessage)messages.nextElement();
       
       new_order = (BolOrder)obj_message.getObject();

       customer_name = new_order.getCustomer().getName();

       customer_country = new_order.getCustomer().getCountry();
       
       if (customer_country equals ("Canada") || customer_country equals ( 
"Mexico"))
       {
         System.out.println("Order for Canada or Mexico");
         msg_sel = "JMSMessageID = '" + obj_message. getJMSMessageID()+ "'";
         qrec = jms_session.createReceiver(queue, msg_sel);
         ((AQjmsQueueReceiver)qrec).receiveNoData();
       }
     }
   }catch (JMSException ex)
    { System.out.println("Exception " + ex);

    }
  }

Retry With Delay Interval

Max Retries

If the transaction receiving the message from a queue/topic fails, it is regarded as an unsuccessful attempt to remove the message. AQ records the number of failed attempts to remove the message in the message history.

In addition, it also allows the application to specify at the queue/topic level, the maximum number of retries supported on messages. If the number of failed attempts to remove a message exceed this number, the message is moved to the exception queue and is no longer available to applications.

Retry Delay

If the transaction receiving a message aborted, this could be because of a 'bad' condition, for example, an order that could not be fulfilled because there were insufficient books in stock. Since inventory updates are made every 12 hours, it makes sense to retry after that time. If an order was not filled after 4 attempts, this could indicates there is a problem.

AQ allows users to specify a retry_delay along with max_retries. This means that a message that has undergone a failed attempt at retrieving will remain visible in the queue for dequeue after 'retry_delay' interval. Until then it will be in the 'WAITING' state. The AQ background process, the time manager enforces the retry delay property.

The maximum retries and retry delay are properties of the queue/topic which can be set when the queue/topic is created or via the alter method on the queue/topic. The default value for MAX_RETRIES is 5.

Example Scenario and Code

If an order cannot be filled because of insufficient inventory, the transaction processing the order is aborted. The booked_orders topic is set up with max_retries = 4 and retry_delay = 12 hours.Thus, if an order is not filled up in two days, it is moved to an exception queue.

public BolOrder process_booked_order(TopicSession jms_session)
  {
    Topic            topic;
    TopicSubscriber  tsubs;
    ObjectMessage    obj_message;
    BolCustomer      customer;
    BolOrder         booked_order = null;
    String           country;
    int              i = 0;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Create local subscriber - to track messages for Western Region  */
      tsubs = jms_session.createDurableSubscriber(topic, "SUBS1",
                                       "Region = 'Western' ",
                                                   false);

       /* wait for a message to show up in the topic */
       obj_message = (ObjectMessage)tsubs.receive(10);

       booked_order = (BolOrder)obj_message.getObject();

       customer = booked_order.getCustomer();
       country    = customer.getCountry();

       if (country == "US")
       {
          jms_session.commit();
       }
       else
       {
          jms_session.rollback();
          booked_order = null;
       }
    }catch (JMSException ex)
    { System.out.println("Exception " + ex) ;}
     
     return booked_order;
   }

Asynchronously Receiving Message Using Message Listener

Message Listener for a Message Consumer

The JMS client can receive messages asynchronously by setting the MessageListener using the setMessageListener method available with the Consumer.

When a message arrives for the message consumer, the onMessage method of the message listener is invoked with the message. The message listener can commit or abort the receipt of the message. The message listener will not receive messages if the JMS Connection has been stopped. The receive call must not be used to receive messages once the message listener has been set for the consumer.

Example

The application processing the new orders queue can be set up for asynchronously receiving messages from the queue.

 public class OrderListener implements MessageListener
   {
      QueueSession   the_sess;
    
      /* constructor */
      OrderListener(QueueSession  my_sess)
      {
        the_sess = my_sess;
      }
      
      /* message listener interface */
      public void onMessage(Message m)
      {
        ObjectMessage    obj_msg;
        BolCustomer      customer;
        BolOrder         new_order = null;

        try {
          /* cast to JMS Object Message */
          obj_msg = (ObjectMessage)m;

          /*  Print some useful information */
          new_order = (BolOrder)obj_msg.getObject();
          customer = new_order.getCustomer();
          System.out.println("Order:  for customer " +  customer.getName()); 

          /* call the process order method 
          * NOTE: we are assuming it is defined elsewhere
          * /
          process_order(new_order);
      
          /* commit the asynchronous receipt of the message */
           the_sess.commit();
        }catch (JMSException ex)
        { System.out.println("Exception " + ex) ;}
     
      }
   }

     public void setListener1(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      MessageListener  ourListener;
        
      try
      {
       /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       /* create a queue receiver */
       qrec = jms_session.createReceiver(queue);

       /* create the message listener */
       ourListener = new OrderListener(jms_session);
 
       /* set the message listener for the receiver */
       qrec.setMessageListener(ourListener);
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
   }

Message Listener for All Consumers on a Session

The JMS client can receive messages asynchronously for all the consumers of the session by setting the MessageListener at the session.

When a message arrives for any of the message consumers of the session, the onMessage method of the message listener is invoked with the message. The message listener can commit or abort the receipt of the message. The message listener will not receive messages if the JMS connection has been stopped. No other mode for receiving messages must be used in the session once the message listener has been set.

Example Scenario and Code

In the customer service component of the BooksOnLine example, messages from different databases arrive at the customer service topics, indicating the state of the order. The customer service application monitors the topics and whenever there is a message about a customer order, it updates the order status in the order_status_table. The application uses the session listener to monitor the different topics. Whenever there is a message in any of the topics, the onMessage method of the session MessageListener is invoked.

/* define our message listener class */
   public class CustomerListener implements MessageListener
   {
      TopicSession   the_sess;
    
      /* constructor */
      CustomerListener(TopicSession  my_sess)
      {
        the_sess = my_sess;
      }
      
      /* message listener interface */
      public void onMessage(Message m)
      {
        ObjectMessage    obj_msg;
        BolCustomer      customer;
        BolOrder         new_order = null;

        try
   {
          /* cast to JMS Object Message */
          obj_msg = (ObjectMessage)m;

          /*  Print some useful information */
          new_order = (BolOrder)obj_msg.getObject();
          customer = new_order.getCustomer();
          System.out.println("Order:  for customer " +  customer.getName()); 

          /* call the update status method 
           * NOTE: we are assuming it is defined elsewhere
           * /
          update_status(new_order, new_order.getStatus());
      
          /* commit the asynchronous receipt of the message */
          the_sess.commit();
       }catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
   }

 }
 public void  monitor_status_topics(TopicSession jms_session) 
  {
    Topic[]             topic = new Topic[4];
    TopicSubscriber[]   tsubs= new TopicSubscriber[4];
   
    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic[0] = ((AQjmsSession)jms_session).getTopic("CS",
                                                   "CS_bookedorders_topic");
      tsubs[0] = jms_session.createDurableSubscriber(topic[0], "BOOKED_ORDER");

      topic[1] = ((AQjmsSession)jms_session).getTopic("CS",
                                                   "CS_billedorders_topic");
      tsubs[1] = jms_session.createDurableSubscriber(topic[1], "BILLED_ORDER");

      topic[2] = ((AQjmsSession)jms_session).getTopic("CS",
                                                   "CS_backdorders_topic");
      tsubs[2] = jms_session.createDurableSubscriber(topic[2], "BACKED_ORDER");

      topic[3] = ((AQjmsSession)jms_session).getTopic("CS",
                                                   "CS_shippedorders_topic");
      tsubs[3] = jms_session.createDurableSubscriber(topic[3], "SHIPPED_ORDER");

      MessageListener  mL = new CustomerListener(jms_session);
      
      /* set the session's message listener */
      jms_session.setMessageListener(mL);

    }catch(JMSException ex)
     { System.out.println("Exception: " + ex); }
  }
  

AQ Exception Handling

AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES, EXPIRATION, MAX_RETRIES and RETRY_DELAY.

An exception_queue is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. However, an application that intends to handle these expired or unserviceable messages can receive/remove them from the exception queue.

To retrieve messages from exception queues, the JMS client must use the point to point interface.The exception queue for messages intended for a topic must be created in a queue table with multiple consumers enabled. Like any other queue, the exception queue must be enabled for receiving messages using the start method in the AQOracleQueue class. You will get an exception if you try to enable it for enqueue.

The exception queue is a provider (Oracle) specific message property called "JMS_OracleExcpQ" that can be set with the message before sending/publishing it. If an exception queue is not specified, the default exception queue is used. If the queue/topic is created in a queue table, say QTAB, the default exception queue will be called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created.

Messages are moved to the exception queues by AQ under the following conditions:

Example Scenarios

The section retry with delay interval has an example with MAX_RETRIES. In the BooksOnLine application, the business rule for each shipping region is that an order will be placed in a back order queue if the order cannot be filled immediately. The back order application will try to fill the order once a day. If the order cannot be filled within 7 days, it is placed in an exception queue for special processing. We implement this using the Time-to-Live property of messages in conjunction with exception queues.

  1. Create the exception queue WS_back_order_exp_que

    public void create_excp_que(TopicSession jms_session)
        {
          AQQueueTable     q_table;
          Queue            excpq;
    
          try {
             /* create the exception queue in the queue table with multiple 
              * consumer flag true
              */
              q_table = ((AQjmsSession)jms_session).getQueueTable("WS", "WS_orders_
    mqtab");
     
              AQjmsDestinationProperty dest_prop = new AQjmsDestinationProperty();
    
              dest_prop.setQueueType(AQjmsDestinationProperty.EXCEPTION_QUEUE);
         excpq = ((AQjmsSession)jms_session).createQueue(q_table, 
                                "WS_back_orders_excp_que", 
                            dest_prop);   
              /* start the exception queue for receiving (dequeuing) messages only 
    */
              ((AQjmsDestination)excpq).start(jms_session, false, true);
    
              }
          catch (JMSException ex)
          { System.out.println("Exception  " + ex); }
        }   
    
    
  2. Publish message on back orders queue with exception queue set to WS_back_orders_excp_que

    
     public static void requeue_back_order(TopicSession jms_session, 
                      String sale_region, BolOrder back_order)
     {
        Topic             back_order_topic;   
        ObjectMessage     obj_message;
        TopicPublisher    tpub;
        long              timetolive;
    
        try
        {
       back_order_topic = ((AQjmsSession)jms_session).getTopic("WS", 
                         "WS_backorders_topic");
       obj_message = jms_session.createObjectMessage();
       obj_message.setObject(back_order);
             
            /* set exception queue */
            obj_message.setStringProperty("JMS_OracleExcpQ", "WS.WS_back_orders_
    excp_que");
    
       tpub = jms_session.createPublisher(null);
    
       /* Set message expiration to 7 days: */ 
       timetolive = 7*60*60*24*1000;          // specified in  milliseconds 
    
       /* Publish the message */
       tpub.publish(back_order_topic, obj_message, DeliveryMode.PERSISTENT,
               1, timetolive);
       jms_session.commit();
        }
        catch (Exception ex)
        {
       System.out.println("Exception :" + ex); 
        }
    }
    
    
  3. Receive expired messages from the exception queue using the point to point interface

    public BolOrder get_expired_order(QueueSession jms_session)
       {
          Queue            queue;
          QueueReceiver    qrec;
          ObjectMessage    obj_message;
          BolCustomer      customer;
          BolOrder         exp_order = null;
            
          try
          {
          /* get a handle to the exception queue */
           queue = ((AQjmsSession) jms_session).getQueue("WS", "WS_back_orders_excp_
    que");
    
           qrec = jms_session.createReceiver(queue);
    
           /* wait for a message to show up in the queue */
           obj_message = (ObjectMessage)qrec.receive();
           
           exp_order = (BolOrder)obj_message.getObject();
           
           customer = exp_order.getCustomer();
          
           System.out.println("Expired Order:  for customer " + 
                               customer.getName()); 
      
          }
          catch (JMSException ex)
          {
            System.out.println("Exception: " + ex);
          }
          return exp_order;
       }
    
    

Go to previous page Go to beginning of chapter Go to next page
Oracle
Copyright © 1996-2000, Oracle Corporation.

All Rights Reserved.

Library

Product

Contents

Index