12 Deliver Events for Changes as they Occur (C++)

Coherence provides cache events. It is extremely simple to receive the events that you need, where you need them, regardless of where the changes are actually occurring in the cluster.

12.1 Listener Interface and Event Object

In the event model, there is an EventListener interface that all listeners must extend. Coherence provides a MapListener interface, which allows application logic to receive events when data in a Coherence cache is added, modified or removed. Example 12-1 illustrates a segment of the MapListener API.

Example 12-1 Excerpt from the coherence::util::MapListener Class File

class MapListener
    : public interface_spec<MapListener,
        implements<EventListener> >
    {
    // ----- handle definitions ---------------------------------------------

    public:
        /**
        * Handle definition.
        */
        typedef TypedHandle<MapListener> Handle;
 
        /**
        * View definition.
        */
        typedef TypedHandle<const MapListener> View;
 
        /**
        * MapEvent View definition.
        */
        typedef TypedHandle<const MapEvent> MapEventView;
  
 
    // ----- MapListener interface ------------------------------------------
 
    public:
        /**
        * Invoked when a map entry has been inserted.
        *
        * @param vEvent  the MapEvent carrying the insert information
        */
        virtual void entryInserted(MapEventView vEvent) = 0;
 
        /**
        * Invoked when a map entry has been updated.
        *
        * @param vEvent  the MapEvent carrying the update information
        */
        virtual void entryUpdated(MapEventView vEvent) = 0;
 
        /**
        * Invoked when a map entry has been removed.
        *
        * @param vEvent  the MapEvent carrying the delete information
        */
        virtual void entryDeleted(MapEventView vEvent) = 0;
    };

An application object that implements the MapListener interface can sign up for events from any Coherence cache or class that implements the ObservableMap interface, simply by passing an instance of the application's MapListener implementation to one of the addMapListener() methods.

The MapEvent object that is passed to the MapListener carries all of the necessary information about the event that has occurred, including the source (ObservableMap) that raised the event, the identity (key) that the event is related to, what the action was against that identity (insert, update or delete), what the old value was and what the new value is. Example 12-2 illustrates a segment of the MapEvent API.

Example 12-2 Excerpt from coherence::util::MapEvent

class MapEvent
     : public class_spec<MapEvent,
         extends<EventObject> >
     {
     friend class factory<MapEvent>;
 
      
     // ----- MapEvent interface ---------------------------------------------
 
     public:
         /**
         * Return an ObservableMap object on which this event has actually
         * occurred.
         *
         * @return an ObservableMap object
         */
         virtual ObservableMap::Handle getMap() const;
 
         /**
         * Return this event's id. The event id is one of the ENTRY_*
         * enumerated constants.
         *
         * @return an id
         */
         virtual int32_t getId() const;
 
         /**
         * Return a key associated with this event.
         *
         * @return a key
         */
         virtual Object::View getKey() const;
 
         /**
         * Return an old value associated with this event.
         * <p>
         * The old value represents a value deleted from or updated in a map.
         * It is always NULL for "insert" notifications.
         *
         * @return an old value
         */
         virtual Object::View getOldValue() const;
 
         /**
         * Return a new value associated with this event.
         * <p>
         * The new value represents a new value inserted into or updated in
         * a map. It is always NULL for "delete" notifications.
         *
         * @return a new value
         */
         virtual Object::View getNewValue() const;
 
 
     // ----- Objectinterface -----------------------------------------------
 
     public:
         /**
         * {@inheritDoc}
         */
         virtual void toStream(std::ostream& out) const;
 
 
     // ----- helper methods -------------------------------------------------
 
     public:
         /**
         * Dispatch this event to the specified listeners collection.
         * <p>
         * This call is equivalent to
         * <pre>
         *   dispatch(listeners, true);
         * </pre>
         *
         * @param vListeners the listeners collection
         *
         * @throws ClassCastException if any of the targets is not
         *         an instance of MapListener interface
         */
         virtual void dispatch(Listeners::View vListeners) const;
 
         /**
         * Dispatch this event to the specified listeners collection.
         *
         * @param vListeners the listeners collection
         * @param fStrict    if true then any RuntimeException thrown by event
         *                   handlers stops all further event processing and
         *                   the exception is re-thrown; if false then all
         *                   exceptions are logged and the process continues
         *
         * @throws ClassCastException if any of the targets is not
         *         an instance of MapListener interface
         */
         virtual void dispatch(Listeners::View vListeners,
                 bool fStrict) const;
 
         /**
         * Dispatch this event to the specified MapListener.
         *
         * @param hListener  the listener
         */
         virtual void dispatch(MapListener::Handle hListener) const;
 
         /**
         * Get the event's description.
         *
         * @return this event's description
         */
         virtual String::View getDescription() const;
 
         /**
         * Convert an event ID into a human-readable string.
         *
         * @param nId  an event ID, one of the ENTRY_* enumerated values
         *
         * @return a corresponding human-readable string, for example
         *         "inserted"
         */
         static String::View getDescription(int32_t nId);
 
 
     // ----- constants ------------------------------------------------------
 
     public:
         /**
         * This event indicates that an entry has been added to the map.
         */
         static const int32_t ENTRY_INSERTED = 1;
 
         /**
         * This event indicates that an entry has been updated in the map.
         */
         static const int32_t ENTRY_UPDATED  = 2;
 
         /**
         * This event indicates that an entry has been removed from the map.
         */
         static const int32_t ENTRY_DELETED  = 3;
     };

