Sun GlassFish Message Queue 4.4 Developer's Guide for Java Clients

Receiving Messages

Messages are received by a message consumer, within the context of a connection and a session. Once you have created a consumer, you can use it to receive messages in either of two ways:

These two forms of message consumption are described in the sections Receiving Messages Synchronously and Receiving Messages Asynchronously.

Creating Message Consumers

The session method createConsumer creates a generic consumer that can be used to receive messages from either a (point-to-point) queue or a (publish/subscribe) topic:

MessageConsumer  myConsumer = mySession.createConsumer(myDest);

If the destination is a queue, the consumer is called a receiver for that queue; if it is a topic, the consumer is a subscriber to that topic.


Note –

The generic MessageConsumer interface also has specialized subinterfaces, QueueReceiver and TopicSubscriber, for receiving messages specifically from a point-to-point queue or a publish/subscribe topic. These types of consumer are created by the createReceiver and createSubscriber methods of the specialized session subinterfaces QueueSession and TopicSession, respectively. However, it is generally more convenient (and recommended) to use the generic form of message consumer described here, which can handle both types of destination indiscriminately.


A subscriber created for a topic destination with the createConsumer method is always nondurable, meaning that it will receive only messages that are sent (published)to the topic while the subscriber is active. If you want the broker to retain messages published to a topic while no subscriber is active and deliver them when one becomes active again, you must instead create a durable subscriber, as described in Durable Subscribers.

Table 2–15 shows the methods defined in the MessageConsumer interface, which are discussed in detail in the relevant sections below.

Table 2–15 Message Consumer Methods

Name 

Description 

getMessageSelector

Get message selector 

receive

Receive message synchronously 

receiveNoWait

Receive message synchronously without blocking 

setMessageListener

Set message listener for asynchronous reception 

getMessageListener

Get message listener for asynchronous reception 

close

Close message consumer 

Message Selectors

If appropriate, you can restrict the messages a consumer will receive from its destination by supplying a message selector as an argument when you create the consumer:

String  mySelector = "/* Text of selector here */";
MessageConsumer  myConsumer = mySession.createConsumer(myDest, mySelector);

The selector is a string whose syntax is based on a subset of the SQL92 conditional expression syntax, which allows you to filter the messages you receive based on the values of their properties (see Message Properties). See the Java Message Service Specification for a complete description of this syntax. The message consumer’s getMessageSelector method returns the consumer’s selector string (or null if no selector was specified when the consumer was created):

String  mySelector = myConsumer.getMessageSelector();

Note –

Messages whose properties do not satisfy the consumer’s selector will be retained undelivered by the destination until they are retrieved by another message consumer. The use of message selectors can thus cause messages to be delivered out of sequence from the order in which they were originally produced. Only a message consumer without a selector is guaranteed to receive messages in their original order.


In some cases, the same connection may both publish and subscribe to the same topic destination. The createConsumer method accepts an optional boolean argument that suppresses the delivery of messages published by the consumer’s own connection:

String  mySelector = "/* Text of selector here */";
MessageConsumer
        myConsumer = mySession.createConsumer(myDest, mySelector, true);

The resulting consumer will receive only messages published by a different connection.

Durable Subscribers

To receive messages delivered to a publish/subscribe topic while no message consumer is active, you must ask the message broker to create a durable subscriber for that topic. All sessions that create such subscribers for a given topic must have the same client identifier (see Using Connections ). When you create a durable subscriber, you supply a subscriber name that must be unique for that client identifier:

MessageConsumer
        myConsumer = mySession.createDurableSubscriber(myDest, "mySub");

(The object returned by the createDurableSubscriber method is actually typed as TopicSubscriber, but since that is a subinterface of MessageConsumer, you can safely assign it to a MessageConsumer variable. Note, however, that the destination myDest must be a publish/subscribe topic and not a point-to-point queue.)

