Sun Java System Message Queue 3.7 UR1 Developer's Guide for Java Clients

Chapter 3 Message Queue Clients: Design and Features

This chapter addresses architectural and configuration issues that depend upon Message Queue’s implementation of the Java Message Specification. It covers the following topics:

Client Design Considerations

The choices you make in designing a JMS client affect portability, allocating work between connections and sessions, reliability and performance, resource use, and ease of administration. This section discusses basic issues that you need to address in client design. It covers the following topics:

Developing Portable Clients

The Java Messaging Specification was developed to abstract access to message-oriented middleware systems (MOMs). A client that writes JMS code should be portable to any provider that implements this specification. If code portability is important to you, be sure that you do the following in developing clients:

Choosing Messaging Domains

As described in Chapter 2, Client Programming Model, in Sun Java System Message Queue 3.7 UR1 Technical Overview, JMS supports two distinct message delivery models: point-to-point and publish/subscribe. These two message delivery models can be handled using different API objects—with slightly different semantics—representing different programming domains, as shown in Table 3–1, or they can be handled by base (unified domain) types.

Table 3–1 JMS Programming Objects

Unified Domain 

Point-to-Point Domain 

Publish/Subscribe Domain 

Destination (Queue or Topic) 

Queue 

Topic 

ConnectionFactory 

QueueConnectionFactory 

TopicConnectionFactory 

Connection 

QueueConnection 

TopicConnection 

Session 

QueueSession 

TopicSession 

MessageProducer 

QueueSender 

TopicPublisher 

MessageConsumer 

QueueReceiver 

TopicSubscriber 

Using the point-to-point or publish/subscribe domains offers the advantage of a clean API that prevents certain types of programming errors; for example, creating a durable subscriber for a queue destination. However, the non-unified domains have the disadvantage that you cannot combine point-to-point and publish/subscribe operations in the same transaction or in the same session. If you need to do that, you should choose the unified domain API.

The JMS 1.1 specification continues to support the separate JMS 1.02 programming domains. (The example applications included with the Message Queue product as well as the code examples provided in this book all use the separate JMS 1.02 programming domains.) You can choose the API that best suits your needs. The only exception are those developers needing to write clients for the Sun Java System Application Server 7 environment, as explained in the following note.


Note –

Developers of applications that run in the Sun Java System Application Server 7 environment are limited to using the JMS 1.0.2 API. This is because Sun Java System Application Server 7 complies with the J2EE 1.3 specification, which supports only JMS 1.0.2. Any JMS messaging performed in servlets and EJBs—including message-driven beans must be based on the domain-specific JMS APIs and cannot use the JMS 1.1 unified domain APIs. Developers of J2EE applications that will run in J2EE 1.4-compliant servers can, however, use the simpler JMS 1.1 APIs.


Connections and Sessions

A connection is a relatively heavy-weight object because of the authentication and communication setup that must be done when a connection is created. For this reason, it’s a good idea to use as few connections as possible. The real allocation of work occurs in sessions, which are light-weight, single-threaded contexts for producing and consuming messages. When you are thinking about structuring your client, it is best to think of the work that is done at the session level.

A session

The requirement that sessions be operated on by a single thread at a time places some restrictions on the combination of producers and consumers that can use the same session. In particular, if a session has an asynchronous consumer, it may not have any other synchronous consumers. For a discussion of the connection and session’s use of threads, see Managing Client Threads. With the exception of these restrictions, let the needs of your application determine the number of sessions, producers, and consumers.

Producers and Consumers

Aside from the reliability your client requires, the design decisions that relate to producers and consumers include the following:

Assigning Client Identifiers

A connection can have a client identifier. This identifier is used to associate a JMS client’s connection to a message service, with state information maintained by the message service for that client. The JMS provider must ensure that a client identifier is unique, and applies to only one connection at a time. Currently, client identifiers are used to maintain state for durable subscribers. In defining a client identifier, you can use a special variable substitution syntax that allows multiple connections to be created from a single ConnectionFactory object using different user name parameters to generate unique client identifiers. These connections can be used by multiple durable subscribers without naming conflicts or lack of security.

Message Queue allows client identifiers to be set in one of two ways:

Message Order and Priority

In general, all messages sent to a destination by a single session are guaranteed to be delivered to a consumer in the order they were sent. However, if they are assigned different priorities, a messaging system will attempt to deliver higher priority messages first.

Beyond this, the ordering of messages consumed by a client can have only a rough relationship to the order in which they were produced. This is because the delivery of messages to a number of destinations and the delivery from those destinations can depend on a number of issues that affect timing, such as the order in which the messages are sent, the sessions from which they are sent, whether the messages are persistent, the lifetime of the messages, the priority of the messages, the message delivery policy of queue destinations (see Chapter 15, Physical Destination Property Reference, in Sun Java System Message Queue 3.7 UR1 Administration Guide), and message service availability.

Using Selectors Efficiently

The use of selectors can have a significant impact on the performance of your application. It’s difficult to put an exact cost on the expense of using selectors since it varies with the complexity of the selector expression, but the more you can do to eliminate or simplify selectors the better.

One way to eliminate (or simplify) selectors is to use multiple destinations to sort messages. This has the additional benefit of spreading the message load over more than one producer, which can improve the scalability of your application. For those cases when it is not possible to do that, here are some techniques that you can use to improve the performance of your application when using selectors:

Balancing Reliability and Performance

Reliable messaging is implemented in a variety of ways: through the use of persistent messages, acknowledgments or transactions, durable subscriptions, and connection failover.

In general, the more reliable the delivery of messages, the more overhead and bandwidth are required to achieve it. The trade-off between reliability and performance is a significant design consideration. You can maximize performance and throughput by choosing to produce and consume nonpersistent messages. On the other hand, you can maximize reliability by producing and consuming persistent messages in a transaction using a transacted session. For a detailed discussion of design options and their impact on performance, see Factors Affecting Performance.

Managing Client Threads

Using client threads effectively requires that you balance performance, throughput, and resource needs. To do this, you need to understand JMS restrictions on thread usage, what threads Message Queue allocates for itself, and the architecture of your applications. This section addresses these issues and offers some guidelines for managing client threads.