12.2 Caches and Classes that Support Events

All Coherence caches implement ObservableMap; in fact, the NamedCache interface that is implemented by all Coherence caches extends the ObservableMap interface. That means that an application can sign up to receive events from any cache, regardless of whether that cache is local, partitioned, near, replicated, using read-through, write-through, write-behind, overflow, disk storage, and so on.

Note:

Regardless of the cache topology and the number of servers, and even if the modifications are being made by other servers, the events will be delivered to the application's listeners.

In addition to the Coherence caches (those objects obtained through a Coherence cache factory), several other supporting classes in Coherence also implement the ObservableMap interface:

  • ObservableHashMap

  • LocalCache

  • OverflowMap

  • NearCache

  • ReadWriteBackingMap

  • AbstractSerializationCache, SerializationCache, and SerializationPagedCache

  • WrapperObservableMap, WrapperConcurrentMap, and WrapperNamedCache

For a full list of published implementing classes, see the Coherence API for ObservableMap.

12.3 Signing Up for all Events

To sign up for events, simply pass an object that implements the MapListener interface to one of the addMapListener methods on ObservableMap:

Example 12-3 ObservableMap methods

virtual void addKeyListener(MapListener::Handle hListener, Object::View vKey, bool fLite) = 0;
virtual void removeKeyListener(MapListener::Handle hListener, Object::View vKey) = 0;
virtual void addFilterListener(MapListener::Handle hListener, Filter::View vFilter = NULL, bool fLite = false) = 0;
virtual void removeFilterListener(MapListener::Handle hListener, Filter::View vFilter = NULL) = 0;

Let's create an example MapListener implementation:

Example 12-4 Example MapListener implementation

#include "coherence/util/MapEvent.hpp"
#include "coherence/util/MapListener.hpp"

#include <iostream>

using coherence::util::MapEvent;
using coherence::util::MapListener;
using namespace std;

/**
* A MapListener implementation that prints each event as it receives
* them.
*/
class EventPrinter 
    : public class_spec<EventPrinter,
        extends<Object>,
        implements<MapListener> >
    {    
    friend class factory<EventPrinter>;

    public:
        virtual void entryInserted(MapEventView vEvent)
            {
            cout << vEvent << endl;
            }

        virtual void entryUpdated(MapEventView vEvent)
            {
            cout << vEvent << endl;
            }

        virtual void entryDeleted(MapEventView vEvent)
            {
            cout << vEvent << endl;
            }
    };

Using this implementation, it is extremely simple to print out all events from any given cache (since all caches implement the ObservableMap interface):

Example 12-5 Printing Events

NamedCache::Handle hCache;
...
hCache->addFilterListener(EventPrinter::create());

Of course, to be able to later remove the listener, it is necessary to hold on to a reference to the listener:

Example 12-6 Holding a Reference to a Listener

MapListener::Handle hListener = EventPrinter::create();
hCache->addFilterListener(hListener);
m_hListener = hListener; // store the listener in a member field

Later, to remove the listener:

Example 12-7 Removing a Reference to a Listener

MapListener::Handle hListener = m_hListener;
if (hListener != NULL)
    {
    hCache->removeFilterListener(hListener);
    m_hListener = NULL; // clean up the listener field
    }

Each add*Listener method on the ObservableMap interface has a corresponding remove*Listener method. To remove a listener, use the remove*Listener method that corresponds to the add*Listener method that was used to add the listener.

