If a connection is failed-over for a producer in a non-transacted session, a client application may receive a JMSException. The application thread that receives the exception should pause for a few seconds and then resend the messages. The client application may want to set a flag on the resent messages to indicate that they could be duplicates.
If a connection is failed over for a message consumer, the consequences vary with the sessions acknowledge mode:
In client-acknowledge mode, calling Message.acknowledge or MessageConsumer.receive during a failover will raise a JMSException. The consumer should call Session.recover to recover or re-deliver the unacknowledged messages and then call Message.acknowledge or MessageConsumer.receive.
In auto-acknowledge mode, after getting a JMSException, the synchronous consumer should pause a few seconds and then call MessageConsumer.receive to continue receiving messages. Any message that failed to be acknowledged (due to the failover) will be redelivered with the redelivered flags set to true.
In dups-OK-acknowledge mode, the synchronous consumer should pause a few seconds after getting an exception and then call MessageConsumer.receive to continue receiving messages. In this case, it's possible that messages delivered and acknowledged (before the failover) could be redelivered.
The following code sample illustrates good coding practices for handling exceptions during a failover. It is designed to send non-transacted, persistent messages forever and to handle JMSExceptions when a failover occurs. The program is able to handle either a true or false setting for the imqReconnectEnabled property. To run the program enter one of the following commands.
java dura.example.FailoverProducer
java -DimqReconnectEnabled=true dura.example.FailoverProducer
/*
* @(#)FailoverProducer.java 1.1 06/06/09
* Copyright 2006 Sun Microsystems, Inc. All Rights Reserved
* SUN PROPRIETARY/CONFIDENTIAL
* Use is subject to license terms. */
package dura.example;
import javax.jms.*;
import com.sun.messaging.ConnectionConfiguration;
import java.util.*;
public class FailoverProducer implements ExceptionListener {
//connection factory
private com.sun.messaging.TopicConnectionFactory factory;
//connection
private TopicConnection pconn = null;
//session
private TopicSession psession = null;
//publisher
private TopicPublisher publisher = null;
//topic
private Topic topic = null;
//This flag indicates whether this test client is closed.
private boolean isClosed = false;
//auto reconnection flag
private boolean autoReconnect = false;
//destination name for this example.
private static final String DURA_TEST_TOPIC = "DuraTestTopic";
//the message counter property name
public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER";
//the message in-doubt-bit property name
public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT";
/**
* Constructor. Get imqReconnectEnabled property value from
* System property.
*/
public FailoverProducer () {
try {
autoReconnect =
Boolean.getBoolean(ConnectionConfiguration.imqReconnectEnabled);
} catch (Exception e) {
this.printException(e);
}
}
/**
* Connection is broken if this handler is called.
* If autoReconnect flag is true, this is called only
* if no more retries from MQ.
*/
public void onException (JMSException jmse) {
this.printException (jmse);
}
/**
* create MQ connection factory.
* @throws JMSException
*/
private void initFactory() throws JMSException {
//get connection factory
factory = new com.sun.messaging.TopicConnectionFactory();
}
/**
* JMS setup. Create a Connection,Session, and Producer.
*
* If any of the JMS object creation fails (due to system failure),
* it retries until it succeeds.
*
*/
private void initProducer() {
boolean isConnected = false;
while ( isClosed == false && isConnected == false ) {
try {
println("producer client creating connection ...");
//create connection
pconn = factory.createTopicConnection();
//set connection exception listener
pconn.setExceptionListener(this);
//create topic session
psession = pconn.createTopicSession(false,
Session.CLIENT_ACKNOWLEDGE);
//get destination
topic = psession.createTopic(DURA_TEST_TOPIC);
//publisher
publisher = psession.createPublisher(topic);
//set flag to true
isConnected = true;
println("producer ready.");
}
catch (Exception e) {
println("*** connect failed ... sleep for 5 secs.");
try {
//close resources.
if ( pconn != null ) {
pconn.close();
}
//pause 5 secs.
Thread.sleep(5000);
} catch (Exception e1) {
;
}
}
}
}
/**
* Start test. This sends JMS messages in a loop (forever).
*/
public void run () {
try {
//create MQ connection factory.
initFactory();
//create JMS connection,session, and producer
initProducer();
//send messages forever.
sendMessages();
} catch (Exception e) {
this.printException(e);
}
}
/**
* Send persistent messages to a topic forever. This shows how
* to handle failover for a message producer.
*/
private void sendMessages() {
//this is set to true if send failed.
boolean messageInDoubt = false;
//message to be sent
TextMessage m = null;
//msg counter
long msgcount = 0;
while (isClosed == false) {
try {
/**
* create a text message
*/
m = psession.createTextMessage();
/**
* the MESSAGE_IN_DOUBT bit is set to true if
* you get an exception for the last message.
*/
if ( messageInDoubt == true ) {
m.setBooleanProperty (MESSAGE_IN_DOUBT, true);
messageInDoubt = false;
println("MESSAGE_IN_DOUBT bit is set to true
for msg: " + msgcount);
} else {
m.setBooleanProperty (MESSAGE_IN_DOUBT, false);
}
//set message counter
m.setLongProperty(MESSAGE_COUNTER, msgcount);
//set message body
m.setText("msg: " + msgcount);
//send the msg
publisher.send(m, DeliveryMode.PERSISTENT, 4, 0);
println("sent msg: " + msgcount);
/**
* reset counetr if reached max long value.
*/
if (msgcount == Long.MAX_VALUE) {
msgcount = 0;
println ("Reset message counter to 0.");
}
//increase counter
msgcount ++;
Thread.sleep(1000);
} catch (Exception e) {
if ( isClosed == false ) {
//set in doubt bit to true.
messageInDoubt = true;
this.printException(e);
//init producer only if auto reconnect is false.
if ( autoReconnect == false ) {
this.initProducer();
}
}
}
}
}
/**
* Close this example program.
*/
public synchronized void close() {
try {
isClosed = true;
pconn.close();
notifyAll();
} catch (Exception e) {
this.printException(e);
}
}
/**
* print the specified exception.
* @param e the exception to be printed.
*/
private void printException (Exception e) {
System.out.println(new Date().toString());
e.printStackTrace();
}
/**
* print the specified message.
* @param msg the message to be printed.
*/
private void println (String msg) {
System.out.println(new Date() + ": " + msg);
}
/**
* Main program to start this example.
*/
public static void main (String args[]) {
FailoverProducer fp = new FailoverProducer();
fp.run();
}
}
The following code sample, FailoverConsumer, illustrates good coding practices for handling exceptions during a failover. The transacted session is able to receive messages forever. The program sets the auto reconnect property to true, requiring the Message Queue runtime to automatically perform a reconnect when the connected broker fails or is killed. It is designed to work with the dura.example.FailoverProducer, shown in the previous section.
To run this program enter the following command.
java dura.example.FailoverConsumer
/*
* @(#)FailoverConsumer.java 1.1 06/06/09
* Copyright 2006 Sun Microsystems, Inc. All Rights Reserved
* SUN PROPRIETARY/CONFIDENTIAL
* Use is subject to license terms.
*
*/
package dura.example;
import java.util.Date;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TransactionRolledBackException;
import com.sun.messaging.ConnectionConfiguration;
public class FailoverConsumer implements ExceptionListener, Runnable {
//JMS connection
private Connection conn = null;
//JMS session
private Session session = null;
//JMS Message consumer
private MessageConsumer messageConsumer = null;
//JMS destination.
private Destination destination = null;
//flag indicates whether this program should continue running.
private boolean isConnected = false;
//destination name.
private static final String DURA_TEST_TOPIC = "DuraTestTopic";
//the commit counter, for information only.
private long commitCounter = 0;
/**
* message counter property set by the producer.
*/
public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER";
/**
* Message in doubt bit set by the producer
*/
public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT";
/**
* receive time out
*/
public static final long RECEIVE_TIMEOUT = 0;
/**
* Default constructor -
* Set up JMS Environment.
*/
public FailoverConsumer() {
setup();
}
/* Connection Exception listener. This is called when connection
* breaks and no reconnect attempts are performed by MQ client runtime.
*/
public void onException (JMSException e) {
print ("Reconnect failed. Shutting down the connection ...");
/**
* Set this flag to false so that the run() method will exit.
*/
this.isConnected = false;
e.printStackTrace();
}
/**
* Best effort to roll back a jms session. When a broker crashes, an
* open-transaction should be rolled back. But the re-started broker
* may not have the uncommitted tranaction information due to system
* failure. In a situation like this, an application can just quit
* calling rollback after retrying a few times The uncommitted
* transaction (resources) will eventually be removed by the broker.
*/
private void rollBackJMS() {
//rollback fail count
int failCount = 0;
boolean keepTrying = true;
while ( keepTrying ) {
try {
print ("<<< rolling back JMS ...., consumer commit counter:
" + this.commitCounter);
session.rollback();
print("<<< JMS rolled back ...., consumer commit counter:
" + this.commitCounter);
keepTrying = false;
} catch (JMSException jmse) {
failCount ++;
jmse.printStackTrace();
sleep (3000); //3 secs
if ( failCount == 1 ) {
print ("<<< rollback failed : total count" + failCount);
keepTrying = false;
}
}
}
}
/**
* Close the JMS connection and exit the program.
*
*/
private void close() {
try {
if ( conn != null ) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
this.isConnected = false;
}
}
/*Receive messages in a loop until closed.*/
public void run () {
while (isConnected) {
try {
/*receive message with specified timeout.*/
Message m = messageConsumer.receive(RECEIVE_TIMEOUT);
/* process the message. */
processMessage(m);
/* commit JMS transaction. */
this.commit();
/*increase the commit counter.*/
this.commitCounter ++;
} catch (TransactionRolledBackException trbe) {
/**
* the transaction is rolled back
* a new transaction is automatically started.
*/
trbe.printStackTrace();
} catch (JMSException jmse) {
/* The transaction is in unknown state.
* We need to roll back the transaction.*/
jmse.printStackTrace();
/* roll back if not closed.
*/
if ( this.isConnected == true ) {
this.rollBackJMS();
}
} catch (Exception e) {
e.printStackTrace();
/* Exit if this is an unexpected Exception.
*/
this.close();
} finally {
;//do nothing
}
}
print(" <<< consumer exit ...");
}
/*Set up connection, destination, etc*/
/
protected void setup() {
try {
//create connection factory
com.sun.messaging.ConnectionFactory factory =
new com.sun.messaging.ConnectionFactory();
//set auto reconnect to true.
factory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
//A value of -1 will retry forever if connection is broken.
factory.setProperty(ConnectionConfiguration.imqReconnectAttempts, "-1");
//retry interval - every 10 seconds
factory.setProperty(ConnectionConfiguration.imqReconnectInterval, "10000");
//create connection
conn = factory.createConnection();
//set client ID
conn.setClientID(DURA_TEST_TOPIC);
//set exception listener
conn.setExceptionListener(this);
//create a transacted session
session = conn.createSession(true, -1);
//get destination
destination = session.createTopic(DURA_TEST_TOPIC);
//message consumer
messageConsumer = session.createDurableSubscriber((Topic)destination,
DURA_TEST_TOPIC);
//set flag to true
this.isConnected = true;
//we are ready, start the connection
conn.start();
print("<<< Ready to receive on destination: " + DURA_TEST_TOPIC);
} catch (JMSException jmse) {
this.isConnected = false;
jmse.printStackTrace();
this.close();
}
}
/**
* Process the received message message.
* This prints received message counter.
* @param m the message to be processed.
*/
private synchronized void processMessage(Message m) {
try {
//in this example, we do not expect a timeout, etc.
if ( m == null ) {
throw new RuntimeException ("<<< Received null message.
Maybe reached max time out. ");
}
//get message counter property
long msgCtr = m.getLongProperty (MESSAGE_COUNTER);
//get message in-doubt bit
boolean indoubt = m.getBooleanProperty(MESSAGE_IN_DOUBT);
if ( indoubt) {
print("<<< received message: " + msgCtr + ", indoubt bit is true");
} else {
print("<<< received message: " + msgCtr);
}
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
/**
* Commit a JMS transaction.
* @throws JMSException
*/
private void commit() throws JMSException {
session.commit();
}
/**
* Sleep for the specified time.
* @param millis sleep time in milli-seconds.
*/
private void sleep (long millis) {
try {
Thread.sleep(millis);
} catch (java.lang.InterruptedException inte) {
print (inte);
}
}
/**
* Print the specified message.
* @param msg the message to be printed.
*/
private static void print (String msg) {
System.out.println(new Date() + ": " + msg);
}
/**
* Print Exception stack trace.
* @param e the exception to be printed.
*/
private static void print (Exception e) {
System.out.print(e.getMessage());
e.printStackTrace();
}
/**
* Start this example program.
*/
public static void main (String args[]) {
FailoverConsumer fc = new FailoverConsumer();
fc.run();
}
}