JMS Threading Restrictions

The Java Messaging Specification mandates that a session not be operated on by more than one thread at a time. This leads to the following restrictions:

The system does not enforce the requirement that a session be single threaded. If your client application violates this requirement, you will get a JMSIllegalState exception or unexpected results.

Thread Allocation for Connections

When the Message Queue client runtime creates a connection, it creates two threads: one for consuming messages from the socket, and one to manage the flow of messages for the connection. In addition, the client runtime creates a thread for each client session. Thus, at a minimum, for a connection using one session, three threads are created. For a connection using three sessions, five threads are created, and so on.

Managing threads in a JMS application often involves trade-offs between performance and throughput. Weigh the following considerations when dealing with threading issues.

Managing Memory and Resources

This section describes memory and performance issues that you can manage by increasing JVM heap space and by managing the size of your messages. It covers the following topics:

You can also improve performance by having the administrator set connection factory attributes to meter the message flow over the client-broker connection and to limit the message flow for a consumer. For a detailed explanation, please see Reliability and Flow Control in Sun Java System Message Queue 3.7 UR1 Administration Guide.

Managing Memory

A client application running in a JVM needs enough memory to accommodate messages that flow in from the network as well as messages the client creates. If your client gets OutOfMemoryError errors, chances are that not enough memory was provided to handle the size or the number of messages being consumed or produced.

Your client might need more than the default JVM heap space. On most systems, the default is 64 MB but you will need to check the default values for your system.

Consider the following guidelines:

Managing Message Size

In general, for better manageability, you can break large messages into smaller parts, and use sequencing to ensure that the partial messages sent are concatenated properly. You can also use a Message Queue JMS feature to compress the body of a message. This section describes the programming interface that allows you to compress messages and to compare the size of compressed and uncompressed messages.

Message compression and decompression is handled entirely by the client runtime, without involving the broker. Therefore, applications can use this feature with a pervious version of the broker, but they must use version 3.6 or later of the Message Queue client runtime library.

Message Compression

You can use the Message.setBooleanProperty() method to specify that the body of a message be compressed. If the JMS_SUN_COMPRESS property is set to true, the client runtime, will compress the body of the message being sent. This happens after the producer’s send method is called and before the send method returns to the caller. The compressed message is automatically decompressed by the client runtime before the message is delivered to the message consumer.

For example, the following call specifies that a message be compressed:

MyMessage.setBooleanProperty(“JMS_SUN_COMPRESS”,true);

Compression only affects the message body; the message header and properties are not compressed.

Two read-only JMS message properties are set by the client runtime after a message is sent.

Applications can test the properties (JMS_SUN_UNCOMPRESSED_SIZE and JMS_SUN_COMPRESSED_SIZE) after a send returns to determine whether compression is advantageous. That is, applications wanting to use this feature, do not have to explicitly receive a compressed and uncompressed version of the message to determine whether compression is desired.

If the consumer of a compressed message wants to resend the message in an uncompressed form, it should call the Message.clearProperties() to clear the JMS_SUN_COMPRESS property. Otherwise, the message will be compressed before it is sent to its next destination.

Advantages and Limitations of Compression

Although message compression has been added to improve performance, such benefit is not guaranteed. Benefits vary with the size and format of messages, the number of consumers, network bandwidth, and CPU performance. For example, the cost of compression and decompression might be higher than the time saved in sending and receiving a compressed message. This is especially true when sending small messages in a high-speed network. On the other hand, applications that publish large messages to many consumers or who publish in a slow network environment, might improve system performance by compressing messages.

Depending on the message body type, compression may also provide minimal or no benefit. An application client can use the JMS_SUN_UNCOMPRESSED_SIZE and JMS_SUN_COMPRESSED_SIZE properties to determine the benefit of compression for different message types.

Message consumers deployed with client runtime libraries that precede version 3.6 cannot handle compressed messages. Clients wishing to send compressed messages must make sure that consumers are compatible. C clients cannot currently consume compressed messages.

Compression Examples

Example 3–1 shows how you set and send a compressed message:


Example 3–1 Sending a Compressed Message


//topicSession and myTopic are assumed to have been created
topicPublisher publisher = topicSession.createPublisher(myTopic);
BytesMessage bytesMessage=topicSession.createBytesMessage();

//byteArray is assumed to have been created
bytesMessage.writeBytes(byteArray);

//instruct the client runtime to compress this message
bytesMessage.setBooleanProperty("JMS_SUN_COMPRESS", true);

//publish message to the myTopic destination
publisher.publish(bytesMessage);

Example 3–2 shows how you examine compressed and uncompressed message body size. The bytesMessage was created as in Example 3–1:


Example 3–2 Comparing Compressed and Uncompressed Message Size


//get uncompressed body size
int uncompressed=bytesMessage.getIntProperty(“JMS_SUN_UNCOMPRESSED_SIZE”);

//get compressed body size
int compressed=bytesMessage.getIntProperty(“JMS_SUN_COMPRESSED_SIZE”);

Managing the Dead Message Queue

When a message is deemed undeliverable, it is automatically placed on a special queue called the dead message queue. A message placed on this queue retains all of its original headers (including its original destination) and information is added to the message’s properties to explain why it became a dead message. An administrator or a developer can access this queue, remove a message, and determine why it was placed on the queue.

This section describes the message properties that you can set or examine programmatically to determine the following:

Message Queue 3.6 clients can set properties related to the dead message queue on messages and send those messages to clients compiled against earlier versions. However clients receiving such messages cannot examine these properties without recompiling against 3.6 libraries.

The dead message queue is automatically created by the broker and called mq.sys.dmq. You can use the message monitoring API, described in Chapter 4, Using the Metrics Monitoring API, to determine whether that queue is growing, to examine messages on that queue, and so on.

You can set the properties described in Table 3–2 for any message to control how the broker should handle that message if it deems it to be undeliverable. Note that these message properties are needed only to override destination, or broker-based behavior.

Table 3–2 Message Properties Relating to Dead Message Queue

