This appendix provides an example client wrapper class that can be used for batching reliable messaging for WebLogic Web services using Java API for XML Web Services (JAX-WS).
For more information about batching reliable messages, see Grouping Messages into Business Units of Work (Batching).
Note:
This client wrapper class is example code only; it is not an officially supported production class.
Example B-1 Example Client Wrapper Class for Batching Reliable Messages
package example.servlet;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Date;
import java.util.SortedSet;
import java.util.Timer;
import java.util.TimerTask;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
import javax.xml.ws.BindingProvider;
import weblogic.wsee.jaxws.JAXWSProperties;
import weblogic.wsee.jaxws.spi.ClientInstance;
import weblogic.wsee.reliability.MessageRange;
import weblogic.wsee.reliability2.api.WsrmClient;
import weblogic.wsee.reliability2.api.WsrmClientFactory;
import weblogic.wsee.reliability2.property.WsrmInvocationPropertyBag;
import weblogic.wsee.reliability2.tube.WsrmClientImpl;
/**
* Example wrapper class to batch reliable requests into fixed size 'batches'
* that can be sent using a single RM sequence. This class allows a client to
* send requests that have no natural common grouping or
* 'business unit of work' and not pay the costs associated with creating and
* terminating an RM sequence for every message.
* NOTE: This class is an *example* of how batching might be performed. To do
* batching correctly, you should consider error recovery and how to
* report sequence errors (reported via ReliabilityErrorListener) back
* to the clients that made the original requests.
* <p>
* If your Web service client code knows of some natural business-oriented
* grouping of requests (called a 'business unit of work'), it should make the
* RM subsystem aware of this unit of work by using the
* WsrmClient.setFinalMessage() method to demarcate the end of a unit (just
* before sending the actual final request via an invocation on
* the client instance). In some cases, notably when the client code represents
* an intermediary in the processing of messages, the client code may not be
* aware of any natural unit of work. In the past, if no business unit of work
* could be determined, clients often just created the client instance, sent the
* single current message they had, and then allowed the sequence to terminate.
* This is functionally workable, but very inefficient. These clients pay the
* cost of an RM sequence handshake and termination for every message they send.
* The BatchingRmClientWrapper class can be used to introduce an artificial
* unit of work (a batch) when no natural business unit of work is available.
* <p>
* Each instance of BatchingRmClientWrapper is a wrapper instance around a
* client instance (port or Dispatch instance). This wrapper can be used to
* obtain a Proxy instance that can be used in place of the original client
* instance. This allows this class to perform batching operations completely
* invisibly from the perspective of the client code.
* <p>
* This class is used for batching reliable requests into
* batches of a given max size that will survive for a given maximum
* duration. If a batch fills up or times out, it is ended, causing the
* RM sequence it represents to be ended/terminated. The timeout ensures that
* if the flow of incoming requests stops the batch/sequence will still
* end in a timely manner.
*/
public class BatchingRmClientWrapper<T>
implements InvocationHandler {
private Class<T> _clazz;
private int _batchSize;
private long _maxBatchLifetimeMillis;
private T _clientInstance;
private PrintWriter _out;
private WsrmClient _rmClient;
private int _numInCurrentBatch;
private int _batchNum;
private Timer _timer;
private boolean _closed;
private boolean _proxyCreated;
/**
* Create a wrapper instance for batching reliable requests into
* batches of the given max size that will survive for the given maximum
* duration. If a batch fills up or times out, it is ended, causing the
* RM sequence it represents to be ended/terminated.
* @param clientInstance The client instance that acts as the source object
* for the batching proxy created by the createProxy() method. This
* is the port/Dispatch instance returned from the call to
* getPort/createDispatch. The BatchingRmClientWrapper will take over
* responsibility for managing the interaction with and cleanup of
* the client instance via the proxy created from createProxy.
* @param clazz of the proxy instance we'll be creating in createProxy.
* This should be the class of the port/Dispatch instance you would
* use to invoke operations on the service. BatchingRmClientWrapper will
* create (via createProxy) a proxy of the given type that can be
* used in place of the original client instance.
* @param batchSize Max number of requests to put into a batch. If the
* max number of requests are sent for a given batch, that batch is
* ended (ending/terminating the sequence it represents) and a new
* batch is started.
* @param maxBatchLifetime A duration value (in the lexical form supported
* by java.util.Duration, e.g. PT30S for 30 seconds) representing
* the maximum time a batch should exist. If the batch exists longer
* than this time, it is ended and a new batch is begun.
* @param out A print stream that can be used to print diagnostic and
* status messages.
*/
public BatchingRmClientWrapper(T clientInstance, Class<T> clazz,
int batchSize, String maxBatchLifetime,
PrintStream out) {
_clazz = clazz;
_batchSize = batchSize;
try {
if (maxBatchLifetime == null) {
maxBatchLifetime = "PT5M";
}
Duration duration =
DatatypeFactory.newInstance().newDuration(maxBatchLifetime);
_maxBatchLifetimeMillis = duration.getTimeInMillis(new Date());
} catch (Exception e) {
throw new RuntimeException(e.toString(), e);
}
_clientInstance = clientInstance;
_out = new PrintWriter(out, true);
_rmClient = WsrmClientFactory.getWsrmClientFromPort(_clientInstance);
_closed = false;
_proxyCreated = false;
_timer = new Timer(true);
_timer.schedule(new TimerTask() {
@Override
public void run() {
terminateOrEndBatch();
}
}, _maxBatchLifetimeMillis);
}
/**
* Creates the dynamic proxy that should be used in place of the client
* instance used to create this BatchingRmClientWrapper instance. This method
* should be called only once per BatchingRmClientWrapper.
*/
public T createProxy() {
if (_proxyCreated) {
throw new IllegalStateException("Already created the proxy for this BatchingRmClientWrapper
instance which wraps the client instance: " + _clientInstance);
}
_proxyCreated = true;
return (T) Proxy.newProxyInstance(getClass().getClassLoader(),
new Class[] {
_clazz,
BindingProvider.class,
java.io.Closeable.class
}, this);
}
private void terminateOrEndBatch() {
synchronized(_clientInstance) {
if (_rmClient.getSequenceId() != null) {
if (terminateBatchAllowed()) {
_out.println("Terminating batch " + _batchNum + " sequence (" + _
rmClient.getSequenceId() + ") for " + _clientInstance);
try {
_rmClient.terminateSequence();
} catch (Exception e) {
e.printStackTrace(_out);
}
} else {
_out.println("Batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ")
for " + _clientInstance + " timed out but has outstanding requests to send and
cannot be terminated now");
}
}
endBatch();
}
}
/**
* Check to see if we have acks for all requests sent. If so,
* we can terminate.
*/
private boolean terminateBatchAllowed() {
try {
synchronized(_clientInstance) {
if (_rmClient.getSequenceId() != null) {
long maxMsgNum = _rmClient.getMostRecentMessageNumber();
if (maxMsgNum < 1) {
// No messages sent, go ahead and terminate.
return true;
}
SortedSet<MessageRange> ranges = _rmClient.getAckRanges();
long maxAck = -1;
boolean hasGaps = false;
long lastRangeUpper = -1;
for (MessageRange range: ranges) {
if (lastRangeUpper > 0) {
if (range.lowerBounds != lastRangeUpper + 1) {
hasGaps = true;
}
} else {
lastRangeUpper = range.upperBounds;
}
maxAck = range.upperBounds;
}
return !(hasGaps || maxAck < maxMsgNum);
}
}
} catch (Exception e) {
e.printStackTrace(_out);
}
return true;
}
private void endBatch() {
synchronized(_clientInstance) {
if (_numInCurrentBatch > 0) {
_out.println("Ending batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ")
for " + _clientInstance + "...");
}
/**
* _rmClient.reset() resets a WsrmClient instance (and the client instance it represents)
* so it can track a new WS-RM sequence for the next invoke on the client
* instance. This method effectively *disconnects* the RM sequence from the
* client instance and lets them continue/complete separately.
*/
_rmClient.reset();
_numInCurrentBatch = 0;
if (!_closed) {
_timer.schedule(new TimerTask() {
@Override
public void run() {
terminateOrEndBatch();
}
}, _maxBatchLifetimeMillis);
}
}
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
boolean operationInvoke = method.getDeclaringClass() == _clazz;
boolean closeableInvoke = method.getDeclaringClass() ==
java.io.Closeable.class;
boolean endOfBatch = false;
if (operationInvoke) {
synchronized(_clientInstance) {
// Check our batch size
if (_numInCurrentBatch == 0) {
_batchNum++;
}
endOfBatch = _numInCurrentBatch >= _batchSize - 1;
if (endOfBatch) {
_rmClient.setFinalMessage();
}
_out.println("Making " + (endOfBatch ? "final " : "") + "invoke " +
(_numInCurrentBatch+1) + " of batch " + _batchNum + " sequence (" + _
rmClient.getSequenceId() + ") with operation: " + method.getName());
}
} else if (closeableInvoke && method.getName().equals("close")) {
synchronized(_clientInstance) {
// Make sure we don't try to schedule the timer anymore
_closed = true;
_timer.cancel();
}
}
Object ret = method.invoke(_clientInstance, args);
if (operationInvoke) {
synchronized(_clientInstance) {
_numInCurrentBatch++;
if (endOfBatch) {
endBatch();
}
}
}
return ret;
}
}