B Example Client Wrapper Class for Batching Reliable Messages

This appendix provides an example client wrapper class that can be used for batching reliable messaging for WebLogic Web services using Java API for XML Web Services (JAX-WS).

For more information about batching reliable messages, see Grouping Messages into Business Units of Work (Batching).

Note:

This client wrapper class is example code only; it is not an officially supported production class.

Example B-1 Example Client Wrapper Class for Batching Reliable Messages

package example.servlet;
 
import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Date;
import java.util.SortedSet;
import java.util.Timer;
import java.util.TimerTask;
 
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
import javax.xml.ws.BindingProvider;
 
import weblogic.wsee.jaxws.JAXWSProperties;
import weblogic.wsee.jaxws.spi.ClientInstance;
import weblogic.wsee.reliability.MessageRange;
import weblogic.wsee.reliability2.api.WsrmClient;
import weblogic.wsee.reliability2.api.WsrmClientFactory;
import weblogic.wsee.reliability2.property.WsrmInvocationPropertyBag;
import weblogic.wsee.reliability2.tube.WsrmClientImpl;
 
/**
 * Example wrapper class to batch reliable requests into fixed size 'batches'
 * that can be sent using a single RM sequence. This class allows a client to
 * send requests that have no natural common grouping or
 * 'business unit of work' and not pay the costs associated with creating and
 * terminating an RM sequence for every message.
 * NOTE: This class is an *example* of how batching might be performed. To do
 *       batching correctly, you should consider error recovery and how to
 *       report sequence errors (reported via ReliabilityErrorListener) back
 *       to the clients that made the original requests.
 * <p>
 * If your Web service client code knows of some natural business-oriented
 * grouping of requests (called a 'business unit of work'), it should make the
 * RM subsystem aware of this unit of work by using the
 * WsrmClient.setFinalMessage() method to demarcate the end of a unit (just
 * before sending the actual final request via an invocation on
 * the client instance). In some cases, notably when the client code represents
 * an intermediary in the processing of messages, the client code may not be
 * aware of any natural unit of work. In the past, if no business unit of work
 * could be determined, clients often just created the client instance, sent the
 * single current message they had, and then allowed the sequence to terminate.
 * This is functionally workable, but very inefficient. These clients pay the
 * cost of an RM sequence handshake and termination for every message they send.
 * The BatchingRmClientWrapper class can be used to introduce an artificial
 * unit of work (a batch) when no natural business unit of work is available.
 * <p>
 * Each instance of BatchingRmClientWrapper is a wrapper instance around a
 * client instance (port or Dispatch instance). This wrapper can be used to
 * obtain a Proxy instance that can be used in place of the original client
 * instance. This allows this class to perform batching operations completely
 * invisibly from the perspective of the client code.
 * <p>
 * This class is used for batching reliable requests into
 * batches of a given max size that will survive for a given maximum
 * duration. If a batch fills up or times out, it is ended, causing the
 * RM sequence it represents to be ended/terminated. The timeout ensures that
 * if the flow of incoming requests stops the batch/sequence will still
 * end in a timely manner.
 */