Property 

Description 

JMS_SUN_PRESERVE_UNDELIVERED

A boolean whose value determines what the broker should do with the message if it is dead.  

The default value of unset, specifies that the message should be handled as specified by the useDMQ property of the destination to which the message was sent.

A value of true overrides the setting of the useDMQ property and sends the dead message to the dead message queue.

A value of false overrides the setting of the useDMQ property and prevents the dead message from being placed in the dead message queue.

JMS_SUN_LOG_DEAD_MESSAGES

A boolean value that determines how activity relating to dead messages should be logged. 

The default value of unset, will behave as specified by the broker configuration property imq.destination.logDeadMsgs.

A value of true overrides the setting of the imq.destination.logDeadMsgs broker property and specifies that the broker should log the action of removing a message or moving it to the dead message queue.

A value of false overrides the setting of the imq.destination.logDeadMsgs broker property and specifies that the broker should not log these actions.

JMS_SUN_TRUNCATE_MSG_BODY

A boolean value that specifies whether the body of a dead message is truncated. 

The default value of unset, will behave as specified by the broker property imq.destination.DMQ.truncateBody.

A value of true overrides the setting of the imq.destination.DMQ.truncateBody property and specifies that the body of the message should be discarded when the message is placed in the dead message queue.

A value of false overrides the setting of the imq.destination.DMQ.truncateBody property and specifies that the body of the message should be stored along with the message header and properties when the message is placed in the dead message queue.

The properties described in Table 3–3 are set by the broker for a message placed in the dead message queue. You can examine the properties for the message to retrieve information about why the message was placed on the queue and to gather other information about the message and about the context within which this action was taken.

Table 3–3 Dead Message Properties

Property 

Description 

JMSXDeliveryCount

An Integer that pecifies the most number of times the message was delivered to a given consumer. This value is set only for ERROR or UNDELIVERABLE messages.

JMS_SUN_DMQ_UNDELIVERED_TIMESTAMP

A Long that pecifies the time (in milliseconds) when the message was placed on the dead message queue.

JMS_SUN_DMQ_UNDELIVERED_REASON

A string that specifies one of the following values to indicate the reason why the message was placed on the dead message queue:

OLDEST

LOW_PRIORITY

EXPIRED

UNDELIVERABLE

ERROR

If the message was marked dead for multiple reasons, for example it was undeliverable and expired, only one reason will be specified by this property. 

The ERROR reason indicates that an internal error made it impossible to process the message. This is an extremely unusual condition, and the sender should just resend the message.

JMS_SUN_DMQ_PRODUCING_BROKER

A String used for message traffic in broker clusters: it specifies the broker name and port number of the broker that produced the message.

JMS_SUN_DMQ_DEAD_BROKER

A String used for message traffic in broker clusters: it specifies the broker name and port number of the broker that placed the message on the dead message queue.

JMS_SUN_DMQ_UNDELIVERED_EXCEPTION

A String that specifies the name of the exception (if the message was dead because of an exception) on either the client or the broker.

JMS_SUN_DMQ_UNDELIVERED_COMMENT

A String used to provide an optional comment d when the message is marked dead.

JMS_SUN_DMQ_BODY_TRUNCATED

A Boolean: a value of true indicates that the message body was not stored. A value of false indicates that the message body was stored.

Managing Physical Destination Limits

When creating a topic or queue destination, the administrator can specify how the broker should behave when certain memory limits are reached. Specifically, when the number of messages reaching a physical destination exceeds the number specified with the maxNumMsgs property or when the total amount of memory allowed for messages exceeds the number specified with the maxTotalMsgBytes property, the broker takes one of the following actions, depending on the setting of the limitBehavior property:

If the default value REJECT_NEWEST is specified for the limitBehavior property, the broker throws out the newest messages received when memory limits are exceeded. If the message discarded is a persistent message, the producing client gets an exception which should be handled by resending the message later.

If any of the other values is selected for the limitBehavior property or if the message is not persistent, the application client is not notified if a message is discarded. Application clients should let the administrator know how they prefer this property to be set for best performance and reliability.

Programming Issues for Message Consumers

This section describes two problems that consumers might need to manage: the undetected loss of a connection, or the loss of a message for distributed synchronous consumers.

Using the Client Runtime Ping Feature

Message Queue defines a connection factory attribute for a ping interval. This attribute specifies the interval at which the client runtime should check the client’s connection to the broker. The ping feature is especially useful to Message Queue clients that exclusively receive messages and might therefore not be aware that the absence of messages is due to a connection failure. This feature could also be useful to producers who don’t send messages frequently and who would want notification that a connection they’re planning to use is not available.

The connection factory attribute used to specify this interval is called imqPingInterval. Its default value is 30 seconds. A value of -1 or 0, specifies that the client runtime should not check the client connection.

Developers should set (or have the administrator set) ping intervals that are slightly more frequent than they need to send or receive messages, to allow time to recover the connection in case the ping discovers a connection failure. Note also that the ping may not occur at the exact time specified by the value you supply for interval; the underlying operating system’s use of i/o buffers may affect the amount of time needed to detect a connection failure and trigger an exception.

A failed ping operation results in a JMSException on the subsequent method call that uses the connection. If an exception listener is registered on the connection, it will be called when a ping operation fails.

Preventing Message Loss for Synchronous Consumers

It is always possible that a message can be lost for synchronous consumers in a session using AUTO_ACKNOWLEDGE mode if the provider fails. To prevent this possibility, you should either use a transacted session or a session in CLIENT_ACKNOWLEDGE mode.

Synchronous Consumption in Distributed Applications

Because distributed applications involve greater processing time, such an application might not behave as expected if it were run locally. For example, calling the receiveNoWait method for a synchronous consumer might return null even when there is a message available to be retrieved.

If a client connects to the broker and immediately calls the receiveNoWait method, it is possible that the message queued for the consuming client is in the process of being transmitted from the broker to the client. The client runtime has no knowledge of what is on the broker, so when it sees that there is no message available on the client’s internal queue, it returns with a null, indicating no message.

