39 Coherence Request Timeouts

When calling Coherence APIs, particularly NamedCache APIs, these are typically remote calls that could take some time to run. Depending on the load on the storage enabled members, or whether a cluster is recovering from a member departure or member joining, and such, the requests could take a lot longer than normal to run. Sometimes, you want to have a guaranteed SLA for request times and need a Coherence API call to time out if it exceeds this SLA.

There are a few different ways to implement request timeouts in Coherence, each with different advantages and some with significant downsides. It is important that you understand what request timeouts actually do and how different approaches work, as they are not always suitable as an SLA guarantee. If used improperly, they can cause application issues, inconsistent cache data, and put more load on a system.

When you use request timeouts, they must have a valid way to handle the timeout exceptions. Blindly re-trying is certainly not a suitable strategy. When a request times out, it is only the caller that receives an exception. The request will still be queued and run to completion on the server. Typically, timeouts will occur because the system is under some load and requests are taking longer than normal. If an application just catches and ignores a timeout, just retries the request, or just carries on as normal without any sensible back-off strategy, then this can just put even more load on the servers at a time when they are obviously under more than normal load.

The smaller the value used for request timeouts, the more likely it is that an application may have exceptions when the system is under heavy load. It is important that the application is tested with the configured timeouts and with the expected production loads to ensure that it behaves as expected. Even then, timeouts may be seen only during failure scenarios or rolling upgrades, when requests typically take longer than normal, so ideally these scenarios should be tested too.

Configure Global Request Timeouts

Coherence configuration files allow the request timeout to be configured in different places depending on the type of cache scheme being configured.

Although configuring request timeouts in the cache configuration is the first option described here, it is not generally recommended. The other approaches listed are better choices.

Note:

A request timeout in configuration only, applies to the sender of a request, and the caller will receive an exception if a request takes longer to run than the timeout. But, it is important to note that the request still has been sent to the server where it will be queued and run. If the request timeout is exceeded, then the caller has no way of knowing whether the request ran successfully on the server or whether it failed, because any response is lost.

Configure Remote Clients

Coherence Extend and gRPC clients can have a request timeout configured in the <remote-cache-scheme> or <remote-grpc-cache-scheme> elements in the cache configuration file. If no timeout is configured for a client, then the default used by Coherence is 30 seconds.

Note:

When configuring a request timeout for a client in the cache configuration file, the timeout applies to all requests made by the client. This includes not just simple requests, such as cache.get() or cache.put(), but also internal requests. For example, the first time a client requests a cache, this is a remote call to the server to ensure the cache exists. If the cache does not exist on the server, then this call could result in the cache needing to be initialized across the whole cluster. If the request timeout is set to a small value (for example, in milliseconds), this can easily fail when a cluster is under load, recovering from member departure, or new members joining.

It is important that a sensible value is set for the request timeout. Application code must be written to expect and to handle RequestTimeoutExceptions that may be thrown by any Coherence API call, especially if the value is very low.

In a Coherence Extend client <remote-cache-scheme>, the request timeout is configured as shown:
<remote-cache-scheme>
  <scheme-name>remote-cache</scheme-name>
  <service-name>RemoteCache</service-name>
  <initiator-config>
    <outgoing-message-handler>
        <request-timeout>20s</request-timeout>
    </outgoing-message-handler>
  </initiator-config>
</remote-cache-scheme>
In a Coherence gRPC client <remote-grpc-cache-scheme>, the request timeout is configured as shown:
<remote-grpc-cache-scheme>
    <scheme-name>remote-grpc</scheme-name>
    <service-name>RemoteGrpcCache</service-name>
    <request-timeout>20s</request-timeout>
</remote-grpc-cache-scheme>

Configure Cluster Members

A request timeout can be configured in cache schemes in the cache configuration file for cluster members.

For example, the following configuration shows a <distributed-scheme> configured with a request timeout of 20 seconds.
<distributed-scheme>
    <scheme-name>storage</scheme-name>
    <service-name>StorageService</service-name>
    <request-timeout>20s</request-timeout>
    <backing-map-scheme>
        <local-scheme/>
    </backing-map-scheme>
    <autostart>true</autostart>
</distributed-scheme>

There is no default timeout for cluster members, so if a request timeout is not configured, then the default is infinite. This does not mean that Coherence will block forever when making a Coherence API call on a cluster member. Coherence has other mechanisms to detect issues with the cluster and detect request response timeouts.

Note:

The request timeout applies to all requests, these could be application API calls, but more importantly, these could be internal Coherence requests.

