The following code sample shows the work that a consumer in a transacted session needs to do in order to recover state after a failover. Note how the application tests both for rollback exceptions and JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/*
* @(#)FailoverQReceiver.java 1.4 07/04/20
*
* Copyright 2000 Sun Microsystems, Inc. All Rights Reserved
* SUN PROPRIETARY/CONFIDENTIAL
* Use is subject to license terms.
*/
package test.jmsclient.ha;
import java.util.Date;
import java.util.Vector;
import javax.jms.*;
import com.sun.messaging.jms.notification.*;
/**
* This sample program uses a transacted session to receive messages.
* It is designed to run with test.jmsclient.ha.FailoverQSender.
*
* @version 1.0
*/
public class FailoverQReceiver
implements ExceptionListener, EventListener, Runnable {
//queue connection
private QueueConnection conn = null;
//queue session
private QueueSession session = null;
//qreceiver
private QueueReceiver qreceiver = null;
//queue destination
private Queue queue = null;
//commmitted counter.
private int commitCounter = 0;
//flag to indicate if the connection is connected to the broker.
private boolean isConnected = false;
//flag to indicate if current connection is to HA broker cluster.
private boolean isHAConnection = false;
//application data holder.
private Vector data = new Vector();
/**
* Default constructor - JMS setup.
*/
public FailoverQReceiver() {
//set up JMS environment
setup();
}
/**
* Connection Exception listener.
*/
public void onException (JMSException e) {
//The run() method will exit.
this.isConnected = false;
log ("Exception listener is called. Connection is closed
by MQ client runtime." );
log (e);
}
/**
* log the connection event.
*/
public void onEvent (Event connectionEvent) {
log (connectionEvent);
}
/**
* Roll back application data.
*/
private void rollBackApplication() {
//reset application data
this.reset();
log ("Rolled back application data, current commit counter:
" + commitCounter);
}
/**
* Clear the application data for the current un-committed transaction.
*/
private void reset() {
data.clear();
}
/**
* Roll back JMS transaction and application.
*/
private void rollBackAll() {
try {
//rollback JMS
rollBackJMS();
//rollback application data
rollBackApplication();
} catch (Exception e) {
log ("rollback failed. closing JMS connection ...");
//application may decide NOT to close connection if rollback failed.
close();
}
}
/**
* Roll back jms session.
*/
private void rollBackJMS() throws JMSException {
session.rollback();
log("JMS session rolled back ...., commit counter:
" + commitCounter);
}
/**
* Close JMS connection and exit the application.
*/
private void close() {
try {
if ( conn != null ) {
conn.close();
}
} catch (Exception e) {
log (e);
} finally {
isConnected = false;
}
}
/**
* Receive, validate, and commit messages.
*/
public void run () {
//produce messages
while (isConnected) {
try {
//receive message
Message m = qreceiver.receive();
//process message -- add message to the data holder
processMessage(m);
//check if the commit flag is set in the message property
if ( shouldCommit(m) ) {
//commit the transaction
commit(m);
}
} catch (TransactionRolledBackException trbe) {
log ("transaction rolled back by MQ ...");
//rollback application data
rollBackApplication();
} catch (JMSException jmse) {
//The exception can happen when receiving messages
//and the connected broker is killed.
if ( isConnected == true ) {
//rollback MQ and application data
rollBackAll();
}
} catch (Exception e) {
log (e);
//application may decide NOT to close the connection
//when an unexpected Exception occurred.
close();
}
}
}
/**
* Set up testing parameters - connection, destination, etc
*/
protected void setup() {
try {
//get connection factory
com.sun.messaging.QueueConnectionFactory factory =
new com.sun.messaging.QueueConnectionFactory();
//create jms connection
conn = factory.createQueueConnection();
//set exception listener
conn.setExceptionListener(this);
//set event listener
( (com.sun.messaging.jms.Connection) conn).setEventListener(this);
//test if this is a HA connection
isHAConnection = ( (com.sun.messaging.jms.Connection)
conn).isConnectedToHABroker();
log ("Is connected to HA broker cluster: " + isHAConnection);
//get destination name
String destName = FailoverQSender.TEST_DEST_NAME;
//create a transacted session
session = conn.createQueueSession(true, -1);
//get destination
queue = session.createQueue(destName);
//create queue receiver
qreceiver = session.createReceiver(queue);
//set isConnected flag to true
isConnected = true;
//start the JMS connection
conn.start();
log("Ready to receive on destination: " + destName);
} catch (JMSException jmse) {
isConnected = false;
log (jmse);
close();
}
}
/**
* Check if we should commit the transaction.
*/
private synchronized boolean shouldCommit(Message m) {
boolean flag = false;
try {
//get the commit flag set by the FailoverQSender
flag = m.getBooleanProperty(FailoverQSender.COMMIT_PROPERTY_NAME);
if ( flag ) {
//check if message property contains expected message counter
validate (m);
}
} catch (JMSException jmse) {
log (jmse);
}
return flag;
}
/**
* A very simple validation only. More logic may be added to validate
* message ordering and message content.
* @param m Message The last message received for the current transaction.
*/
private void validate (Message m) {
try {
//get message counter property
int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
//The counter is set sequentially and must be received in right order.
//Each message is committed after validated.
if (counter != (commitCounter + 1)) {
this.printData();
throw new RuntimeException("validation failed.");
}
log ("messages validated. ready to commit ...");
} catch (JMSException jmse) {
log (jmse);
printData();
throw new RuntimeException("Exception occurred during validation:
" + jmse);
}
}
/**
* Get the message counter and put it in the data holder.
* @param m the current message received
*/
private synchronized void processMessage(Message m) throws JMSException {
// get message counter. this value is set by the FailoverQSender.
int ct = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
// log the message
log("received message: " + ct
+", current connected broker:
" + this.getCurrentConnectedBrokerAddress());
// saved the data in data holder.
data.addElement(new Integer(ct));
}
/**
* commit the current transaction.
* @param m the last received message to be committed.
* @throws JMSException if commit failed.
*/
private void commit(Message m) throws JMSException {
//commit the transaction
session.commit();
//get the current message counter
int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
//set the commit counter
commitCounter = counter;
//clear app data
this.reset();
log ("Messages committed, commitCounter: " + commitCounter);
}
/**
* log exception.
*/
private synchronized void log (Exception e) {
System.out.println(new Date() + ": Exception Stack Trace: ");
e.printStackTrace();
}
/**
* log connection event.
*/
private synchronized void log (Event event) {
try {
System.out.println(new Date()
+ ": Received MQ event notification.");
System.out.println("*** Event Code: " + event.getEventCode() );
System.out.println("*** Event message: " + event.getEventMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Log the specified message.
*/
private void log (String msg) {
System.out.println(new Date() + ": " + msg);
}
/**
* print values stored in the data holder.
*
*/
private void printData() {
for ( int i=0; i< data.size(); i++) {
log (" *** data index " + i + " = " + data.elementAt(i) );
}
}
private String getCurrentConnectedBrokerAddress() {
return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress();
}
/**
* The main method. This starts the failover queue receiver.
*/
public static void main (String args[]) {
FailoverQReceiver fqr = new FailoverQReceiver();
fqr.run();
}
}