You can avoid this problem by having your client do either of the following:

Factors Affecting Performance

Application design decisions can have a significant effect on overall messaging performance. The most important factors affecting performance are those that impact the reliability of message delivery; among these are the following:

Other application design factors impacting performance include the following:

The sections that follow describe the impact of each of these factors on messaging performance. As a general rule, there is a trade-off between performance and reliability: factors that increase reliability tend to decrease performance.

Table 3–4 shows how application design factors affect messaging performance. The table shows two scenarios—a high-reliability, low-performance scenario and a high-performance, low-reliability scenario—and the choice of application design factors that characterizes each. Between these extremes, there are many choices and trade-offs that affect both reliability and performance.

Table 3–4 Comparison of High Reliability and High Performance Scenarios

Application DesignFactor 

High Reliability, Low Performance 

High Performance, Low Reliability 

Delivery mode 

Persistent messages 

Nonpersistent messages 

Use of transactions 

Transacted sessions 

No transactions 

Acknowledgment mode 

AUTO_ACKNOWLEDGE

CLIENT_ACKNOWLEDGE

DUPS_OK_ACKNOWLEDGE

NO_ACKNOWLEDGE

Durable/nondurable subscriptions 

Durable subscriptions 

Nondurable subscriptions 

Use of selectors 

Message filtering 

No message filtering 

Message size 

Small messages 

Large messages 

Message body type 

Complex body types 

Simple body types 

Delivery Mode (Persistent/Nonpersistent)

Persistent messages guarantee message delivery in case of broker failure. The broker stores these message in a persistent store until all intended consumers acknowledge that they have consumed the message.

Broker processing of persistent messages is slower than for nonpersistent messages for the following reasons:

For both queues and topics with durable subscribers, performance was approximately 40% faster for non-persistent messages. We obtained these results using 10K-size messages and AUTO_ACKNOWLEDGE mode.

Use of Transactions

A transaction guarantees that all messages produced in a transacted session and all messages consumed in a transacted session will be either processed or not processed (rolled back) as a unit. Message Queue supports both local and distributed transactions.

A message produced or acknowledged in a transacted session is slower than in a non-transacted session for the following reasons:

Acknowledgment Mode

Other than using transactions, you can ensure reliable delivery by having the client acknowledge receiving a message. If a session is closed without the client acknowledging the message or if the message broker fails before the acknowledgment is processed, the broker redelivers that message, setting a JMSRedelivered flag.

For a non-transacted session, the client can choose one of three acknowledgment modes, each of which has its own performance characteristics:

Performance is impacted by acknowledgment mode for the following reasons:

Durable vs. Nondurable Subscriptions

Subscribers to a topic destination have either durable and nondurable subscriptions. Durable subscriptions provide increased reliability at the cost of slower throughput for the following reasons:

We compared performance for durable and non-durable subscribers in two cases: persistent and nonpersistent 10k-sized messages. Both cases use AUTO_ACKNOWLEDGE acknowledgment mode. We found a performance impact only in the case of persistent messages, which slowed messages conveyed to durable subscribers by about 30%.

Use of Selectors (Message Filtering)

Application developers can have the messaging provider sort messages according to criteria specified in the message selector associated with a consumer and deliver to that consumer only those messages whose property value matches the message selector. For example, if an application creates a subscriber to the topic WidgetOrders and specifies the expression NumberOfOrders >1000 for the message selector, messages with a NumberOfOrders property value of 1001 or more are delivered to that subscriber.

Creating consumers with selectors lowers performance (as compared to using multiple destinations) because additional processing is required to handle each message. When a selector is used, it must be parsed so that it can be matched against future messages. Additionally, the message properties of each message must be retrieved and compared against the selector as each message is routed. However, using selectors provides more flexibility in a messaging application and may lower resource requirements at the expense of speed.

Message Size

Message size affects performance because more data must be passed from producing client to broker and from broker to consuming client, and because for persistent messages a larger message must be stored.

However, by batching smaller messages into a single message, the routing and processing of individual messages can be minimized, providing an overall performance gain. In this case, information about the state of individual messages is lost.

In our tests, which compared throughput in kilobytes per second for 1K, 10K, and 100K-sized messages to a queue destination using AUTO_ACKNOWLEDGE mode, we found that non-persistent messaging was about 50% faster for 1K messages, about 20% faster for 10K messages, and about 5% faster for 100K messages. The size of the message affected performance significantly for both persistent and non-persistent messages. 100k messages are about 10 times faster than 10K, and 10K messages are about 5 times faster than 1K.

Message Body Type

JMS supports five message body types, shown below roughly in the order of complexity:

While, in general, the message type is dictated by the needs of an application, the more complicated types (map and object) carry a performance cost — the expense of serializing and deserializing the data. The performance cost depends on how simple or how complicated the data is.

Client Connection Failover (Auto-Reconnect)

Message Queue supports client connection failover. A failed connection can be automatically restored not only to the original broker, but to a different broker in a broker cluster. There are circumstances under which the client-side state cannot be restored on any broker during an automatic reconnection attempt; for example, when the client uses transacted sessions or temporary destinations. At such times the connection exception handler is called and the application code has to catch the exception and restore state.

This section explains how automatic reconnection is enabled, how the broker behaves during a reconnect, how automatic reconnection impacts producers and consumers, and how producers and consumers should handle exceptions that result from connection failover. For additional information about this feature, please see Connection Handling in Sun Java System Message Queue 3.7 UR1 Administration Guide.

Enabling Auto-Reconnect

The developer or the administrator can enable automatic reconnection by setting the connection factory imqReconnectEnabled attribute to true. The connection factory administered object must also be configured to specify the following:

Single-Broker Auto-Reconnect

Configure your connection-factory object as follows:


Example 3–3 Example of Command to Configure a Single Broker


imqobjmgr add -t cf -l "cn=myConnectionFactory" \
    -o "imqAddressList=mq://jpgserv/jms" \
    -o "imqReconnect=true" \
    -o "imqReconnectAttempts=10"
               

