Sun Java System Message Queue 3.7 UR1 Developer's Guide for Java Clients

Client Connection Failover (Auto-Reconnect)

Message Queue supports client connection failover. A failed connection can be automatically restored not only to the original broker, but to a different broker in a broker cluster. There are circumstances under which the client-side state cannot be restored on any broker during an automatic reconnection attempt; for example, when the client uses transacted sessions or temporary destinations. At such times the connection exception handler is called and the application code has to catch the exception and restore state.

This section explains how automatic reconnection is enabled, how the broker behaves during a reconnect, how automatic reconnection impacts producers and consumers, and how producers and consumers should handle exceptions that result from connection failover. For additional information about this feature, please see Connection Handling in Sun Java System Message Queue 3.7 UR1 Administration Guide.

Enabling Auto-Reconnect

The developer or the administrator can enable automatic reconnection by setting the connection factory imqReconnectEnabled attribute to true. The connection factory administered object must also be configured to specify the following:

Single-Broker Auto-Reconnect

Configure your connection-factory object as follows:


Example 3–3 Example of Command to Configure a Single Broker


imqobjmgr add -t cf -l "cn=myConnectionFactory" \
    -o "imqAddressList=mq://jpgserv/jms" \
    -o "imqReconnect=true" \
    -o "imqReconnectAttempts=10"
               

This command creates a connection-factory object with a single address in the broker address list. If connection fails, the client runtime will try to reconnect with the broker 10 times. If an attempt to reconnect fails, the client runtime will sleep for three seconds (the default value for the imqReconnectInterval attribute) before trying again. After 10 unsuccessful attempts, the application will receive a JMSException .

You can ensure that the broker starts automatically with at system start-up time. See Sun Java System Message Queue 3.7 UR1 Installation Guide for information on how to configure automatic broker start-up. For example, on the Solaris platform, you can use /etc/rc.d scripts.

Parallel Broker Auto-Reconnect

Configure your connection-factory objects as follows:


Example 3–4 Example of Command to Configure Parallel Brokers


imqobjmgr add -t cf -l "cn=myCF" \
    -o "imqAddressList=myhost1, mqtcp://myhost2:12345/jms" \
    -o "imqReconnect=true" \
    -o "imqReconnectRetries=5"
               

This command creates a connection factory object with two addresses in the broker list. The first address describes a broker instance running on the host myhost1 with a standard port number (7676). The second address describes a jms connection service running at a statically configured port number (12345).

Clustered-Broker Auto-Reconnect

Configure your connection-factory objects as follows:


Example 3–5 Example of Command to Configure a Broker Cluster


imqobjmgr add -t cf -l "cn=myConnectionFactory" \
    -o "imqAddressList=mq://myhost1/ssljms, \
            mq://myhost2/ssljms, \
            mq://myhost3/ssljms, \
            mq://myhost4/ssljms” \
    -o "imqReconnect=true" \
    -o "imqReconnectRetries=5" \
    -o "imqAddressListBehavior=RANDOM"
               

This command creates a connection factory object with four addresses in the imqAddressList. All the addresses point to jms services running on SSL transport on different hosts. Since the imqAddressListBehavior attribute is set to RANDOM, the client connections that are established using this connection factory object will be distributed randomly among the four brokers in the address list.

This is a clustered broker configuration, so you must configure one of the brokers in the cluster as the master broker. In the connection-factory address list, you can also specify a subset of all the brokers in the cluster.

Auto-Reconnect Behaviors

A broker treats an automatic reconnection as it would a new connection. When the original connection is lost, all resources associated with that connection are released. For example, in a broker cluster, as soon as one broker fails, the other brokers assume that the client connections associated with the failed broker are gone. After auto-reconnect takes place, the client connections are recreated from scratch.

Sometimes the client-side state cannot be fully restored by auto-reconnect. Perhaps a resource that the client needs cannot be recreated. In this case, the client runtime calls the client’s connection exception handler and the client must take appropriate action to restore state. For additional information, see Handling Exceptions When Failover Occurs.

