ヘッダーをスキップ
Oracle® Fusion Middleware Oracle WebLogic Server JAX-WS Webサービスの高度な機能のプログラミング
11g リリース1(10.3.4)
B61633-02
  目次へ
目次

前
戻る
 
 

B 信頼性のあるメッセージングのバッチ処理のためのクライアント・ラッパー・クラスの例

次のコードは、信頼性のあるメッセージングのバッチ処理に使用できるクライアント・ラッパー・クラスの例です。信頼性のあるメッセージングのバッチ処理の詳細は、「ビジネス作業単位へのメッセージのグループ化(バッチ処理)」を参照してください。


注意:

このクライアント・ラッパー・クラスはサンプル・コードです。正式にサポートされている本番クラスではありません。

例B-1 信頼性のあるメッセージングのバッチ処理のためのクライアント・ラッパー・クラスの例

import java.io.*;
import java.lang.*;
import java.util.*;
import javax.xml.*;
 
import weblogic.wsee.jaxws.JAXWSProperties;
import weblogic.wsee.jaxws.spi.ClientInstance;
import weblogic.wsee.reliability.MessageRange;
import weblogic.wsee.reliability2.api.WsrmClient;
import weblogic.wsee.reliability2.api.WsrmClientFactory;
import weblogic.wsee.reliability2.property.WsrmInvocationPropertyBag;
import weblogic.wsee.reliability2.tube.WsrmClientImpl;
 
/**
 * Example wrapper class to batch reliable requests into fixed size 'batches'
 * that can be sent using a single RM sequence. This class allows a client to
 * send requests that have no natural common grouping or
 * 'business unit of work' and not pay the costs associated with creating and
 * terminating an RM sequence for every message.
 * NOTE: This class is an *example* of how batching might be performed. To do
 *       batching correctly, you should consider error recovery and how to
 *       report sequence errors (reported via ReliabilityErrorListener) back
 *       to the clients that made the original requests.
 * <p>
 * If your Web service client code knows of some natural business-oriented
 * grouping of requests (called a 'business unit of work'), it should make the
 * RM subsystem aware of this unit of work by using the
 * WsrmClient.setFinalMessage() method to demarcate the end of a unit (just
 * before sending the actual final request via an invocation on
 * the client instance). In some cases, notably when the client code represents
 * an intermediary in the processing of messages, the client code may not be
 * aware of any natural unit of work. In the past, if no business unit of work
 * could be determined, clients often just created the client instance, sent the
 * single current message they had, and then allowed the sequence to terminate.
 * This is functionally workable, but very inefficient. These clients pay the
 * cost of an RM sequence handshake and termination for every message they send.
 * The BatchingRmClientWrapper class can be used to introduce an artificial
 * unit of work (a batch) when no natural business unit of work is available.
 * <p>
 * Each instance of BatchingRmClientWrapper is a wrapper instance around a
 * client instance (port or Dispatch instance). This wrapper can be used to
 * obtain a Proxy instance that can be used in place of the original client
 * instance. This allows this class to perform batching operations completely
 * invisibly from the perspective of the client code.
 * <p>
 * This class is used for batching reliable requests into
 * batches of a given max size that will survive for a given maximum
 * duration. If a batch fills up or times out, it is ended, causing the
 * RM sequence it represents to be ended/terminated. The timeout ensures that
 * if the flow of incoming requests stops the batch/sequence will still
 * end in a timely manner.
 */