This command creates a connection-factory object with a single address in the broker address list. If connection fails, the client runtime will try to reconnect with the broker 10 times. If an attempt to reconnect fails, the client runtime will sleep for three seconds (the default value for the imqReconnectInterval attribute) before trying again. After 10 unsuccessful attempts, the application will receive a JMSException .

You can ensure that the broker starts automatically with at system start-up time. See Sun Java System Message Queue 3.7 UR1 Installation Guide for information on how to configure automatic broker start-up. For example, on the Solaris platform, you can use /etc/rc.d scripts.

Parallel Broker Auto-Reconnect

Configure your connection-factory objects as follows:


Example 3–4 Example of Command to Configure Parallel Brokers


imqobjmgr add -t cf -l "cn=myCF" \
    -o "imqAddressList=myhost1, mqtcp://myhost2:12345/jms" \
    -o "imqReconnect=true" \
    -o "imqReconnectRetries=5"
               

This command creates a connection factory object with two addresses in the broker list. The first address describes a broker instance running on the host myhost1 with a standard port number (7676). The second address describes a jms connection service running at a statically configured port number (12345).

Clustered-Broker Auto-Reconnect

Configure your connection-factory objects as follows:


Example 3–5 Example of Command to Configure a Broker Cluster


imqobjmgr add -t cf -l "cn=myConnectionFactory" \
    -o "imqAddressList=mq://myhost1/ssljms, \
            mq://myhost2/ssljms, \
            mq://myhost3/ssljms, \
            mq://myhost4/ssljms” \
    -o "imqReconnect=true" \
    -o "imqReconnectRetries=5" \
    -o "imqAddressListBehavior=RANDOM"
               

This command creates a connection factory object with four addresses in the imqAddressList. All the addresses point to jms services running on SSL transport on different hosts. Since the imqAddressListBehavior attribute is set to RANDOM, the client connections that are established using this connection factory object will be distributed randomly among the four brokers in the address list.

This is a clustered broker configuration, so you must configure one of the brokers in the cluster as the master broker. In the connection-factory address list, you can also specify a subset of all the brokers in the cluster.

Auto-Reconnect Behaviors

A broker treats an automatic reconnection as it would a new connection. When the original connection is lost, all resources associated with that connection are released. For example, in a broker cluster, as soon as one broker fails, the other brokers assume that the client connections associated with the failed broker are gone. After auto-reconnect takes place, the client connections are recreated from scratch.

Sometimes the client-side state cannot be fully restored by auto-reconnect. Perhaps a resource that the client needs cannot be recreated. In this case, the client runtime calls the client’s connection exception handler and the client must take appropriate action to restore state. For additional information, see Handling Exceptions When Failover Occurs.

If the client is automatically-reconnected to a different broker instance, persistent messages can only be sent after the original broker recovers. Other state information held by the failed or disconnected broker can be lost. The messages held by the original broker, once it is restored, might be delivered out of order. This is because broker instances in a cluster do not use a shared, highly available persistent store.

A transacted session is the most reliable method of ensuring that a message isn’t lost if you are careful in coding the transaction. If auto-reconnect happens in the middle of a transaction, the client runtime throws an exception when the transaction is committed, and the transaction is rolled back. At that point, you must make sure that the client restarts the whole transaction. (This is especially important when you use a broker cluster.) For additional information, see Handling Exceptions When Failover Occurs.

Automatic reconnection affects producers and consumers differently:

Auto-Reconnect Limitations

Notice the following points when using the auto-reconnect feature:

Handling Exceptions When Failover Occurs

Several kinds of exceptions can occur as a result of the client being reconnected after a failover. How the client application should handle these exceptions depends on whether a session is transacted, on the kind of exception thrown, and on the client's role--as producer or consumer. The following sections discuss the implications of these factors.

Independently of how the exception is raised, the client application must never call System.exit()to exit the application because this would prevent the Message Queue client runtime from reconnecting to an alternate or restarted broker.

When a failover occurs, exception messages may be shown on the application's console and recorded in the broker's log. These messages are for information only. They may be useful in troubleshooting, but minimizing or eliminating the impact of a failover is best handled preemptively by the application client in the ways described in the following sections.

Handling Exceptions in a Transacted Session

A transacted session might fail to commit and (throw an exception) either because a failover occurs while statements within the transaction are being executed or because the failover occurs during the call to Session.commit(). In the first case, the failover is said to occur during an open transaction; in the second case, the failover occurs during the commit itself.

In the case of a failover during an open transaction, when the client application calls Session.commit(), the client runtime will throw a TransactionRolledBackException and roll back the transaction causing the following to happen.

If the client application itself had called Session.rollback after a failover (before the Session.commit is executed) the same things would happen as if the application had received a TransactionRollbackException. After receiving a TransactionRollbackException or calling Session.rollback(), the client application must retry the failed transaction. That is, it must re-send and re-consume the messages that were involved in the failed-over transaction.

In the second case, when the failover occurs during a call to Session.commit, there may be three outcomes:

Handling Exceptions in a Non-Transacted Session

If a connection is failed-over for a producer, a client application may receive a JMSException. The application thread that receives the exception should pause for a few seconds and then resend the messages. The client application may want to set a flag on the resent messages to indicate that they could be duplicates.

If a connection is failed over for a message consumer, the consequences vary with the sessions acknowledge mode:

Failover Producer Example

The following code sample illustrates good coding practices for handling exceptions during a failover. It is designed to send non-transacted, persistent messages forever and to handle JMSExceptions when a failover occurs. The program is able to handle either a true or false setting for the imqReconnectEnabled property. To run the program enter one of the following commands.

java dura.example.FailoverProducer
java -DimqReconnectEnabled=true dura.example.FailoverProducer
/*
 * @(#)FailoverProducer.java	1.1 06/06/09
 * Copyright 2006 Sun Microsystems, Inc. All Rights Reserved
 * SUN PROPRIETARY/CONFIDENTIAL
 * Use is subject to license terms. */

package dura.example;

import javax.jms.*;
import com.sun.messaging.ConnectionConfiguration;
import java.util.*;

public class FailoverProducer implements ExceptionListener {