If the client is automatically-reconnected to a different broker instance, persistent messages can only be sent after the original broker recovers. Other state information held by the failed or disconnected broker can be lost. The messages held by the original broker, once it is restored, might be delivered out of order. This is because broker instances in a cluster do not use a shared, highly available persistent store.

A transacted session is the most reliable method of ensuring that a message isn’t lost if you are careful in coding the transaction. If auto-reconnect happens in the middle of a transaction, the client runtime throws an exception when the transaction is committed, and the transaction is rolled back. At that point, you must make sure that the client restarts the whole transaction. (This is especially important when you use a broker cluster.) For additional information, see Handling Exceptions When Failover Occurs.

Automatic reconnection affects producers and consumers differently:

Auto-Reconnect Limitations

Notice the following points when using the auto-reconnect feature:

Handling Exceptions When Failover Occurs

Several kinds of exceptions can occur as a result of the client being reconnected after a failover. How the client application should handle these exceptions depends on whether a session is transacted, on the kind of exception thrown, and on the client's role--as producer or consumer. The following sections discuss the implications of these factors.

Independently of how the exception is raised, the client application must never call System.exit()to exit the application because this would prevent the Message Queue client runtime from reconnecting to an alternate or restarted broker.

When a failover occurs, exception messages may be shown on the application's console and recorded in the broker's log. These messages are for information only. They may be useful in troubleshooting, but minimizing or eliminating the impact of a failover is best handled preemptively by the application client in the ways described in the following sections.

Handling Exceptions in a Transacted Session

A transacted session might fail to commit and (throw an exception) either because a failover occurs while statements within the transaction are being executed or because the failover occurs during the call to Session.commit(). In the first case, the failover is said to occur during an open transaction; in the second case, the failover occurs during the commit itself.

In the case of a failover during an open transaction, when the client application calls Session.commit(), the client runtime will throw a TransactionRolledBackException and roll back the transaction causing the following to happen.

If the client application itself had called Session.rollback after a failover (before the Session.commit is executed) the same things would happen as if the application had received a TransactionRollbackException. After receiving a TransactionRollbackException or calling Session.rollback(), the client application must retry the failed transaction. That is, it must re-send and re-consume the messages that were involved in the failed-over transaction.

In the second case, when the failover occurs during a call to Session.commit, there may be three outcomes:

Handling Exceptions in a Non-Transacted Session

If a connection is failed-over for a producer, a client application may receive a JMSException. The application thread that receives the exception should pause for a few seconds and then resend the messages. The client application may want to set a flag on the resent messages to indicate that they could be duplicates.

If a connection is failed over for a message consumer, the consequences vary with the sessions acknowledge mode:

Failover Producer Example

The following code sample illustrates good coding practices for handling exceptions during a failover. It is designed to send non-transacted, persistent messages forever and to handle JMSExceptions when a failover occurs. The program is able to handle either a true or false setting for the imqReconnectEnabled property. To run the program enter one of the following commands.

java dura.example.FailoverProducer
java -DimqReconnectEnabled=true dura.example.FailoverProducer
/*
 * @(#)FailoverProducer.java	1.1 06/06/09
 * Copyright 2006 Sun Microsystems, Inc. All Rights Reserved
 * SUN PROPRIETARY/CONFIDENTIAL
 * Use is subject to license terms. */

package dura.example;

import javax.jms.*;
import com.sun.messaging.ConnectionConfiguration;
import java.util.*;

public class FailoverProducer implements ExceptionListener {

	//connection factory
    private com.sun.messaging.TopicConnectionFactory factory;
    //connection
    private TopicConnection pconn = null;
    //session
    private TopicSession psession = null;
    //publisher
    private TopicPublisher publisher = null;
    //topic
    private Topic topic = null;
    //This flag indicates whether this test client is closed.
    private boolean isClosed = false;
    //auto reconnection flag
    private boolean autoReconnect = false;
    //destination name for this example.
    private static final String DURA_TEST_TOPIC = "DuraTestTopic";
    //the message counter property name 
    public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER";
    //the message in-doubt-bit property name
    public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT";

