Oracle® Fusion Middleware Oracle WebLogic Server JAX-WS Webサービスの高度な機能のプログラミング 11g リリース1(10.3.4) B61633-02 |
|
戻る |
次のコードは、信頼性のあるメッセージングのバッチ処理に使用できるクライアント・ラッパー・クラスの例です。信頼性のあるメッセージングのバッチ処理の詳細は、「ビジネス作業単位へのメッセージのグループ化(バッチ処理)」を参照してください。
注意: このクライアント・ラッパー・クラスはサンプル・コードです。正式にサポートされている本番クラスではありません。 |
例B-1 信頼性のあるメッセージングのバッチ処理のためのクライアント・ラッパー・クラスの例
import java.io.*; import java.lang.*; import java.util.*; import javax.xml.*; import weblogic.wsee.jaxws.JAXWSProperties; import weblogic.wsee.jaxws.spi.ClientInstance; import weblogic.wsee.reliability.MessageRange; import weblogic.wsee.reliability2.api.WsrmClient; import weblogic.wsee.reliability2.api.WsrmClientFactory; import weblogic.wsee.reliability2.property.WsrmInvocationPropertyBag; import weblogic.wsee.reliability2.tube.WsrmClientImpl; /** * Example wrapper class to batch reliable requests into fixed size 'batches' * that can be sent using a single RM sequence. This class allows a client to * send requests that have no natural common grouping or * 'business unit of work' and not pay the costs associated with creating and * terminating an RM sequence for every message. * NOTE: This class is an *example* of how batching might be performed. To do * batching correctly, you should consider error recovery and how to * report sequence errors (reported via ReliabilityErrorListener) back * to the clients that made the original requests. * <p> * If your Web service client code knows of some natural business-oriented * grouping of requests (called a 'business unit of work'), it should make the * RM subsystem aware of this unit of work by using the * WsrmClient.setFinalMessage() method to demarcate the end of a unit (just * before sending the actual final request via an invocation on * the client instance). In some cases, notably when the client code represents * an intermediary in the processing of messages, the client code may not be * aware of any natural unit of work. In the past, if no business unit of work * could be determined, clients often just created the client instance, sent the * single current message they had, and then allowed the sequence to terminate. * This is functionally workable, but very inefficient. These clients pay the * cost of an RM sequence handshake and termination for every message they send. * The BatchingRmClientWrapper class can be used to introduce an artificial * unit of work (a batch) when no natural business unit of work is available. * <p> * Each instance of BatchingRmClientWrapper is a wrapper instance around a * client instance (port or Dispatch instance). This wrapper can be used to * obtain a Proxy instance that can be used in place of the original client * instance. This allows this class to perform batching operations completely * invisibly from the perspective of the client code. * <p> * This class is used for batching reliable requests into * batches of a given max size that will survive for a given maximum * duration. If a batch fills up or times out, it is ended, causing the * RM sequence it represents to be ended/terminated. The timeout ensures that * if the flow of incoming requests stops the batch/sequence will still * end in a timely manner. */ public class BatchingRmClientWrapper<T> implements InvocationHandler { private Class<T> _clazz; private int _batchSize; private long _maxBatchLifetimeMillis; private T _clientInstance; private PrintWriter _out; private WsrmClient _rmClient; private int _numInCurrentBatch; private int _batchNum; private Timer _timer; private boolean _closed; private boolean _proxyCreated; /** * Create a wrapper instance for batching reliable requests into * batches of the given max size that will survive for the given maximum * duration. If a batch fills up or times out, it is ended, causing the * RM sequence it represents to be ended/terminated. * @param clientInstance The client instance that acts as the source object * for the batching proxy created by the createProxy() method. This * is the port/Dispatch instance returned from the call to * getPort/createDispatch. The BatchingRmClientWrapper will take over * responsibility for managing the interaction with and cleanup of * the client instance via the proxy created from createProxy. * @param clazz of the proxy instance created from createProxy. * This should be the class of the port/Dispatch instance you would * use to invoke operations on the service. BatchingRmClientWrapper will * create (via createProxy) a proxy of the given type that can be * used in place of the original client instance. * @param batchSize Max number of requests to put into a batch. If the * max number of requests are sent for a given batch, that batch is * ended (ending/terminating the sequence it represents) and a new * batch is started. * @param maxBatchLifetime A duration value (in the lexical form supported * by java.util.Duration, e.g. PT30S for 30 seconds) representing * the maximum time a batch should exist. If the batch exists longer * than this time, it is ended and a new batch is begun. * @param out A print stream that can be used to print diagnostic and * status messages. */ public BatchingRmClientWrapper(T clientInstance, Class<T> clazz, int batchSize, String maxBatchLifetime, PrintStream out) { _clazz = clazz; _batchSize = batchSize; try { if (maxBatchLifetime == null) { maxBatchLifetime = "PT5M"; } Duration duration = DatatypeFactory.newInstance().newDuration(maxBatchLifetime); _maxBatchLifetimeMillis = duration.getTimeInMillis(new Date()); } catch (Exception e) { throw new RuntimeException(e.toString(), e); } _clientInstance = clientInstance; _out = new PrintWriter(out, true); _rmClient = WsrmClientFactory.getWsrmClientFromPort(_clientInstance); _closed = false; _proxyCreated = false; _timer = new Timer(true); _timer.schedule(new TimerTask() { @Override public void run() { terminateOrEndBatch(); } }, _maxBatchLifetimeMillis); } /** * Creates the dynamic proxy that should be used in place of the client * instance used to create this BatchingRmClientWrapper instance. This method * should be called only once per BatchingRmClientWrapper. */ public T createProxy() { if (_proxyCreated) { throw new IllegalStateException("Already created the proxy for this BatchingRmClientWrapper instance which wraps the client instance: " + _clientInstance); } _proxyCreated = true; return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { _clazz, BindingProvider.class, java.io.Closeable.class }, this); } private void terminateOrEndBatch() { synchronized(_clientInstance) { if (_rmClient.getSequenceId() != null) { if (terminateBatchAllowed()) { _out.println("Terminating batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance); try { _rmClient.terminateSequence(); } catch (Exception e) { e.printStackTrace(_out); } } else { _out.println("Batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance + " timed out but has outstanding requests to send and cannot be terminated now"); } } endBatch(); } } /** * Check to see if there are acknowledgements for all requests sent. If so, * terminate. */ private boolean terminateBatchAllowed() { try { synchronized(_clientInstance) { if (_rmClient.getSequenceId() != null) { // TODO: Remove this workaround when getMostRecentMessageNumber is // fixed. // The following BUG is targeted for the next release, at which time // the workaround can be removed. // Bug 10382605 - WSRMCLIENT.GETMOSTRECENTMESSAGENUMBER() ALWAYS RETURNS 0 //long maxMsgNum = _rmClient.getMostRecentMessageNumber(); // -------------------------------------------------------- // Workaround: Start // *** Do *not* use this API in your own code. It is not // supported for customer use *** WsrmInvocationPropertyBag rmProps = (WsrmInvocationPropertyBag)((BindingProvider)_clientInstance). getRequestContext().get(WsrmInvocationPropertyBag.key); long maxMsgNum = rmProps != null ? rmProps.getMostRecentMsgNum() : 0; // Workaround: End // -------------------------------------------------------- if (maxMsgNum < 1) { // No messages sent, go ahead and terminate. return true; } SortedSet<MessageRange> ranges = _rmClient.getAckRanges(); long maxAck = -1; boolean hasGaps = false; long lastRangeUpper = -1; for (MessageRange range: ranges) { if (lastRangeUpper > 0) { if (range.lowerBounds != lastRangeUpper + 1) { hasGaps = true; } } else { lastRangeUpper = range.upperBounds; } maxAck = range.upperBounds; } return !(hasGaps || maxAck < maxMsgNum); } } } catch (Exception e) { e.printStackTrace(_out); } return true; } private void endBatch() { synchronized(_clientInstance) { if (_numInCurrentBatch > 0) { _out.println("Ending batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance + "..."); } resetWsrmClient(_rmClient, _clientInstance); _numInCurrentBatch = 0; if (!_closed) { _timer.schedule(new TimerTask() { @Override public void run() { terminateOrEndBatch(); } }, _maxBatchLifetimeMillis); } } } /** * Resets a WsrmClient instance (and the client instance it represents) * so it can track a new WS-RM sequence for the next invoke on the client * instance. This method effectively *disconnects* the RM sequence from the * client instance and lets them continue/complete separately. * NOTE: You should use this method instead of WsrmClient.reset() alone due * to a bug in WsrmClient.reset that does not completely reset all state * stored on the client instance from the old sequence. */ public static void resetWsrmClient(WsrmClient rmClient, Object clientInstance) { rmClient.reset(); // TODO: Shouldn't have to do this, as _rmClient.reset should do it for // The following BUG is targeted for the next release, at which time // the workaround can be removed. // Bug 10382543 - WSRMCLIENT.RESET() FAILS TO RESET ALL NECESSARY STATE // -------------------------------------------------------- // Workaround: Start // *** Do *not* use this API in your own code. It is not // supported for customer use *** WeakReference<ClientInstance> clientInstanceRef = (WeakReference<ClientInstance>) ((BindingProvider)clientInstance). getRequestContext().get(JAXWSProperties.CLIENT_INSTANCE_WEAK_REF); ClientInstance instance = clientInstanceRef != null ? clientInstanceRef.get() : null; if (instance != null) { instance.getProps(). remove(WsrmClientImpl.CLIENT_CURRENT_SEQUENCE_ID_PROP_NAME); } // Workaround: End // -------------------------------------------------------- } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { boolean operationInvoke = method.getDeclaringClass() == _clazz; boolean closeableInvoke = method.getDeclaringClass() == java.io.Closeable.class; boolean endOfBatch = false; if (operationInvoke) { synchronized(_clientInstance) { // Check our batch size if (_numInCurrentBatch == 0) { _batchNum++; } endOfBatch = _numInCurrentBatch >= _batchSize - 1; if (endOfBatch) { _rmClient.setFinalMessage(); } _out.println("Making " + (endOfBatch ? "final " : "") + "invoke " + (_numInCurrentBatch+1) + " of batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") with operation: " + method.getName()); } } else if (closeableInvoke && method.getName().equals("close")) { synchronized(_clientInstance) { // Make sure we don't try to schedule the timer anymore _closed = true; _timer.cancel(); } } Object ret = method.invoke(_clientInstance, args); if (operationInvoke) { synchronized(_clientInstance) { _numInCurrentBatch++; if (endOfBatch) { endBatch(); } } } return ret; } }