Message Queue supports client connection failover. A failed connection can be automatically restored not only to the original broker, but to a different broker in a broker cluster. There are circumstances under which the client-side state cannot be restored on any broker during an automatic reconnection attempt; for example, when the client uses transacted sessions or temporary destinations. At such times the connection exception handler is called and the application code has to catch the exception and restore state.
This section explains how automatic reconnection is enabled, how the broker behaves during a reconnect, how automatic reconnection impacts producers and consumers, and how producers and consumers should handle exceptions that result from connection failover. For additional information about this feature, please see Connection Handling in Sun GlassFish Message Queue 4.4 Administration Guide.
Message Queue also provides a notification API that allows the client application to listen for closure and reconnection events and to respond to such events based on the notification type and the connection state. These notifications may be valuable in preparing the client for an impending event or for gathering diagnostic data. For more information, see Connection Event Notification.
Starting with version 4.1 of Message Queue, you can cluster brokers in either a conventional cluster or a high-availability cluster. The clustering model used may affect your client design. This section notes such design differences
If you are using conventional clusters, you enable automatic reconnection by setting the connection factory imqReconnectEnabled attribute to true. If you are using a high availability cluster, the imqReconnectEnabled attribute is ignored; the client runtime will automatically reconnect to a backup broker if the connection is lost and not regained after no more than imqReconnectAttempts attempts. This applies to all deployment configurations: whether Message Queue is used stand alone or whether the connection is created through a resource adapter.
No matter which type of cluster you are using, you must also configure the connection factory administered object to specify the following information.
A list of message-service addresses (using the imqAddressList attribute). Independently of the clustering model used, the client runtime uses this address list when it establishes the initial connection.
When you connect to a conventional cluster, the client runtime also uses the address list when it tries to reestablish a connection to the message service: it attempts to connect to the brokers in the list until it finds (or fails to find) an available broker. If you specify only a single broker instance on the imqAddressList attribute, the configuration won’t support recovery from hardware failure.
When you specify more than one broker, you can decide whether to use parallel brokers or a broker cluster. In a parallel configuration, there is no communication between brokers, while in a broker cluster, the brokers interact to distribute message delivery loads. (Refer to Cluster Message Delivery in Sun GlassFish Message Queue 4.4 Technical Overview for more information on broker clusters.)
To enable parallel-broker reconnection, set theimqAddressListBehavior attribute to PRIORITY . Typically, you would specify no more than a pair of brokers for this type of reconnection. This way, the messages are published to one broker, and all clients fail over together from the first broker to the second.
To enable clustered-broker reconnection, set the imqAddressListBehavior attribute to RANDOM. This way, the client runtime randomizes connection attempts across the list, and client connections are distributed evenly across the broker cluster.
Each broker in a cluster uses its own separate persistent store (which means that undelivered persistent messages are unavailable until a failed broker is back online). If one broker crashes, its client connections are reestablished on other brokers.
If you use the high availability clustering model, the address list is dynamically updated to include the brokers that are connected to the highly available database serving the cluster. In this case, the client runtime and the brokers use an internal protocol to determine which broker takes over the persistent data of the failed broker. Therefore the imqAddressListBehavior property does not apply to this model.
The number of iterations to be made over the list of brokers (using the imqAddressListIterations attribute) when attempting to create a connection or to reconnect.
For high-availability clusters, the broker will attempt to reconnect forever (no matter what value you specify for this attribute). If the client does not want this behavior, it must explicitly close the connection.
The number of attempts to reconnect to a broker if the first connection fails (using the imqReconnectAttempts attribute).
The interval, in milliseconds, between reconnect attempts, using the imqReconnectInterval attribute. This attribute applies to both clustering models.
Configure your connection-factory object as follows:
imqobjmgr add -t cf -l "cn=myConnectionFactory" \ -o "imqAddressList=mq://jpgserv/jms" \ -o "imqReconnect=true" \ -o "imqReconnectAttempts=10" -j "java.naming.factory.initial = com.sun.jndi.fscontext.RefFSContextFactory -j "java.naming.provider.url= file:///home/foo/imq_admin_objects" |
This command creates a connection-factory object with a single address in the broker address list. If connection fails, the client runtime will try to reconnect with the broker 10 times. If an attempt to reconnect fails, the client runtime will sleep for three seconds (the default value for the imqReconnectInterval attribute) before trying again. After 10 unsuccessful attempts, the application will receive a JMSException .
You can ensure that the broker starts automatically at system start-up time. See Starting Brokers Automatically in Sun GlassFish Message Queue 4.4 Administration Guide for information on how to configure automatic broker start-up. For example, on the Solaris platform, you can use /etc/rc.d scripts.
Configure your connection-factory objects as follows:
imqobjmgr add -t cf -l "cn=myCF" \ -o "imqAddressList=myhost1, mqtcp://myhost2:12345/jms" \ -o "imqReconnect=true" \ -o "imqReconnectRetries=5" -j "java.naming.factory.initial = com.sun.jndi.fscontext.RefFSContextFactory -j "java.naming.provider.url= file:///home/foo/imq_admin_objects" |
This command creates a connection factory object with two addresses in the broker list. The first address describes a broker instance running on the host myhost1 with a standard port number (7676). The second address describes a jms connection service running at a statically configured port number (12345).
Configure your connection-factory objects as follows:
imqobjmgr add -t cf -l "cn=myConnectionFactory" \ -o "imqAddressList=mq://myhost1/ssljms, \ mq://myhost2/ssljms, \ mq://myhost3/ssljms, \ mq://myhost4/ssljms” \ -o "imqReconnect=true" \ -o "imqReconnectRetries=5" \ -o "imqAddressListBehavior=RANDOM" -j "java.naming.factory.initial = com.sun.jndi.fscontext.RefFSContextFactory -j "java.naming.provider.url= file:///home/foo/imq_admin_objects" |
This command creates a connection factory object with four addresses in the imqAddressList. All the addresses point to jms services running on SSL transport on different hosts. Since the imqAddressListBehavior attribute is set to RANDOM, the client connections that are established using this connection factory object will be distributed randomly among the four brokers in the address list. If you are using a high availability cluster, the RANDOM attribute is ignored during a failover reconnect after losing an existing connection to a broker.
For a conventional cluster, you must configure one of the brokers in the cluster as the master broker.In the connection-factory address list, you can also specify a subset of all the brokers in the cluster.
A broker treats an automatic reconnection as it would a new connection. When the original connection is lost, all resources associated with that connection are released. For example, in a broker cluster, as soon as one broker fails, the other brokers assume that the client connections associated with the failed broker are gone. After auto-reconnect takes place, the client connections are recreated from scratch.
Sometimes the client-side state cannot be fully restored by auto-reconnect. Perhaps a resource that the client needs cannot be recreated. In this case, the client runtime calls the client’s connection exception handler and the client must take appropriate action to restore state. For additional information, see Handling Exceptions When Failover Occurs.
If the client is automatically-reconnected to a different broker instance, effects vary depending on the clustering model used.
In a conventional cluster, persistent messages produced but not yet consumed may only be delivered to the consumer after the original broker recovers. Other state information held by the failed or disconnected broker can be lost. The messages held by the original broker, once it is restored, might be delivered out of order.
In a high availability cluster, messages produced but not yet consumed continue to be delivered to the consumer without the original broker needing to recover.
A transacted session is the most reliable method of ensuring that a message isn’t lost if you are careful in coding the transaction. If auto-reconnect happens in the middle of a transaction, any attempt to produce or consume messages will cause the client runtime to throw a JMSException. In this case, applications must call Session.rollback() to roll back the transaction.
The Message Queue client runtime may throw a TransactionRolledBackException when Session.commit() is called during or after a failover occurs. In this case, the transaction is rolled back and a new transaction is automatically started. Applications are not required to call Session.rollback() to rollback the transaction after receiving a TransactionRolledBackException.
The Message Queue client runtime may throw a JMSException when Session.commit() is called during or after a failover occurs. In this case, the transaction state is unknown (may or may not be committed). Applications should call Session.rollback() to roll back the uncommitted transaction.
If you are using a high availability cluster, the only time your transaction might wind up in an unknown state is if it is not possible to reconnect to any brokers in the cluster. This should happen rarely if ever. For additional information, see Handling Exceptions When Failover Occurs.
Automatic reconnection affects producers and consumers differently:
During reconnection, producers cannot send messages. The production of messages (or any operation that involves communication with the message broker) is blocked until the connection is reestablished.
For consumers, automatic reconnection is supported for all client acknowledgment modes. After a connection is reestablished, the broker will redeliver all unacknowledged messages it had previously delivered, marking them with a Redeliver flag. The client can examine this flag to determine whether any message has already been consumed (but not yet acknowledged). In the case of nondurable subscribers, some messages might be lost because the broker does not hold their messages once their connections have been closed. Any messages produced for nondurable subscribers while the connection is down cannot be delivered when the connections is reestablished. For additional information, see Handling Exceptions When Failover Occurs.
Notice the following points when using the auto-reconnect feature:
Messages might be redelivered to a consumer after auto-reconnect takes place. In auto-acknowledge mode, you will get no more than one redelivered message. In other session types, all unacknowledged persistent messages are redelivered.
While the client runtime is trying to reconnect, any messages sent by the broker to nondurable topic consumers are lost.
Any messages that are in queue destinations and that are unacknowledged when a connection fails are redelivered after auto-reconnect. However, in the case of queues delivering to multiple consumers, these messages cannot be guaranteed to be redelivered to the original consumers. That is, as soon as a connection fails, an unacknowledged queue message might be rerouted to other connected consumers.
In the case of a conventional broker cluster, the failure of the master broker prevents the following operations from succeeding on any other broker in the cluster:
Creating or destroying a new durable subscription.
Creating or destroying a new physical destination using the imqcmd create dst command.
Starting a new broker process. (However, the brokers that are already running continue to function normally even if the master broker goes down.)
You can configure the master broker to restart automatically using Message Queue broker support for rc scripts or the Windows service manager.
Auto-reconnect doesn’t work if the client uses a ConnectionConsumer to consume messages. In that case, the client runtime throws an exception.
Several kinds of exceptions can occur as a result of the client being reconnected after a failover. How the client application should handle these exceptions depends on whether a session is transacted, on the kind of exception thrown, and on the client's role--as producer or consumer. The following sections discuss the implications of these factors.
Independently of how the exception is raised, the client application must never call System.exit()to exit the application because this would prevent the Message Queue client runtime from reconnecting to an alternate or restarted broker.
When a failover occurs, exception messages may be shown on the application's console and recorded in the broker's log. These messages are for information only. They may be useful in troubleshooting, but minimizing or eliminating the impact of a failover is best handled preemptively by the application client in the ways described in the following sections.
Message Queue provides a notification API that allows the client application to listen for closure and reconnection events and to respond to such events based on the notification type and the connection state. These notifications may be valuable in preparing the client for an impending event or for gathering diagnostic data. For more information, see Connection Event Notification
A transacted session might fail to commit and (throw an exception) either because a failover occurs while statements within the transaction are being executed or because the failover occurs during the call to Session.commit(). In the first case, the failover is said to occur during an open transaction; in the second case, the failover occurs during the commit itself.
In the case of a failover during an open transaction, when the client application calls Session.commit(), the client runtime will throw a TransactionRolledBackException and roll back the transaction causing the following to happen.
Messages that have been produced (but not committed) in the transacted session are discarded and not delivered to the consumer.
All messages that have been consumed (but not committed) in the transacted session are redelivered to the consumer with the Redeliver flag set.
A new transaction is automatically started.
If the client application itself had called Session.rollback after a failover (before the Session.commit is executed) the same things would happen as if the application had received a TransactionRollbackException. After receiving a TransactionRollbackException or calling Session.rollback(), the client application must retry the failed transaction. That is, it must re-send and re-consume the messages that were involved in the failed-over transaction.
In the second case, when the failover occurs during a call to Session.commit, there may be three outcomes:
The transaction is committed successfully and the call to Session.commit does not return an exception. In this case, the application client does not have to do anything.
The runtime throws a TransactionRolledbackException and does not commit the transaction. The transaction is automatically rolled back by the Message Queue runtime. In this case, the client application must retry the transaction as described for the case in which an open transaction is failed-over.
A JMXException is thrown. This signals the fact that the transaction state is unknown: It might have either succeeded or failed. A client application should handle this case by assuming failure, pausing for three seconds, calling Session.rollback, and then retrying the operations. However, since the commit might have succeeded, when retrying the transacted operations, a producer should set application-specific properties on the messages it re-sends to signal that these might be duplicate messages. Likewise, consumers that retry receive operations should not assume that a message that is redelivered is necessarily a duplicate. In other words, to ensure once and only once delivery, both producers and consumers need to do a little extra work to handle this edge case. The code samples presented next illustrate good coding practices for handling this situation.
If you are using a high availability cluster, the only time this condition might arise is when the client is unable to connect to any backup broker. This should be extremely rare.
The next two examples illustrate how stand-alone Message Queue producers and consumers should handle transactions during a failover. To run the sample programs, do the following:
Start two high availability brokers. The brokers can be on the same machine or on different machines, but they must be in the same cluster.
Start the example programs. For example:
java —DimqAddressList="localhost:777" test.jmsclient.ha.FailoverQSender
java —DimqAddressList="localhost:777" test.jmsclient.ha.FailoverQReceiver
It does not matter in what order you start the programs. The only property that you must specify is imqAddressList. The client application will be automatically failed over to a backup broker if the connection to its home broker fails. (The imqReconnectEnabled and imqAddressListIterations properties are ignored for a high availability cluster.)
Kill the broker to which the producing or consuming application is connected. The clients will reconnect, validate, and continue the failed transaction. A message produced or consumed in a transaction is either committed or rolled back after a successful failover.
You can restart the dead broker and retry the failover operation by killing the new home broker.
The following code sample shows the work that a producer in a transacted session needs to do to recover state after a failover. Note how the application tests both for rollback exceptions and for JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/* * @(#)FailoverQSender.java 1.2 07/04/20 * * Copyright 2000 Sun Microsystems, Inc. All Rights Reserved * SUN PROPRIETARY/CONFIDENTIAL * Use is subject to license terms. * */ package test.jmsclient.ha; import java.util.Date; import javax.jms.*; import com.sun.messaging.jms.Connection; import com.sun.messaging.jms.notification.*; /** * * This sample program uses a transacted session to send messages. * It is designed to run with test.jmsclient.ha.FailoverQReceiver * @version 1.0 */ public class FailoverQSender implements ExceptionListener, EventListener, Runnable { //constant - commit property name public static final String COMMIT_PROPERTY_NAME = "COMMIT_PROPERTY"; //constant - message counter public static final String MESSAGE_COUNTER = "counter"; //constant - destination name public static final String TEST_DEST_NAME = "FailoverTestDest001"; //queue connection QueueConnection conn = null; //session QueueSession session = null; //queue sender QueueSender sender = null; //queue destination Queue queue = null; //commmitted counter. private int commitCounter = 0; //current message counter private int currentCounter = 0; //set to true if the application is connected to the broker. private boolean isConnected = false; /** * Default constructor - do nothing. * Properties are passed in from init() method. */ public FailoverQSender() { //set up JMS environment setup(); } /** * Connection Exception listener. */ public void onException (JMSException e) { //The run() method will exit. this.isConnected = false; log ("Exception listener is called. Connection is closed by MQ client runtime." ); log (e); } /** * this method is called when a MQ connection event occurred. */ public void onEvent (Event connectionEvent) { log(connectionEvent); } /** * Rollback the application data. * */ private void rollBackApplication() { this.currentCounter = this.commitCounter; log ("Application rolled back., current (commit) counter: " + currentCounter); } /** * Roll back the current jms session. */ private void rollBackJMS() { try { log("Rolling back JMS ...., commit counter: " + commitCounter); session.rollback(); } catch (JMSException jmse) { log("Rollback failed"); log(jmse); //application may decide to log and continue sending messages // without closing the application. close(); } } /** * rollback application data and jms session. * */ private void rollBackAll() { //rollback jms rollBackJMS(); //rollback app data rollBackApplication(); } /** * close JMS connection and stop the application * */ private void close() { try { if ( conn != null ) { //close the connection conn.close(); } } catch (Exception e) { //log exception log (e); } finally { //set flag to true. application thread will exit isConnected = false; } } /** * Send messages in a loop until the connection is closed. * Session is committed for each message sent. */ public void run () { //start producing messages while (isConnected) { try { //reset message counter if it reaches max int value checkMessageCounter(); //create a message Message m = session.createMessage(); //get the current message counter value int messageCounter = this.getMessageCounter(); //set message counter to message property m.setIntProperty(MESSAGE_COUNTER, messageCounter); //set commit property m.setBooleanProperty(COMMIT_PROPERTY_NAME, true); //send the message sender.send(m); log("Sending message: " + messageCounter + ", current connected broker: " + this.getCurrentConnectedBrokerAddress()); //commit the message this.commit(); // pause 3 seconds sleep(3000); } catch (TransactionRolledBackException trbe) { //rollback app data rollBackApplication(); } catch (JMSException jmse) { if (isConnected == true) { //rollback app data and JMS session rollBackAll(); } } } } /** * Reset all counters if integer max value is reached. */ private void checkMessageCounter() { if ( currentCounter == Integer.MAX_VALUE ) { currentCounter = 0; commitCounter = 0; } } /** * Set up testing parameters - connection, destination, etc */ protected void setup() { try { //get connection factory com.sun.messaging.QueueConnectionFactory factory = new com.sun.messaging.QueueConnectionFactory(); //create a queue connection conn = factory.createQueueConnection(); //set exception listener conn.setExceptionListener(this); //set event listener ( (com.sun.messaging.jms.Connection) conn).setEventListener(this); //get destination name String destName = TEST_DEST_NAME; //create a transacted session session = conn.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); //get destination queue = session.createQueue(destName); //create queue sender sender = session.createSender(queue); //set isConnected flag to true. this.isConnected = true; } catch (JMSException jmse) { this.isConnected = false; } } /** * get the next message counter. */ private synchronized int getMessageCounter () { return ++ currentCounter; } /** * commit the current transaction/session. */ private void commit() throws JMSException { session.commit(); this.commitCounter = currentCounter; log ("Transaction committed, commit counter: " +commitCounter); } /** * Get the current connencted broker address. */ private String getCurrentConnectedBrokerAddress() { return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress(); } /** * log a string message. * @param msg */ private synchronized void log (String msg) { System.out.println(new Date() + ": " + msg); } /** * Log an exception received. */ private synchronized void log (Exception e) { System.out.println(new Date() + ": Exception:"); e.printStackTrace(); } /** * Log the specified MQ event. */ private synchronized void log (Event event) { try { System.out.println(new Date() + ": Received MQ event notification."); System.out.println("*** Event code: " + event.getEventCode() ); System.out.println("*** Event message: " + event.getEventMessage()); } catch (Exception e) { e.printStackTrace(); } } /** * pause the specified milli seconds. */ private void sleep (long millis) { try { Thread.sleep(millis); } catch (java.lang.InterruptedException inte) { log (inte); } } /** * The main program. */ public static void main (String args[]) { FailoverQSender fp = new FailoverQSender(); fp.run(); } }
The following code sample shows the work that a consumer in a transacted session needs to do in order to recover state after a failover. Note how the application tests both for rollback exceptions and JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/* * @(#)FailoverQReceiver.java 1.4 07/04/20 * * Copyright 2000 Sun Microsystems, Inc. All Rights Reserved * SUN PROPRIETARY/CONFIDENTIAL * Use is subject to license terms. */ package test.jmsclient.ha; import java.util.Date; import java.util.Vector; import javax.jms.*; import com.sun.messaging.jms.notification.*; /** * This sample program uses a transacted session to receive messages. * It is designed to run with test.jmsclient.ha.FailoverQSender. * * @version 1.0 */ public class FailoverQReceiver implements ExceptionListener, EventListener, Runnable { //queue connection private QueueConnection conn = null; //queue session private QueueSession session = null; //qreceiver private QueueReceiver qreceiver = null; //queue destination private Queue queue = null; //commmitted counter. private int commitCounter = 0; //flag to indicate if the connection is connected to the broker. private boolean isConnected = false; //flag to indicate if current connection is to HA broker cluster. private boolean isHAConnection = false; //application data holder. private Vector data = new Vector(); /** * Default constructor - JMS setup. */ public FailoverQReceiver() { //set up JMS environment setup(); } /** * Connection Exception listener. */ public void onException (JMSException e) { //The run() method will exit. this.isConnected = false; log ("Exception listener is called. Connection is closed by MQ client runtime." ); log (e); } /** * log the connection event. */ public void onEvent (Event connectionEvent) { log (connectionEvent); } /** * Roll back application data. */ private void rollBackApplication() { //reset application data this.reset(); log ("Rolled back application data, current commit counter: " + commitCounter); } /** * Clear the application data for the current un-committed transaction. */ private void reset() { data.clear(); } /** * Roll back JMS transaction and application. */ private void rollBackAll() { try { //rollback JMS rollBackJMS(); //rollback application data rollBackApplication(); } catch (Exception e) { log ("rollback failed. closing JMS connection ..."); //application may decide NOT to close connection if rollback failed. close(); } } /** * Roll back jms session. */ private void rollBackJMS() throws JMSException { session.rollback(); log("JMS session rolled back ...., commit counter: " + commitCounter); } /** * Close JMS connection and exit the application. */ private void close() { try { if ( conn != null ) { conn.close(); } } catch (Exception e) { log (e); } finally { isConnected = false; } } /** * Receive, validate, and commit messages. */ public void run () { //produce messages while (isConnected) { try { //receive message Message m = qreceiver.receive(); //process message -- add message to the data holder processMessage(m); //check if the commit flag is set in the message property if ( shouldCommit(m) ) { //commit the transaction commit(m); } } catch (TransactionRolledBackException trbe) { log ("transaction rolled back by MQ ..."); //rollback application data rollBackApplication(); } catch (JMSException jmse) { //The exception can happen when receiving messages //and the connected broker is killed. if ( isConnected == true ) { //rollback MQ and application data rollBackAll(); } } catch (Exception e) { log (e); //application may decide NOT to close the connection //when an unexpected Exception occurred. close(); } } } /** * Set up testing parameters - connection, destination, etc */ protected void setup() { try { //get connection factory com.sun.messaging.QueueConnectionFactory factory = new com.sun.messaging.QueueConnectionFactory(); //create jms connection conn = factory.createQueueConnection(); //set exception listener conn.setExceptionListener(this); //set event listener ( (com.sun.messaging.jms.Connection) conn).setEventListener(this); //test if this is a HA connection isHAConnection = ( (com.sun.messaging.jms.Connection) conn).isConnectedToHABroker(); log ("Is connected to HA broker cluster: " + isHAConnection); //get destination name String destName = FailoverQSender.TEST_DEST_NAME; //create a transacted session session = conn.createQueueSession(true, -1); //get destination queue = session.createQueue(destName); //create queue receiver qreceiver = session.createReceiver(queue); //set isConnected flag to true isConnected = true; //start the JMS connection conn.start(); log("Ready to receive on destination: " + destName); } catch (JMSException jmse) { isConnected = false; log (jmse); close(); } } /** * Check if we should commit the transaction. */ private synchronized boolean shouldCommit(Message m) { boolean flag = false; try { //get the commit flag set by the FailoverQSender flag = m.getBooleanProperty(FailoverQSender.COMMIT_PROPERTY_NAME); if ( flag ) { //check if message property contains expected message counter validate (m); } } catch (JMSException jmse) { log (jmse); } return flag; } /** * A very simple validation only. More logic may be added to validate * message ordering and message content. * @param m Message The last message received for the current transaction. */ private void validate (Message m) { try { //get message counter property int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER); //The counter is set sequentially and must be received in right order. //Each message is committed after validated. if (counter != (commitCounter + 1)) { this.printData(); throw new RuntimeException("validation failed."); } log ("messages validated. ready to commit ..."); } catch (JMSException jmse) { log (jmse); printData(); throw new RuntimeException("Exception occurred during validation: " + jmse); } } /** * Get the message counter and put it in the data holder. * @param m the current message received */ private synchronized void processMessage(Message m) throws JMSException { // get message counter. this value is set by the FailoverQSender. int ct = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER); // log the message log("received message: " + ct +", current connected broker: " + this.getCurrentConnectedBrokerAddress()); // saved the data in data holder. data.addElement(new Integer(ct)); } /** * commit the current transaction. * @param m the last received message to be committed. * @throws JMSException if commit failed. */ private void commit(Message m) throws JMSException { //commit the transaction session.commit(); //get the current message counter int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER); //set the commit counter commitCounter = counter; //clear app data this.reset(); log ("Messages committed, commitCounter: " + commitCounter); } /** * log exception. */ private synchronized void log (Exception e) { System.out.println(new Date() + ": Exception Stack Trace: "); e.printStackTrace(); } /** * log connection event. */ private synchronized void log (Event event) { try { System.out.println(new Date() + ": Received MQ event notification."); System.out.println("*** Event Code: " + event.getEventCode() ); System.out.println("*** Event message: " + event.getEventMessage()); } catch (Exception e) { e.printStackTrace(); } } /** * Log the specified message. */ private void log (String msg) { System.out.println(new Date() + ": " + msg); } /** * print values stored in the data holder. * */ private void printData() { for ( int i=0; i< data.size(); i++) { log (" *** data index " + i + " = " + data.elementAt(i) ); } } private String getCurrentConnectedBrokerAddress() { return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress(); } /** * The main method. This starts the failover queue receiver. */ public static void main (String args[]) { FailoverQReceiver fqr = new FailoverQReceiver(); fqr.run(); } }
If a connection is failed-over for a producer in a non-transacted session, a client application may receive a JMSException. The application thread that receives the exception should pause for a few seconds and then resend the messages. The client application may want to set a flag on the resent messages to indicate that they could be duplicates.
If a connection is failed over for a message consumer, the consequences vary with the sessions acknowledge mode:
In client-acknowledge mode, calling Message.acknowledge or MessageConsumer.receive during a failover will raise a JMSException. The consumer should call Session.recover to recover or re-deliver the unacknowledged messages and then call Message.acknowledge or MessageConsumer.receive.
In auto-acknowledge mode, after getting a JMSException, the synchronous consumer should pause a few seconds and then call MessageConsumer.receive to continue receiving messages. Any message that failed to be acknowledged (due to the failover) will be redelivered with the redelivered flags set to true.
In dups-OK-acknowledge mode, the synchronous consumer should pause a few seconds after getting an exception and then call MessageConsumer.receive to continue receiving messages. In this case, it's possible that messages delivered and acknowledged (before the failover) could be redelivered.
The following code sample illustrates good coding practices for handling exceptions during a failover. It is designed to send non-transacted, persistent messages forever and to handle JMSExceptions when a failover occurs. The program is able to handle either a true or false setting for the imqReconnectEnabled property. To run the program enter one of the following commands.
java dura.example.FailoverProducer
java -DimqReconnectEnabled=true dura.example.FailoverProducer
/* * @(#)FailoverProducer.java 1.1 06/06/09 * Copyright 2006 Sun Microsystems, Inc. All Rights Reserved * SUN PROPRIETARY/CONFIDENTIAL * Use is subject to license terms. */ package dura.example; import javax.jms.*; import com.sun.messaging.ConnectionConfiguration; import java.util.*; public class FailoverProducer implements ExceptionListener { //connection factory private com.sun.messaging.TopicConnectionFactory factory; //connection private TopicConnection pconn = null; //session private TopicSession psession = null; //publisher private TopicPublisher publisher = null; //topic private Topic topic = null; //This flag indicates whether this test client is closed. private boolean isClosed = false; //auto reconnection flag private boolean autoReconnect = false; //destination name for this example. private static final String DURA_TEST_TOPIC = "DuraTestTopic"; //the message counter property name public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER"; //the message in-doubt-bit property name public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT"; /** * Constructor. Get imqReconnectEnabled property value from * System property. */ public FailoverProducer () { try { autoReconnect = Boolean.getBoolean(ConnectionConfiguration.imqReconnectEnabled); } catch (Exception e) { this.printException(e); } } /** * Connection is broken if this handler is called. * If autoReconnect flag is true, this is called only * if no more retries from MQ. */ public void onException (JMSException jmse) { this.printException (jmse); } /** * create MQ connection factory. * @throws JMSException */ private void initFactory() throws JMSException { //get connection factory factory = new com.sun.messaging.TopicConnectionFactory(); } /** * JMS setup. Create a Connection,Session, and Producer. * * If any of the JMS object creation fails (due to system failure), * it retries until it succeeds. * */ private void initProducer() { boolean isConnected = false; while ( isClosed == false && isConnected == false ) { try { println("producer client creating connection ..."); //create connection pconn = factory.createTopicConnection(); //set connection exception listener pconn.setExceptionListener(this); //create topic session psession = pconn.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); //get destination topic = psession.createTopic(DURA_TEST_TOPIC); //publisher publisher = psession.createPublisher(topic); //set flag to true isConnected = true; println("producer ready."); } catch (Exception e) { println("*** connect failed ... sleep for 5 secs."); try { //close resources. if ( pconn != null ) { pconn.close(); } //pause 5 secs. Thread.sleep(5000); } catch (Exception e1) { ; } } } } /** * Start test. This sends JMS messages in a loop (forever). */ public void run () { try { //create MQ connection factory. initFactory(); //create JMS connection,session, and producer initProducer(); //send messages forever. sendMessages(); } catch (Exception e) { this.printException(e); } } /** * Send persistent messages to a topic forever. This shows how * to handle failover for a message producer. */ private void sendMessages() { //this is set to true if send failed. boolean messageInDoubt = false; //message to be sent TextMessage m = null; //msg counter long msgcount = 0; while (isClosed == false) { try { /** * create a text message */ m = psession.createTextMessage(); /** * the MESSAGE_IN_DOUBT bit is set to true if * you get an exception for the last message. */ if ( messageInDoubt == true ) { m.setBooleanProperty (MESSAGE_IN_DOUBT, true); messageInDoubt = false; println("MESSAGE_IN_DOUBT bit is set to true for msg: " + msgcount); } else { m.setBooleanProperty (MESSAGE_IN_DOUBT, false); } //set message counter m.setLongProperty(MESSAGE_COUNTER, msgcount); //set message body m.setText("msg: " + msgcount); //send the msg publisher.send(m, DeliveryMode.PERSISTENT, 4, 0); println("sent msg: " + msgcount); /** * reset counetr if reached max long value. */ if (msgcount == Long.MAX_VALUE) { msgcount = 0; println ("Reset message counter to 0."); } //increase counter msgcount ++; Thread.sleep(1000); } catch (Exception e) { if ( isClosed == false ) { //set in doubt bit to true. messageInDoubt = true; this.printException(e); //init producer only if auto reconnect is false. if ( autoReconnect == false ) { this.initProducer(); } } } } } /** * Close this example program. */ public synchronized void close() { try { isClosed = true; pconn.close(); notifyAll(); } catch (Exception e) { this.printException(e); } } /** * print the specified exception. * @param e the exception to be printed. */ private void printException (Exception e) { System.out.println(new Date().toString()); e.printStackTrace(); } /** * print the specified message. * @param msg the message to be printed. */ private void println (String msg) { System.out.println(new Date() + ": " + msg); } /** * Main program to start this example. */ public static void main (String args[]) { FailoverProducer fp = new FailoverProducer(); fp.run(); } }
The following code sample, FailoverConsumer, illustrates good coding practices for handling exceptions during a failover. The transacted session is able to receive messages forever. The program sets the auto reconnect property to true, requiring the Message Queue runtime to automatically perform a reconnect when the connected broker fails or is killed. It is designed to work with the dura.example.FailoverProducer, shown in the previous section.
To run this program enter the following command.
java dura.example.FailoverConsumer
/* * @(#)FailoverConsumer.java 1.1 06/06/09 * Copyright 2006 Sun Microsystems, Inc. All Rights Reserved * SUN PROPRIETARY/CONFIDENTIAL * Use is subject to license terms. * */ package dura.example; import java.util.Date; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TransactionRolledBackException; import com.sun.messaging.ConnectionConfiguration; public class FailoverConsumer implements ExceptionListener, Runnable { //JMS connection private Connection conn = null; //JMS session private Session session = null; //JMS Message consumer private MessageConsumer messageConsumer = null; //JMS destination. private Destination destination = null; //flag indicates whether this program should continue running. private boolean isConnected = false; //destination name. private static final String DURA_TEST_TOPIC = "DuraTestTopic"; //the commit counter, for information only. private long commitCounter = 0; /** * message counter property set by the producer. */ public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER"; /** * Message in doubt bit set by the producer */ public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT"; /** * receive time out */ public static final long RECEIVE_TIMEOUT = 0; /** * Default constructor - * Set up JMS Environment. */ public FailoverConsumer() { setup(); } /* Connection Exception listener. This is called when connection * breaks and no reconnect attempts are performed by MQ client runtime. */ public void onException (JMSException e) { print ("Reconnect failed. Shutting down the connection ..."); /** * Set this flag to false so that the run() method will exit. */ this.isConnected = false; e.printStackTrace(); } /** * Best effort to roll back a jms session. When a broker crashes, an * open-transaction should be rolled back. But the re-started broker * may not have the uncommitted tranaction information due to system * failure. In a situation like this, an application can just quit * calling rollback after retrying a few times The uncommitted * transaction (resources) will eventually be removed by the broker. */ private void rollBackJMS() { //rollback fail count int failCount = 0; boolean keepTrying = true; while ( keepTrying ) { try { print ("<<< rolling back JMS ...., consumer commit counter: " + this.commitCounter); session.rollback(); print("<<< JMS rolled back ...., consumer commit counter: " + this.commitCounter); keepTrying = false; } catch (JMSException jmse) { failCount ++; jmse.printStackTrace(); sleep (3000); //3 secs if ( failCount == 1 ) { print ("<<< rollback failed : total count" + failCount); keepTrying = false; } } } } /** * Close the JMS connection and exit the program. * */ private void close() { try { if ( conn != null ) { conn.close(); } } catch (Exception e) { e.printStackTrace(); } finally { this.isConnected = false; } } /*Receive messages in a loop until closed.*/ public void run () { while (isConnected) { try { /*receive message with specified timeout.*/ Message m = messageConsumer.receive(RECEIVE_TIMEOUT); /* process the message. */ processMessage(m); /* commit JMS transaction. */ this.commit(); /*increase the commit counter.*/ this.commitCounter ++; } catch (TransactionRolledBackException trbe) { /** * the transaction is rolled back * a new transaction is automatically started. */ trbe.printStackTrace(); } catch (JMSException jmse) { /* The transaction is in unknown state. * We need to roll back the transaction.*/ jmse.printStackTrace(); /* roll back if not closed. */ if ( this.isConnected == true ) { this.rollBackJMS(); } } catch (Exception e) { e.printStackTrace(); /* Exit if this is an unexpected Exception. */ this.close(); } finally { ;//do nothing } } print(" <<< consumer exit ..."); } /*Set up connection, destination, etc*/ / protected void setup() { try { //create connection factory com.sun.messaging.ConnectionFactory factory = new com.sun.messaging.ConnectionFactory(); //set auto reconnect to true. factory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true"); //A value of -1 will retry forever if connection is broken. factory.setProperty(ConnectionConfiguration.imqReconnectAttempts, "-1"); //retry interval - every 10 seconds factory.setProperty(ConnectionConfiguration.imqReconnectInterval, "10000"); //create connection conn = factory.createConnection(); //set client ID conn.setClientID(DURA_TEST_TOPIC); //set exception listener conn.setExceptionListener(this); //create a transacted session session = conn.createSession(true, -1); //get destination destination = session.createTopic(DURA_TEST_TOPIC); //message consumer messageConsumer = session.createDurableSubscriber((Topic)destination, DURA_TEST_TOPIC); //set flag to true this.isConnected = true; //we are ready, start the connection conn.start(); print("<<< Ready to receive on destination: " + DURA_TEST_TOPIC); } catch (JMSException jmse) { this.isConnected = false; jmse.printStackTrace(); this.close(); } } /** * Process the received message message. * This prints received message counter. * @param m the message to be processed. */ private synchronized void processMessage(Message m) { try { //in this example, we do not expect a timeout, etc. if ( m == null ) { throw new RuntimeException ("<<< Received null message. Maybe reached max time out. "); } //get message counter property long msgCtr = m.getLongProperty (MESSAGE_COUNTER); //get message in-doubt bit boolean indoubt = m.getBooleanProperty(MESSAGE_IN_DOUBT); if ( indoubt) { print("<<< received message: " + msgCtr + ", indoubt bit is true"); } else { print("<<< received message: " + msgCtr); } } catch (JMSException jmse) { jmse.printStackTrace(); } } /** * Commit a JMS transaction. * @throws JMSException */ private void commit() throws JMSException { session.commit(); } /** * Sleep for the specified time. * @param millis sleep time in milli-seconds. */ private void sleep (long millis) { try { Thread.sleep(millis); } catch (java.lang.InterruptedException inte) { print (inte); } } /** * Print the specified message. * @param msg the message to be printed. */ private static void print (String msg) { System.out.println(new Date() + ": " + msg); } /** * Print Exception stack trace. * @param e the exception to be printed. */ private static void print (Exception e) { System.out.print(e.getMessage()); e.printStackTrace(); } /** * Start this example program. */ public static void main (String args[]) { FailoverConsumer fc = new FailoverConsumer(); fc.run(); } }