For example, when a service starts on a cluster member and that service forms a cluster with other cluster members, it goes through a whole cluster join protocol. These cluster join messages are all requests that will have the same timeout applied. A cluster join requires responses from all existing cluster members; this can take time, especially when the cluster is under a heavy load, or when other members are leaving or joining. Failure of cluster join messages can have an extreme effect on the stability of the cluster member and cause it to not start properly.

Configuring request timeouts on cluster members is not recommended unless it is for a very good reason and applications are then very well tested under production level loads.

Use Coherence PriorityTask With a Request Timeout

When using the invoke or aggregate methods on a NamedMap or NamedCache, Coherence has a PriorityTask interface that you can use to set a request timeout for individual method calls.

If invoke or invokeAll methods are called with an entry processor that implements PriorityProcessor, then any request timeout value supplied by the PriorityProcessor will override the default timeout configured for the cache. The same applies to calling the aggregate method with an aggregator that implements PriorityAggregator.

Most Coherence cache API methods can be implemented with an entry processor or aggregator, so it would be possible to replace almost any cache API call with an equivalent invoke or aggregate using a priority task.

Using invoke or aggregate as a replacement for some NamedCache API methods, just to be able to specify a request timeout, may result in minor performance differences. For example, a NamedCache.get() method is very efficient, but replacing this with something like an ExtractorProcessor or ReducerAggregator, would be less efficient due to additional serialization. The use of invoke or aggregate methods would also negate any use of a NearCache because these method always go to the server. You would need to weigh the pros and cons of different approaches.

Using PriorityProcessor

The PriorityProcessor class is a PriorityTask and InvocableMap.EntryProcessor implementation that wraps and delegates to another entry processor. This allows any existing entry processor to be wrapped and called with a specific request timeout.

For example, an application could use an ExtractorProcessor to extract a value from a cache entry and return that value. If the cache entry was a Person with a method getName() that returned the person’s name, the following code could be used to extract a name from an entry in a cache.

public String getName(String key)
    {
    NamedCache<String, Person> cache = session.getCache("people");
    InvocableMap.EntryProcessor<String, Person, String> processor 
        = Processors.extract(ValueExtractor.of(Person::getName));
    return cache.invoke(key, processor);
    }

If an application wanted the previous method to time out if the request takes longer than a required amount of time, then the entry processor can be wrapped in a PriorityProcessor and its request timeout set.

For example, the following code now configures the processor with a request timeout of 500 milliseconds. If the request does not return within the timeout, then a RequestTimeoutException will be thrown.

public String getName(String key)
    {
    NamedCache<String, Person> cache = session.getCache("people");
    InvocableMap.EntryProcessor<String, Person, String> processor 
        = Processors.extract(ValueExtractor.of(Person::getName));
    PriorityProcessor<String, Person, String> priority 
        = Processors.priority(processor);
    priority.setRequestTimeoutMillis(500L);
    return cache.invoke(key, priority);
    }

As with other request timeout implementations, the timeout only applies to the caller. The request will still be sent to the server where it is queued and run. If the request exceeds the timeout, there will be no feedback to the caller about whether the processor succeeded or failed on the server.

Using PriorityAggregator

The PriorityAggregator class is a PriorityTask and InvocableMap.StreamingAggregator implementation that wraps and delegates to another aggregator. This allows any existing aggregator to be wrapped and called with a specific request timeout.

For example, an application could use a ReducerAggregator to extract a value from a cache entry and return that value. If the cache entry was a Person with a method getName() that returned the person’s name, then you could use the following code to extract the keys and corresponding names for all the cache entries that match a Filter.

public Map<String, String> getNames(Filter<?> filter)
    {
    NamedCache<String, Person> cache = session.getCache("people");
    ReducerAggregator<String, Person, ?, String> aggregator 
        = Aggregators.reduce(ValueExtractor.of(Person::getName));
    return cache.aggregate(filter, aggregator);
    }

If an application wanted the previous method to time out if the request takes longer than a required amount of time, the aggregator can be wrapped in a PriorityAggregator and its request timeout set.

For example, the following code now configures the aggregator with a request timeout of 750 milliseconds. If the request does not return within the timeout, then a RequestTimeoutException will be thrown.

public Map<String, String> getNames(Filter<?> filter)
    {
    NamedCache<String, Person> cache = session.getCache("people");
    ReducerAggregator<String, Person, ?, String> aggregator 
        = Aggregators.reduce(ValueExtractor.of(Person::getName));
    PriorityAggregator<String, Person, ?, Map<String, String>> priority   
        = new PriorityAggregator<>(aggregator);
    priority.setRequestTimeoutMillis(750L);
    return cache.aggregate(filter, priority);
    }

As with other request timeout implementations, the timeout only applies to the caller. The request will still be sent to the server where it is queued and the aggregation will be executed. If the request exceeds the timeout, then there will be no feedback to the caller about whether the aggregation succeeded or failed on the server.