	//connection factory
    private com.sun.messaging.TopicConnectionFactory factory;
    //connection
    private TopicConnection pconn = null;
    //session
    private TopicSession psession = null;
    //publisher
    private TopicPublisher publisher = null;
    //topic
    private Topic topic = null;
    //This flag indicates whether this test client is closed.
    private boolean isClosed = false;
    //auto reconnection flag
    private boolean autoReconnect = false;
    //destination name for this example.
    private static final String DURA_TEST_TOPIC = "DuraTestTopic";
    //the message counter property name 
    public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER";
    //the message in-doubt-bit property name
    public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT";

    /**
     * Constructor.  Get imqReconnectEnabled property value from 
     * System property.
     */
    public FailoverProducer () {
    	
    	try {
    		autoReconnect = 
    		Boolean.getBoolean(ConnectionConfiguration.imqReconnectEnabled);
    	} catch (Exception e) {
    		this.printException(e);
    	}
    	
    }

    /**
     * Connection is broken if this handler is called.  
     * If autoReconnect flag is true, this is called only 
     * if no more retries from MQ.
     */
    public void onException (JMSException jmse) {
        this.printException (jmse);
    }

    /**
     * create MQ connection factory.
     * @throws JMSException
     */
    private void initFactory() throws JMSException {
        //get connection factory
        factory = new com.sun.messaging.TopicConnectionFactory();
    }

    /**
     * JMS setup.  Create a Connection,Session, and Producer.
     * 
     * If any of the JMS object creation fails (due to system failure),
     * it retries until it succeeds.
     *
     */
    private void initProducer() {
    	
        boolean isConnected = false;

        while ( isClosed == false && isConnected == false ) {
            
        	try {
                println("producer client creating connection ...");

                //create connection
                pconn = factory.createTopicConnection();
                
                //set connection exception listener
                pconn.setExceptionListener(this);

                //create topic session
                psession = pconn.createTopicSession(false,
                    Session.CLIENT_ACKNOWLEDGE);
                
                //get destination
                topic = psession.createTopic(DURA_TEST_TOPIC);

                //publisher
                publisher = psession.createPublisher(topic);

                //set flag to true
                isConnected = true;

                println("producer ready.");
            }
            catch (Exception e) {

                println("*** connect failed ... sleep for 5 secs.");
                
                try {
                	//close resources.
                	if ( pconn != null ) {
                		pconn.close();
                	}
                	//pause 5 secs.
                    Thread.sleep(5000);
                
                } catch (Exception e1) {
                    ;
                }
            }
        }
    }

    /**
     * Start test.  This sends JMS messages in a loop (forever).
     */
    public void run () {

        try {
        	//create MQ connection factory.
            initFactory();
            
            //create JMS connection,session, and producer
            initProducer();
            
            //send messages forever.
            sendMessages();
        } catch (Exception e) {
            this.printException(e);
        }
    }

    /**
     * Send persistent messages to a topic forever.  This shows how
     * to handle failover for a message producer.
     */
    private void sendMessages() {
    	
    	//this is set to true if send failed.
        boolean messageInDoubt = false;
        
        //message to be sent
        TextMessage m = null;
        
        //msg counter
        long msgcount = 0;

        while (isClosed == false) {
        	
            try {
            	
            	/**
            	 * create a text message 
            	 */
                m = psession.createTextMessage();
               
                /**
                 * the MESSAGE_IN_DOUBT bit is set to true if 
                 * you get an exception for the last message.
                 */
                if ( messageInDoubt == true ) {
                    m.setBooleanProperty (MESSAGE_IN_DOUBT, true);
                    messageInDoubt = false;
                    
                    println("MESSAGE_IN_DOUBT bit is set to true 
                             for msg: " + msgcount);
                } else {
                    m.setBooleanProperty (MESSAGE_IN_DOUBT, false);
                }
                
                //set message counter
                m.setLongProperty(MESSAGE_COUNTER, msgcount);
                
                //set message body
                m.setText("msg: " + msgcount);
                
                //send the msg
                publisher.send(m, DeliveryMode.PERSISTENT, 4, 0);

                println("sent msg: " + msgcount);
                
                /**
                 * reset counetr if reached max long value.
                 */
                if (msgcount == Long.MAX_VALUE) {
                	msgcount = 0;
                	
                	println ("Reset message counter to 0.");
                }
                
                //increase counter
                msgcount ++;
                
                Thread.sleep(1000);

            } catch (Exception e) {

                if ( isClosed == false ) {
                	
                    //set in doubt bit to true.
                    messageInDoubt = true;

                    this.printException(e);
                   
                    //init producer only if auto reconnect is false.
                    if ( autoReconnect == false ) {
                        this.initProducer();
                    }
                }
            }
        }
    }

    /**
     * Close this example program.
     */
    public synchronized void close() {

        try {
            isClosed = true;
            pconn.close();

            notifyAll();
        } catch (Exception e) {
            this.printException(e);
        }
    }
    
    /**
     * print the specified exception.
     * @param e the exception to be printed.
     */
    private void printException (Exception e) {
    	System.out.println(new Date().toString());
    	e.printStackTrace();
    }
    
    /**
     * print the specified message.
     * @param msg the message to be printed.
     */
    private void println (String msg) {
    	System.out.println(new Date() + ": " + msg);
    }
    
    /**
     * Main program to start this example.
     */
    public static void main (String args[]) {
    	FailoverProducer fp = new FailoverProducer();
    	fp.run();
    }

}

Failover Consumer Example

The following code sample, FailoverConsumer, illustrates good coding practices for handling exceptions during a failover. The transacted session is able to receive messages forever. The program sets the auto reconnect property to true, requiring the Message Queue runtime to automatically perform a reconnect when the connected broker fails or is killed. It is designed to work with the dura.example.FailoverProducer, shown in the previous section.

To run this program enter the following command.

java dura.example.FailoverConsumer
/*
 * @(#)FailoverConsumer.java	1.1 06/06/09
 * Copyright 2006 Sun Microsystems, Inc. All Rights Reserved
 * SUN PROPRIETARY/CONFIDENTIAL
 * Use is subject to license terms.
 *
 */
package dura.example;

import java.util.Date;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TransactionRolledBackException;
import com.sun.messaging.ConnectionConfiguration;

public class FailoverConsumer implements ExceptionListener, Runnable {

