![]() | |
Sun Java System Message Queue 3 2005Q4 Developer's Guide for Java Clients |
Chapter 2
Using the Java APIThis chapter describes how to use the classes and methods of the Message Queue Java application programming interface (API) to accomplish specific tasks, and provides brief code samples to illustrate some of these tasks. (For clarity, the code samples shown in the chapter omit an exception check.) The topics covered include the following:
This chapter does not provide exhaustive information about each class and method. For detailed reference information, see the JavaDoc documentation for each individual class. For information on the practical design of Message Queue Java programs, see Chapter 3, "Message Queue Clients: Design and Features."
Messaging DomainsThe Java Message Service (JMS) specification, which Message Queue implements, supports two commonly used models of interaction between message clients and message brokers, sometimes known as messaging domains:
- In the point-to-point (or PTP) messaging model, each message is delivered from a message producer to a single message consumer. The producer delivers the message to a queue, from which it is later delivered to one of the consumers registered for the queue. Any number of producers and consumers can interact with the same queue, but each message is guaranteed to be delivered to — and successfully consumed by — exactly one consumer and no more. If no consumers are registered for a queue, it holds the messages it receives and eventually delivers them when a consumer registers.
- In the publish/subscribe (or pub/sub) model, a single message can be delivered from a producer to any number of consumers. The producer publishes the message to a topic, from which it is then delivered to all active consumers that have subscribed to the topic. Any number of producers can publish messages to a given topic, and each message can be delivered to any number of subscribed consumers. The model also supports the notion of durable subscriptions, in which a consumer registered with a topic need not be active at the time a message is published; when the consumer subsequently becomes active, it will receive the message. If no active consumers are registered for a topic, the topic does not hold the messages it receives unless it has inactive consumers with durable subscriptions.
JMS applications are free to use either of these messaging models, or even to mix them both within the same application. Historically, the JMS API provided a separate set of domain-specific object classes for each model. While these domain-specific interfaces continue to be supported for legacy purposes, client programmers are now encouraged to use the newer unified domain interface, which supports both models indiscriminately. For this reason, the discussions and code examples in this manual focus exclusively on the unified interfaces wherever possible. Table 2-1 shows the API classes for all three domains.
Table 2-1 Interface Classes for Messaging Domains
Unified Domain
Point-to-Point Domain
Publish/Subscribe Domain
Destination1
Queue
Topic
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Session
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver
TopicSubscriber
1Depending on programming approach, you might specify a particular destination type (Queue or Topic).
Working With ConnectionsAll messaging occurs within the context of a connection. Connections are created via a connection factory encapsulating all of the needed configuration properties for connecting to a particular JMS provider. A connection’s configuration properties are completely determined by the connection factory, and cannot be changed once the connection has been created. Thus the only way to control the properties of a connection is by setting those of the connection factory you use to create it.
Obtaining a Connection Factory
Typically, a connection factory is created for you by a Message Queue administrator and preconfigured, using the administration tools described in the Message Queue Administration Guide, with whatever property settings are appropriate for connecting to particular JMS provider. The factory is then placed in a publicly available administered object store, where you can access it by name via the Java Naming and Directory Interface (JNDI) API. This arrangement has several benefits:
- It allows the administrator to control the properties of client connections to the provider, ensuring that they are properly configured.
- It enables the administrator to tune performance and improve throughput by adjusting configuration settings even after an application has been deployed.
- By relying on the predefined connection factory to handle the configuration details, it helps keep client code provider-independent and thus more easily portable from one JMS provider to another.
Sometimes, however, it may be more convenient to dispense with JNDI lookup and simply create your own connection factory by direct instantiation. Although hard-coding configuration values for a particular JMS provider directly into your application code sacrifices flexibility and provider-independence, this approach might make sense in some circumstances — such as in the early stages of application development and debugging, or in applications where reconfigurability and portability to other providers are not important concerns.
The following sections describe these two approaches to obtaining a connection factory: by JNDI lookup or direct instantiation.
Looking Up a Connection Factory With JNDI
Code Example 2-1 shows how to look up a connection factory object in the JNDI object store.
The procedure consists of the following steps:
To Look Up a Connection Factory With JNDI
- Create the environment for constructing the initial JNDI naming context.
How you create the initial context depends on whether you are using a file-system object store or a Lightweight Directory Access Protocol (LDAP) server for your Message Queue administered objects. The code shown here assumes a file-system store; for information about the corresponding LDAP object store attributes, see the Message Queue Administration Guide.
The constructor for the initial context accepts an environment parameter, a hash table whose entries specify the attributes for creating the context:
Hashtable env = new Hashtable();
You can also set an environment by specifying system properties on the command line, rather than programmatically. For instructions, see the README file in the JMS example applications directory.
- Store the environment attributes that tell JNDI which initial context factory to use and where to find the JMS provider.
The names of these attributes are defined as static constants in class Context:
env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.fscontext.RefFSContextFactory");
env.put(Context.PROVIDER_URL, "file:///C:/imq_admin_objects");
Note
The directory represented by C:/imq_admin_objects must already exist; if necessary, you must create the directory before referencing it in your code.
- Create the initial context.
Context ctx = new InitialContext(env);
If you use system properties to set the environment, omit the environment parameter when creating the context:
Context ctx = new InitialContext();
- Look up the connection factory object in the administered object store and typecast it to the appropriate class:
String CF_LOOKUP_NAME = "MyConnectionFactory";
ConnectionFactory
myFactory = (ConnectionFactory) ctx.lookup(CF_LOOKUP_NAME);The lookup name you use, CF_LOOKUP_NAME, must match the name used when the object was stored.
You can now proceed to use the connection factory to create connections to the message broker, as described under Using Connections.
Overriding Configuration Settings
It is recommended that you use a connection factory just as you receive it from a JNDI lookup, with the property settings originally configured by your Message Queue administrator. However, there may be times when you need to override the preconfigured properties with different values of your own. You can do this from within your application code by calling the connection factory’s setProperty method. This method (inherited from the superclass AdministeredObject) takes two string arguments giving the name and value of the property to be set. The property names for the first argument are defined as static constants in the Message Queue class ConnectionConfiguration: for instance, the statement
myFactory.setProperty(ConnectionConfiguration.imqDefaultPassword,
"mellon");sets the default password for establishing broker connections. See the Message Queue Administration Guide for complete information on the available connection factory configuration properties.
It is also possible to override connection factory properties from the command line, by using the -D option to set their values when starting your client application. For example, the command line
java -DimqDefaultPassword=mellon MyMQClient
starts an application named MyMQClient with the same default password as in the preceding example. Setting a property value this way overrides any other value specified for it, whether preconfigured in the JNDI object store or set programmatically with the setProperty method.
Note
A Message Queue administrator can prevent a connection factory’s properties from being overridden by specifying that the object be read-only when placing it in the object store. The properties of such a factory cannot be changed in any way, whether with the -D option from the command line or via the setProperty method from within your client application’s code. Any attempt to override the factory’s property values will simply be ignored.
Instantiating a Connection Factory
Code Example 2-2 shows how to create a connection factory object by direct instantiation and configure its properties.
Code Example 2-2 Instantiating a Connection Factory
// Instantiate the connection factory object.
com.sun.messaging.ConnectionFactory
myFactory = new com.sun.messaging.ConnectionFactory();
// Set the connection factory's configuration properties.
myFactory.setProperty(ConnectionConfiguration.imqAddressList,
"localhost:7676,broker2:5000,broker3:9999");
The steps are as follows:
To Instantiate and Configure a Connection Factory
- Instantiate the connection factory object.
The name ConnectionFactory is defined both as a JMS interface (in package javax.jms) and as a Message Queue class (in com.sun.messaging) that implements that interface. Since only a class can be instantiated, you must use the constructor defined in com.sun.messaging to create your connection factory object. Note, however, that you cannot import the name from both packages without causing a compilation error. Hence, if you have imported the entire package javax.jms.*, you must qualify the constructor with the full package name when instantiating the object:
com.sun.messaging.ConnectionFactory
myFactory = new com.sun.messaging.ConnectionFactory();Notice that the type declaration for the variable myFactory, to which the instantiated connection factory is assigned, is also qualified with the full package name. This is because the setProperty method, used in Step 2, belongs to the ConnectionFactory class defined in the package com.sun.messaging, rather than to the ConnectionFactory interface defined in javax.jms. Thus in order for the compiler to recognize this method, myFactory must be typed explicitly as com.sun.messaging.ConnectionFactory rather than simply ConnectionFactory (which would resolve to javax.jms.ConnectionFactory after importing javax.jms.*).
- Set the connection factory’s configuration properties.
The most important configuration property is imqAddressList, which specifies the host names and port numbers of the message brokers to which the factory creates connections. By default, the factory returned by the ConnectionFactory constructor in Step 1 is configured to create connections to a broker on host localhost at port number 7676. If necessary, you can use the setProperty method, described in the preceding section, to change that setting:
myFactory.setProperty(ConnectionConfiguration.imqAddressList,
"localhost:7676,broker2:5000,broker3:9999");Of course, you can also set any other configuration properties your application may require. See the Message Queue Administration Guide for a list of the available connection factory properties.
You can now proceed to use the connection factory to create connections to the message service, as described in the next section.
Using Connections
Once you have obtained a connection factory, you can use it to create a connection to the message service. The factory’s createConnection method takes a user name and password as arguments:
Connection
myConnection = myFactory.createConnection("mithrandir", "mellon");Before granting the connection, Message Queue authenticates the user name and password by looking them up in its user repository. As a convenience for developers who do not wish to go to the trouble of populating a user repository during application development and testing, there is also a parameterless form of the createConnection method:
Connection myConnection = myFactory.createConnection();
This creates a connection configured for the default user identity, with both user name and password set to guest.
This unified-domain createConnection method is part of the generic JMS ConnectionFactory interface, defined in package javax.jms; the Message Queue version in com.sun.messaging adds corresponding methods createQueueConnection and createTopicConnection for use specifically with the point-to-point and publish/subscribe domains.
Table 2-2 shows the methods defined in the Connection interface.
Table 2-2 Connection Methods
Name
Description
createSession
Create session
setClientID
Set client identifier
getClientID
Get client identifier
setExceptionListener
Set exception listener
getExceptionListener
Get exception listener
getMetaData
Get metadata for connection
createConnectionConsumer
Create connection consumer
createDurableConnectionConsumer
Create durable connection consumer
start
Start incoming message delivery
stop
Stop incoming message delivery
close
Close connection
The main purpose of a connection is to create sessions for exchanging messages with the message service:
myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
The first argument to createSession is a boolean indicating whether the session is transacted; the second specifies its acknowledgment mode. Possible values for this second argument are AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, and DUPS_OK_ACKNOWLEDGE, all defined as static constants in the standard JMS Session interface, javax.jms.Session; the extended Message Queue version of the interface, com.sun.messaging.jms.Session, adds another such constant, NO_ACKNOWLEDGE. See Acknowledgment Modes and Transacted Sessions for further discussion.
If your client application will be using the publish/subscribe domain to create durable topic subscriptions, it must have a client identifier to identify itself to the message service. In general, the most convenient arrangement is to configure the client runtime to provide a unique client identifier automatically for each client. However, the Connection interface also provides a method, setClientID, for setting a client identifier explicitly, and a corresponding getClientID method for retrieving its value. See Assigning Client Identifiers and the Message Queue Administration Guide for more information.
You should also use the setExceptionListener method to register an exception listener for the connection. This is an object implementing the JMS ExceptionListener interface, which consists of the single method onException:
void onException (JMSException exception)
In the event of a problem with the connection, the message broker will call this method, passing an exception object identifying the nature of the problem.
A connection’s getMetaData method returns a ConnectionMetaData object, which in turn provides methods for obtaining various items of information about the connection, such as its JMS version and the name and version of the JMS provider.
The createConnectionConsumer and createDurableConnectionConsumer methods (as well as the session methods setMessageListener and getMessageListener, listed in Table 2-3) are used for concurrent message consumption; see the Java Message Service Specification for more information.
In order to receive incoming messages, you must 7start the connection by calling its start method:
myConnection.start();
It is important not to do this until after you have created any message consumers you will be using to receive messages on the connection. Starting the connection before creating the consumers risks missing some incoming messages before the consumers are ready to receive them. It is not necessary to start the connection in order to send outgoing messages.
If for any reason you need to suspend the flow of incoming messages, you can do so by calling the connection’s stop method:
myConnection.stop();
To resume delivery of incoming messages, call the start method again.
Finally, when you are through with a connection, you should close it to release any resources associated with it:
myConnection.close();
This automatically closes all sessions, message producers, and message consumers associated with the connection and deletes any temporary destinations. All pending message receives are terminated and any transactions in progress are rolled back. Closing a connection does not force an acknowledgment of client-acknowledged sessions.
Working With DestinationsAll Message Queue messages travel from a message producer to a message consumer by way of a destination on a message broker. Message delivery is thus a two-stage process: the message is first delivered from the producer to the destination and later from the destination to the consumer. Physical destinations on the broker are created administratively by a Message Queue administrator, using the administration tools described in the Message Queue Administration Guide; the broker provides routing and delivery services for messages sent to such a destination.
As described earlier under Messaging Domains, Message Queue supports two types of destination, depending on the messaging domain being used:
These two types of destination are represented by the Message Queue classes Queue and Topic, respectively. These, in turn, are both subclasses of the generic class Destination, part of the unified messaging domain that subsumes both the point-to-point and publish-subscribe domains. A client program that uses the Destination superclass can thus handle both queue and topic destinations indiscriminately.
Looking Up a Destination With JNDI
Because JMS providers differ in their destination addressing conventions, Message Queue does not define a standard address syntax for obtaining access to a destination. Rather, the destination is typically placed in a publicly available administered object store by a Message Queue administrator and accessed by the client via a JNDI lookup in a manner similar to that described earlier for connection factories (see Looking Up a Connection Factory With JNDI).
Code Example 2-3 shows how to look up a destination object in the JNDI object store.
Note
If a Message Queue client is a J2EE component, JNDI resources are provided by the J2EE container. In such cases, JNDI lookup code may differ from that shown here; see your J2EE provider documentation for details.
Code Example 2-3 Looking Up a Destination
// Create the environment for constructing the initial JNDI naming context.
Hashtable env = new Hashtable();
// Store the environment attributes that tell JNDI which initial context factory to use
// and where to find the provider.
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");
env.put(Context.PROVIDER_URL, "file:///C:/imq_admin_objects");
// Create the initial context.
Context ctx = new InitialContext(env);
// Look up the destination object in the JNDI object store.
String DEST_LOOKUP_NAME = "MyDest";
Destination MyDest = (Destination) ctx.lookup(DEST_LOOKUP_NAME);
The procedure consists of the following steps:
To Look Up a Destination With JNDI
- Create the environment for constructing the initial JNDI naming context.
How you create the initial context depends on whether you are using a file-system object store or a Lightweight Directory Access Protocol (LDAP) server for your Message Queue administered objects. The code shown here assumes a file-system store; for information about the corresponding LDAP object store attributes, see the Message Queue Administration Guide.
The constructor for the initial context accepts an environment parameter, a hash table whose entries specify the attributes for creating the context:
Hashtable env = new Hashtable();
You can also set an environment by specifying system properties on the command line, rather than programmatically. For instructions, see the README file in the JMS example applications directory.
- Store the environment attributes that tell JNDI which initial context factory to use and where to find the JMS provider.
The names of these attributes are defined as static constants in class Context:
env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.fscontext.RefFSContextFactory");
env.put(Context.PROVIDER_URL, "file:///C:/imq_admin_objects");
Note
The directory represented by C:/imq_admin_objects must already exist; if necessary, you must create the directory before referencing it in your code.
- Create the initial context.
Context ctx = new InitialContext(env);
If you use system properties to set the environment, omit the environment parameter when creating the context:
Context ctx = new InitialContext();
- Look up the destination object in the administered object store and typecast it to the appropriate class:
String DEST_LOOKUP_NAME = "MyDest";
Destination MyDest = (Destination) ctx.lookup(DEST_LOOKUP_NAME);The lookup name you use, DEST_LOOKUP_NAME, must match the name used when the object was stored. Note that the actual destination object returned from the object store will always be either a (point-to-point) queue or a (publish/subscribe) topic, but that either can be assigned to a variable of the generic unified-domain class Destination.
You can now proceed to send and receive messages via the destination, as described under Sending Messages and Receiving Messages.
Instantiating a Destination
As with connection factories, you may sometimes find it more convenient to dispense with JNDI lookup and simply create your own queue or topic destination objects by direct instantiation. Although a variable of type Destination can accept objects of either class, you cannot directly instantiate a Destination object; the object must always belong to one of the specific classes Queue or Topic. The constructors for both of these classes accept a string argument specifying the name of the physical destination to which the object corresponds:
Destination myDest = new com.sun.messaging.Queue("myDest");
Note, however, that this only creates a Java object representing the destination; it does not actually create a physical destination on the message broker. The physical destination itself must still be created by a Message Queue administrator, with the same name you pass to the constructor when instantiating the object.
Note
Destination names beginning with the letters mq are reserved and should not be used by client programs.
Unlike connection factories, destinations have a much more limited set of configuration properties. In fact, only two such properties are defined in the Message Queue class DestinationConfiguration: the name of the physical destination itself (imqDestinationName) and an optional descriptive string (imqDestinationDescription). Since the latter property is rarely used and the physical destination name can be supplied directly as an argument to the Queue or Topic constructor as shown above, there normally is no need (as there often is with a connection factory) to specify additional properties with the object’s setProperty method. Hence the variable to which you assign the destination object (myDest in the example above) need not be typed with the Message Queue class com.sun.messaging.Destination; the standard JMS interface javax.jms.Destination (which the Message Queue class implements) is sufficient. If you have imported the full JMS package javax.jms.*, you can simply declare the variable with the unqualified name Destination, as above, rather than with something like
com.sun.messaging.Destination
myDest = new com.sun.messaging.Queue("myDest");as shown earlier for connection factories.
Temporary Destinations
A temporary destination is one that exists only for the duration of the connection that created it. You may sometimes find it convenient to create such a destination to use, for example, as a reply destination for messages you send. Temporary destinations are created with the session method createTemporaryQueue or createTemporaryTopic (see Working With Sessions, below): for example,
TemporaryQueue tempQueue = mySession.createTemporaryQueue();
Although the temporary destination is created by a particular session, its scope is actually the entire connection to which that session belongs. Any of the connection’s sessions (not just the one that created the temporary destination) can create a message consumer for the destination and receive messages from it. The temporary destination is automatically deleted when its connection is closed, or you can delete it explicitly by calling its delete method:
tempQueue.delete();
Working With SessionsA session is a single-threaded context for producing and consuming messages. You can create multiple message producers and consumers for a single session, but you are restricted to using them serially, in a single logical thread of control.
Table 2-3 shows the methods defined in the Session interface; they are discussed in the relevant sections below.
Table 2-3 Session Methods
Name
Description
createProducer
Create message producer
createConsumer
Create message consumer
createDurableSubscriber
Create durable subscriber for topic
unsubscribe
Delete durable subscription to topic
createMessage
Create null message
createTextMessage
Create text message
createStreamMessage
Create stream message
createMapMessage
Create map message
createObjectMessage
Create object message
createBytesMessage
Create bytes message
createQueue
Create queue destination
createTopic
Create topic destination
createTemporaryQueue
Create temporary queue
createTemporaryTopic
Create temporary topic
createBrowser
Create message browser
setMessageListener
Set distinguished message listener
getMessageListener
Get distinguished message listener
getAcknowledgeMode
Get session’s acknowledgment mode
getTransacted
Is session transacted?
commit
Commit transaction
rollback
Roll back transaction
recover
Recover unacknowledged messages
close
Close session
Every session exists within the context of a particular connection. The number of sessions you can create for a single connection is limited only by system resources. As described earlier (see Using Connections), you use the connection’s createSession method to create a session:
Session
mySession = myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);The first (boolean) argument specifies whether the session is transacted; see Transacted Sessions for further discussion. The second argument is an integer constant representing the session’s acknowledgment mode, as described in the next section.
Acknowledgment Modes
A session’s acknowledgment mode determines the way your application handles the exchange of acknowledgment information when receiving messages from a broker. The JMS specification defines three possible acknowledgment modes:
- In auto-acknowledge mode, the Message Queue client runtime immediately sends a client acknowledgment for each message it delivers to the message consumer; it then blocks waiting for a return broker acknowledgment confirming that the broker has received the client acknowledgment. This acknowledgment “handshake” between client and broker is handled automatically by the client runtime, with no need for explicit action on your part.
- In client-acknowledge mode, your client application must explicitly acknowledge the receipt of all messages. This allows you to defer acknowledgment until after you have finished processing the message, ensuring that the broker will not delete it from persistent storage before processing is complete. You can either acknowledge each message individually or batch multiple messages and acknowledge them all at once; the client acknowledgment you send to the broker applies to all messages received since the previous acknowledgment. In either case, as in auto-acknowledge mode, the session thread blocks after sending the client acknowledgment, waiting for a broker acknowledgment in return to confirm that your client acknowledgment has been received.
- In dups-OK-acknowledge mode, the session automatically sends a client acknowledgment each time it has received a fixed number of messages, or when a fixed time interval has elapsed since the last acknowledgment was sent. (This fixed batch size and timeout interval are currently 10 messages and 7 seconds, respectively, and are not configurable by the client.) Unlike the first two modes described above, the broker does not acknowledge receipt of the client acknowledgment, and the session thread does not block awaiting such return acknowledgment from the broker. This means that you have no way to confirm that your acknowledgment has been received; if it is lost in transmission, the broker may redeliver the same message more than once. However, because client acknowledgments are batched and the session thread does not block, applications that can tolerate multiple delivery of the same message can achieve higher throughput in this mode than in auto-acknowledge or client-acknowledge mode.
Message Queue extends the JMS specification by adding a fourth acknowledgment mode:
- In no-acknowledge mode, your client application does not acknowledge receipt of messages, nor does the broker expect any such acknowledgment. There is thus no guarantee whatsoever that any message sent by the broker has been successfully received. This mode sacrifices all reliability for the sake of maximum throughput of message traffic.
The standard JMS Session interface, defined in package javax.jms, defines static constants for the first three acknowledgment modes (AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, and DUPS_OK_ACKNOWLEDGE), to be used as arguments to the connection’s createSession method. The constant representing the fourth mode (NO_ACKNOWLEDGE) is defined in the extended Message Queue version of the interface, in package com.sun.messaging.jms. The session method getAcknowledgeMode returns one of these constants:
int ackMode = mySession.getAcknowledgeMode();
switch (ackMode)
{
case Session.AUTO_ACKNOWLEDGE:
/* Code here to handle auto-acknowledge mode */
break;case Session.CLIENT_ACKNOWLEDGE:
/* Code here to handle client-acknowledge mode */
break;case Session.DUPS_OK_ACKNOWLEDGE:
/* Code here to handle dups-OK-acknowledge mode */
break;case com.sun.messaging.jms.Session.NO_ACKNOWLEDGE:
/* Code here to handle no-acknowledge mode */
break;
}
Note
All of the acknowledgment modes discussed above apply to message consumption. For message production, the broker’s acknowledgment behavior depends on the message’s delivery mode (persistent or nonpersistent; see Message Header). The broker acknowledges the receipt of persistent messages, but not of nonpersistent ones; this behavior is not configurable by the client.
In a transacted session (see next section), the acknowledgment mode is ignored and all acknowledgment processing is handled for you automatically by the Message Queue client runtime. In this case, the getAcknowledgeMode method returns the special constant Session.SESSION_TRANSACTED.
Transacted Sessions
Transactions allow you to group together an entire series of incoming and outgoing messages and treat them as an atomic unit. The message broker tracks the state of the transaction’s individual messages, but does not complete their delivery until you commit the transaction. In the event of failure, you can roll back the transaction, canceling all of its messages and restarting the entire series from the beginning.
Transactions always take place within the context of a single session. To use them, you must create a transacted session by passing true as the first argument to a connection’s createSession method:
Session
mySession = myConnection.createSession(true, Session.SESSION_TRANSACTED);The session’s getTransacted method tests whether it is a transacted session:
if ( mySession.getTransacted() )
{ /* Code here to handle transacted session */
}
else
{ /* Code here to handle non-transacted session */
}A transacted session always has exactly one open transaction, encompassing all messages sent or received since the session was created or the previous transaction was completed. Committing or rolling back a transaction ends that transaction and automatically begins another.
Note
Because the scope of a transaction is limited to a single session, it is not possible to combine the production and consumption of a message into a single end-to-end transaction. That is, the delivery of a message from a message producer to a destination on the broker cannot be placed in the same transaction with its subsequent delivery from the destination to a consumer.
When all messages in a transaction have been successfully delivered, you call the session’s commit method to commit the transaction:
mySession.commit();
All of the session’s incoming messages are acknowledged and all of its outgoing messages are sent. The transaction is then considered complete and a new one is started.
When a send or receive operation fails, an exception is thrown. While it is possible to handle the exception by simply ignoring it or by retrying the operation, it is recommended that you roll back the transaction, using the session’s rollback method:
mySession.rollback();
All of the session’s incoming messages are recovered and redelivered, and its outgoing messages are destroyed and must be re-sent.
Working With MessagesThis section describes how to use the Message Queue Java API to compose, send, receive, and process messages.
Message Structure
A message consists of the following parts:
The following sections discuss each of these in greater detail.
Message Header
Every message must have a header containing identifying and routing information. The header consists of a set of standard fields, which are defined in the Java Message Service Specification and summarized in Table 2-4. Some of these are set automatically by Message Queue in the course of producing and delivering a message, some depend on settings specified when a message producer sends a message, and others are set by the client on a message-by-message basis.
Table 2-4 Message Header Fields
Name
Description
JMSMessageID
Message identifier
JMSDestination
Destination to which message is sent
JMSReplyTo
Destination to which to reply
JMSCorrelationID
Link to related message
JMSDeliveryMode
Delivery mode (persistent or nonpersistent)
JMSPriority
Priority level
JMSTimestamp
Time of transmission
JMSExpiration
Expiration time
JMSType
Message type
JMSRedelivered
Has message been delivered before?
The JMS Message interface defines methods for setting the value of each header field: for instance,
outMsg.setJMSReplyTo(replyDest);
Table 2-5 lists all of the available header specification methods.
Table 2-5 Message Header Specification Methods
Name
Description
setJMSMessageID
Set message identifier
setJMSDestination
Set destination
setJMSReplyTo
Set reply destination
setJMSCorrelationID
Set correlation identifier from string
setJMSCorrelationIDAsBytes
Set correlation identifier from byte array
setJMSDeliveryMode
Set delivery mode
setJMSPriority
Set priority level
setJMSTimestamp
Set time stamp
setJMSExpiration
Set expiration time
setJMSType
Set message type
setJMSRedelivered
Set redelivered flag
The message identifier (JMSMessageID) is a string value uniquely identifying the message, assigned and set by the message broker when the message is sent. Because generating an identifier for each message adds to both the size of the message and the overhead involved in sending it, and because some client applications may not use them, the JMS interface provides a way to suppress the generation of message identifiers, using the message producer method setDisableMessageID (see Sending Messages).
The JMSDestination header field holds a Destination object representing the destination to which the message is directed, set by the message broker when the message is sent. There is also a JMSReplyTo field that you can set to specify a destination to which reply messages should be directed. Clients sending such a reply message can set its JMSCorrelationID header field to refer to the message to which they are replying. Typically this field is set to the message identifier string of the message being replied to, but client applications are free to substitute their own correlation conventions instead, using either the setJMSCorrelationID method (if the field value is a string) or the more general setJMSCorrelationIDAsBytes (if it is not).
The delivery mode (JMSDeliveryMode) specifies whether the message broker should log the message to stable storage. There are two possible values, PERSISTENT and NON_PERSISTENT, both defined as static constants of the JMS interface DeliveryMode: for example,
outMsg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
The default delivery mode is PERSISTENT, represented by the static constant Message.DEFAULT_DELIVERY_MODE.
The choice of delivery mode represents a tradeoff between performance and reliability:
- In persistent mode, the broker logs the message to stable storage, ensuring that it will not be lost in transit in the event of transmission failure; the message is guaranteed to be delivered exactly once.
- In nonpersistent mode, the message is not logged to stable storage; it will be delivered at most once, but may be lost in case of failure and not delivered at all. This mode does, however, improve performance by reducing the broker’s message-handling overhead. It may thus be appropriate for applications in which performance is at a premium and reliability is not.
The message’s priority level (JMSPriority) is expressed as an integer from 0 (lowest) to 9 (highest). Priorities from 0 to 4 are considered gradations of normal priority, those from 5 to 9 of expedited priority. The default priority level is 4, represented by the static constant Message.DEFAULT_PRIORITY.
The JMSTimestamp header field is set by the Message Queue client runtime to the time it delivers the message to the broker, expressed as a long integer in standard Java format (milliseconds since midnight, January 1, 1970 UTC). The message’s lifetime, specified when the message is sent, is added to this value and the result is stored in the JMSExpiration header field. (The default lifetime value of 0, represented by the static constant Message.DEFAULT_TIME_TO_LIVE, denotes an unlimited lifetime. In this case, the expiration time is also set to 0 to indicate that the message never expires.) As with the message identifier, client applications that do not use a message’s time stamp can improve performance by suppressing its generation with the message producer method setDisableMessageTimestamp (see Sending Messages).
The header field JMSType can contain an optional message type identifier string supplied by the client when the message is sent. This field is intended for use with other JMS providers; Message Queue clients can simply ignore it.
When a message already delivered must be delivered again because of a failure, the broker indicates this by setting the JMSRedelivered flag in the message header to true. This can happen, for instance, when a session is recovered or a transaction is rolled back. The receiving client can check this flag to avoid duplicate processing of the same message (such as when the message has already been successfully received but the client’s acknowledgment was missed by the broker).
See the Java Message Service Specification for a more detailed discussion of all message header fields.
Message Properties
A message property consists of a name string and an associated value, which must be either a string or one of the standard Java primitive data types (int, byte, short, long, float, double, or boolean). The Message interface provides methods for setting properties of each type (see Table 2-6). There is also a setObjectProperty method that accepts a primitive value in objectified form, as a Java object of class Integer, Byte, Short, Long, Float, Double, Boolean, or String. The clearProperties method deletes all properties associated with a message; the message header and body are not affected.
Table 2-6 Message Property Specification Methods
Name
Description
setIntProperty
Set integer property
setByteProperty
Set byte property
setShortProperty
Set short integer property
setLongProperty
Set long integer property
setFloatProperty
Set floating-point property
setDoubleProperty
Set double-precision property
setBooleanProperty
Set boolean property
setStringProperty
Set string property
setObjectProperty
Set property from object
clearProperties
Clear properties
The JMS specification defines certain standard properties, listed in Table 2-7. By convention, the names of all such standard properties begin with the letters JMSX; names of this form are reserved and must not be used by a client application for its own custom message properties. Similarly, property names beginning with JMS_SUN are reserved for provider-specific properties defined by Message Queue itself; these are discussed in Chapter 3, "Message Queue Clients: Design and Features."
Table 2-7 Standard JMS Message Properties
Name
Description
JMSXUserID
Identity of user sending message
JMSXAppID
Identity of application sending message
JMSXDeliveryCount
Number of delivery attempts
JMSXGroupID
Identity of message group to which this message belongs
JMSXGroupSeq
Sequence number within message group
JMSXProducerTXID
Identifier of transaction within which message was produced
JMSXConsumerTXID
Identifier of transaction within which message was consumed
JMSXRcvTimestamp
Time message delivered to consumer
JMSXState
Message state (waiting, ready, expired, or retained)
Message Body
The actual content of a message is contained in the message body. JMS defines six classes (or types) of message, each with a different body format:
- A text message (interface TextMessage) contains a Java string.
- A stream message (interface StreamMessage) contains a stream of Java primitive values, written and read sequentially.
- A map message (interface MapMessage) contains a set of name-value pairs, where each name is a string and each value is a Java primitive value. The order of the entries is undefined; they can be accessed randomly by name or enumerated sequentially.
- An object message (interface ObjectMessage) contains a serialized Java object (which may in turn be a collection of other objects).
- A bytes message (interface BytesMessage) contains a stream of uninterpreted bytes.
- A null message (interface Message) consists of a header only, with no message body.
Each of these is a subinterface of the generic Message interface, extended with additional methods specific to the particular message type.
Composing Messages
The JMS Session interface provides methods for creating each type of message, as shown in Table 2-8. For instance, you can create a text message with a statement such as
TextMessage outMsg = mySession.createTextMessage();
In general, these methods create a message with an empty body; the interfaces for specific message types then provide additional methods for filling the body with content, as described in the sections that follow.
Table 2-8 Session Methods for Message Creation
Name
Description
createMessage
Create null message
createTextMessage
Create text message
createStreamMessage
Create stream message
createMapMessage
Create map message
createObjectMessage
Create object message
createBytesMessage
Create bytes message
Note
Some of the message-creation methods have an overloaded form that allows you to initialize the message body directly at creation: for example,
TextMessage
outMsg = mySession.createTextMessage("Hello, World!");These exceptions are pointed out in the relevant sections below.
Once a message has been delivered to a message consumer, its body is considered read-only; any attempt by the consumer to modify the message body will cause an exception (MessageNotWriteableException) to be thrown. The consumer can, however, empty the message body and place it in a writeable state by calling the message method clearBody:
outMsg.clearBody();
This places the message in the same state as if it had been newly created, ready to fill its body with new content.
Composing Text Messages
You create a text message with the session method createTextMessage. You can either initialize the message body directly at creation time
TextMessage outMsg = mySession.createTextMessage("Hello, World!");
or simply create an empty message and then use its setText method (Table 2-9) to set its content:
TextMessage outMsg = mySession.createTextMessage();
outMsg.setText("Hello, World!");
Composing Stream Messages
The session method createStreamMessage returns a new, empty stream message. You can then use the methods shown in Table 2-10 to write primitive data values into the message body, similarly to writing to a data stream: for example,
StreamMessage outMsg = mySession.createStreamMessage();
outMsg.writeString("The Meaning of Life");
outMsg.writeInt(42);
Table 2-10 Stream Message Composition Methods
Name
Description
writeInt
Write integer to message stream
writeByte
Write byte value to message stream
writeBytes
Write byte array to message stream
writeShort
Write short integer to message stream
writeLong
Write long integer to message stream
writeFloat
Write floating-point value to message stream
writeDouble
Write double-precision value to message stream
writeBoolean
Write boolean value to message stream
writeChar
Write character to message stream
writeString
Write string to message stream
writeObject
Write value of object to message stream
reset
Reset message stream
As a convenience for handling values whose types are not known until execution time, the writeObject method accepts a string or an objectified primitive value of class Integer, Byte, Short, Long, Float, Double, Boolean, or Character and writes the corresponding string or primitive value to the message stream: for example, the statements
Integer meaningOfLife = new Integer(42);
outMsg.writeObject(meaningOfLife);are equivalent to
outMsg.writeInt(42);
This method will throw an exception (MessageFormatException) if the argument given to it is not of class String or one of the objectified primitive classes.
Once you’ve written the entire message contents to the stream, the reset method
outMsg.reset();
puts the message body in read-only mode and repositions the stream to the beginning, ready to read (see Processing Messages). When the message is in this state, any attempt to write to the message stream will throw the exception MessageNotWriteableException. A call to the clearBody method (inherited from the superinterface Message) deletes the entire message body and makes it writeable again.
Composing Map Messages
Table 2-11 shows the methods available in the MapMessage interface for adding content to the body of a map message. Each of these methods takes two arguments, a name string and a primitive or string value of the appropriate type, and adds the corresponding name-value pair to the message body: for example,
StreamMessage outMsg = mySession.createMapMessage();
outMsg.setInt("The Meaning of Life", 42);
Table 2-11 Map Message Composition Methods
Name
Description
setInt
Store integer in message map by name
setByte
Store byte value in message map by name
setBytes
Store byte array in message map by name
setShort
Store short integer in message map by name
setLong
Store long integer in message map by name
setFloat
Store floating-point value in message map by name
setDouble
Store double-precision value in message map by name
setBoolean
Store boolean value in message map by name
setChar
Store character in message map by name
setString
Store string in message map by name
setObject
Store object in message map by name
Like stream messages, map messages provide a convenience method (setObject) for dealing with values whose type is determined dynamically at execution time: for example, the statements
Integer meaningOfLife = new Integer(42);
outMsg.setObject("The Meaning of Life", meaningOfLife);are equivalent to
outMsg.setInt("The Meaning of Life", 42);
The object supplied must be either a string object (class String) or an objectified primitive value of class Integer, Byte, Short, Long, Float, Double, Boolean, or Character; otherwise an exception (MessageFormatException) will be thrown.
Composing Object Messages
The ObjectMessage interface provides just one method, setObject (Table 2-12), for setting the body of an object message:
ObjectMessage outMsg = mySession.createObjectMessage();
outMsg.setObject(bodyObject);The argument to this method can be any serializable object (that is, an instance of any class that implements the standard Java interface Serializable). If the object is not serializable, the exception MessageFormatException will be thrown.
Table 2-12 Object Message Composition Method
Name
Description
setObject
Serialize object to message body
As an alternative, you can initialize the message body directly when you create the message, by passing an object to the session method createObjectMessage:
ObjectMessage outMsg = mySession.createObjectMessage(bodyObject);
Again, an exception will be thrown if the object is not serializable.
Composing Bytes Messages
The body of a bytes message simply consists of a stream of uninterpreted bytes; its interpretation is entirely a matter of conventional agreement between sender and receiver. This type of message is intended primarily for encoding message formats required by other existing message systems; Message Queue clients should generally use one of the other, more specific message types instead.
Composing a bytes message is similar to composing a stream message (see Composing Stream Messages). You create the message with the session method createBytesMessage, then use the methods shown in Table 2-13 to encode primitive values into the message’s byte stream: for example,
BytesMessage outMsg = mySession.createBytesMessage();
outMsg.writeUTF("The Meaning of Life");
outMsg.writeInt(42);
Table 2-13 Bytes Message Composition Methods
Name
Description
writeInt
Write integer to message stream
writeByte
Write byte value to message stream
writeBytes
Write byte array to message stream
writeShort
Write short integer to message stream
writeLong
Write long integer to message stream
writeFloat
Write floating-point value to message stream
writeDouble
Write double-precision value to message stream
writeBoolean
Write boolean value to message stream
writeChar
Write character to message stream
writeUTF
Write UTF-8 string to message stream
writeObject
Write value of object to message stream
reset
Reset message stream
As with stream and map messages, you can use the generic object-based method writeObject to handle values whose type is unknown at compilation time: for example, the statements
Integer meaningOfLife = new Integer(42);
outMsg.writeObject(meaningOfLife);are equivalent to
outMsg.writeInt(42);
The message’s reset method
outMsg.reset();
puts the message body in read-only mode and repositions the byte stream to the beginning, ready to read (see Processing Messages). Attempting to write further content to a message in this state will cause an exception (MessageNotWriteableException). The inherited Message method clearBody can be used to delete the entire message body and make it writeable again.
Sending Messages
In order to send messages to a message broker, you must create a message producer using the session method createProducer:
MessageProducer myProducer = mySession.createProducer(myDest);
The scope of the message producer is limited to the session that created it and the connection to which that session belongs. Table 2-14 shows the methods defined in the MessageProducer interface.
Table 2-14 Message Producer Methods
Name
Description
getDestination
Get default destination
setDeliveryMode
Set default delivery mode
getDeliveryMode
Get default delivery mode
setPriority
Set default priority level
getPriority
Get default priority level
setTimeToLive
Set default message lifetime
getTimeToLive
Get default message lifetime
setDisableMessageID
Set message identifier disable flag
getDisableMessageID
Get message identifier disable flag
setDisableMessageTimestamp
Set time stamp disable flag
getDisableMessageTimestamp
Get time stamp disable flag
send
Send message
close
Close message producer
The createProducer method takes a destination as an argument, which may be either a (point-to-point) queue or a (publish/subscribe) topic. The producer will then send all of its messages to the specified destination. If the destination is a queue, the producer is called a sender for that queue; if it is a topic, the producer is a publisher to that topic. The message producer’s getDestination method returns this destination.
You also have the option of leaving the destination unspecified when you create a producer
MessageProducer myProducer = mySession.createProducer(null);
in which case you must specify an explicit destination for each message. This option is typically used for producers that must send messages to a variety of destinations, such as those designated in the JMSReplyTo header fields of incoming messages (see Message Header).
Note
The generic MessageProducer interface also has specialized subinterfaces, QueueSender and TopicPublisher, for sending messages specifically to a point-to-point queue or a publish/subscribe topic. These types of producer are created by the createSender and createPublisher methods of the specialized session subinterfaces QueueSession and TopicSession, respectively. However, it is generally more convenient (and recommended) to use the generic form of message producer described here, which can handle both types of destination indiscriminately.
A producer has a default delivery mode (persistent or nonpersistent), priority level, and message lifetime, which it will apply to all messages it sends unless explicitly overridden for an individual message. You can set these properties with the message producer methods setDeliveryMode, setPriority, and setTimeToLive, and retrieve them with getDeliveryMode, getPriority, and getTimeToLive. If you don’t set them explicitly, they default to persistent delivery, priority level 4, and a lifetime value of 0, denoting an unlimited message lifetime.
The heart of the message producer interface is the send method, which is available in a variety of overloaded forms. The simplest of these just takes a message as its only argument:
myProducer.send(outMsg);
This simply sends the specified message to the producer’s default destination, using the producer’s default delivery mode, priority, and message lifetime. Alternatively, you can explicitly specify the destination
myProducer.send(myDest, outMsg);
or the delivery mode, priority, and lifetime in milliseconds
myProducer.send(outMsg, DeliveryMode.NON_PERSISTENT, 9, 1000);
or all of these at once:
myProducer.send(myDest, outMsg, DeliveryMode.NON_PERSISTENT, 9, 1000);
Recall that if you did not specify a destination when creating the message producer, you must provide an explicit destination for each message you send.
As discussed earlier under Message Header, client applications that have no need for the message identifier and time stamp fields in the message header can gain some performance improvement by suppressing the generation of these fields, using the message producer’s setDisableMessageID and setdisableMessageTimestamp methods. Note that a true value for either of these flags disables the generation of the corresponding header field, while a false value enables it. Both flags are set to false by default, meaning that the broker will generate the values of these header fields unless explicitly instructed otherwise.
When you are finished using a message producer, you should call its close method
myProducer.close();
allowing the broker and client runtime to release any resources they may have allocated on the producer’s behalf.
Receiving Messages
Messages are received by a message consumer, within the context of a connection and a session. Once you have created a consumer, you can use it to receive messages in either of two ways:
- In synchronous message consumption, you explicitly request the delivery of messages when you are ready to receive them.
- In asynchronous message consumption, you register a message listener for the consumer. The Message Queue client runtime then calls the listener whenever it has a message to deliver.
These two forms of message consumption are described in the sections Receiving Messages Synchronously and Receiving Messages Asynchronously.
Creating Message Consumers
The session method createConsumer creates a generic consumer that can be used to receive messages from either a (point-to-point) queue or a (publish/subscribe) topic:
MessageConsumer myConsumer = mySession.createConsumer(myDest);
If the destination is a queue, the consumer is called a receiver for that queue; if it is a topic, the consumer is a subscriber to that topic.
Note
The generic MessageConsumer interface also has specialized subinterfaces, QueueReceiver and TopicSubscriber, for receiving messages specifically from a point-to-point queue or a publish/subscribe topic. These types of consumer are created by the createReceiver and createSubscriber methods of the specialized session subinterfaces QueueSession and TopicSession, respectively. However, it is generally more convenient (and recommended) to use the generic form of message consumer described here, which can handle both types of destination indiscriminately.
A subscriber created for a topic destination with the createConsumer method is always nondurable, meaning that it will receive only messages that are sent (published) to the topic while the subscriber is active. If you want the broker to retain messages published to a topic while no subscriber is active and deliver them when one becomes active again, you must instead create a durable subscriber, as described in Durable Subscribers.
Table 2-15 shows the methods defined in the MessageConsumer interface, which are discussed in detail in the relevant sections below.
Table 2-15 Message Consumer Methods
Name
Description
getMessageSelector
Get message selector
receive
Receive message synchronously
receiveNoWait
Receive message synchronously without blocking
setMessageListener
Set message listener for asynchronous reception
getMessageListener
Get message listener for asynchronous reception
close
Close message consumer
Message Selectors
If appropriate, you can restrict the messages a consumer will receive from its destination by supplying a message selector as an argument when you create the consumer:
String mySelector = "/* Text of selector here */";
MessageConsumer myConsumer = mySession.createConsumer(myDest, mySelector);The selector is a string whose syntax is based on a subset of the SQL92 conditional expression syntax, which allows you to filter the messages you receive based on the values of their properties (see Message Properties). See the Java Message Service Specification for a complete description of this syntax. The message consumer’s getMessageSelector method returns the consumer’s selector string (or null if no selector was specified when the consumer was created):
String mySelector = myConsumer.getMessageSelector();
Note
Messages whose properties do not satisfy the consumer’s selector will be retained undelivered by the destination until they are retrieved by another message consumer. The use of message selectors can thus cause messages to be delivered out of sequence from the order in which they were originally produced. Only a message consumer without a selector is guaranteed to receive messages in their original order.
In some cases, the same connection may both publish and subscribe to the same topic destination. The createConsumer method accepts an optional boolean argument that suppresses the delivery of messages published by the consumer’s own connection:
String mySelector = "/* Text of selector here */";
MessageConsumer
myConsumer = mySession.createConsumer(myDest, mySelector, true);The resulting consumer will receive only messages published by a different connection.
Durable Subscribers
To receive messages delivered to a publish/subscribe topic while no message consumer is active, you must ask the message broker to create a durable subscriber for that topic. All sessions that create such subscribers for a given topic must have the same client identifier (see Using Connections). When you create a durable subscriber, you supply a subscriber name that must be unique for that client identifier:
MessageConsumer
myConsumer = mySession.createDurableSubscriber(myDest, "mySub");(The object returned by the createDurableSubscriber method is actually typed as TopicSubscriber, but since that is a subinterface of MessageConsumer, you can safely assign it to a MessageConsumer variable. Note, however, that the destination myDest must be a publish/subscribe topic and not a point-to-point queue.)
You can think of a durable subscriber as a “virtual message consumer” for the specified topic, identified by the unique combination of a client identifier and subscriber name. When a message arrives for the topic and no message consumer is currently active for it, the message will be retained for later delivery. Whenever you create a consumer with the given client identifier and subscriber name, it will be considered to represent this same durable subscriber and will receive all of the accumulated messages that have arrived for the topic in the subscriber’s absence. Each message is retained until it is delivered to (and acknowledged by) such a consumer or until it expires.
Note
Only one session at a time can have an active consumer for a given durable subscription. If another such consumer already exists, the createDurableSubscriber method will throw an exception.
Like the createConsumer method described in the preceding section (which creates nondurable subscribers), createDurableSubscriber can accept an optional message selector string and a boolean argument telling whether to suppress the delivery of messages published by the subscriber’s own connection:
String mySelector = "/* Text of selector here */";
MessageConsumer myConsumer
= mySession.createDurableSubscriber(myDest, "mySub",
mySelector, true);You can change the terms of a durable subscription by creating a new subscriber with the same client identifier and subscription name but with a different topic, selector, or both. The effect is as if the old subscription were destroyed and a new one created with the same name. When you no longer need a durable subscription, you can destroy it with the session method unsubscribe:
mySession.unsubscribe("mySub");
Receiving Messages Synchronously
Once you have created a message consumer for a session, using either the createConsumer or createDurableSubscriber method, you must start the session’s connection to begin the flow of incoming messages:
myConnection.start();
(Note that it is not necessary to start a connection in order to produce messages, only to consume them.) You can then use the consumer’s receive method to receive messages synchronously from the message broker:
Message inMsg = myConsumer.receive();
This returns the next available message for this consumer. If no message is immediately available, the receive method blocks until one arrives. You can also provide a timeout interval in milliseconds:
Message inMsg = myConsumer.receive(1000);
In this case, if no message arrives before the specified timeout interval (1 second in the example) expires, the method will return with a null result. An alternative method, receiveNoWait, returns a null result immediately if no message is currently available:
Message inMsg = myConsumer.receiveNoWait();
Receiving Messages Asynchronously
If you want your message consumer to receive incoming messages asynchronously, you must create a message listener to process the messages. This is a Java object that implements the JMS MessageListener interface. The procedure is as follows:
To Set Up a Message Queue Java Client to Receive Messages Asynchronously
- Define a message listener class implementing the MessageListener interface.
The interface consists of the single method onMessage, which accepts a message as a parameter and processes it in whatever way is appropriate for your application:
public class MyMessageListener implements MessageListener
{
public void onMessage (Message inMsg)
{
/* Code here to process message */
}
}- Create a message consumer.
You can use either the createConsumer or createDurableSubscriber method of the session in which the consumer will operate: for instance,
MessageConsumer myConsumer = mySession.createConsumer(myDest);
- Create an instance of your message listener class.
MyMessageListener myListener = new MyMessageListener();
- Associate the message listener with your message consumer.
The message consumer method setMessageListener accepts a message listener object and associates it with the given consumer:
myConsumer.setMessageListener(myListener);
- Start the connection to which this consumer’s session belongs.
The connection’s start method begins the flow of messages from the message broker to your message consumer:
myConnection.start();
Once the connection is started, the Message Queue client runtime will call your message listener’s onMessage method each time it has a message to deliver to this consumer.
To ensure that no messages are lost before your consumer is ready to receive them, it is important not to start the connection until after you have created the message listener and associated it with the consumer. If the connection is already started, you should stop it before creating an asynchronous consumer, then start it again when the consumer is ready to begin processing.
Setting a consumer’s message listener to null removes any message listener previously associated with it:
myConsumer.setMessageListener(null);
The consumer’s getMessageListener method returns its current message listener (or null if there is none):
MyMessageListener myListener = myConsumer.getMessageListener();
Acknowledging Messages
If you have specified client-acknowledge as your session’s acknowledgment mode (see Acknowledgment Modes), it is your client application’s responsibility to explicitly acknowledge each message it receives. If you have received the message synchronously, via a message consumer’s receive (or receiveNoWait) method, you should process the message first and then acknowledge it; if you have received it asynchronously, your message listener’s onMessage method should acknowledge the message after processing it. This ensures that the message broker will not delete the message from persistent storage until processing is complete.
Note
In a transacted session (see Transacted Sessions), there is no need to acknowledge a message explicitly: the session’s acknowledgment mode is ignored and all acknowledgment processing is handled for you automatically by the Message Queue client runtime. In this case, the session’s getAcknowledgeMode method will return the special constant Session.SESSION_TRANSACTED.
Table 2-16 shows the methods available for acknowledging a message. The most general is acknowledge, defined in the standard JMS interface javax.jms.Message:
inMsg.acknowledge();
This acknowledges all unacknowledged messages consumed by the session up to the time of call. You can use this method to acknowledge each message individually as you receive it, or you can group several messages together and acknowledge them all at once by calling acknowledge on the last one in the group.
Table 2-16 Message Acknowledgment Methods
Function
Description
acknowledge
Acknowledge all unacknowledged messages for session
acknowledgeThisMessage
Acknowledge this message only
acknowledgeUpThroughThisMessage
Acknowledge all unacknowledged messages through this one
The Message Queue version of the Message interface, defined in the package com.sun.messaging.jms, adds two more methods that provide more flexible control over which messages you acknowledge. The acknowledgeThisMessage method just acknowledges the single message for which it is called, rather than all messages consumed by the session; acknowledgeUpThroughThisMessage acknowledges the message for which it is called and all previous messages; messages received after that message remain unacknowledged.
Browsing Messages
If the destination from which you are consuming messages is a point-to-point queue, you can use a queue browser to examine the messages in the queue without consuming them. The session method createBrowser creates a browser for a specified queue:
QueueBrowser myBrowser = mySession.createBrowser(myDest);
The method will throw an exception (InvalidDestinationException) if you try to pass it a topic destination instead of a queue. You can also supply a selector string as an optional second argument:
String mySelector = "/* Text of selector here */";
QueueBrowser myBrowser = mySession.createBrowser(myDest, mySelector);Table 2-17 shows the methods defined in the QueueBrowser interface. The getQueue and getMessageSelector methods return the browser’s queue and selector string, respectively.
Table 2-17 Queue Browser Methods
Name
Description
getQueue
Get queue from which this browser reads
getMessageSelector
Get message selector
getEnumeration
Get enumeration of all messages in the queue
close
Close browser
The most important queue browser method is getEnumeration, which returns a Java enumeration object that you can use to iterate through the messages in the queue, as shown in Code Example 2-4.
Code Example 2-4 Browsing a Queue
Enumeration queueMessages = myBrowser.getEnumeration();
Message eachMessage;
while ( queueMessages.hasMoreElements() )
{ eachMessage = queueMessages.nextElement();
/* Do something with the message */
}
The browser’s close method closes it when you’re through with it:
myBrowser.close();
Closing a Consumer
As a matter of good programming practice, you should close a message consumer when you have no further need for it. Closing a session or connection automatically closes all consumers associated with it; to close a consumer without closing the session or connection to which it belongs, you can use its close method:
myConsumer.close();
For a consumer that is a nondurable topic subscriber, this terminates the flow of messages to the consumer. However, if the consumer is a queue receiver or a durable topic subscriber, messages will continue to be accumulated for the destination and will be delivered the next time a consumer for that destination becomes active. To terminate a durable subscription permanently, call its session’s unsubscribe method with the subscriber name as an argument:
mySession.unsubscribe("mySub");
Processing Messages
Processing a message after you have received it may entail examining its header fields, properties, and body. The following sections describe how this is done.
Retrieving Message Header Fields
The standard JMS message header fields are described under Message Header. Table 2-18 shows the methods provided by the JMS Message interface for retrieving the values of these fields: for instance, you can obtain a message’s reply destination with the statement
Destination replyDest = inMsg.getJMSReplyTo();
Table 2-18 Message Header Retrieval Methods
Name
Description
getJMSMessageID
Get message identifier
getJMSDestination
Get destination
getJMSReplyTo
Get reply destination
getJMSCorrelationID
Get correlation identifier as string
getJMSCorrelationIDAsBytes
Get correlation identifier as byte array
getJMSDeliveryMode
Get delivery mode
getJMSPriority
Get priority level
getJMSTimestamp
Get time stamp
getJMSExpiration
Get expiration time
getJMSType
Get message type
getJMSRedelivered
Get redelivered flag
Retrieving Message Properties
Table 2-19 lists the methods defined in the JMS Message interface for retrieving the values of a message’s properties (see Message Properties). There is a retrieval method for each of the possible primitive types that a property value can assume: for instance, you can obtain a message’s time stamp with the statement
long timeStamp = inMsg.getLongProperty("JMSXRcvTimestamp");
Table 2-19 Message Property Retrieval Methods
Name
Description
getIntProperty
Get integer property
getByteProperty
Get byte property
getShortProperty
Get short integer property
getLongProperty
Get long integer property
getFloatProperty
Get floating-point property
getDoubleProperty
Get double-precision property
getBooleanProperty
Get boolean property
getStringProperty
Get string property
getObjectProperty
Get property as object
getPropertyNames
Get property names
propertyExists
Does property exist?
There is also a generic getObjectProperty method that returns a property value in objectified form, as a Java object of class Integer, Byte, Short, Long, Float, Double, Boolean, or String. For example, another way to obtain a message’s time stamp, equivalent to that shown above, would be
Long timeStampObject = (Long) inMsg.getObjectProperty("JMSXRcvTimestamp");
long timeStamp = timeStampObject.longValue();If the message has no property with the requested name, getObjectProperty will return null; the message method propertyExists tests whether this is the case.
The getPropertyNames method returns a Java enumeration object for iterating through all of the property names associated with a given message; you can then use the retrieval methods shown in the table to retrieve each of the properties by name, as shown in Code Example 2-5.
Code Example 2-5 Enumerating Message Properties
Enumeration propNames = inMsg.getPropertyNames();
String eachName;
Object eachValue;
while ( propNames.hasMoreElements() )
{ eachName = propNames.nextElement();
eachValue = inMsg.getObjectProperty(eachName);
/* Do something with the value */
}
Processing the Message Body
The methods for retrieving the contents of a message’s body essentially parallel those for composing the body, as described earlier under Composing Messages. The following sections describe these methods for each of the possible message types (text, stream, map, object, and bytes).
Processing Text Messages
The text message method getText (Table 2-20) retrieves the contents of a text message’s body in the form of a string:
String textBody = inMsg.getText();
Processing Stream Messages
Reading the contents of a stream message is similar to reading from a data stream, using the access methods shown in Table 2-21: for example, the statement
int intVal = inMsg.readInt();
retrieves an integer value from the message stream.
Table 2-21 Stream Message Access Methods
Name
Description
readInt
Read integer from message stream
readByte
Read byte value from message stream
readBytes
Read byte array from message stream
readShort
Read short integer from message stream
readLong
Read long integer from message stream
readFloat
Read floating-point value from message stream
readDouble
Read double-precision value from message stream
readBoolean
Read boolean value from message stream
readChar
Read character from message stream
readString
Read string from message stream
readObject
Read value from message stream as object
The readObject method returns the next value from the message stream in objectified form, as a Java object of the class corresponding to the value’s primitive data type: for instance, if the value is of type int, readObject returns an object of class Integer. The following statements are equivalent to the one shown above:
Integer intObject = (Integer) inMsg.readObject();
int intVal = intObject.intValue();Processing Map Messages
The MapMessage interface provides the methods shown in Table 2-22 for reading the body of a map message. Each access method takes a name string as an argument and returns the value to which that name is mapped: for instance, under the example shown in Composing Map Messages, the statement
int meaningOfLife = inMsg.getInt("The Meaning of Life");
would set the variable meaningOfLife to the value 42.
Table 2-22 Map Message Access Methods
Name
Description
getInt
Get integer from message map by name
getByte
Get byte value from message map by name
getBytes
Get byte array from message map by name
getShort
Get short integer from message map by name
getLong
Get long integer from message map by name
getFloat
Get floating-point value from message map by name
getDouble
Get double-precision value from message map by name
getBoolean
Get boolean value from message map by name
getChar
Get character from message map by name
getString
Get string from message map by name
getObject
Get object from message map by name
itemExists
Does map contain an item with specified name?
getMapNames
Get enumeration of all names in map
Like stream messages, map messages provide an access method, getObject, that returns a value from the map in objectified form, as a Java object of the class corresponding to the value’s primitive data type: for instance, if the value is of type int, getObject returns an object of class Integer. The following statements are equivalent to the one shown above:
Integer meaningObject = (Integer) inMsg.getObject("The Meaning of Life");
int meaningOfLife = meaningObject.intValue();The itemExists method returns a boolean value indicating whether the message map contains an association for a given name string:
if ( inMsg.itemExists("The Meaning of Life") )
{ /* Life is meaningful */
}
else
{ /* Life is meaningless */
}The getMapNames method returns a Java enumeration object for iterating through all of the names defined in the map; you can then use getObject to retrieve the corresponding values, as shown in Code Example 2-6.
Code Example 2-6 Enumerating Map Message Values
Enumeration mapNames = inMsg.getMapNames();
String eachName;
Object eachValue;
while ( mapNames.hasMoreElements() )
{ eachName = mapNames.nextElement();
eachValue = inMsg.getObject(eachName);
/* Do something with the value */
}
Processing Object Messages
The ObjectMessage interface provides just one method, getObject (Table 2-23), for retrieving the serialized object that is the body of an object message:
Object messageBody = inMsg.getObject();
You can then typecast the result to a more specific class and process it in whatever way is appropriate.
Table 2-23 Object Message Access Method
Name
Description
getObject
Get serialized object from message body
Processing Bytes Messages
The body of a bytes message simply consists of a stream of uninterpreted bytes; its interpretation is entirely a matter of conventional agreement between sender and receiver. This type of message is intended primarily for decoding message formats used by other existing message systems; Message Queue clients should generally use one of the other, more specific message types instead.
Reading the body of a bytes message is similar to reading a stream message (see Processing Stream Messages): you use the methods shown in Table 2-24 to decode primitive values from the message’s byte stream. For example, the statement
int intVal = inMsg.readInt();
retrieves an integer value from the byte stream. The getBodyLength method returns the length of the entire message body in bytes:
int bodyLength = inMsg.getBodyLength();
Table 2-24 Bytes Message Access Methods
Name
Description
getBodyLength
Get length of message body in bytes
readInt
Read integer from message stream
readByte
Read signed byte value from message stream
readUnsignedByte
Read unsigned byte value from message stream
readBytes
Read byte array from message stream
readShort
Read signed short integer from message stream
readUnsignedShort
Read unsigned short integer from message stream
readLong
Read long integer from message stream
readFloat
Read floating-point value from message stream
readDouble
Read double-precision value from message stream
readBoolean
Read boolean value from message stream
readChar
Read character from message stream
readUTF
Read UTF-8 string from message stream