You can think of a durable subscriber as a “virtual message consumer” for the specified topic, identified by the unique combination of a client identifier and subscriber name. When a message arrives for the topic and no message consumer is currently active for it, the message will be retained for later delivery. Whenever you create a consumer with the given client identifier and subscriber name, it will be considered to represent this same durable subscriber and will receive all of the accumulated messages that have arrived for the topic in the subscriber’s absence. Each message is retained until it is delivered to (and acknowledged by) such a consumer or until it expires.


Note –

Only one session at a time can have an active consumer for a given durable subscription. If another such consumer already exists, the createDurableSubscriber method will throw an exception.


Like the createConsumer method described in the preceding section (which creates nondurable subscribers), createDurableSubscriber can accept an optional message selector string and a boolean argument telling whether to suppress the delivery of messages published by the subscriber’s own connection:

String  mySelector = "/* Text of selector here */";
MessageConsumer  myConsumer
                    = mySession.createDurableSubscriber(myDest, "mySub",
                                                        mySelector, true);

You can change the terms of a durable subscription by creating a new subscriber with the same client identifier and subscription name but with a different topic, selector, or both. The effect is as if the old subscription were destroyed and a new one created with the same name. When you no longer need a durable subscription, you can destroy it with the session method unsubscribe:

mySession.unsubscribe("mySub");

Receiving Messages Synchronously

Once you have created a message consumer for a session, using either the createConsumer orcreateDurableSubscriber method, you must start the session’s connection to begin the flow of incoming messages:

myConnection.start();

(Note that it is not necessary to start a connection in order to produce messages, only to consume them.) You can then use the consumer’s receive method to receive messages synchronously from the message broker:

Message  inMsg = myConsumer.receive();

This returns the next available message for this consumer. If no message is immediately available, the receive method blocks until one arrives. You can also provide a timeout interval in milliseconds:

Message  inMsg = myConsumer.receive(1000);

In this case, if no message arrives before the specified timeout interval (1 second in the example) expires, the method will return with a null result. An alternative method, receiveNoWait, returns a null result immediately if no message is currently available:

Message  inMsg = myConsumer.receiveNoWait();

Receiving Messages Asynchronously

If you want your message consumer to receive incoming messages asynchronously, you must create a message listener to process the messages. This is a Java object that implements the JMS MessageListener interface. The procedure is as follows:

ProcedureTo Set Up a Message Queue Java Client to Receive Messages Asynchronously

  1. Define a message listener class implementing the MessageListener interface.

    The interface consists of the single method onMessage, which accepts a message as a parameter and processes it in whatever way is appropriate for your application:


    public class MyMessageListener implements MessageListener
      {
            public void onMessage (Message  inMsg)
          {
            /* Code here to process message */
          }
       }
  2. Create a message consumer.

    You can use either the createConsumer or createDurableSubscriber method of the session in which the consumer will operate: for instance,


    MessageConsumer  myConsumer = mySession.createConsumer(myDest);
  3. Create an instance of your message listener class.


    MyMessageListener  myListener = new MyMessageListener();
  4. Associate the message listener with your message consumer.

    The message consumer method setMessageListener accepts a message listener object and associates it with the given consumer:


    myConsumer.setMessageListener(myListener);
  5. Start the connection to which this consumer’s session belongs.

    The connection’s start method begins the flow of messages from the message broker to your message consumer:


    myConnection.start();

    Once the connection is started, the Message Queue client runtime will call your message listener’s onMessage method each time it has a message to deliver to this consumer.

    To ensure that no messages are lost before your consumer is ready to receive them, it is important not to start the connection until after you have created the message listener and associated it with the consumer. If the connection is already started, you should stop it before creating an asynchronous consumer, then start it again when the consumer is ready to begin processing.

    Setting a consumer’s message listener to null removes any message listener previously associated with it:


    myConsumer.setMessageListener(null);

    The consumer’s getMessageListener method returns its current message listener (or null if there is none):


    MyMessageListener  myListener = myConsumer.getMessageListener();