	//JMS connection
    private Connection conn = null;
    //JMS session
    private Session session = null;
    //JMS Message consumer
    private MessageConsumer messageConsumer = null;
    //JMS destination.
    private Destination destination = null;
    //flag indicates whether this program should continue running.
    private boolean isConnected = false;
    //destination name.
    private static final String DURA_TEST_TOPIC = "DuraTestTopic";
    //the commit counter, for information only.
    private long commitCounter = 0;
    
    /**
     * message counter property set by the producer.
     */
    public static final String MESSAGE_COUNTER = "MESSAGE_COUNTER";
    
    /**
     * Message in doubt bit set by the producer
     */
    public static final String MESSAGE_IN_DOUBT = "MESSAGE_IN_DOUBT";
    
    /**
     * receive time out
     */
    public static final long RECEIVE_TIMEOUT = 0;
    
    /**
     * Default constructor -   
     * Set up JMS Environment.
     */
    public FailoverConsumer() {
       setup();
    }

    /*  Connection Exception listener.  This is called when connection
     *  breaks and no reconnect attempts are performed by MQ client runtime.
     */
    public void onException (JMSException e) {

    	print ("Reconnect failed.  Shutting down the connection ...");
    	
    	/**
    	 * Set this flag to false so that the run() method will exit.
    	 */
        this.isConnected = false;
        e.printStackTrace();
    }

