The following code sample shows the work that a producer in a transacted session needs to do to recover state after a failover. Note how the application tests both for rollback exceptions and for JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/* * @(#)FailoverQSender.java 1.2 07/04/20 * * Copyright 2000 Sun Microsystems, Inc. All Rights Reserved * SUN PROPRIETARY/CONFIDENTIAL * Use is subject to license terms. * */ package test.jmsclient.ha; import java.util.Date; import javax.jms.*; import com.sun.messaging.jms.Connection; import com.sun.messaging.jms.notification.*; /** * * This sample program uses a transacted session to send messages. * It is designed to run with test.jmsclient.ha.FailoverQReceiver * @version 1.0 */ public class FailoverQSender implements ExceptionListener, EventListener, Runnable { //constant - commit property name public static final String COMMIT_PROPERTY_NAME = "COMMIT_PROPERTY"; //constant - message counter public static final String MESSAGE_COUNTER = "counter"; //constant - destination name public static final String TEST_DEST_NAME = "FailoverTestDest001"; //queue connection QueueConnection conn = null; //session QueueSession session = null; //queue sender QueueSender sender = null; //queue destination Queue queue = null; //commmitted counter. private int commitCounter = 0; //current message counter private int currentCounter = 0; //set to true if the application is connected to the broker. private boolean isConnected = false; /** * Default constructor - do nothing. * Properties are passed in from init() method. */ public FailoverQSender() { //set up JMS environment setup(); } /** * Connection Exception listener. */ public void onException (JMSException e) { //The run() method will exit. this.isConnected = false; log ("Exception listener is called. Connection is closed by MQ client runtime." ); log (e); } /** * this method is called when a MQ connection event occurred. */ public void onEvent (Event connectionEvent) { log(connectionEvent); } /** * Rollback the application data. * */ private void rollBackApplication() { this.currentCounter = this.commitCounter; log ("Application rolled back., current (commit) counter: " + currentCounter); } /** * Roll back the current jms session. */ private void rollBackJMS() { try { log("Rolling back JMS ...., commit counter: " + commitCounter); session.rollback(); } catch (JMSException jmse) { log("Rollback failed"); log(jmse); //application may decide to log and continue sending messages // without closing the application. close(); } } /** * rollback application data and jms session. * */ private void rollBackAll() { //rollback jms rollBackJMS(); //rollback app data rollBackApplication(); } /** * close JMS connection and stop the application * */ private void close() { try { if ( conn != null ) { //close the connection conn.close(); } } catch (Exception e) { //log exception log (e); } finally { //set flag to true. application thread will exit isConnected = false; } } /** * Send messages in a loop until the connection is closed. * Session is committed for each message sent. */ public void run () { //start producing messages while (isConnected) { try { //reset message counter if it reaches max int value checkMessageCounter(); //create a message Message m = session.createMessage(); //get the current message counter value int messageCounter = this.getMessageCounter(); //set message counter to message property m.setIntProperty(MESSAGE_COUNTER, messageCounter); //set commit property m.setBooleanProperty(COMMIT_PROPERTY_NAME, true); //send the message sender.send(m); log("Sending message: " + messageCounter + ", current connected broker: " + this.getCurrentConnectedBrokerAddress()); //commit the message this.commit(); // pause 3 seconds sleep(3000); } catch (TransactionRolledBackException trbe) { //rollback app data rollBackApplication(); } catch (JMSException jmse) { if (isConnected == true) { //rollback app data and JMS session rollBackAll(); } } } } /** * Reset all counters if integer max value is reached. */ private void checkMessageCounter() { if ( currentCounter == Integer.MAX_VALUE ) { currentCounter = 0; commitCounter = 0; } } /** * Set up testing parameters - connection, destination, etc */ protected void setup() { try { //get connection factory com.sun.messaging.QueueConnectionFactory factory = new com.sun.messaging.QueueConnectionFactory(); //create a queue connection conn = factory.createQueueConnection(); //set exception listener conn.setExceptionListener(this); //set event listener ( (com.sun.messaging.jms.Connection) conn).setEventListener(this); //get destination name String destName = TEST_DEST_NAME; //create a transacted session session = conn.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); //get destination queue = session.createQueue(destName); //create queue sender sender = session.createSender(queue); //set isConnected flag to true. this.isConnected = true; } catch (JMSException jmse) { this.isConnected = false; } } /** * get the next message counter. */ private synchronized int getMessageCounter () { return ++ currentCounter; } /** * commit the current transaction/session. */ private void commit() throws JMSException { session.commit(); this.commitCounter = currentCounter; log ("Transaction committed, commit counter: " +commitCounter); } /** * Get the current connencted broker address. */ private String getCurrentConnectedBrokerAddress() { return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress(); } /** * log a string message. * @param msg */ private synchronized void log (String msg) { System.out.println(new Date() + ": " + msg); } /** * Log an exception received. */ private synchronized void log (Exception e) { System.out.println(new Date() + ": Exception:"); e.printStackTrace(); } /** * Log the specified MQ event. */ private synchronized void log (Event event) { try { System.out.println(new Date() + ": Received MQ event notification."); System.out.println("*** Event code: " + event.getEventCode() ); System.out.println("*** Event message: " + event.getEventMessage()); } catch (Exception e) { e.printStackTrace(); } } /** * pause the specified milli seconds. */ private void sleep (long millis) { try { Thread.sleep(millis); } catch (java.lang.InterruptedException inte) { log (inte); } } /** * The main program. */ public static void main (String args[]) { FailoverQSender fp = new FailoverQSender(); fp.run(); } }