A transacted session might fail to commit and (throw an exception) either because a failover occurs while statements within the transaction are being executed or because the failover occurs during the call to Session.commit(). In the first case, the failover is said to occur during an open transaction; in the second case, the failover occurs during the commit itself.
In the case of a failover during an open transaction, when the client application calls Session.commit(), the client runtime will throw a TransactionRolledBackException and roll back the transaction causing the following to happen.
Messages that have been produced (but not committed) in the transacted session are discarded and not delivered to the consumer.
All messages that have been consumed (but not committed) in the transacted session are redelivered to the consumer with the Redeliver flag set.
A new transaction is automatically started.
If the client application itself had called Session.rollback after a failover (before the Session.commit is executed) the same things would happen as if the application had received a TransactionRollbackException. After receiving a TransactionRollbackException or calling Session.rollback(), the client application must retry the failed transaction. That is, it must re-send and re-consume the messages that were involved in the failed-over transaction.
In the second case, when the failover occurs during a call to Session.commit, there may be three outcomes:
The transaction is committed successfully and the call to Session.commit does not return an exception. In this case, the application client does not have to do anything.
The runtime throws a TransactionRolledbackException and does not commit the transaction. The transaction is automatically rolled back by the Message Queue runtime. In this case, the client application must retry the transaction as described for the case in which an open transaction is failed-over.
A JMXException is thrown. This signals the fact that the transaction state is unknown: It might have either succeeded or failed. A client application should handle this case by assuming failure, pausing for three seconds, calling Session.rollback, and then retrying the operations. However, since the commit might have succeeded, when retrying the transacted operations, a producer should set application-specific properties on the messages it re-sends to signal that these might be duplicate messages. Likewise, consumers that retry receive operations should not assume that a message that is redelivered is necessarily a duplicate. In other words, to ensure once and only once delivery, both producers and consumers need to do a little extra work to handle this edge case. The code samples presented next illustrate good coding practices for handling this situation.
If you are using a high availability cluster, the only time this condition might arise is when the client is unable to connect to any backup broker. This should be extremely rare.
The next two examples illustrate how stand-alone Message Queue producers and consumers should handle transactions during a failover. To run the sample programs, do the following:
Start two high availability brokers. The brokers can be on the same machine or on different machines, but they must be in the same cluster.
Start the example programs. For example:
java —DimqAddressList="localhost:777" test.jmsclient.ha.FailoverQSender
java —DimqAddressList="localhost:777" test.jmsclient.ha.FailoverQReceiver
It does not matter in what order you start the programs. The only property that you must specify is imqAddressList. The client application will be automatically failed over to a backup broker if the connection to its home broker fails. (The imqReconnectEnabled and imqAddressListIterations properties are ignored for a high availability cluster.)
Kill the broker to which the producing or consuming application is connected. The clients will reconnect, validate, and continue the failed transaction. A message produced or consumed in a transaction is either committed or rolled back after a successful failover.
You can restart the dead broker and retry the failover operation by killing the new home broker.
The following code sample shows the work that a producer in a transacted session needs to do to recover state after a failover. Note how the application tests both for rollback exceptions and for JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/* * @(#)FailoverQSender.java 1.2 07/04/20 * * Copyright 2000 Sun Microsystems, Inc. All Rights Reserved * SUN PROPRIETARY/CONFIDENTIAL * Use is subject to license terms. * */ package test.jmsclient.ha; import java.util.Date; import javax.jms.*; import com.sun.messaging.jms.Connection; import com.sun.messaging.jms.notification.*; /** * * This sample program uses a transacted session to send messages. * It is designed to run with test.jmsclient.ha.FailoverQReceiver * @version 1.0 */ public class FailoverQSender implements ExceptionListener, EventListener, Runnable { //constant - commit property name public static final String COMMIT_PROPERTY_NAME = "COMMIT_PROPERTY"; //constant - message counter public static final String MESSAGE_COUNTER = "counter"; //constant - destination name public static final String TEST_DEST_NAME = "FailoverTestDest001"; //queue connection QueueConnection conn = null; //session QueueSession session = null; //queue sender QueueSender sender = null; //queue destination Queue queue = null; //commmitted counter. private int commitCounter = 0; //current message counter private int currentCounter = 0; //set to true if the application is connected to the broker. private boolean isConnected = false; /** * Default constructor - do nothing. * Properties are passed in from init() method. */ public FailoverQSender() { //set up JMS environment setup(); } /** * Connection Exception listener. */ public void onException (JMSException e) { //The run() method will exit. this.isConnected = false; log ("Exception listener is called. Connection is closed by MQ client runtime." ); log (e); } /** * this method is called when a MQ connection event occurred. */ public void onEvent (Event connectionEvent) { log(connectionEvent); } /** * Rollback the application data. * */ private void rollBackApplication() { this.currentCounter = this.commitCounter; log ("Application rolled back., current (commit) counter: " + currentCounter); } /** * Roll back the current jms session. */ private void rollBackJMS() { try { log("Rolling back JMS ...., commit counter: " + commitCounter); session.rollback(); } catch (JMSException jmse) { log("Rollback failed"); log(jmse); //application may decide to log and continue sending messages // without closing the application. close(); } } /** * rollback application data and jms session. * */ private void rollBackAll() { //rollback jms rollBackJMS(); //rollback app data rollBackApplication(); } /** * close JMS connection and stop the application * */ private void close() { try { if ( conn != null ) { //close the connection conn.close(); } } catch (Exception e) { //log exception log (e); } finally { //set flag to true. application thread will exit isConnected = false; } } /** * Send messages in a loop until the connection is closed. * Session is committed for each message sent. */ public void run () { //start producing messages while (isConnected) { try { //reset message counter if it reaches max int value checkMessageCounter(); //create a message Message m = session.createMessage(); //get the current message counter value int messageCounter = this.getMessageCounter(); //set message counter to message property m.setIntProperty(MESSAGE_COUNTER, messageCounter); //set commit property m.setBooleanProperty(COMMIT_PROPERTY_NAME, true); //send the message sender.send(m); log("Sending message: " + messageCounter + ", current connected broker: " + this.getCurrentConnectedBrokerAddress()); //commit the message this.commit(); // pause 3 seconds sleep(3000); } catch (TransactionRolledBackException trbe) { //rollback app data rollBackApplication(); } catch (JMSException jmse) { if (isConnected == true) { //rollback app data and JMS session rollBackAll(); } } } } /** * Reset all counters if integer max value is reached. */ private void checkMessageCounter() { if ( currentCounter == Integer.MAX_VALUE ) { currentCounter = 0; commitCounter = 0; } } /** * Set up testing parameters - connection, destination, etc */ protected void setup() { try { //get connection factory com.sun.messaging.QueueConnectionFactory factory = new com.sun.messaging.QueueConnectionFactory(); //create a queue connection conn = factory.createQueueConnection(); //set exception listener conn.setExceptionListener(this); //set event listener ( (com.sun.messaging.jms.Connection) conn).setEventListener(this); //get destination name String destName = TEST_DEST_NAME; //create a transacted session session = conn.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); //get destination queue = session.createQueue(destName); //create queue sender sender = session.createSender(queue); //set isConnected flag to true. this.isConnected = true; } catch (JMSException jmse) { this.isConnected = false; } } /** * get the next message counter. */ private synchronized int getMessageCounter () { return ++ currentCounter; } /** * commit the current transaction/session. */ private void commit() throws JMSException { session.commit(); this.commitCounter = currentCounter; log ("Transaction committed, commit counter: " +commitCounter); } /** * Get the current connencted broker address. */ private String getCurrentConnectedBrokerAddress() { return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress(); } /** * log a string message. * @param msg */ private synchronized void log (String msg) { System.out.println(new Date() + ": " + msg); } /** * Log an exception received. */ private synchronized void log (Exception e) { System.out.println(new Date() + ": Exception:"); e.printStackTrace(); } /** * Log the specified MQ event. */ private synchronized void log (Event event) { try { System.out.println(new Date() + ": Received MQ event notification."); System.out.println("*** Event code: " + event.getEventCode() ); System.out.println("*** Event message: " + event.getEventMessage()); } catch (Exception e) { e.printStackTrace(); } } /** * pause the specified milli seconds. */ private void sleep (long millis) { try { Thread.sleep(millis); } catch (java.lang.InterruptedException inte) { log (inte); } } /** * The main program. */ public static void main (String args[]) { FailoverQSender fp = new FailoverQSender(); fp.run(); } }
The following code sample shows the work that a consumer in a transacted session needs to do in order to recover state after a failover. Note how the application tests both for rollback exceptions and JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/* * @(#)FailoverQReceiver.java 1.4 07/04/20 * * Copyright 2000 Sun Microsystems, Inc. All Rights Reserved * SUN PROPRIETARY/CONFIDENTIAL * Use is subject to license terms. */ package test.jmsclient.ha; import java.util.Date; import java.util.Vector; import javax.jms.*; import com.sun.messaging.jms.notification.*; /** * This sample program uses a transacted session to receive messages. * It is designed to run with test.jmsclient.ha.FailoverQSender. * * @version 1.0 */ public class FailoverQReceiver implements ExceptionListener, EventListener, Runnable { //queue connection private QueueConnection conn = null; //queue session private QueueSession session = null; //qreceiver private QueueReceiver qreceiver = null; //queue destination private Queue queue = null; //commmitted counter. private int commitCounter = 0; //flag to indicate if the connection is connected to the broker. private boolean isConnected = false; //flag to indicate if current connection is to HA broker cluster. private boolean isHAConnection = false; //application data holder. private Vector data = new Vector(); /** * Default constructor - JMS setup. */ public FailoverQReceiver() { //set up JMS environment setup(); } /** * Connection Exception listener. */ public void onException (JMSException e) { //The run() method will exit. this.isConnected = false; log ("Exception listener is called. Connection is closed by MQ client runtime." ); log (e); } /** * log the connection event. */ public void onEvent (Event connectionEvent) { log (connectionEvent); } /** * Roll back application data. */ private void rollBackApplication() { //reset application data this.reset(); log ("Rolled back application data, current commit counter: " + commitCounter); } /** * Clear the application data for the current un-committed transaction. */ private void reset() { data.clear(); } /** * Roll back JMS transaction and application. */ private void rollBackAll() { try { //rollback JMS rollBackJMS(); //rollback application data rollBackApplication(); } catch (Exception e) { log ("rollback failed. closing JMS connection ..."); //application may decide NOT to close connection if rollback failed. close(); } } /** * Roll back jms session. */ private void rollBackJMS() throws JMSException { session.rollback(); log("JMS session rolled back ...., commit counter: " + commitCounter); } /** * Close JMS connection and exit the application. */ private void close() { try { if ( conn != null ) { conn.close(); } } catch (Exception e) { log (e); } finally { isConnected = false; } } /** * Receive, validate, and commit messages. */ public void run () { //produce messages while (isConnected) { try { //receive message Message m = qreceiver.receive(); //process message -- add message to the data holder processMessage(m); //check if the commit flag is set in the message property if ( shouldCommit(m) ) { //commit the transaction commit(m); } } catch (TransactionRolledBackException trbe) { log ("transaction rolled back by MQ ..."); //rollback application data rollBackApplication(); } catch (JMSException jmse) { //The exception can happen when receiving messages //and the connected broker is killed. if ( isConnected == true ) { //rollback MQ and application data rollBackAll(); } } catch (Exception e) { log (e); //application may decide NOT to close the connection //when an unexpected Exception occurred. close(); } } } /** * Set up testing parameters - connection, destination, etc */ protected void setup() { try { //get connection factory com.sun.messaging.QueueConnectionFactory factory = new com.sun.messaging.QueueConnectionFactory(); //create jms connection conn = factory.createQueueConnection(); //set exception listener conn.setExceptionListener(this); //set event listener ( (com.sun.messaging.jms.Connection) conn).setEventListener(this); //test if this is a HA connection isHAConnection = ( (com.sun.messaging.jms.Connection) conn).isConnectedToHABroker(); log ("Is connected to HA broker cluster: " + isHAConnection); //get destination name String destName = FailoverQSender.TEST_DEST_NAME; //create a transacted session session = conn.createQueueSession(true, -1); //get destination queue = session.createQueue(destName); //create queue receiver qreceiver = session.createReceiver(queue); //set isConnected flag to true isConnected = true; //start the JMS connection conn.start(); log("Ready to receive on destination: " + destName); } catch (JMSException jmse) { isConnected = false; log (jmse); close(); } } /** * Check if we should commit the transaction. */ private synchronized boolean shouldCommit(Message m) { boolean flag = false; try { //get the commit flag set by the FailoverQSender flag = m.getBooleanProperty(FailoverQSender.COMMIT_PROPERTY_NAME); if ( flag ) { //check if message property contains expected message counter validate (m); } } catch (JMSException jmse) { log (jmse); } return flag; } /** * A very simple validation only. More logic may be added to validate * message ordering and message content. * @param m Message The last message received for the current transaction. */ private void validate (Message m) { try { //get message counter property int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER); //The counter is set sequentially and must be received in right order. //Each message is committed after validated. if (counter != (commitCounter + 1)) { this.printData(); throw new RuntimeException("validation failed."); } log ("messages validated. ready to commit ..."); } catch (JMSException jmse) { log (jmse); printData(); throw new RuntimeException("Exception occurred during validation: " + jmse); } } /** * Get the message counter and put it in the data holder. * @param m the current message received */ private synchronized void processMessage(Message m) throws JMSException { // get message counter. this value is set by the FailoverQSender. int ct = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER); // log the message log("received message: " + ct +", current connected broker: " + this.getCurrentConnectedBrokerAddress()); // saved the data in data holder. data.addElement(new Integer(ct)); } /** * commit the current transaction. * @param m the last received message to be committed. * @throws JMSException if commit failed. */ private void commit(Message m) throws JMSException { //commit the transaction session.commit(); //get the current message counter int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER); //set the commit counter commitCounter = counter; //clear app data this.reset(); log ("Messages committed, commitCounter: " + commitCounter); } /** * log exception. */ private synchronized void log (Exception e) { System.out.println(new Date() + ": Exception Stack Trace: "); e.printStackTrace(); } /** * log connection event. */ private synchronized void log (Event event) { try { System.out.println(new Date() + ": Received MQ event notification."); System.out.println("*** Event Code: " + event.getEventCode() ); System.out.println("*** Event message: " + event.getEventMessage()); } catch (Exception e) { e.printStackTrace(); } } /** * Log the specified message. */ private void log (String msg) { System.out.println(new Date() + ": " + msg); } /** * print values stored in the data holder. * */ private void printData() { for ( int i=0; i< data.size(); i++) { log (" *** data index " + i + " = " + data.elementAt(i) ); } } private String getCurrentConnectedBrokerAddress() { return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress(); } /** * The main method. This starts the failover queue receiver. */ public static void main (String args[]) { FailoverQReceiver fqr = new FailoverQReceiver(); fqr.run(); } }