    /**
     * Best effort to roll back a jms session.  When a broker crashes, an
     * open-transaction should be rolled back.  But the re-started broker 
     * may not have the uncommitted tranaction information due to system
     * failure.  In a situation like this, an application can just quit
     * calling rollback after retrying a few times  The uncommitted 
     * transaction (resources) will eventually be removed by the broker.
     */
    private void rollBackJMS() {
    	
    	//rollback fail count
    	int failCount = 0;
    	
        boolean keepTrying = true;
        
        while ( keepTrying ) {

            try {

                print ("<<< rolling back JMS ...., consumer commit counter:
                          " +  this.commitCounter);

                session.rollback();
                
                print("<<< JMS rolled back ...., consumer commit counter:
                          " + this.commitCounter);
                keepTrying = false;
            } catch (JMSException jmse) {
               
            	failCount ++;
                jmse.printStackTrace();

                sleep (3000); //3 secs
                
                if ( failCount == 1 ) {

                    print ("<<< rollback failed : total count" + failCount);
                    keepTrying = false;
                }
            }
        }
    }

    
    /**
     * Close the JMS connection and exit the program.
     *
     */
    private void close() {
        try {
       	
            if ( conn != null ) {
                conn.close();	
            }
        
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            this.isConnected = false;
        }
    }

    /*Receive messages in a loop until closed.*/
     
    public void run () {
       
		while (isConnected) {

			try {
				
				/*receive message with specified timeout.*/
				
                Message m = messageConsumer.receive(RECEIVE_TIMEOUT);
                
                /* process the message. */
                processMessage(m);

                /* commit JMS transaction. */
                this.commit();
                
                /*increase the commit counter.*/
                this.commitCounter ++;
               
			} catch (TransactionRolledBackException trbe) {
				
				/**
				 * the transaction is rolled back
				 * a new transaction is automatically started. 
				 */
				trbe.printStackTrace();
			} catch (JMSException jmse) {
				
				/* The transaction is in unknown state.
        * We need to roll back the transaction.*/

                jmse.printStackTrace();
                
                /* roll back if not closed.
                 */
                if ( this.isConnected == true ) {
					this.rollBackJMS();
				}

			} catch (Exception e) {
                
                e.printStackTrace();
                
                /* Exit if this is an unexpected Exception.
                 */
                this.close();
                
            } finally {
            	;//do nothing
            }
		}
		
		print(" <<< consumer exit ...");
	}

    /*Set up connection, destination, etc*/
       /
    protected void setup() {
        
    	try {
        	
			//create connection factory
			com.sun.messaging.ConnectionFactory factory =
			new com.sun.messaging.ConnectionFactory();
			
			//set auto reconnect to true.
			factory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
        	//A value of -1 will retry forever if connection is broken.
			factory.setProperty(ConnectionConfiguration.imqReconnectAttempts, "-1");
			//retry interval - every 10 seconds
			factory.setProperty(ConnectionConfiguration.imqReconnectInterval, "10000");
			//create connection
			conn = factory.createConnection();
			//set client ID
			conn.setClientID(DURA_TEST_TOPIC);
			
			//set exception listener
			conn.setExceptionListener(this);

			//create a transacted session
			session = conn.createSession(true, -1);
            
			//get destination
			destination = session.createTopic(DURA_TEST_TOPIC);
			
			//message consumer
			messageConsumer = session.createDurableSubscriber((Topic)destination,
                                                       DURA_TEST_TOPIC);
			//set flag to true
            this.isConnected = true;
            //we are ready, start the connection
            conn.start();
            
            print("<<< Ready to receive on destination: " + DURA_TEST_TOPIC);

		} catch (JMSException jmse) {
            this.isConnected = false;
            jmse.printStackTrace();

            this.close();
        }
	}

    /**
     * Process the received message message. 
     *  This prints received message counter.
     * @param m the message to be processed.
     */
    private synchronized void processMessage(Message m) {
        
    	try {
    		//in this example, we do not expect a timeout, etc.
    		if ( m == null ) {
    			throw new RuntimeException ("<<< Received null message. 
                                     Maybe reached max time out. ");
    		}
    		
    		//get message counter property
        	long msgCtr = m.getLongProperty (MESSAGE_COUNTER);
        	
        	//get message in-doubt bit
        	boolean indoubt = m.getBooleanProperty(MESSAGE_IN_DOUBT);
        	
        	if ( indoubt) {
        		print("<<< received message: " + msgCtr + ", indoubt bit is true");
        	} else {
        		print("<<< received message: " + msgCtr);
        	}
        	
        } catch (JMSException jmse) {
            jmse.printStackTrace();
        }
    }

    /**
     * Commit a JMS transaction.
     * @throws JMSException
     */
    private void commit() throws JMSException {
        session.commit();
    }
    
    /**
     * Sleep for the specified time.
     * @param millis sleep time in milli-seconds.
     */
    private void sleep (long millis) {
        try {
            Thread.sleep(millis);
        } catch (java.lang.InterruptedException inte) {
            print (inte);
        }
    }
    
    /**
     * Print the specified message.
     * @param msg the message to be printed.
     */
    private static void print (String msg) {
    	System.out.println(new Date() + ": " + msg);
    }
    
    /**
     * Print Exception stack trace.
     * @param e the exception to be printed.
     */
    private static void print (Exception e) {
    	System.out.print(e.getMessage());
    	e.printStackTrace();
    }
    
    /**
     * Start this example program.
     */
    public static void main (String args[]) {
        FailoverConsumer fc = new FailoverConsumer();
        fc.run();
    }

}

Custom Client Acknowledgment

Message Queue supports the standard JMS acknowledgment modes (auto-acknowledge, client-acknowledge, and dups-OK-acknowledge). When you create a session for a consumer, you can specify one of these modes. Your choice will affect whether acknowledgment is done explicitly (by the client application) or implicitly (by the session) and will also affect performance and reliability. This section describes additional options you can use to customize acknowledgment behavior:

The following sections explain how you program these options.

Using Client Acknowledge Mode

For more flexibility, Message Queue lets you customize the JMS client-acknowledge mode. In client-acknowledge mode, the client explicitly acknowledges message consumption by invoking the acknowledge() method of a message object. The standard behavior of this method is to cause the session to acknowledge all messages that have been consumed by any consumer in the session since the last time the method was invoked. (That is, the session acknowledges the current message and all previously unacknowledged messages, regardless of who consumed them.)

In addition to the standard behavior specified by JMS, Message Queue lets you use client-acknowledge mode to acknowledge one message at a time.

Observe the following rules when implementing custom client acknowledgment:

If a broker fails, any message that was not acknowledged successfully (that is, any message whose acknowledgment ended in a JMSException) is held by the broker for delivery to subsequent clients.

Example 3–6 demonstrates both types of custom client acknowledgment.


Example 3–6 Example of Custom Client Acknowledgment Code


...
import javax.jms.*;
...[Look up a connection factory and create a connection.]

    Session session = connection.createSession(false,
                       Session.CLIENT_ACKNOWLEDGE);

...[Create a consumer and receive messages.]

    Message message1 = consumer.receive();
    Message message2 = consumer.receive();
    Message message3 = consumer.receive();

...[Process messages.]

...[Acknowledge one individual message.
   Notice that the following acknowledges only message 2.]

    ((com.sun.messaging.jms.Message)message2).acknowledgeThisMessage();

...[Continue. Receive and process more messages.]

    Message message4 = consumer.receive();
    Message message5 = consumer.receive();
    Message message6 = consumer.receive();

...[Acknowledge all messages up through message 4. Notice that this
    acknowledges messages 1, 3, and 4, because message 2 was acknowledged 
    earlier.]

    ((com.sun.messaging.jms.Message)message4).acknowledgeUpThroughThisMessage();
...[Continue. Finally, acknowledge all messages consumed in the session.    
    Notice that this acknowledges all remaining consumed messages, that is,
    messages 5 and 6, because this is the standard behavior of the JMS API.]

    message5.acknowledge();

            

Using No Acknowledge Mode

No-acknowledge mode is a nonstandard extension to the JMS API. Normally, the broker waits for a client acknowledgment before considering that a message has been acknowledged and discarding it. That acknowledgment must be made programmatically if the client has specified client-acknowledge mode or it can be made automatically, by the session, if the client has specified auto-acknowledge or dups-OK-acknowledge. If a consuming client specifies no-acknowledge mode, the broker discards the message as soon as it has sent it to the consuming client. This feature is intended for use by nondurable subscribers consuming nonpersistent messages, but it can be used by any consumer.

Using this feature improves performance by reducing protocol traffic and broker work involved in acknowledging a message. This feature can also improve performance for brokers dealing with misbehaving clients who do not acknowledge messages and therefore tie down broker memory resources unnecessarily. Using this mode has no effect on producers.

You use this feature by specifying NO_ACKNOWLEDGE for the acknowledgeMode parameter to the createSession, createQueueSession, or createTopicSession method. No-acknowledge mode must be used only with the connection methods defined in the com.sun.messaging.jms package. Note however that the connection itself must be created using the javax.jms package.

The following are sample variable declarations for connection, queueConnection and topicConnection:

javax.jms.connection Connection;
javax.jms.queueConnection queueConnection
javax.jms.topicConnection topicConnection

The following are sample statements to create different kinds of no-acknowledge sessions:

//to create a no ack session
Session noAckSession  =
     ((com.sun.messaging.jms.Connection)connection)
    .createSession(com.sun.messaging.jms.Session.NO_ACKNOWLEDGE);

// to create a no ack topic session
TopicSession noAckTopicSession  =
     ((com.sun.messaging.jms.TopicConnection) topicConnection)
    .createTopicSession(com.sun.messaging.jms.Session.NO_ACKNOWLEDGE);

//to create a no ack queue session
QueueSession noAckQueueSession  =
     ((com.sun.messaging.jms.QueueConnection) queueConnection)
    .createQueueSession(com.sun.messaging.jms.Session.NO_ACKNOWLEDGE);

Specifying no-acknowledge mode for a session results in the following behavior:

Communicating with C Clients

Message Queue supports C clients as message producers and consumers.

A Java client consuming messages sent by a C client faces only one restriction: a C client cannot be part of a distributed transaction, and therefore a Java client receiving a message from a C client cannot participate in a distributed transaction either.

A Java client producing messages for a consuming C client must be aware of the following differences in the Java and C interfaces because these differences will affect the C client’s ability to consume messages: C clients