    /**
     * Constructor.  Get imqReconnectEnabled property value from 
     * System property.
     */
    public FailoverProducer () {
    	
    	try {
    		autoReconnect = 
    		Boolean.getBoolean(ConnectionConfiguration.imqReconnectEnabled);
    	} catch (Exception e) {
    		this.printException(e);
    	}
    	
    }

    /**
     * Connection is broken if this handler is called.  
     * If autoReconnect flag is true, this is called only 
     * if no more retries from MQ.
     */
    public void onException (JMSException jmse) {
        this.printException (jmse);
    }

    /**
     * create MQ connection factory.
     * @throws JMSException
     */
    private void initFactory() throws JMSException {
        //get connection factory
        factory = new com.sun.messaging.TopicConnectionFactory();
    }

    /**
     * JMS setup.  Create a Connection,Session, and Producer.
     * 
     * If any of the JMS object creation fails (due to system failure),
     * it retries until it succeeds.
     *
     */
    private void initProducer() {
    	
        boolean isConnected = false;

        while ( isClosed == false && isConnected == false ) {
            
        	try {
                println("producer client creating connection ...");

                //create connection
                pconn = factory.createTopicConnection();
                
                //set connection exception listener
                pconn.setExceptionListener(this);

                //create topic session
                psession = pconn.createTopicSession(false,
                    Session.CLIENT_ACKNOWLEDGE);
                
                //get destination
                topic = psession.createTopic(DURA_TEST_TOPIC);

                //publisher
                publisher = psession.createPublisher(topic);

                //set flag to true
                isConnected = true;

                println("producer ready.");
            }
            catch (Exception e) {

                println("*** connect failed ... sleep for 5 secs.");
                
                try {
                	//close resources.
                	if ( pconn != null ) {
                		pconn.close();
                	}
                	//pause 5 secs.
                    Thread.sleep(5000);
                
                } catch (Exception e1) {
                    ;
                }
            }
        }
    }

    /**
     * Start test.  This sends JMS messages in a loop (forever).
     */
    public void run () {

        try {
        	//create MQ connection factory.
            initFactory();
            
            //create JMS connection,session, and producer
            initProducer();
            
            //send messages forever.
            sendMessages();
        } catch (Exception e) {
            this.printException(e);
        }
    }

    /**
     * Send persistent messages to a topic forever.  This shows how
     * to handle failover for a message producer.
     */
    private void sendMessages() {
    	
    	//this is set to true if send failed.
        boolean messageInDoubt = false;
        
        //message to be sent
        TextMessage m = null;
        
        //msg counter
        long msgcount = 0;

        while (isClosed == false) {
        	
            try {
            	
            	/**
            	 * create a text message 
            	 */
                m = psession.createTextMessage();
               
                /**
                 * the MESSAGE_IN_DOUBT bit is set to true if 
                 * you get an exception for the last message.
                 */
                if ( messageInDoubt == true ) {
                    m.setBooleanProperty (MESSAGE_IN_DOUBT, true);
                    messageInDoubt = false;
                    
                    println("MESSAGE_IN_DOUBT bit is set to true 
                             for msg: " + msgcount);
                } else {
                    m.setBooleanProperty (MESSAGE_IN_DOUBT, false);
                }
                
                //set message counter
                m.setLongProperty(MESSAGE_COUNTER, msgcount);
                
                //set message body
                m.setText("msg: " + msgcount);
                
                //send the msg
                publisher.send(m, DeliveryMode.PERSISTENT, 4, 0);

                println("sent msg: " + msgcount);
                
                /**
                 * reset counetr if reached max long value.
                 */
                if (msgcount == Long.MAX_VALUE) {
                	msgcount = 0;
                	
                	println ("Reset message counter to 0.");
                }
                
                //increase counter
                msgcount ++;
                
                Thread.sleep(1000);

            } catch (Exception e) {

                if ( isClosed == false ) {
                	
                    //set in doubt bit to true.
                    messageInDoubt = true;

                    this.printException(e);
                   
                    //init producer only if auto reconnect is false.
                    if ( autoReconnect == false ) {
                        this.initProducer();
                    }
                }
            }
        }
    }

    /**
     * Close this example program.
     */
    public synchronized void close() {

        try {
            isClosed = true;
            pconn.close();

            notifyAll();
        } catch (Exception e) {
            this.printException(e);
        }
    }
    
    /**
     * print the specified exception.
     * @param e the exception to be printed.
     */
    private void printException (Exception e) {
    	System.out.println(new Date().toString());
    	e.printStackTrace();
    }
    
    /**
     * print the specified message.
     * @param msg the message to be printed.
     */
    private void println (String msg) {
    	System.out.println(new Date() + ": " + msg);
    }
    
    /**
     * Main program to start this example.
     */
    public static void main (String args[]) {
    	FailoverProducer fp = new FailoverProducer();
    	fp.run();
    }

}

Failover Consumer Example

The following code sample, FailoverConsumer, illustrates good coding practices for handling exceptions during a failover. The transacted session is able to receive messages forever. The program sets the auto reconnect property to true, requiring the Message Queue runtime to automatically perform a reconnect when the connected broker fails or is killed. It is designed to work with the dura.example.FailoverProducer, shown in the previous section.

To run this program enter the following command.

java dura.example.FailoverConsumer
/*
 * @(#)FailoverConsumer.java	1.1 06/06/09
 * Copyright 2006 Sun Microsystems, Inc. All Rights Reserved
 * SUN PROPRIETARY/CONFIDENTIAL
 * Use is subject to license terms.
 *
 */
package dura.example;

import java.util.Date;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TransactionRolledBackException;
import com.sun.messaging.ConnectionConfiguration;

public class FailoverConsumer implements ExceptionListener, Runnable {

