The following code sample shows the work that a producer in a transacted session needs to do to recover state after a failover. Note how the application tests both for rollback exceptions and for JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.
/*
* @(#)FailoverQSender.java 1.2 07/04/20
*
* Copyright 2000 Sun Microsystems, Inc. All Rights Reserved
* SUN PROPRIETARY/CONFIDENTIAL
* Use is subject to license terms.
*
*/
package test.jmsclient.ha;
import java.util.Date;
import javax.jms.*;
import com.sun.messaging.jms.Connection;
import com.sun.messaging.jms.notification.*;
/**
*
* This sample program uses a transacted session to send messages.
* It is designed to run with test.jmsclient.ha.FailoverQReceiver
* @version 1.0
*/
public class FailoverQSender
implements ExceptionListener, EventListener, Runnable {
//constant - commit property name
public static final String COMMIT_PROPERTY_NAME = "COMMIT_PROPERTY";
//constant - message counter
public static final String MESSAGE_COUNTER = "counter";
//constant - destination name
public static final String TEST_DEST_NAME = "FailoverTestDest001";
//queue connection
QueueConnection conn = null;
//session
QueueSession session = null;
//queue sender
QueueSender sender = null;
//queue destination
Queue queue = null;
//commmitted counter.
private int commitCounter = 0;
//current message counter
private int currentCounter = 0;
//set to true if the application is connected to the broker.
private boolean isConnected = false;
/**
* Default constructor - do nothing.
* Properties are passed in from init() method.
*/
public FailoverQSender() {
//set up JMS environment
setup();
}
/**
* Connection Exception listener.
*/
public void onException (JMSException e) {
//The run() method will exit.
this.isConnected = false;
log ("Exception listener is called.
Connection is closed by MQ client runtime." );
log (e);
}
/**
* this method is called when a MQ connection event occurred.
*/
public void onEvent (Event connectionEvent) {
log(connectionEvent);
}
/**
* Rollback the application data.
*
*/
private void rollBackApplication() {
this.currentCounter = this.commitCounter;
log ("Application rolled back., current (commit) counter: "
+ currentCounter);
}
/**
* Roll back the current jms session.
*/
private void rollBackJMS() {
try {
log("Rolling back JMS ...., commit counter: " + commitCounter);
session.rollback();
} catch (JMSException jmse) {
log("Rollback failed");
log(jmse);
//application may decide to log and continue sending messages
// without closing the application.
close();
}
}
/**
* rollback application data and jms session.
*
*/
private void rollBackAll() {
//rollback jms
rollBackJMS();
//rollback app data
rollBackApplication();
}
/**
* close JMS connection and stop the application
*
*/
private void close() {
try {
if ( conn != null ) {
//close the connection
conn.close();
}
} catch (Exception e) {
//log exception
log (e);
} finally {
//set flag to true. application thread will exit
isConnected = false;
}
}
/**
* Send messages in a loop until the connection is closed.
* Session is committed for each message sent.
*/
public void run () {
//start producing messages
while (isConnected) {
try {
//reset message counter if it reaches max int value
checkMessageCounter();
//create a message
Message m = session.createMessage();
//get the current message counter value
int messageCounter = this.getMessageCounter();
//set message counter to message property
m.setIntProperty(MESSAGE_COUNTER, messageCounter);
//set commit property
m.setBooleanProperty(COMMIT_PROPERTY_NAME, true);
//send the message
sender.send(m);
log("Sending message: " + messageCounter +
", current connected broker: " +
this.getCurrentConnectedBrokerAddress());
//commit the message
this.commit();
// pause 3 seconds
sleep(3000);
} catch (TransactionRolledBackException trbe) {
//rollback app data
rollBackApplication();
} catch (JMSException jmse) {
if (isConnected == true) {
//rollback app data and JMS session
rollBackAll();
}
}
}
}
/**
* Reset all counters if integer max value is reached.
*/
private void checkMessageCounter() {
if ( currentCounter == Integer.MAX_VALUE ) {
currentCounter = 0;
commitCounter = 0;
}
}
/**
* Set up testing parameters - connection, destination, etc
*/
protected void setup() {
try {
//get connection factory
com.sun.messaging.QueueConnectionFactory factory =
new com.sun.messaging.QueueConnectionFactory();
//create a queue connection
conn = factory.createQueueConnection();
//set exception listener
conn.setExceptionListener(this);
//set event listener
( (com.sun.messaging.jms.Connection) conn).setEventListener(this);
//get destination name
String destName = TEST_DEST_NAME;
//create a transacted session
session = conn.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
//get destination
queue = session.createQueue(destName);
//create queue sender
sender = session.createSender(queue);
//set isConnected flag to true.
this.isConnected = true;
} catch (JMSException jmse) {
this.isConnected = false;
}
}
/**
* get the next message counter.
*/
private synchronized int getMessageCounter () {
return ++ currentCounter;
}
/**
* commit the current transaction/session.
*/
private void commit() throws JMSException {
session.commit();
this.commitCounter = currentCounter;
log ("Transaction committed, commit counter: " +commitCounter);
}
/**
* Get the current connencted broker address.
*/
private String getCurrentConnectedBrokerAddress() {
return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress();
}
/**
* log a string message.
* @param msg
*/
private synchronized void log (String msg) {
System.out.println(new Date() + ": " + msg);
}
/**
* Log an exception received.
*/
private synchronized void log (Exception e) {
System.out.println(new Date() + ": Exception:");
e.printStackTrace();
}
/**
* Log the specified MQ event.
*/
private synchronized void log (Event event) {
try {
System.out.println(new Date() + ": Received MQ event notification.");
System.out.println("*** Event code: " + event.getEventCode() );
System.out.println("*** Event message: " + event.getEventMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* pause the specified milli seconds.
*/
private void sleep (long millis) {
try {
Thread.sleep(millis);
} catch (java.lang.InterruptedException inte) {
log (inte);
}
}
/**
* The main program.
*/
public static void main (String args[]) {
FailoverQSender fp = new FailoverQSender();
fp.run();
}
}