public class BatchingRmClientWrapper<T>
  implements InvocationHandler {
 
  private Class<T> _clazz;
  private int _batchSize;
  private long _maxBatchLifetimeMillis;
  private T _clientInstance;
  private PrintWriter _out;
  private WsrmClient _rmClient;
  private int _numInCurrentBatch;
  private int _batchNum;
  private Timer _timer;
  private boolean _closed;
  private boolean _proxyCreated;
 
  /**
   * Create a wrapper instance for batching reliable requests into
   * batches of the given max size that will survive for the given maximum
   * duration. If a batch fills up or times out, it is ended, causing the
   * RM sequence it represents to be ended/terminated.
   * @param clientInstance The client instance that acts as the source object
   *        for the batching proxy created by the createProxy() method. This
   *        is the port/Dispatch instance returned from the call to
   *        getPort/createDispatch. The BatchingRmClientWrapper will take over
   *        responsibility for managing the interaction with and cleanup of
   *        the client instance via the proxy created from createProxy.
   * @param clazz of the proxy instance created from createProxy.
   *        This should be the class of the port/Dispatch instance you would
   *        use to invoke operations on the service. BatchingRmClientWrapper will
   *        create (via createProxy) a proxy of the given type that can be
   *        used in place of the original client instance.
   * @param batchSize Max number of requests to put into a batch. If the
   *        max number of requests are sent for a given batch, that batch is
   *        ended (ending/terminating the sequence it represents) and a new
   *        batch is started.
   * @param maxBatchLifetime A duration value (in the lexical form supported
   *        by java.util.Duration, e.g. PT30S for 30 seconds) representing
   *        the maximum time a batch should exist. If the batch exists longer
   *        than this time, it is ended and a new batch is begun.
   * @param out A print stream that can be used to print diagnostic and
   *        status messages.
   */
  public BatchingRmClientWrapper(T clientInstance, Class<T> clazz,
                                 int batchSize, String maxBatchLifetime,
                                 PrintStream out) {
    _clazz = clazz;
    _batchSize = batchSize;
    try {
      if (maxBatchLifetime == null) {
        maxBatchLifetime = "PT5M";
      }
      Duration duration =
        DatatypeFactory.newInstance().newDuration(maxBatchLifetime);
      _maxBatchLifetimeMillis = duration.getTimeInMillis(new Date());
    } catch (Exception e) {
      throw new RuntimeException(e.toString(), e);
    }
    _clientInstance = clientInstance;
    _out = new PrintWriter(out, true);
    _rmClient = WsrmClientFactory.getWsrmClientFromPort(_clientInstance);
    _closed = false;
    _proxyCreated = false;
    _timer = new Timer(true);
    _timer.schedule(new TimerTask() {
      @Override
      public void run() {
        terminateOrEndBatch();
      }
    }, _maxBatchLifetimeMillis);
  }
 
  /**
   * Creates the dynamic proxy that should be used in place of the client
   * instance used to create this BatchingRmClientWrapper instance. This method
   * should be called only once per BatchingRmClientWrapper.
   */
  public T createProxy() {
    if (_proxyCreated) {
      throw new IllegalStateException("Already created the proxy for this BatchingRmClientWrapper instance which wraps the client instance: " + _clientInstance);
    }
    _proxyCreated = true;
    return (T) Proxy.newProxyInstance(getClass().getClassLoader(),
                                     new Class[] {
                                       _clazz,
                                       BindingProvider.class,
                                       java.io.Closeable.class
                                     }, this);
  }
 
  private void terminateOrEndBatch() {
    synchronized(_clientInstance) {
      if (_rmClient.getSequenceId() != null) {
        if (terminateBatchAllowed()) {
          _out.println("Terminating batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance);
          try {
            _rmClient.terminateSequence();
          } catch (Exception e) {
            e.printStackTrace(_out);
          }
        } else {
          _out.println("Batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance + " timed out but has outstanding requests to send and cannot be terminated now");
        }
      }
      endBatch();
    }
  }
 
  /**
   * Check to see if there are acknowledgements for all requests sent. If so,
   * terminate.
   */
  private boolean terminateBatchAllowed() {
    try {
      synchronized(_clientInstance) {
        if (_rmClient.getSequenceId() != null) {
 
          // TODO: Remove this workaround when getMostRecentMessageNumber is
          //       fixed.
          //       The following BUG is targeted for the next release, at which time
          //       the workaround can be removed.
          //       Bug 10382605 - WSRMCLIENT.GETMOSTRECENTMESSAGENUMBER() ALWAYS RETURNS 0
          //long maxMsgNum = _rmClient.getMostRecentMessageNumber();
 
          // --------------------------------------------------------
          // Workaround: Start
          //   *** Do *not* use this API in your own code. It is not
          //       supported for customer use ***
          WsrmInvocationPropertyBag rmProps =
            (WsrmInvocationPropertyBag)((BindingProvider)_clientInstance).
              getRequestContext().get(WsrmInvocationPropertyBag.key);
          long maxMsgNum = rmProps != null ? rmProps.getMostRecentMsgNum() : 0;
          // Workaround: End
          // --------------------------------------------------------
 
          if (maxMsgNum < 1) {
            // No messages sent, go ahead and terminate.
            return true;
          }
          SortedSet<MessageRange> ranges = _rmClient.getAckRanges();
          long maxAck = -1;
          boolean hasGaps = false;
          long lastRangeUpper = -1;
          for (MessageRange range: ranges) {
            if (lastRangeUpper > 0) {
              if (range.lowerBounds != lastRangeUpper + 1) {
                hasGaps = true;
              }
            } else {
              lastRangeUpper = range.upperBounds;
            }
            maxAck = range.upperBounds;
          }
          return !(hasGaps || maxAck < maxMsgNum);
        }
      }
    } catch (Exception e) {
      e.printStackTrace(_out);
    }
    return true;
  }
 
  private void endBatch() {
    synchronized(_clientInstance) {
      if (_numInCurrentBatch > 0) {
        _out.println("Ending batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance + "...");
      }
      resetWsrmClient(_rmClient, _clientInstance);
      _numInCurrentBatch = 0;
      if (!_closed) {
        _timer.schedule(new TimerTask() {
          @Override
          public void run() {
            terminateOrEndBatch();
          }
        }, _maxBatchLifetimeMillis);
      }
    }
  }
 
  /**
   * Resets a WsrmClient instance (and the client instance it represents)
   * so it can track a new WS-RM sequence for the next invoke on the client
   * instance. This method effectively *disconnects* the RM sequence from the
   * client instance and lets them continue/complete separately.
   * NOTE: You should use this method instead of WsrmClient.reset() alone due
   *       to a bug in WsrmClient.reset that does not completely reset all state
   *       stored on the client instance from the old sequence.
   */
  public static void resetWsrmClient(WsrmClient rmClient, Object clientInstance) {
    rmClient.reset();
    // TODO: Shouldn't have to do this, as _rmClient.reset should do it for
    //       The following BUG is targeted for the next release, at which time
    //       the workaround can be removed.
    //       Bug 10382543 - WSRMCLIENT.RESET() FAILS TO RESET ALL NECESSARY STATE
    // --------------------------------------------------------
    // Workaround: Start
    //   *** Do *not* use this API in your own code. It is not
    //       supported for customer use ***
    WeakReference<ClientInstance> clientInstanceRef =
      (WeakReference<ClientInstance>)
        ((BindingProvider)clientInstance).
          getRequestContext().get(JAXWSProperties.CLIENT_INSTANCE_WEAK_REF);
    ClientInstance instance = clientInstanceRef != null ?
      clientInstanceRef.get() : null;
    if (instance != null) {
      instance.getProps().
        remove(WsrmClientImpl.CLIENT_CURRENT_SEQUENCE_ID_PROP_NAME);
    }
    // Workaround: End
    // --------------------------------------------------------
  }
 
  public Object invoke(Object proxy, Method method, Object[] args)
    throws Throwable {
    boolean operationInvoke = method.getDeclaringClass() == _clazz;
    boolean closeableInvoke = method.getDeclaringClass() ==
                              java.io.Closeable.class;
    boolean endOfBatch = false;
    if (operationInvoke) {
      synchronized(_clientInstance) {
        // Check our batch size
        if (_numInCurrentBatch == 0) {
          _batchNum++;
        }
        endOfBatch = _numInCurrentBatch >= _batchSize - 1;
        if (endOfBatch) {
          _rmClient.setFinalMessage();
        }
        _out.println("Making " + (endOfBatch ? "final " : "") + "invoke " + (_numInCurrentBatch+1) + " of batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") with operation: " + method.getName());
      }
    } else if (closeableInvoke && method.getName().equals("close")) {
      synchronized(_clientInstance) {
        // Make sure we don't try to schedule the timer anymore
        _closed = true;
        _timer.cancel();
      }
    }
    Object ret = method.invoke(_clientInstance, args);
    if (operationInvoke) {
      synchronized(_clientInstance) {
        _numInCurrentBatch++;
        if (endOfBatch) {
          endBatch();
        }
      }
    }
    return ret;
  }
}