9 Perform Continuous Query for C++ Clients

While Coherence provides the ability to obtain a point in time query result from a Coherence cache and the ability to receive events that would change the result of that query, it also provides a feature that combines a query result with a continuous stream of related events to maintain an up-to-date query result in a real-time fashion. This capability is called Continuous Query because it has the same effect as if the desired query had zero latency and the query were being executed several times every millisecond!

A continuous query cache is similar to a materialized view in the Oracle database. A materialized view copies data queried from the database tables into the view. If there are any changes to the data in the database, then the data in the view is automatically updated. This enables you to see changes to the result set. In continuous query, a local copy of the cache is created on the client. Filters allow you to limit the size and content of the cache. Combined with an event listener, the cache can be updated in real time.

For example, assume that you want to monitor, in real time, all sales orders for several customers. To do this, you can create a continuous query cache and set up an event listener that will listen for any events pertaining to the customers. Coherence will query for all of the data objects on the grid that pertain to a particular customer and copy them to a local cache. The event listener on the query will listen for any inserts, updates, or deletes that take place on the grid for the customer. When an event occurs, the local copy of the customer data is updated.

9.1 Uses of Continuous Query Caching

There are several different general use categories for Continuous Query Caching:

  • It is an ideal building block for Complex Event Processing (CEP) systems and event correlation engines.

  • It is ideal for situations in which an application repeats a particular query and would benefit from always having instant access to the up-to-date result of that query.

  • A Continuous Query Cache is analogous to a materialized view and is useful for accessing and manipulating the results of a query using the standard NamedCache API, and receiving an ongoing stream of events related to that query.

  • A Continuous Query Cache can be used in a manner similar to a Near Cache because it maintains an up-to-date set of data locally where it is being used, for example, on a particular server node or on a client. Note that while a Near Cache is invalidation-based, a Continuous Query Cache actually maintains its data in an up-to-date manner.

By combining the Coherence*Extend functionality with Continuous Query Caching, an application can support literally tens of thousands of concurrent users.

Note:

Continuous Query Caches are useful in almost every type of application, including both client-based and server-based applications, because they provide the ability to very easily and efficiently maintain an up-to-date local copy of a specified sub-set of a much larger and potentially distributed cached data set.

9.2 The Coherence Continuous Query Cache

The Coherence implementation of Continuous Query is found in the ContinuousQueryCache class. This class, like all Coherence caches, implements the standard NamedCache interface, which includes the following capabilities:

  • Cache access and manipulation using the Map interface: NamedCache extends the Map interface, which is based on the Map interface from the Java Collections Framework.

  • Events for all object modifications that occur within the cache: NamedCache extends the ObservableMap interface.

  • Identity-based clusterwide locking of objects in the cache: NamedCache extends the ConcurrentMap interface.

  • Querying the objects in the cache: NamedCache extends the QueryMap interface.

  • Distributed Parallel Processing and Aggregation of objects in the cache: NamedCache extends the InvocableMap interface.

Since the ContinuousQueryCache implements the NamedCache interface, which is the same API provided by all Coherence caches, it is extremely simple to use, and it can be easily substituted for another cache when its functionality is called for.

9.3 Defining a Continuous Query Cache

There are two features that define a Continuous Query Cache:

  • The underlying cache that the Continuous Query is based on.

  • A query of the underlying cache that produces the sub-set that the Continuous Query Cache will cache.

The underlying cache can be any Coherence cache, including another Continuous Query Cache. The most straight-forward way of obtaining a cache is by using the CacheFactory class. This class enables you to create a cache simply by specifying its name. It will be created automatically and its configuration will be based on the application's cache configuration elements. For example, the following line of code creates a cache named orders:

NamedCache::Handle hCache = CacheFactory::getCache("orders");

The query is the same type of query that would be used to query any other cache. Example 9-1 illustrates how you can use code filters to find a given trader with a given order status:

Example 9-1 Using Filters for Querying

ValueExtractor::Handle hTraderExtractor = ReflectionExtractor::create("getTrader");
ValueExtractor::Handle hStatusExtractor = ReflectionExtractor::create("getStatus");

Filter::Handle hFilter = AndFilter::create(EqualsFilter::create(hTraderExtractor, vTraderId),
                              EqualsFilter::create(hStatusExtractor, vStatus));

Normally, to query a cache, you could use one of the methods from the QueryMap class. For example, to obtain a snap-shot of all open trades for this trader:

Set::View vSetOpenTrades = hCache->entrySet(hFilter);

In contrast, the Continuous Query Cache is constructed from the ContinuousQueryCache::create method, passing the cache and the filter:

ContinuousQueryCache::Handle hCacheOpenTrades  = ContinuousQueryCache::create(hCache, hFilter);

9.4 Cleaning up Resources Associated with a Continuous Query Cache

A Continuous Query Cache places one or more event listeners on its underlying cache. If the Continuous Query Cache is used for the duration of the application, then the resources will be cleaned up when the node is shut down or otherwise stops. However, if the Continuous Query Cache is only used for a period, then the application must call the release() method on the Continuous Query Cache when it is done using it.

9.5 Caching Only Keys, or Caching Both Keys and Values