12.4 MultiplexingMapListener

Another helpful base class for creating a MapListener is the MultiplexingMapListener, which routes all events to a single method for handling. Example 12-8 illustrates a simplified version of the EventPrinter example:

Example 12-8 Using MultiplexingMapListener to Route Events

#include "coherence/util/MultiplexingMapListener.hpp"

#include <iostream>

using coherence::util::MultiplexingMapListener;


class EventPrinter 
    : public class_spec<EventPrinter,
        extends<MultiplexingMapListener> >
    {
    public:
        virtual void onMapEvent(MapEventView vEvent)
            {
            std::cout << vEvent << std::endl;
            }
    };

12.5 Configuring a MapListener for a Cache

If the listener should always be on a particular cache, then place it into the cache configuration using the <listener> element and Coherence will automatically add the listener when it configures the cache.

12.6 Signing Up for Events on Specific Identities

Signing up for events that occur against specific identities (keys) is just as simple. The C++ code in Example 12-9 prints all events that occur against the Integer key 5:

Example 12-9 Printing Events that Occur Against a Specified Integer Key

hCache->addKeyListener(EventPrinter::create(), Integer32::create(5), false);

The code in Example 12-10 would only trigger an event when the Integer key 5 is inserted or updated:

Example 12-10 Triggering an Event for a Specified Integer Key Value

for (int32_t i = 0; i < 10; ++i)
    {
    Integer32::View vKey   = Integer32::create(i);
    Integer32::View vValue = vKey;
    hCache->put(vKey, vValue);
    }

12.7 Filtering Events

Similar to listening to a particular key, it is possible to listen to particular events. In Example 12-11, a listener is added to the cache with a filter that allows the listener to only receive delete events.

Example 12-11 Adding a Listener with a Filter that Allows only Deleted Events

// Filters used with partitioned caches must implement coherence::io::pof::PortableObject

#include "coherence/io/pof/PofReader.hpp"
#include "coherence/io/pof/PofWriter.hpp"
#include "coherence/io/pof/PortableObject.hpp"
#include "coherence/util/Filter.hpp"
#include "coherence/util/MapEvent.hpp"

using coherence::io::pof::PofReader;
using coherence::io::pof::PofWriter;
using coherence::io::pof::PortableObject;
using coherence::util::Filter;
using coherence::util::MapEvent;

class DeletedFilter
    : public class_spec<DeletedFilter,
        extends<Object>,
        implements<Filter, PortableObject> >
    {
    public:
        // Filter interface        virtual bool evaluate(Object::View v) const
            {
            MapEvent::View vEvt = cast<MapEvent::View>(v);
            return MapEvent::ENTRY_DELETED == vEvt->getId();
            }

        // PortableObject interface        virtual void readExternal(PofReader::Handle hIn)
            {
            }

        virtual void writeExternal(PofWriter::Handle hOut) const
            {
            }
    };

hCache->addFilterListener(EventPrinter::create(), DeletedFilter::create(), false);

For example, if the following sequence of calls were made:

Example 12-12 Inserting and Removing Data from the Cache

cache::put(String::create("hello"), String::create("world"));
cache::put(String::create("hello"), String::create("again"));
cache::remove(String::create("hello"));

The result would be:

CacheEvent{LocalCache deleted: key=hello, value=again}

For more information, see "Advanced: Listening to Queries".

Filtering Events Versus Filtering Cached Data

When building a Filter for querying, the object that will be passed to the evaluate method of the Filter will be a value from the cache, or, if the Filter implements the EntryFilter interface, the entire Map::Entry from the cache. When building a Filter for filtering events for a MapListener, the object that will be passed to the evaluate method of the Filter will always be of type MapEvent.

For more information on how to use a query filter to listen to cache events, see Advanced: Listening to Queries.

12.8 "Lite" Events

By default, Coherence provides both the old and the new value as part of an event. Consider the following example:

Example 12-13 Inserting, Updating, and Removing a Value

MapListener::Handle hListener = EventPrinter::create();
// add listener with the default"lite" value of falsehCache->addFilterListener(hListener);

// insert a 1KB value
String::View vKey = String::create("test");
hCache->put(vKey, Array<octet_t>::create(1024));

// update with a 2KB value
hCache->put(vKey, Array<octet_t>::create(2048));

// remove the value
hCache->remove(vKey);

When the above code is run, the insert event carries the new 1KB value, the update event carries both the old 1KB value and the new 2KB value and the remove event carries the removed 2KB value.

