Oracle GlassFish Message Queue 4.4.2 Developer's Guide for Java Clients

Transacted Session: Failover Consumer Example

The following code sample shows the work that a consumer in a transacted session needs to do in order to recover state after a failover. Note how the application tests both for rollback exceptions and JMS exceptions. Note also the use of a counter to allow the producer and consumer to verify message order and delivery.

 * @(#)	1.4 07/04/20
 * Copyright 2000 Sun Microsystems, Inc. All Rights Reserved
 * Use is subject to license terms.
package test.jmsclient.ha;

import java.util.Date;
import java.util.Vector;
import javax.jms.*;
import com.sun.messaging.jms.notification.*;

 * This sample program uses a transacted session to receive messages.
 * It is designed to run with test.jmsclient.ha.FailoverQSender.
 * @version 1.0
public class FailoverQReceiver
    implements ExceptionListener, EventListener, Runnable {
	//queue connection
    private QueueConnection conn = null;
    //queue session
    private QueueSession session = null;
    private QueueReceiver qreceiver = null;
    //queue destination
    private Queue queue = null;

    //commmitted counter.
    private int commitCounter = 0;
    //flag to indicate if the connection is connected to the broker.
    private boolean isConnected = false;
    //flag to indicate if current connection is to HA broker cluster.
    private boolean isHAConnection = false;
    //application data holder.
    private Vector data = new Vector();

     * Default constructor - JMS setup.
    public FailoverQReceiver() {
        //set up JMS environment

     * Connection Exception listener.
    public void onException (JMSException e) {

        //The run() method will exit.
        this.isConnected = false;

        log ("Exception listener is called. Connection is closed 
                               by MQ client runtime." );
        log (e);

     * log the connection event.
    public void onEvent (Event connectionEvent) {
       log (connectionEvent);

     * Roll back application data.
    private void rollBackApplication() {
        //reset application data

        log ("Rolled back application data, current commit counter:
             " + commitCounter);

     * Clear the application data for the current un-committed transaction.
    private void reset() {

     * Roll back JMS transaction and application.
    private void rollBackAll() {
        try {
           //rollback JMS
        	  //rollback application data
        } catch (Exception e) {
        	log ("rollback failed. closing JMS connection ...");
        //application may decide NOT to close connection if rollback failed.

     * Roll back jms session.
    private void rollBackJMS() throws JMSException {
		      log("JMS session rolled back ...., commit counter: 
             " + commitCounter);


	 * Close JMS connection and exit the application.
    private void close() {
        try {
            if ( conn != null ) {
        } catch (Exception e) {
            log (e);
        } finally {
            isConnected = false;

     * Receive, validate, and commit messages.
    public void run () {

		    //produce messages
		    while (isConnected) {

			     try {
            //receive message
             Message m = qreceiver.receive();
            //process message -- add message to the data holder
            //check if the commit flag is set in the message property
             if ( shouldCommit(m) ) {
                //commit the transaction

			   } catch (TransactionRolledBackException trbe) {
                log ("transaction rolled back by MQ  ...");
				        //rollback application data
			   } catch (JMSException jmse) {
				       //The exception can happen when receiving messages 
              //and the connected broker is killed.
                if ( isConnected == true ) {
					         //rollback MQ and application data
		    	} catch (Exception e) {
                log (e);
                //application may decide NOT to close the connection 
                //when an unexpected Exception occurred.

     * Set up testing parameters - connection, destination, etc
    protected void setup() {
      try {
			   //get connection factory
			   com.sun.messaging.QueueConnectionFactory factory =
				 new com.sun.messaging.QueueConnectionFactory();
			   //create jms connection
			   conn = factory.createQueueConnection();

			   //set exception listener

			   //set event listener
			   ( (com.sun.messaging.jms.Connection) conn).setEventListener(this);
			   //test if this is a HA connection
			   isHAConnection = ( (com.sun.messaging.jms.Connection) 
			   log ("Is connected to HA broker cluster: " + isHAConnection);
			   //get destination name
			   String destName = FailoverQSender.TEST_DEST_NAME;

			   //create a transacted session
			   session = conn.createQueueSession(true, -1);

			   //get destination
			   queue = session.createQueue(destName);
			   //create queue receiver
			   qreceiver = session.createReceiver(queue);
			   //set isConnected flag to true
            isConnected = true;
        //start the JMS connection
            log("Ready to receive on destination: " + destName);
		} catch (JMSException jmse) {
			   isConnected = false;
		 	   log (jmse);

     * Check if we should commit the transaction.
    private synchronized boolean shouldCommit(Message m) {

        boolean flag = false;

        try {
        	//get the commit flag set by the FailoverQSender
            flag = m.getBooleanProperty(FailoverQSender.COMMIT_PROPERTY_NAME);

            if ( flag ) {
            	//check if message property contains expected message counter
                validate (m);

        } catch (JMSException jmse) {
            log (jmse);

        return flag;

     * A very simple validation only.  More logic may be added to validate
     * message ordering and message content.
     * @param m Message  The last message received for the current transaction.
    private void validate (Message m) {

      try {
       //get message counter property
         int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
       //The counter is set sequentially and must be received in right order.
       //Each message is committed after validated.
            if (counter != (commitCounter + 1)) {
				        throw new RuntimeException("validation failed.");

            log ("messages validated.  ready to commit ...");
            } catch (JMSException jmse) {
              log (jmse);
            throw new RuntimeException("Exception occurred during validation:
                                        " + jmse);

     * Get the message counter and put it in the data holder.
     * @param m the current message received
    private synchronized void processMessage(Message m)  throws JMSException {

		// get message counter. this value is set by the FailoverQSender.
		int ct = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
		// log the message
		log("received message: " + ct 
				 +", current connected broker: 
        " + this.getCurrentConnectedBrokerAddress());
		// saved the data in data holder.
		data.addElement(new Integer(ct));

     * commit the current transaction.
     * @param m the last received message to be committed.
     * @throws JMSException if commit failed.
    private void commit(Message m) throws JMSException {
     //commit the transaction
    	//get the current message counter
        int counter = m.getIntProperty(FailoverQSender.MESSAGE_COUNTER);
        //set the commit counter
        commitCounter = counter;
        //clear app data

        log ("Messages committed, commitCounter: " + commitCounter);

     * log exception.
    private synchronized void log (Exception e) {
    System.out.println(new Date() + ": Exception Stack Trace: ");

     * log connection event.
    private synchronized void log (Event event) {

        try {
            System.out.println(new Date() 
                 + ": Received MQ event notification.");
            System.out.println("*** Event Code: " + event.getEventCode() );
            System.out.println("*** Event message: " + event.getEventMessage());
        } catch (Exception e) {
     * Log the specified message.
    private void log (String msg) {
    System.out.println(new Date() + ": " + msg);

     * print values stored in the data holder.
    private void printData() {

        for ( int i=0; i< data.size(); i++) {
            log (" *** data index " + i + " = " + data.elementAt(i) );

    private String getCurrentConnectedBrokerAddress() {
    	return ((com.sun.messaging.jms.Connection)conn).getBrokerAddress();
     * The main method.  This starts the failover queue receiver.
    public static void main (String args[]) {
        FailoverQReceiver fqr = new FailoverQReceiver();;
