B 信頼性のあるメッセージングのバッチ化のためのクライアント・ラッパー・クラスの例
この付録では、Java API for XML Web Services (JAX-WS)を使用したWebLogic Webサービスで、信頼性のあるメッセージングのバッチ処理に使用できるクライアント・ラッパー・クラスの例を示します。
               信頼性のあるメッセージングのバッチ化の詳細は、「ビジネス作業単位へのメッセージのグループ化(バッチ化)」を参照してください。
ノート:
このクライアント・ラッパー・クラスはサンプル・コードです。正式にサポートされている製品クラスではありません。
例B-1 信頼性のあるメッセージングのバッチ化のためのクライアント・ラッパー・クラスの例
package example.servlet;
 
import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Date;
import java.util.SortedSet;
import java.util.Timer;
import java.util.TimerTask;
 
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
import javax.xml.ws.BindingProvider;
 
import weblogic.wsee.jaxws.JAXWSProperties;
import weblogic.wsee.jaxws.spi.ClientInstance;
import weblogic.wsee.reliability.MessageRange;
import weblogic.wsee.reliability2.api.WsrmClient;
import weblogic.wsee.reliability2.api.WsrmClientFactory;
import weblogic.wsee.reliability2.property.WsrmInvocationPropertyBag;
import weblogic.wsee.reliability2.tube.WsrmClientImpl;
 
/**
 * Example wrapper class to batch reliable requests into fixed size 'batches'
 * that can be sent using a single RM sequence. This class allows a client to
 * send requests that have no natural common grouping or
 * 'business unit of work' and not pay the costs associated with creating and
 * terminating an RM sequence for every message.
 * NOTE: This class is an *example* of how batching might be performed. To do
 *       batching correctly, you should consider error recovery and how to
 *       report sequence errors (reported via ReliabilityErrorListener) back
 *       to the clients that made the original requests.
 * <p>
 * If your web service client code knows of some natural business-oriented
 * grouping of requests (called a 'business unit of work'), it should make the
 * RM subsystem aware of this unit of work by using the
 * WsrmClient.setFinalMessage() method to demarcate the end of a unit (just
 * before sending the actual final request via an invocation on
 * the client instance). In some cases, notably when the client code represents
 * an intermediary in the processing of messages, the client code may not be
 * aware of any natural unit of work. In the past, if no business unit of work
 * could be determined, clients often just created the client instance, sent the
 * single current message they had, and then allowed the sequence to terminate.
 * This is functionally workable, but very inefficient. These clients pay the
 * cost of an RM sequence handshake and termination for every message they send.
 * The BatchingRmClientWrapper class can be used to introduce an artificial
 * unit of work (a batch) when no natural business unit of work is available.
 * <p>
 * Each instance of BatchingRmClientWrapper is a wrapper instance around a
 * client instance (port or Dispatch instance). This wrapper can be used to
 * obtain a Proxy instance that can be used in place of the original client
 * instance. This allows this class to perform batching operations completely
 * invisibly from the perspective of the client code.
 * <p>
 * This class is used for batching reliable requests into
 * batches of a given max size that will survive for a given maximum
 * duration. If a batch fills up or times out, it is ended, causing the
 * RM sequence it represents to be ended/terminated. The timeout ensures that
 * if the flow of incoming requests stops the batch/sequence will still
 * end in a timely manner.
 */