	//JMS connection
    private Connection conn = null;
    //JMS session
    private Session session = null;
    //JMS Message consumer
    private MessageConsumer messageConsumer = null;
    //JMS destination.
    private Destination destination = null;
    //flag indicates whether this program should continue running.
    private boolean isConnected = false;
    //destination name.
    private static final String DURA_TEST_TOPIC = "DuraTestTopic";
    //the commit counter, for information only.
    private long commitCounter = 0;
    
    /**
     * message counter property set by the producer.
     */
    public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER";
    
    /**
     * Message in doubt bit set by the producer
     */
    public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT";
    
    /**
     * receive time out
     */
    public static final long RECEIVE_TIMEOUT = 0;
    
    /**
     * Default constructor -   
     * Set up JMS Environment.
     */
    public FailoverConsumer() {
       setup();
    }

    /*  Connection Exception listener.  This is called when connection
     *  breaks and no reconnect attempts are performed by MQ client runtime.
     */
    public void onException (JMSException e) {

    	print ("Reconnect failed.  Shutting down the connection ...");
    	
    	/**
    	 * Set this flag to false so that the run() method will exit.
    	 */
        this.isConnected = false;
        e.printStackTrace();
    }

    /**
     * Best effort to roll back a jms session.  When a broker crashes, an
     * open-transaction should be rolled back.  But the re-started broker 
     * may not have the uncommitted tranaction information due to system
     * failure.  In a situation like this, an application can just quit
     * calling rollback after retrying a few times  The uncommitted 
     * transaction (resources) will eventually be removed by the broker.
     */
    private void rollBackJMS() {
    	
    	//rollback fail count
    	int failCount = 0;
    	
        boolean keepTrying = true;
        
        while ( keepTrying ) {

            try {

                print ("<<< rolling back JMS ...., consumer commit counter:
                          " +  this.commitCounter);

                session.rollback();
                
                print("<<< JMS rolled back ...., consumer commit counter:
                          " + this.commitCounter);
                keepTrying = false;
            } catch (JMSException jmse) {
               
            	failCount ++;
                jmse.printStackTrace();

                sleep (3000); //3 secs
                
                if ( failCount == 1 ) {

                    print ("<<< rollback failed : total count" + failCount);
                    keepTrying = false;
                }
            }
        }
    }

    
    /**
     * Close the JMS connection and exit the program.
     *
     */
    private void close() {
        try {
       	
            if ( conn != null ) {
                conn.close();	
            }
        
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            this.isConnected = false;
        }
    }

    /*Receive messages in a loop until closed.*/
     
    public void run () {
       
		while (isConnected) {

			try {
				
				/*receive message with specified timeout.*/
				
                Message m = messageConsumer.receive(RECEIVE_TIMEOUT);
                
                /* process the message. */
                processMessage(m);

                /* commit JMS transaction. */
                this.commit();
                
                /*increase the commit counter.*/
                this.commitCounter ++;
               
			} catch (TransactionRolledBackException trbe) {
				
				/**
				 * the transaction is rolled back
				 * a new transaction is automatically started. 
				 */
				trbe.printStackTrace();
			} catch (JMSException jmse) {
				
				/* The transaction is in unknown state.
        * We need to roll back the transaction.*/

                jmse.printStackTrace();
                
                /* roll back if not closed.
                 */
                if ( this.isConnected == true ) {
					this.rollBackJMS();
				}

			} catch (Exception e) {
                
                e.printStackTrace();
                
                /* Exit if this is an unexpected Exception.
                 */
                this.close();
                
            } finally {
            	;//do nothing
            }
		}
		
		print(" <<< consumer exit ...");
	}

    /*Set up connection, destination, etc*/
       /
    protected void setup() {
        
    	try {
        	
			//create connection factory
			com.sun.messaging.ConnectionFactory factory =
			new com.sun.messaging.ConnectionFactory();
			
			//set auto reconnect to true.
			factory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
        	//A value of -1 will retry forever if connection is broken.
			factory.setProperty(ConnectionConfiguration.imqReconnectAttempts, "-1");
			//retry interval - every 10 seconds
			factory.setProperty(ConnectionConfiguration.imqReconnectInterval, "10000");
			//create connection
			conn = factory.createConnection();
			//set client ID
			conn.setClientID(DURA_TEST_TOPIC);
			
			//set exception listener
			conn.setExceptionListener(this);

			//create a transacted session
			session = conn.createSession(true, -1);
            
			//get destination
			destination = session.createTopic(DURA_TEST_TOPIC);
			
			//message consumer
			messageConsumer = session.createDurableSubscriber((Topic)destination,
                                                       DURA_TEST_TOPIC);
			//set flag to true
            this.isConnected = true;
            //we are ready, start the connection
            conn.start();
            
            print("<<< Ready to receive on destination: " + DURA_TEST_TOPIC);

		} catch (JMSException jmse) {
            this.isConnected = false;
            jmse.printStackTrace();

            this.close();
        }
	}

    /**
     * Process the received message message. 
     *  This prints received message counter.
     * @param m the message to be processed.
     */
    private synchronized void processMessage(Message m) {
        
    	try {
    		//in this example, we do not expect a timeout, etc.
    		if ( m == null ) {
    			throw new RuntimeException ("<<< Received null message. 
                                     Maybe reached max time out. ");
    		}
    		
    		//get message counter property
        	long msgCtr = m.getLongProperty (MESSAGE_COUNTER);
        	
        	//get message in-doubt bit
        	boolean indoubt = m.getBooleanProperty(MESSAGE_IN_DOUBT);
        	
        	if ( indoubt) {
        		print("<<< received message: " + msgCtr + ", indoubt bit is true");
        	} else {
        		print("<<< received message: " + msgCtr);
        	}
        	
        } catch (JMSException jmse) {
            jmse.printStackTrace();
        }
    }

    /**
     * Commit a JMS transaction.
     * @throws JMSException
     */
    private void commit() throws JMSException {
        session.commit();
    }
    
    /**
     * Sleep for the specified time.
     * @param millis sleep time in milli-seconds.
     */
    private void sleep (long millis) {
        try {
            Thread.sleep(millis);
        } catch (java.lang.InterruptedException inte) {
            print (inte);
        }
    }
    
    /**
     * Print the specified message.
     * @param msg the message to be printed.
     */
    private static void print (String msg) {
    	System.out.println(new Date() + ": " + msg);
    }
    
    /**
     * Print Exception stack trace.
     * @param e the exception to be printed.
     */
    private static void print (Exception e) {
    	System.out.print(e.getMessage());
    	e.printStackTrace();
    }
    
    /**
     * Start this example program.
     */
    public static void main (String args[]) {
        FailoverConsumer fc = new FailoverConsumer();
        fc.run();
    }

}