When an application does not require the old and the new value to be included in the event, it can indicate that by requesting only "lite" events. When adding a listener, you can request lite events by using either the addFilterListener or the addKeyListener method that takes an additional boolean fLite parameter. In the above example, the only change would be:

Example 12-14 Requesting Only "Lite" Events

cache->addFilterListener(hListener, (Filter::View) NULL, true);

Note:

Obviously, a lite event's old value and new value may be NULL. However, even if you request lite events, the old and the new value may be included if there is no additional cost to generate and deliver the event. In other words, requesting that a MapListener receive lite events is simply a hint to the system that the MapListener does not need to know the old and new values for the event.

12.9 Advanced: Listening to Queries

All Coherence caches support querying by any criteria. When an application queries for data from a cache, the result is a point-in-time snapshot, either as a set of identities (keySet) or a set of identity/value pairs (entrySet). The mechanism for determining the contents of the resulting set is referred to as filtering, and it allows an application developer to construct queries of arbitrary complexity using a rich set of out-of-the-box filters (for example, equals, less-than, like, between, and so on), or to provide their own custom filters (for example, XPath).

The same filters that are used to query a cache can be used to listen to events from a cache. For example, in a trading system it is possible to query for all open Order objects for a particular trader.

Note:

Executing Queries in the Cluster: Example 12-15 uses the coherence::util::extractor::ReflectionExtractor class. While the C++ client doesn't support reflection, the ReflectionExtractor can be used for queries which are executed in the cluster. In this case, the ReflectionExtractor simply passes the necessary extraction information to the cluster to perform the query. In cases where the ReflectionExtractor would extract the data on the client, such as the ContinuousQueryCache when caching values locally, the use of the ReflectionExtractor is not supported. For these cases, you must provide a custom extractor.

Example 12-15 Filtering for Cache Events

NamedCache::Handle hMapTrades = ...
Filter::Handle hFilter = AndFilter::create(
        EqualsFilter::create(ReflectionExtractor::create("getTrader"), vTraderId),
        EqualsFilter::create(ReflectionExtractor::create("getStatus"), Status::OPEN));
Set::View vSetOpenTrades = hMapTrades->entrySet(hFilter);

To receive notifications of new trades being opened for that trader, closed by that trader or reassigned to or from another trader, the application can use the same filter:

Example 12-16 Filtering for Specialized Events

// receive events for all trade IDs that this trader is interested in
hMapTrades->addFilterListener(hListener, MapEventFilter::create(hFilter), true);

The MapEventFilter converts a query filter into an event filter.

Note:

Filtering events versus filtering cached data: When building a Filter for querying, the object that will be passed to the evaluate method of the Filter will be a value from the cache, or, if the Filter implements the EntryFilter interface, the entire Map::Entry from the cache. When building a Filter for filtering events for a MapListener, the object that will be passed to the evaluate method of the Filter will always be of type MapEvent.

The MapEventFilter converts a Filter that is used to do a query into a Filter that is used to filter events for a MapListener. In other words, the MapEventFilter is constructed from a Filter that queries a cache, and the resulting MapEventFilter is a filter that evaluates MapEvent objects by converting them into the objects that a query Filter would expect.

The MapEventFilter has several very powerful options, allowing an application listener to receive only the events that it is specifically interested in. More importantly for scalability and performance, only the desired events have to be communicated over the network, and they are communicated only to the servers and clients that have expressed interest in those specific events. For example:

Example 12-17 Communicating Only Specialized Events over the Network

// receive all events for all trades that this trader is interested in
int32_t nMask = MapEventFilter::E_ALL;
hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true);

// receive events for all this trader's trades that are closed or
// re-assigned to a different trader
nMask = MapEventFilter::E_UPDATED_LEFT | MapEventFilter::E_DELETED;
hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true);

// receive events for all trades as they are assigned to this trader
nMask = MapEventFilter::E_INSERTED | MapEventFilter::E_UPDATED_ENTERED;
hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true);

// receive events only for new trades assigned to this trader
nMask = MapEventFilter::E_INSERTED;
hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true);

For more information on the various options supported, see the API documentation for MapEventFilter.

12.10 Advanced: Synthetic Events