public class BatchingRmClientWrapper<T>
  implements InvocationHandler {
 
  private Class<T> _clazz;
  private int _batchSize;
  private long _maxBatchLifetimeMillis;
  private T _clientInstance;
  private PrintWriter _out;
  private WsrmClient _rmClient;
  private int _numInCurrentBatch;
  private int _batchNum;
  private Timer _timer;
  private boolean _closed;
  private boolean _proxyCreated;
 
  /**
   * Create a wrapper instance for batching reliable requests into
   * batches of the given max size that will survive for the given maximum
   * duration. If a batch fills up or times out, it is ended, causing the
   * RM sequence it represents to be ended/terminated.
   * @param clientInstance The client instance that acts as the source object
   *        for the batching proxy created by the createProxy() method. This
   *        is the port/Dispatch instance returned from the call to
   *        getPort/createDispatch. The BatchingRmClientWrapper will take over
   *        responsibility for managing the interaction with and cleanup of
   *        the client instance via the proxy created from createProxy.
   * @param clazz of the proxy instance we'll be creating in createProxy.
   *        This should be the class of the port/Dispatch instance you would
   *        use to invoke operations on the service. BatchingRmClientWrapper will
   *        create (via createProxy) a proxy of the given type that can be
   *        used in place of the original client instance.
   * @param batchSize Max number of requests to put into a batch. If the
   *        max number of requests are sent for a given batch, that batch is
   *        ended (ending/terminating the sequence it represents) and a new
   *        batch is started.
   * @param maxBatchLifetime A duration value (in the lexical form supported
   *        by java.util.Duration, e.g. PT30S for 30 seconds) representing
   *        the maximum time a batch should exist. If the batch exists longer
   *        than this time, it is ended and a new batch is begun.
   * @param out A print stream that can be used to print diagnostic and
   *        status messages.
   */
  public BatchingRmClientWrapper(T clientInstance, Class<T> clazz,
                                 int batchSize, String maxBatchLifetime,
                                 PrintStream out) {
    _clazz = clazz;
    _batchSize = batchSize;
    try {
      if (maxBatchLifetime == null) {
        maxBatchLifetime = "PT5M";
      }
      Duration duration =
        DatatypeFactory.newInstance().newDuration(maxBatchLifetime);
      _maxBatchLifetimeMillis = duration.getTimeInMillis(new Date());
    } catch (Exception e) {
      throw new RuntimeException(e.toString(), e);
    }
    _clientInstance = clientInstance;
    _out = new PrintWriter(out, true);
    _rmClient = WsrmClientFactory.getWsrmClientFromPort(_clientInstance);
    _closed = false;
    _proxyCreated = false;
    _timer = new Timer(true);
    _timer.schedule(new TimerTask() {
      @Override
      public void run() {
        terminateOrEndBatch();
      }
    }, _maxBatchLifetimeMillis);
  }
 
  /**
   * Creates the dynamic proxy that should be used in place of the client
   * instance used to create this BatchingRmClientWrapper instance. This method
   * should be called only once per BatchingRmClientWrapper.
   */
  public T createProxy() {
    if (_proxyCreated) {
      throw new IllegalStateException("Already created the proxy for this BatchingRmClientWrapper
        instance which wraps the client instance: " + _clientInstance);
    }
    _proxyCreated = true;
    return (T) Proxy.newProxyInstance(getClass().getClassLoader(),
                                     new Class[] {
                                       _clazz,
                                       BindingProvider.class,
                                       java.io.Closeable.class
                                     }, this);
  }
 
  private void terminateOrEndBatch() {
    synchronized(_clientInstance) {
      if (_rmClient.getSequenceId() != null) {
        if (terminateBatchAllowed()) {
          _out.println("Terminating batch " + _batchNum + " sequence (" + _
                rmClient.getSequenceId() + ") for " + _clientInstance);
          try {
            _rmClient.terminateSequence();
          } catch (Exception e) {
            e.printStackTrace(_out);
          }
        } else {
          _out.println("Batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") 
            for " + _clientInstance + " timed out but has outstanding requests to send and 
            cannot be terminated now");
        }
      }
      endBatch();
    }
  }
 
  /**
   * Check to see if we have acks for all requests sent. If so,
   * we can terminate.
   */
  private boolean terminateBatchAllowed() {
    try {
      synchronized(_clientInstance) {
        if (_rmClient.getSequenceId() != null) {
          long maxMsgNum = _rmClient.getMostRecentMessageNumber();
          if (maxMsgNum < 1) {
            // No messages sent, go ahead and terminate.
            return true;
          }
          SortedSet<MessageRange> ranges = _rmClient.getAckRanges();
          long maxAck = -1;
          boolean hasGaps = false;
          long lastRangeUpper = -1;
          for (MessageRange range: ranges) {
            if (lastRangeUpper > 0) {
              if (range.lowerBounds != lastRangeUpper + 1) {
                hasGaps = true;
              }
            } else {
              lastRangeUpper = range.upperBounds;
            }
            maxAck = range.upperBounds;
          }
          return !(hasGaps || maxAck < maxMsgNum);
        }
      }
    } catch (Exception e) {
      e.printStackTrace(_out);
    }
    return true;
  }
 
  private void endBatch() {
    synchronized(_clientInstance) {
      if (_numInCurrentBatch > 0) {
        _out.println("Ending batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ")
           for " + _clientInstance + "...");
      }
      /**
       * _rmClient.reset() resets a WsrmClient instance (and the client instance it represents)
       * so it can track a new WS-RM sequence for the next invoke on the client
       * instance. This method effectively *disconnects* the RM sequence from the
       * client instance and lets them continue/complete separately.
       */ 
      _rmClient.reset();
      _numInCurrentBatch = 0;
      if (!_closed) {
        _timer.schedule(new TimerTask() {
          @Override
          public void run() {
            terminateOrEndBatch();
          }
        }, _maxBatchLifetimeMillis);
      }
    }
  }
 
  public Object invoke(Object proxy, Method method, Object[] args)
    throws Throwable {
    boolean operationInvoke = method.getDeclaringClass() == _clazz;
    boolean closeableInvoke = method.getDeclaringClass() ==
                              java.io.Closeable.class;
    boolean endOfBatch = false;
    if (operationInvoke) {
      synchronized(_clientInstance) {
        // Check our batch size
        if (_numInCurrentBatch == 0) {
          _batchNum++;
        }
        endOfBatch = _numInCurrentBatch >= _batchSize - 1;
        if (endOfBatch) {
          _rmClient.setFinalMessage();
        }
        _out.println("Making " + (endOfBatch ? "final " : "") + "invoke " + 
           (_numInCurrentBatch+1) + " of batch " + _batchNum + " sequence (" + _
           rmClient.getSequenceId() + ") with operation: " + method.getName());
      }
    } else if (closeableInvoke && method.getName().equals("close")) {
      synchronized(_clientInstance) {
        // Make sure we don't try to schedule the timer anymore
        _closed = true;
        _timer.cancel();
      }
    }
    Object ret = method.invoke(_clientInstance, args);
    if (operationInvoke) {
      synchronized(_clientInstance) {
        _numInCurrentBatch++;
        if (endOfBatch) {
          endBatch();
        }
      }
    }
    return ret;
  }
}