Acknowledging Messages

If you have specified client-acknowledge as your session’s acknowledgment mode (see Acknowledgment Modes), it is your client application’s responsibility to explicitly acknowledge each message it receives. If you have received the message synchronously, using a message consumer’s receive (or receiveNoWait) method, you should process the message first and then acknowledge it; if you have received it asynchronously, your message listener’s onMessage method should acknowledge the message after processing it. This ensures that the message broker will not delete the message from persistent storage until processing is complete.


Note –

In a transacted session (see Transacted Sessions), there is no need to acknowledge a message explicitly: the session’s acknowledgment mode is ignored and all acknowledgment processing is handled for you automatically by the Message Queue client runtime. In this case, the session’s getAcknowledgeMode method will return the special constant Session.SESSION_TRANSACTED.


Table 2–16 shows the methods available for acknowledging a message. The most general is acknowledge, defined in the standard JMS interface javax.jms.Message:

inMsg.acknowledge();

This acknowledges all unacknowledged messages consumed by the session up to the time of call. You can use this method to acknowledge each message individually as you receive it, or you can group several messages together and acknowledge them all at once by calling acknowledge on the last one in the group.

Table 2–16 Message Acknowledgment Methods

Function 

Description 

acknowledge

Acknowledge all unacknowledged messages for session 

acknowledgeThisMessage

Acknowledge this message only 

acknowledgeUpThroughThisMessage

Acknowledge all unacknowledged messages through this one 

The Message Queue version of the Message interface, defined in the package com.sun.messaging.jms, adds two more methods that provide more flexible control over which messages you acknowledge. The acknowledgeThisMessage method just acknowledges the single message for which it is called, rather than all messages consumed by the session; acknowledgeUpThroughThisMessage acknowledges the message for which it is called and all previous messages; messages received after that message remain unacknowledged.

Browsing Messages

If the destination from which you are consuming messages is a point-to-point queue, you can use a queue browser to examine the messages in the queue without consuming them. The session method createBrowser creates a browser for a specified queue:

QueueBrowser  myBrowser = mySession.createBrowser(myDest);

The method will throw an exception (InvalidDestinationException) if you try to pass it a topic destination instead of a queue. You can also supply a selector string as an optional second argument:

String        mySelector = "/* Text of selector here */";
QueueBrowser  myBrowser  = mySession.createBrowser(myDest, mySelector);

Table 2–17 shows the methods defined in the QueueBrowser interface. The getQueue and getMessageSelector methods return the browser’s queue and selector string, respectively.

Table 2–17 Queue Browser Methods

Name 

Description 

getQueue

Get queue from which this browser reads 

getMessageSelector

Get message selector 

getEnumeration

Get enumeration of all messages in the queue 

close

Close browser 

The most important queue browser method is getEnumeration, which returns a Java enumeration object that you can use to iterate through the messages in the queue, as shown in Example 2–4.


Example 2–4 Browsing a Queue


Enumeration  queueMessages = myBrowser.getEnumeration();
Message      eachMessage;

while ( queueMessages.hasMoreElements() )
  { eachMessage = queueMessages.nextElement();
    /* Do something with the message */
  }
               

The browser’s close method closes it when you’re through with it:

myBrowser.close();

Closing a Consumer

As a matter of good programming practice, you should close a message consumer when you have no further need for it. Closing a session or connection automatically closes all consumers associated with it; to close a consumer without closing the session or connection to which it belongs, you can use its close method:

myConsumer.close();

For a consumer that is a nondurable topic subscriber, this terminates the flow of messages to the consumer. However, if the consumer is a queue receiver or a durable topic subscriber, messages will continue to be accumulated for the destination and will be delivered the next time a consumer for that destination becomes active. To terminate a durable subscription permanently, call its session’s unsubscribe method with the subscriber name as an argument:

mySession.unsubscribe("mySub");