public class BatchingRmClientWrapper<T>
  implements InvocationHandler {
 
  private Class<T> _clazz;
  private int _batchSize;
  private long _maxBatchLifetimeMillis;
  private T _clientInstance;
  private PrintWriter _out;
  private WsrmClient _rmClient;
  private int _numInCurrentBatch;
  private int _batchNum;
  private Timer _timer;
  private boolean _closed;
  private boolean _proxyCreated;
 
  /**
   * Create a wrapper instance for batching reliable requests into
   * batches of the given max size that will survive for the given maximum
   * duration. If a batch fills up or times out, it is ended, causing the
   * RM sequence it represents to be ended/terminated.
   * @param clientInstance The client instance that acts as the source object
   *        for the batching proxy created by the createProxy() method. This
   *        is the port/Dispatch instance returned from the call to
   *        getPort/createDispatch. The BatchingRmClientWrapper will take over
   *        responsibility for managing the interaction with and cleanup of
   *        the client instance via the proxy created from createProxy.
   * @param clazz of the proxy instance we'll be creating in createProxy.
   *        This should be the class of the port/Dispatch instance you would
   *        use to invoke operations on the service. BatchingRmClientWrapper will
   *        create (via createProxy) a proxy of the given type that can be
   *        used in place of the original client instance.
   * @param batchSize Max number of requests to put into a batch. If the
   *        max number of requests are sent for a given batch, that batch is
   *        ended (ending/terminating the sequence it represents) and a new
   *        batch is started.
   * @param maxBatchLifetime A duration value (in the lexical form supported
   *        by java.util.Duration, e.g. PT30S for 30 seconds) representing
   *        the maximum time a batch should exist. If the batch exists longer
   *        than this time, it is ended and a new batch is begun.
   * @param out A print stream that can be used to print diagnostic and
   *        status messages.
   */
  public BatchingRmClientWrapper(T clientInstance, Class<T> clazz,
                                 int batchSize, String maxBatchLifetime,
                                 PrintStream out) {
    _clazz = clazz;
    _batchSize = batchSize;
    try {
      if (maxBatchLifetime == null) {
        maxBatchLifetime = "PT5M";
      }
      Duration duration =
        DatatypeFactory.newInstance().newDuration(maxBatchLifetime);
      _maxBatchLifetimeMillis = duration.getTimeInMillis(new Date());
    } catch (Exception e) {
      throw new RuntimeException(e.toString(), e);
    }
    _clientInstance = clientInstance;
    _out = new PrintWriter(out, true);
    _rmClient = WsrmClientFactory.getWsrmClientFromPort(_clientInstance);
    _closed = false;
    _proxyCreated = false;
    _timer = new Timer(true);
    _timer.schedule(new TimerTask() {
      @Override
      public void run() {
        terminateOrEndBatch();
      }
    }, _maxBatchLifetimeMillis);
  }
 
  /**
   * Creates the dynamic proxy that should be used in place of the client
   * instance used to create this BatchingRmClientWrapper instance. This method
   * should be called only once per BatchingRmClientWrapper.
   */
  public T createProxy() {
    if (_proxyCreated) {
      throw new IllegalStateException("Already created the proxy for this BatchingRmClientWrapper
        instance which wraps the client instance: " + _clientInstance);
    }
    _proxyCreated = true;
    return (T) Proxy.newProxyInstance(getClass().getClassLoader(),
                                     new Class[] {
                                       _clazz,
                                       BindingProvider.class,
                                       java.io.Closeable.class
                                     }, this);
  }
 
  private void terminateOrEndBatch() {
    synchronized(_clientInstance) {
      if (_rmClient.getSequenceId() != null) {
        if (terminateBatchAllowed()) {
          _out.println("Terminating batch " + _batchNum + " sequence (" + _
                rmClient.getSequenceId() + ") for " + _clientInstance);
          try {
            _rmClient.terminateSequence();
          } catch (Exception e) {
            e.printStackTrace(_out);
          }
        } else {
          _out.println("Batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") 
            for " + _clientInstance + " timed out but has outstanding requests to send and 
            cannot be terminated now");
        }
      }
      endBatch();
    }
  }
 
  /**
   * Check to see if we have acks for all requests sent. If so,
   * we can terminate.
   */
  private boolean terminateBatchAllowed() {
    try {
      synchronized(_clientInstance) {
        if (_rmClient.getSequenceId() != null) {
          long maxMsgNum = _rmClient.getMostRecentMessageNumber();
          if (maxMsgNum < 1) {
            // No messages sent, go ahead and terminate.
            return true;
          }
          SortedSet<MessageRange> ranges = _rmClient.getAckRanges();
          long maxAck = -1;
          boolean hasGaps = false;
          long lastRangeUpper = -1;
          for (MessageRange range: ranges) {
            if (lastRangeUpper > 0) {
              if (range.lowerBounds != lastRangeUpper + 1) {
                hasGaps = true;
              }
            } else {
              lastRangeUpper = range.upperBounds;
            }
            maxAck = range.upperBounds;
          }
          return !(hasGaps || maxAck < maxMsgNum);
        }
      }
    } catch (Exception e) {
      e.printStackTrace(_out);
    }
    return true;
  }
 
  private void endBatch() {
    synchronized(_clientInstance) {
      if (_numInCurrentBatch > 0) {
        _out.println("Ending batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ")
           for " + _clientInstance + "...");
      }
      /**
       * _rmClient.reset() resets a WsrmClient instance (and the client instance it represents)
       * so it can track a new WS-RM sequence for the next invoke on the client
       * instance. This method effectively *disconnects* the RM sequence from the
       * client instance and lets them continue/complete separately.
       */ 
      _rmClient.reset();
      _numInCurrentBatch = 0;
      if (!_closed) {
        _timer.schedule(new TimerTask() {
          @Override
          public void run() {
            terminateOrEndBatch();
          }
        }, _maxBatchLifetimeMillis);
      }
    }
  }
 
  public Object invoke(Object proxy, Method method, Object[] args)
    throws Throwable {
    boolean operationInvoke = method.getDeclaringClass() == _clazz;
    boolean closeableInvoke = method.getDeclaringClass() ==
                              java.io.Closeable.class;
    boolean endOfBatch = false;
    if (operationInvoke) {
      synchronized(_clientInstance) {
        // Check our batch size
        if (_numInCurrentBatch == 0) {
          _batchNum++;
        }
        endOfBatch = _numInCurrentBatch >= _batchSize - 1;
        if (endOfBatch) {
          _rmClient.setFinalMessage();
        }
        _out.println("Making " + (endOfBatch ? "final " : "") + "invoke " + 
           (_numInCurrentBatch+1) + " of batch " + _batchNum + " sequence (" + _
           rmClient.getSequenceId() + ") with operation: " + method.getName());
      }
    } else if (closeableInvoke && method.getName().equals("close")) {
      synchronized(_clientInstance) {
        // Make sure we don't try to schedule the timer anymore
        _closed = true;
        _timer.cancel();
      }
    }
    Object ret = method.invoke(_clientInstance, args);
    if (operationInvoke) {
      synchronized(_clientInstance) {
        _numInCurrentBatch++;
        if (endOfBatch) {
          endBatch();
        }
      }
    }
    return ret;
  }
}