| Oracle® Fusion Middleware Oracle WebLogic Server JAX-WS Webサービスの高度な機能のプログラミング 11g リリース1(10.3.4) B61633-02 |
|
![]() 戻る |
次のコードは、信頼性のあるメッセージングのバッチ処理に使用できるクライアント・ラッパー・クラスの例です。信頼性のあるメッセージングのバッチ処理の詳細は、「ビジネス作業単位へのメッセージのグループ化(バッチ処理)」を参照してください。
|
注意: このクライアント・ラッパー・クラスはサンプル・コードです。正式にサポートされている本番クラスではありません。 |
例B-1 信頼性のあるメッセージングのバッチ処理のためのクライアント・ラッパー・クラスの例
import java.io.*;
import java.lang.*;
import java.util.*;
import javax.xml.*;
import weblogic.wsee.jaxws.JAXWSProperties;
import weblogic.wsee.jaxws.spi.ClientInstance;
import weblogic.wsee.reliability.MessageRange;
import weblogic.wsee.reliability2.api.WsrmClient;
import weblogic.wsee.reliability2.api.WsrmClientFactory;
import weblogic.wsee.reliability2.property.WsrmInvocationPropertyBag;
import weblogic.wsee.reliability2.tube.WsrmClientImpl;
/**
* Example wrapper class to batch reliable requests into fixed size 'batches'
* that can be sent using a single RM sequence. This class allows a client to
* send requests that have no natural common grouping or
* 'business unit of work' and not pay the costs associated with creating and
* terminating an RM sequence for every message.
* NOTE: This class is an *example* of how batching might be performed. To do
* batching correctly, you should consider error recovery and how to
* report sequence errors (reported via ReliabilityErrorListener) back
* to the clients that made the original requests.
* <p>
* If your Web service client code knows of some natural business-oriented
* grouping of requests (called a 'business unit of work'), it should make the
* RM subsystem aware of this unit of work by using the
* WsrmClient.setFinalMessage() method to demarcate the end of a unit (just
* before sending the actual final request via an invocation on
* the client instance). In some cases, notably when the client code represents
* an intermediary in the processing of messages, the client code may not be
* aware of any natural unit of work. In the past, if no business unit of work
* could be determined, clients often just created the client instance, sent the
* single current message they had, and then allowed the sequence to terminate.
* This is functionally workable, but very inefficient. These clients pay the
* cost of an RM sequence handshake and termination for every message they send.
* The BatchingRmClientWrapper class can be used to introduce an artificial
* unit of work (a batch) when no natural business unit of work is available.
* <p>
* Each instance of BatchingRmClientWrapper is a wrapper instance around a
* client instance (port or Dispatch instance). This wrapper can be used to
* obtain a Proxy instance that can be used in place of the original client
* instance. This allows this class to perform batching operations completely
* invisibly from the perspective of the client code.
* <p>
* This class is used for batching reliable requests into
* batches of a given max size that will survive for a given maximum
* duration. If a batch fills up or times out, it is ended, causing the
* RM sequence it represents to be ended/terminated. The timeout ensures that
* if the flow of incoming requests stops the batch/sequence will still
* end in a timely manner.
*/
public class BatchingRmClientWrapper<T>
implements InvocationHandler {
private Class<T> _clazz;
private int _batchSize;
private long _maxBatchLifetimeMillis;
private T _clientInstance;
private PrintWriter _out;
private WsrmClient _rmClient;
private int _numInCurrentBatch;
private int _batchNum;
private Timer _timer;
private boolean _closed;
private boolean _proxyCreated;
/**
* Create a wrapper instance for batching reliable requests into
* batches of the given max size that will survive for the given maximum
* duration. If a batch fills up or times out, it is ended, causing the
* RM sequence it represents to be ended/terminated.
* @param clientInstance The client instance that acts as the source object
* for the batching proxy created by the createProxy() method. This
* is the port/Dispatch instance returned from the call to
* getPort/createDispatch. The BatchingRmClientWrapper will take over
* responsibility for managing the interaction with and cleanup of
* the client instance via the proxy created from createProxy.
* @param clazz of the proxy instance created from createProxy.
* This should be the class of the port/Dispatch instance you would
* use to invoke operations on the service. BatchingRmClientWrapper will
* create (via createProxy) a proxy of the given type that can be
* used in place of the original client instance.
* @param batchSize Max number of requests to put into a batch. If the
* max number of requests are sent for a given batch, that batch is
* ended (ending/terminating the sequence it represents) and a new
* batch is started.
* @param maxBatchLifetime A duration value (in the lexical form supported
* by java.util.Duration, e.g. PT30S for 30 seconds) representing
* the maximum time a batch should exist. If the batch exists longer
* than this time, it is ended and a new batch is begun.
* @param out A print stream that can be used to print diagnostic and
* status messages.
*/
public BatchingRmClientWrapper(T clientInstance, Class<T> clazz,
int batchSize, String maxBatchLifetime,
PrintStream out) {
_clazz = clazz;
_batchSize = batchSize;
try {
if (maxBatchLifetime == null) {
maxBatchLifetime = "PT5M";
}
Duration duration =
DatatypeFactory.newInstance().newDuration(maxBatchLifetime);
_maxBatchLifetimeMillis = duration.getTimeInMillis(new Date());
} catch (Exception e) {
throw new RuntimeException(e.toString(), e);
}
_clientInstance = clientInstance;
_out = new PrintWriter(out, true);
_rmClient = WsrmClientFactory.getWsrmClientFromPort(_clientInstance);
_closed = false;
_proxyCreated = false;
_timer = new Timer(true);
_timer.schedule(new TimerTask() {
@Override
public void run() {
terminateOrEndBatch();
}
}, _maxBatchLifetimeMillis);
}
/**
* Creates the dynamic proxy that should be used in place of the client
* instance used to create this BatchingRmClientWrapper instance. This method
* should be called only once per BatchingRmClientWrapper.
*/
public T createProxy() {
if (_proxyCreated) {
throw new IllegalStateException("Already created the proxy for this BatchingRmClientWrapper instance which wraps the client instance: " + _clientInstance);
}
_proxyCreated = true;
return (T) Proxy.newProxyInstance(getClass().getClassLoader(),
new Class[] {
_clazz,
BindingProvider.class,
java.io.Closeable.class
}, this);
}
private void terminateOrEndBatch() {
synchronized(_clientInstance) {
if (_rmClient.getSequenceId() != null) {
if (terminateBatchAllowed()) {
_out.println("Terminating batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance);
try {
_rmClient.terminateSequence();
} catch (Exception e) {
e.printStackTrace(_out);
}
} else {
_out.println("Batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance + " timed out but has outstanding requests to send and cannot be terminated now");
}
}
endBatch();
}
}
/**
* Check to see if there are acknowledgements for all requests sent. If so,
* terminate.
*/
private boolean terminateBatchAllowed() {
try {
synchronized(_clientInstance) {
if (_rmClient.getSequenceId() != null) {
// TODO: Remove this workaround when getMostRecentMessageNumber is
// fixed.
// The following BUG is targeted for the next release, at which time
// the workaround can be removed.
// Bug 10382605 - WSRMCLIENT.GETMOSTRECENTMESSAGENUMBER() ALWAYS RETURNS 0
//long maxMsgNum = _rmClient.getMostRecentMessageNumber();
// --------------------------------------------------------
// Workaround: Start
// *** Do *not* use this API in your own code. It is not
// supported for customer use ***
WsrmInvocationPropertyBag rmProps =
(WsrmInvocationPropertyBag)((BindingProvider)_clientInstance).
getRequestContext().get(WsrmInvocationPropertyBag.key);
long maxMsgNum = rmProps != null ? rmProps.getMostRecentMsgNum() : 0;
// Workaround: End
// --------------------------------------------------------
if (maxMsgNum < 1) {
// No messages sent, go ahead and terminate.
return true;
}
SortedSet<MessageRange> ranges = _rmClient.getAckRanges();
long maxAck = -1;
boolean hasGaps = false;
long lastRangeUpper = -1;
for (MessageRange range: ranges) {
if (lastRangeUpper > 0) {
if (range.lowerBounds != lastRangeUpper + 1) {
hasGaps = true;
}
} else {
lastRangeUpper = range.upperBounds;
}
maxAck = range.upperBounds;
}
return !(hasGaps || maxAck < maxMsgNum);
}
}
} catch (Exception e) {
e.printStackTrace(_out);
}
return true;
}
private void endBatch() {
synchronized(_clientInstance) {
if (_numInCurrentBatch > 0) {
_out.println("Ending batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance + "...");
}
resetWsrmClient(_rmClient, _clientInstance);
_numInCurrentBatch = 0;
if (!_closed) {
_timer.schedule(new TimerTask() {
@Override
public void run() {
terminateOrEndBatch();
}
}, _maxBatchLifetimeMillis);
}
}
}
/**
* Resets a WsrmClient instance (and the client instance it represents)
* so it can track a new WS-RM sequence for the next invoke on the client
* instance. This method effectively *disconnects* the RM sequence from the
* client instance and lets them continue/complete separately.
* NOTE: You should use this method instead of WsrmClient.reset() alone due
* to a bug in WsrmClient.reset that does not completely reset all state
* stored on the client instance from the old sequence.
*/
public static void resetWsrmClient(WsrmClient rmClient, Object clientInstance) {
rmClient.reset();
// TODO: Shouldn't have to do this, as _rmClient.reset should do it for
// The following BUG is targeted for the next release, at which time
// the workaround can be removed.
// Bug 10382543 - WSRMCLIENT.RESET() FAILS TO RESET ALL NECESSARY STATE
// --------------------------------------------------------
// Workaround: Start
// *** Do *not* use this API in your own code. It is not
// supported for customer use ***
WeakReference<ClientInstance> clientInstanceRef =
(WeakReference<ClientInstance>)
((BindingProvider)clientInstance).
getRequestContext().get(JAXWSProperties.CLIENT_INSTANCE_WEAK_REF);
ClientInstance instance = clientInstanceRef != null ?
clientInstanceRef.get() : null;
if (instance != null) {
instance.getProps().
remove(WsrmClientImpl.CLIENT_CURRENT_SEQUENCE_ID_PROP_NAME);
}
// Workaround: End
// --------------------------------------------------------
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
boolean operationInvoke = method.getDeclaringClass() == _clazz;
boolean closeableInvoke = method.getDeclaringClass() ==
java.io.Closeable.class;
boolean endOfBatch = false;
if (operationInvoke) {
synchronized(_clientInstance) {
// Check our batch size
if (_numInCurrentBatch == 0) {
_batchNum++;
}
endOfBatch = _numInCurrentBatch >= _batchSize - 1;
if (endOfBatch) {
_rmClient.setFinalMessage();
}
_out.println("Making " + (endOfBatch ? "final " : "") + "invoke " + (_numInCurrentBatch+1) + " of batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") with operation: " + method.getName());
}
} else if (closeableInvoke && method.getName().equals("close")) {
synchronized(_clientInstance) {
// Make sure we don't try to schedule the timer anymore
_closed = true;
_timer.cancel();
}
}
Object ret = method.invoke(_clientInstance, args);
if (operationInvoke) {
synchronized(_clientInstance) {
_numInCurrentBatch++;
if (endOfBatch) {
endBatch();
}
}
}
return ret;
}
}