When constructing a Continuous Query Cache, you can specify that the cache should only keep track of the keys that result from the query and obtain the values from the underlying cache only when they are asked for. This feature may be useful for creating a Continuous Query Cache that represents a very large query result set or if the values are never or rarely requested. To specify that only the keys should be cached, pass false when creating the ContinuousQueryCache; for example:

ContinuousQueryCache::Handle hCacheOpenTrades  = 
        ContinuousQueryCache::create(hCache, hFilter, false);

If necessary, the CacheValues property can be modified after the cache has been instantiated; for example:

hCacheOpenTrades->setCacheValues(true);

9.5.1 CacheValues Property and Event Listeners

If the Continuous Query Cache has any standard (non-lite) event listeners, or if any of the event listeners are filtered, then the CacheValues property will automatically be set to true. This is because the Continuous Query Cache uses the locally cached values to filter events and to supply the old and new values for the events that it raises.

9.5.2 Using ReflectionExtractor with Continuous Query Caches

When the Continuous Query Cache is configured to cache values, the use of the ReflectionExtractor is not supported. This is because the ReflectionExtractor does not support reflection in C++. In this case, you must provide a custom extractor. When the Continuous Query Cache is not caching values locally, the ReflectionExtractor can be used since it does not perform the extraction on the client but instead passes the necessary extraction information to the cluster to perform the query.

9.6 Listening to the Continuous Query Cache

Since the Continuous Query Cache is itself observable, it is possible for the client to place one or more event listeners onto it. For example:

Example 9-2 Placing a Listener into a Continuous Query Cache

ContinuousQueryCache::Handle hCacheOpenTrades  = ContinuousQueryCache::create(hCache, hFilter);
hCacheOpenTrades->addFilterListener(hListener);

If your application has to perform some processing against every item that is already in the cache and every item added to the cache, then provide the listener during construction. The resulting cache will receive one event for each item that is in the Continuous Query Cache, whether it was there to begin with (because it was in the query) or if it got added during or after the construction of the cache. One form of the factory create method of ContinuousQueryCache enables you to specify a cache, a filter, and a listener:

Example 9-3 Creating a Continuous Query Cache with a Filter and a Listener

ContinuousQueryCache::Handle hCacheOpenTrades  = ContinuousQueryCache::create(
        hRemoteCache, hFilter, true, hListener);

9.6.1 Avoiding Unexpected Results

There are two alternate approaches to processing the items in the Continuous Query Cache, both of which could yield unexpected and unwanted results. First, if you perform the processing and then add the listener to handle any later additions, then events that occur in the split second after the iteration and before the listener is added will be missed! This is illustrated in Example 9-4:

Example 9-4 Processing the Data, then Adding the Listener

ContinuousQueryCache::Handle hCacheOpenTrades  = ContinuousQueryCache::create(hCache, hFilter);

for (Iterator::Handle hIter = hCacheOpenTrades->entrySet()->iterator(); hIter->hasNext(); )
    {
    Map::Entry::View vEntry = cast<Map::Entry::View>(hIter->next());
    // .. process the cache entry
    }
hCacheOpenTrades->addFilterListener(hListener);

The second approach is to add a listener first, so that no events are missed, and then do the processing. In this case, it is possible that the same entry will show up in both an event and in the Iterator. The events can be asynchronous, so the sequence of operations cannot be guaranteed.

Example 9-5 Adding the Listener, then Processing the Data

ContinuousQueryCache::Handle hCacheOpenTrades  = 
        ContinuousQueryCache::create(hRemoteCache, hFilter);

hCacheOpenTrades->addFilterListener(hListener);
for (Iterator::Handle hIter = hCacheOpenTrades->entrySet()->iterator(); hIter->hasNext(); )
    {
    Map::Entry::View vEntry = cast<Map::Entry::View>(hIter->next());
    // .. process the cache entry
    }

9.6.2 Achieving a Stable Materialized View

The Continuous Query Cache implementation faced the same challenge: How to assemble an exact point-in-time snapshot of an underlying cache while receiving a stream of modification events from that same cache. The solution has several parts. First, Coherence supports an option for synchronous events, which provides a set of ordering guarantees. Secondly, the Continuous Query Cache has a two-phase implementation of its initial population that allows it to first query the underlying cache and then subsequently resolve all of the events that came in during the first phase. Since achieving these guarantees of data visibility without any missing or repeated events is fairly complex, the ContinuousQueryCache allows a developer to pass a listener during construction, thus avoiding exposing these same complexities to the application developer.

9.6.3 Support for Synchronous and Asynchronous Listeners

By default, listeners to the Continuous Query Cache will have their events delivered asynchronously. However, the ContinuousQueryCache implementation does respect the option for synchronous events as provided by the SynchronousListener interface.

9.7 Making the Continuous Query Cache Read-Only

The Continuous Query Cache can be made into a read-only cache by using the boolean setReadOnly method on the ContinuousQueryCache class; for example:

hCacheOpenTrades->setReadOnly(true);

A read-only Continuous Query Cache will not allow objects to be added to, changed in, removed from or locked in the cache.

Once a Continuous Query Cache has been set to read-only, it cannot be changed back to read/write.