Use AsyncNamedCache

When using the Coherence AsyncNamedCache APIs, the methods on a cache return a CompletableFuture.

The result of the CompletableFuture can be obtained using a timeout. This can be used by application code to wait for a set amount of time before giving up. This gives the same behavior as configuring a request timeout, but the timeout can be controlled for individual calls.

For example, the following code will perform a get on a cache and wait a specified amount of time for the result.

public <K, V> V getWithTimeout(NamedCache<K, V> cache, K key, 
        long timeout, TimeUnit unit) 
        throws TimeoutException, InterruptedException, ExecutionException
    {
    CompletableFuture<V> future = cache.async().get(key);
    return future.get(timeout, unit);
    }

Just like other methods, the request is still sent to the server where it will be queued and run. An advantage of using this method is that it is also possible to handle any errors thrown by the server if the call fails.

In the following example, an error handler is added to the future to log any exception. If the timeout is exceeded, then the client will receive a TimeoutException but the call will still run to completion, so the handler code will log any exceptions thrown by the server.

public <K, V> V getWithTimeout(NamedCache<K, V> cache, K key, long timeout, TimeUnit unit)
        throws TimeoutException, InterruptedException, ExecutionException
    {
    CompletableFuture<V> future = cache.async().get(key);
    future.handle((v, error) ->
        {
        if (error != null)
            {
            Logger.err(error);
            }
        return v;
        });
    return future.get(timeout, unit);
    }

Note:

Although application code can call cancel() on a CompletableFuture returned by AsyncNamedCache, this will have no effect on the request. Calling cancel() does not stop the request from running, but any application code waiting on the result of the future will get an exception.

Use the Java CompletableFuture API

Instead of using AsyncNamedCache, you can make asynchronous calls using the Java CompletableFuture API.

For example, you can use the CompletableFuture.supplyAsnc() method to asynchronously run a Coherence cache call on the Java fork join thread pool. The following code will wait the specified amount of time for the call to return.

public <K, V> V getWithTimeout(NamedCache<K, V> cache, K key, long timeout, TimeUnit unit)
        throws TimeoutException, InterruptedException, ExecutionException
    {
    CompletableFuture<V> future = CompletableFuture.supplyAsync(() -> cache.get(key));
    return future.get(timeout, unit);
    }

Instead of using the fork join pool, it is also possible to supply a different executor:

public <K, V> V getWithTimeout(NamedCache<K, V> cache, K key, long timeout, TimeUnit unit)
        throws TimeoutException, InterruptedException, ExecutionException
    {
    Executor executor = Executors.newSingleThreadExecutor();
    CompletableFuture<V> future = CompletableFuture.supplyAsync(() -> cache.get(key), executor);
    return future.get(timeout, unit);
    }

As with using AsyncNamedCache, it is advisable to have some sort of error handler to log exceptions from the server.

public <K, V> V getWithTimeout(NamedCache<K, V> cache, K key, long timeout, TimeUnit unit)
        throws TimeoutException, InterruptedException, ExecutionException
    {
    Executor executor = Executors.newSingleThreadExecutor();
    CompletableFuture<V> future = CompletableFuture.supplyAsync(() ->
        cache.get(key), executor);
    future.handle((v, error) ->
        {
        if (error != null)
            {
            Logger.err(error);
            }
        return v;
        });
    return future.get(timeout, unit);
    }

Unlike when using AsyncNamedCache, calling cancel() on the future may stop the running of the request, or it may not. The lambda or java.util.function.Supplier passed to the CompletableFuture.supplyAsync() method will be queued for running in the fork join pool, or whichever Executor is being used. If cancel() is called before the lambda is run by the executor, then the request will be canceled. If cancel() is called after the cache request has been sent to the server, then it will be queued and run.

Use the Coherence Timeout Class

Internally Coherence uses an interruptible lock for blocking operations, so these operations can be interrupted after a specified timeout interval. You can use the com.oracle.coherence.common.base.Timeout API to interrupt and error out of blocking operations after a specified timeout interval.

The following is an example of how to use the Timeout API to interrupt a cache operation if it takes longer than the given ten-second timeout interval. If the code inside the try/catch block takes longer than 500 milliseconds to run, then the thread will be interrupted.

NamedCache<String, Person> cache = session.getCache("people");

try (Timeout t = Timeout.after(500, TimeUnit.MILLISECOND))
    {
   Person p = cache.get(key);
    // handle result
    }
catch (InterruptedException e)
    {
    // handle error due to time out
    }

As with other timeout methods, this will only interrupt the calling thread. Any request sent to the server will still be queued and run. Also, like some of the other methods, there will be no feedback to the client about whether the remote call succeeded or failed.