Events usually reflect the changes being made to a cache. For example, one server is modifying one entry in a cache while another server is adding several items to a cache while a third server is removing an item from the same cache, all while fifty threads on each and every server in the cluster is accessing data from the same cache! All the modifying actions will produce events that any server within the cluster can choose to receive. We refer to these actions as client actions, and the events as being dispatched to clients, even though the "clients" in this case are actually servers. This is a natural concept in a true peer-to-peer architecture, such as a Coherence cluster: Each and every peer is both a client and a server, both consuming services from its peers and providing services to its peers. In a typical Java Enterprise application, a "peer" is an application server instance that is acting as a container for the application, and the "client" is that part of the application that is directly accessing and modifying the caches and listening to events from the caches.

Some events originate from within a cache itself. There are many examples, but the most common cases are:

  • When entries automatically expire from a cache;

  • When entries are evicted from a cache because the maximum size of the cache has been reached;

  • When entries are transparently added to a cache as the result of a Read-Through operation;

  • When entries in a cache are transparently updated as the result of a Read-Ahead or Refresh-Ahead operation.

Each of these represents a modification, but the modifications represent natural (and typically automatic) operations from within a cache. These events are referred to as synthetic events.

When necessary, an application can differentiate between client-induced and synthetic events simply by asking the event if it is synthetic. This information is carried on a sub-class of the MapEvent, called CacheEvent. Using the previous EventPrinter example, it is possible to print only the synthetic events:

Example 12-18 Differentiating Between Client-Induced and Synthetic Events

class EventPrinter
    : public class_spec<EventPrinter,
        extends<MultiplexingMapListener> >
    {
    friend class factory<EventPrinter>;

    public:
        void onMapEvent(MapEvent::View vEvt)
            {
            if (instanceof<CacheEvent::View>(vEvt) &&
                (cast<CacheEvent::View>(vEvt)->isSynthetic()))
                {
                std::cout << vEvt;
                }
            }
    };

For more information on this feature, see the API documentation for CacheEvent.

12.11 Advanced: Backing Map Events

While it is possible to listen to events from Coherence caches, each of which presents a local view of distributed, partitioned, replicated, near-cached, continuously-queried, read-through/write-through and/or write-behind data, it is also possible to peek behind the curtains, so to speak.

For some advanced use cases, it may be necessary to peek behind the curtain—or more correctly, to "listen to" the "map" behind the "service". Replication, partitioning and other approaches to managing data in a distributed environment are all distribution services. The service still has to have something in which to actually manage the data, and that something is called a "backing map".

Backing maps are configurable. If all the data for a particular cache should be kept in object form on the heap, then use an unlimited and non-expiring LocalCache (or a SafeHashMap if statistics are not required). If only a small number of items should be kept in memory, use a LocalCache. If data are to be read on demand from a database, then use a ReadWriteBackingMap (which knows how to read and write through an application's DAO implementation), and in turn give the ReadWriteBackingMap a backing map such as a SafeHashMap or a LocalCache to store its data in.

Some backing maps are observable. The events coming from these backing maps are not usually of direct interest to the application. Instead, Coherence translates them into actions that must be taken (by Coherence) to keep data synchronized and properly backed up, and it also translates them when appropriate into clustered events that are delivered throughout the cluster as requested by application listeners. For example, if a partitioned cache has a LocalCache as its backing map, and the local cache expires an entry, that event causes Coherence to expire all of the backup copies of that entry. Furthermore, if any listeners have been registered on the partitioned cache, and if the event matches their event filter(s), then that event will be delivered to those listeners on the servers where those listeners were registered.

In some advanced use cases, an application must process events on the server where the data are being maintained, and it must do so on the structure (backing map) that is actually managing the data. In these cases, if the backing map is an observable map, a listener can be configured on the backing map or one can be programmatically added to the backing map. (If the backing map is not observable, it can be made observable by wrapping it in an WrapperObservableMap.)

For more information on this feature, see the API documentation for BackingMapManager.

12.12 Advanced: Synchronous Event Listeners

Some events are delivered asynchronously, so that application listeners do not disrupt the cache services that are generating the events. In some rare scenarios, asynchronous delivery can cause ambiguity of the ordering of events compared to the results of ongoing operations. To guarantee that the cache API operations and the events are ordered as if the local view of the clustered system were single-threaded, a MapListener must implement the SynchronousListener marker interface.

One example in Coherence itself that uses synchronous listeners is the Near Cache, which can use events to invalidate locally cached data ("Seppuku").

For more information on this feature, see the API documentation for SynchronousListener.

12.13 Summary

Coherence provides an extremely rich event model for caches, providing the means for an application to request the specific events it requires, and the means to have those events delivered only to those parts of the application that require them.