25 Using Map Events

This chapter provides instructions for using map event listeners to receive cache events and events from any class in Coherence that implements the ObservableMap interface.

This chapter includes the following sections:

25.1 Overview of Map Events

Coherence provides cache events using the JavaBean Event model. The implementation allows applications to receive the events when and where they are needed, regardless of where the changes are actually occurring in the cluster. Developers that are familiar with the JavaBean model should have no difficulties working with events, even in a complex cluster.

Note:

Coherence also includes the live event programming model. Live events provide support for common event types and can be used instead of map events. For details about using live events, see "Using Live Events".

25.1.1 Listener Interface and Event Object

In the JavaBeans Event model, there is an EventListener interface that all listeners must extend. Coherence provides a MapListener interface, which allows application logic to receive events when data in a Coherence cache is added, modified or removed.

An application object that implements the MapListener interface can sign up for events from any Coherence cache or class that implements the ObservableMap interface, simply by passing an instance of the application's MapListener implementation to a addMapListener() method.

The MapEvent object that is passed to the MapListener carries all of the necessary information about the event that has occurred, including the source (ObservableMap) that raised the event, the identity (key) that the event is related to, what the action was against that identity (insert, update or delete), what the old value was and what the new value is.

For details about the MapListener interface and the MapEvent class, see the Java API Reference for Oracle Coherence.

25.1.2 Understanding Event Guarantees

The partitioned cache service guarantees that under normal circumstances an event is delivered only once. However, there are two scenarios that could break this guarantee:

  • A catastrophic cluster failure that caused the data loss (for example, simultaneous crash of two machines holding data). In this case, the PARTITION_LOST event is emitted to all registered PartitionListener instances on the server side.

  • Client disconnect. In this case, the MEMBER_LEFT event is emitted to all registered MemberListener instances on the client side.

25.1.3 Caches and Classes that Support Events

All Coherence caches implement ObservableMap; in fact, the NamedCache interface that is implemented by all Coherence caches extends the ObservableMap interface. That means that an application can sign up to receive events from any cache, regardless of whether that cache is local, partitioned, near, replicated, using read-through, write-through, write-behind, overflow, disk storage, and so on.

Note:

Regardless of the cache topology and the number of servers, and even if the modifications are being made by other servers, the events are delivered to the application's listeners.

In addition to the Coherence caches (those objects obtained through a Coherence cache factory), several other supporting classes in Coherence also implement the ObservableMap interface:

  • ObservableHashMap

  • LocalCache

  • OverflowMap

  • NearCache

  • ReadWriteBackingMap

  • AbstractSerializationCache, SerializationCache, and SerializationPagedCache

  • WrapperObservableMap, WrapperConcurrentMap, and WrapperNamedCache

25.2 Signing Up for All Events

To sign up for events, simply pass an object that implements the MapListener interface to a addMapListener method on ObservableMap. The addMapListener methods are illustrated in Example 25-1.

Example 25-1 Methods on the ObservableMap API

public void addMapListener(MapListener listener);
public void addMapListener(MapListener listener, Object oKey, boolean fLite);
public void addMapListener(MapListener listener, Filter filter, boolean fLite);

Let's create an example MapListener implementation. Example 25-2 illustrates a sample MapListener implementation that prints each event as it receive.

Example 25-2 Sample MapListener Implementation

/**
* A MapListener implementation that prints each event as it receives
* them.
*/
public static class EventPrinter
        extends Base
        implements MapListener
    {
    public void entryInserted(MapEvent evt)
        {
        out(evt);
        }

    public void entryUpdated(MapEvent evt)
        {
        out(evt);
        }

    public void entryDeleted(MapEvent evt)
        {
        out(evt);
        }
    }

Using this implementation, it is extremely simple to print all events from any given cache (since all caches implement the ObservableMap interface):

cache.addMapListener(new EventPrinter());

Of course, to be able to later remove the listener, it is necessary to hold on to a reference to the listener:

Example 25-3 Holding a Reference to a Listener

Listener listener = new EventPrinter();
cache.addMapListener(listener);
m_listener = listener; // store the listener in a field

Later, to remove the listener:

Example 25-4 Removing a Listener

Listener listener = m_listener;
if (listener != null)
    {
    cache.removeMapListener(listener);
    m_listener = null; // clean up the listener field
    }

Each addMapListener method on the ObservableMap interface has a corresponding removeMapListener method. To remove a listener, use the removeMapListener method that corresponds to the addMapListener method that was used to add the listener.

25.3 Using an Inner Class as a MapListener

When creating an inner class to use as a MapListener, or when implementing a MapListener that only listens to one or two types of events (inserts, updates or deletes), you can use the AbstractMapListener base class. For example, the anonymous inner class in Example 25-5 prints out only the insert events for the cache.

Example 25-5 Inner Class that Prints Only Cache Insert Events

cache.addMapListener(new AbstractMapListener()
    {
    public void entryInserted(MapEvent evt)
        {
        out(evt);
        }
    });

Another helpful base class for creating a MapListener is the MultiplexingMapListener, which routes all events to a single method for handling. This class would allow you to simplify the EventPrinter example to the code illustrated in Example 25-6. Since only one method must be implemented to capture all events, the MultiplexingMapListener can also be very useful when creating an inner class to use as a MapListener.

Example 25-6 Routing All Events to a Single Method for Handling

public static class EventPrinter
        extends MultiplexingMapListener
    {
    public void onMapEvent(MapEvent evt)
        {
        out(evt);
        }
    }

25.4 Configuring a MapListener For a Cache

If the listener should always be on a particular cache, then place it into the cache configuration using the <listener> element and Coherence automatically adds the listener when it configures the cache.

25.5 Signing Up For Events On Specific Identities

Signing up for events that occur against specific identities (keys) is just as simple. For example, to print all events that occur against the Integer key 5:

cache.addMapListener(new EventPrinter(), new Integer(5), false);

Thus, the code in Example 25-7 would only trigger an event when the Integer key 5 is inserted or updated:

Example 25-7 Triggering an Event when a Specific Integer Key is Inserted or Updated

for (int i = 0; i < 10; ++i)
    {
    Integer key   = new Integer(i);
    String  value = "test value for key " + i;
    cache.put(key, value);
    }

25.6 Filtering Events

Similar to listening to a particular key, it is possible to listen to particular events. In Example 25-8 a listener is added to the cache with a filter that allows the listener to only receive delete events.

Example 25-8 Adding a Listener with Filter for Deleted Events

// Filters used with partitioned caches must be 
// Serializable, Externalizable or ExternalizableLite
public class DeletedFilter
        implements Filter, Serializable
    {
    public boolean evaluate(Object o)
        {
        MapEvent evt = (MapEvent) o;
        return evt.getId() == MapEvent.ENTRY_DELETED;
        }
    }

cache.addMapListener(new EventPrinter(), new DeletedFilter(), false);

Note:

Filtering events versus filtering cached data:

When building a filter for querying, the object that is passed to the evaluate method of the Filter is a value from the cache, or - if the filter implements the EntryFilter interface - the entire Map.Entry from the cache. When building a filter for filtering events for a MapListener, the object that is passed to the evaluate method of the filter is of type MapEvent.

See "Listening to Queries", for more information on how to use a query filter to listen to cache events, .

If you then make the following sequence of calls:

cache.put("hello", "world");
cache.put("hello", "again");
cache.remove("hello");

The result would be:

CacheEvent{LocalCache deleted: key=hello, value=again}

For more information, see the "Listening to Queries".

25.7 Using Lite Events

By default, Coherence provides both the old and the new value as part of an event. Consider the following example:

Example 25-9 Inserting, Updating, and Removing a Value from the Cache

MapListener listener = new MultiplexingMapListener()
    {
    public void onMapEvent(MapEvent evt)
        {
        out("event has occurred: " + evt);
        out("(the wire-size of the event would have been "
            + ExternalizableHelper.toBinary(evt).length()
            + " bytes.)");
        }
    };
cache.addMapListener(listener);

// insert a 1KB value
cache.put("test", new byte[1024]);

// update with a 2KB value
cache.put("test", new byte[2048]);

// remove the 2KB value
cache.remove("test");

The output from running the test, illustrated inExample 25-10, shows that the first event carries the 1KB inserted value, the second event carries both the replaced 1KB value and the new 2KB value, and the third event carries the removed 2KB value.

Example 25-10 Sample Output

event has occurred: CacheEvent{LocalCache added: key=test, value=[B@a470b8}
(the wire-size of the event would have been 1283 bytes.)
event has occurred: CacheEvent{LocalCache updated: key=test, old value=[B@a470b8, new value=[B@1c6f579}
(the wire-size of the event would have been 3340 bytes.)
event has occurred: CacheEvent{LocalCache deleted: key=test, value=[B@1c6f579}
(the wire-size of the event would have been 2307 bytes.)

When an application does not require the old and the new value to be included in the event, it can indicate that by requesting only "lite" events. When adding a listener, you can request lite events by using a addMapListener method that takes an additional boolean fLite parameter. In Example 25-9, the only change would be:

cache.addMapListener(listener, (Filter) null, true);

Note:

Obviously, a lite event's old value and new value may be null. However, even if you request lite events, the old and the new value may be included if there is no additional cost to generate and deliver the event. In other words, requesting that a MapListener receive lite events is simply a hint to the system that the MapListener does not have to know the old and new values for the event.

25.8 Listening to Queries

All Coherence caches support querying by any criteria. When an application queries for data from a cache, the result is a point-in-time snapshot, either as a set of identities (keySet) or a set of identity/value pairs (entrySet). The mechanism for determining the contents of the resulting set is referred to as filtering, and it allows an application developer to construct queries of arbitrary complexity using a rich set of out-of-the-box filters (for example, equals, less-than, like, between, and so on), or to provide their own custom filters (for example, XPath).

The same filters that are used to query a cache can listen to events from a cache. For example, in a trading system it is possible to query for all open Order objects for a particular trader:

Example 25-11 Listening for Events from a Cache

NamedCache mapTrades = ...
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                              new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTrades = mapTrades.entrySet(filter);

To receive notifications of new trades being opened for that trader, closed by that trader or reassigned to or from another trader, the application can use the same filter:

Example 25-12 Listening for Events on an Object

// receive events for all trade IDs that this trader is interested in
mapTrades.addMapListener(listener, new MapEventFilter(filter), true);

The MapEventFilter converts a query filter into an event filter.

The MapEventFilter has several very powerful options, allowing an application listener to receive only the events that it is specifically interested in. More importantly for scalability and performance, only the desired events have to be communicated over the network, and they are communicated only to the servers and clients that have expressed interest in those specific events. Example 25-13 illustrates these scenarios.

Example 25-13 Using MapEventFilter to Filter on Various Events

// receive all events for all trades that this trader is interested in
nMask = MapEventFilter.E_ALL;
mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events for all this trader's trades that are closed or
// re-assigned to a different trader
nMask = MapEventFilter.E_UPDATED_LEFT | MapEventFilter.E_DELETED;
mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events for all trades as they are assigned to this trader
nMask = MapEventFilter.E_INSERTED | MapEventFilter.E_UPDATED_ENTERED;
mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events only fornew trades assigned to this trader
nMask = MapEventFilter.E_INSERTED;
mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

For more information on the various options supported, see the API documentation for MapEventFilter.

25.8.1 Filtering Events Versus Filtering Cached Data

When building a Filter for querying, the object that is passed to the evaluate method of the Filter is a value from the cache, or if the Filter implements the EntryFilter interface, the entire Map.Entry from the cache. When building a Filter for filtering events for a MapListener, the object that is passed to the evaluate method of the Filter is of type MapEvent.

The MapEventFilter converts a Filter that is used to do a query into a Filter that is used to filter events for a MapListener. In other words, the MapEventFilter is constructed from a Filter that queries a cache, and the resulting MapEventFilter is a filter that evaluates MapEvent objects by converting them into the objects that a query Filter would expect.

25.9 Using Synthetic Events

Events usually reflect the changes being made to a cache. For example, one server is modifying one entry in a cache while another server is adding several items to a cache while a third server is removing an item from the same cache, all while fifty threads on each and every server in the cluster is accessing data from the same cache! All the modifying actions produces events that any server within the cluster can choose to receive. We refer to these actions as client actions, and the events as being dispatched to clients, even though the "clients" in this case are actually servers. This is a natural concept in a true peer-to-peer architecture, such as a Coherence cluster: Each and every peer is both a client and a server, both consuming services from its peers and providing services to its peers. In a typical Java Enterprise application, a "peer" is an application server instance that is acting as a container for the application, and the "client" is that part of the application that is directly accessing and modifying the caches and listening to events from the caches.

Some events originate from within a cache itself. There are many examples, but the most common cases are:

  • When entries automatically expire from a cache;

  • When entries are evicted from a cache because the maximum size of the cache has been reached;

  • When entries are transparently added to a cache as the result of a Read-Through operation;

  • When entries in a cache are transparently updated as the result of a Read-Ahead or Refresh-Ahead operation.

Each of these represents a modification, but the modifications represent natural (and typically automatic) operations from within a cache. These events are referred to as synthetic events.

When necessary, an application can differentiate between client-induced and synthetic events simply by asking the event if it is synthetic. This information is carried on a sub-class of the MapEvent, called CacheEvent. Using the previous EventPrinter example, it is possible to print only the synthetic events:

Example 25-14 Determining Synthetic Events

public static class EventPrinter
        extends MultiplexingMapListener
    {
    public void onMapEvent(MapEvent evt)
        {
        if (evt instanceof CacheEvent && ((CacheEvent) evt).isSynthetic())
            {
            out(evt);
            )
        }
    }

For more information on this feature, see the API documentation for CacheEvent.

25.10 Using Backing Map Events

While it is possible to listen to events from Coherence caches, each of which presents a local view of distributed, partitioned, replicated, near-cached, continuously-queried, read-through/write-through and and write-behind data, it is also possible to peek behind the curtains, so to speak.

For some advanced use cases, it may be necessary to "listen to" the "map" behind the "service". Replication, partitioning and other approaches to managing data in a distributed environment are all distribution services. The service still has to have something in which to actually manage the data, and that something is called a "backing map".

Backing maps can be configured. If all the data for a particular cache should be kept in object form on the heap, then use an unlimited and non-expiring LocalCache (or a SafeHashMap if statistics are not required). If only a small number of items should be kept in memory, use a LocalCache. If data are to be read on demand from a database, then use a ReadWriteBackingMap (which knows how to read and write through an application's DAO implementation), and in turn give the ReadWriteBackingMap a backing map such as a SafeHashMap or a LocalCache to store its data in.

Some backing maps are observable. The events coming from these backing maps are not usually of direct interest to the application. Instead, Coherence translates them into actions that must be taken (by Coherence) to keep data synchronous and properly backed up, and it also translates them when appropriate into clustered events that are delivered throughout the cluster as requested by application listeners. For example, if a partitioned cache has a LocalCache as its backing map, and the local cache expires an entry, that event causes Coherence to expire all of the backup copies of that entry. Furthermore, if any listeners have been registered on the partitioned cache, and if the event matches their event filter(s), then that event is delivered to those listeners on the servers where those listeners were registered.

In some advanced use cases, an application must process events on the server where the data are being maintained, and it must do so on the structure (backing map) that is actually managing the data. In these cases, if the backing map is an observable map, a listener can be configured on the backing map or one can be programmatically added to the backing map. (If the backing map is not observable, it can be made observable by wrapping it in an WrapperObservableMap.)

Each backing map event is dispatched once and only once. However, multiple backing map events could be generated from a single put. For example, if the entry from put has to be redistributed, then distributed events (deleted from original node, and inserted in a new node) are created. In this case, the backing map listener is called multiple times for the single put.

Lastly, backing map listeners are always synchronous; they are fired on a thread that is doing the modification operation while holding the synchronization monitor for the backing map itself. Often times for internal backing map listeners, events are not processed immediately, but are queued and processed later asynchronously.

For more information on this feature, see the API documentation for BackingMapManager.

25.10.1 Producing Readable Backing MapListener Events from Distributed Caches

Backing MapListener events are returned from replicated caches in readable Java format. However, backing MapListener events returned from distributed caches are in internal Coherence format. The Coherence Incubator Common project provides an AbstractMultiplexingBackingMapListener class that enables you to obtain readable backing MapListener events from distributed caches. See http://coherence.oracle.com/display/INCUBATOR/Coherence+Common to download Coherence Common libraries.

To produce readable backing MapListener events from distributed caches:

  1. Implement the AbstractMultiplexingBackingMapListener class.

  2. Register the implementation in the <listener> section of the backing-map-scheme in the cache-config file.

  3. Start the cache server application file and the client file with the cacheconfig Java property:

    -Dtangosol.coherence.cacheconfig="cache-config.xml"
    

The AbstractMultiplexingBackingMapListener class provides an onBackingMapEvent method which you can override to specify how you would like the event returned.

The following listing of the VerboseBackingMapListener class is a sample implementation of AbstractMultiplexingBackingMapListener. The onBackingMapEvent method has been over-ridden to send the results to standard output.

Example 25-15 An AbstractMultiplexingBackingMapListener Implementation

import com.tangosol.net.BackingMapManagerContext;
import com.tangosol.util.MapEvent;


public class VerboseBackingMapListener extends AbstractMultiplexingBackingMapListener {

        public VerboseBackingMapListener(BackingMapManagerContext context) {
                super(context);
        }
        
        @Override
        protected void onBackingMapEvent(MapEvent mapEvent, Cause cause) {
                
                System.out.printf("Thread: %s Cause: %s Event: %s\n",
                Thread.currentThread().getName(), cause, mapEvent);
        }
}

Example 25-16 is an example distributed scheme definition. In the <listener> section of the file, the VerboseBackingMapListener is identified as being of type com.tangosol.net.BackingMapManagerContext.

Example 25-16 Distributed Scheme Specifying a Verbose Backing Map Listener

<distributed-scheme>
   <scheme-name>my-dist-scheme</scheme-name>
   <service-name>DistributedCache</service-name>
      <backing-map-scheme>
         <read-write-backing-map-scheme>
            <internal-cache-scheme>
               <local-scheme>
                  <high-units>0</high-units>
                  <expiry-delay>0</expiry-delay>
               </local-scheme>
            </internal-cache-scheme>
            <cachestore-scheme>
               <class-scheme>
                  <class-name>CustomCacheStore</class-name>
                  <init-params>
                     <init-param>
                        <param-type>java.lang.String</param-type>
                        <param-value>{cache-name}</param-value>
                     </init-param>
                  </init-params>
               </class-scheme>
            </cachestore-scheme>
            <listener> 
               <class-scheme>
                  <class-name>VerboseBackingMapListener</class-name>
                     <init-params>
                        <init-param>
                           <param-type>com.tangosol.net.BackingMapManagerContext
                           </param-type>
                           <param-value>{manager-context}</param-value>
                        </init-param>
                     </init-params>
                  </class-scheme>
            </listener> 
         </read-write-backing-map-scheme>                                            
      </backing-map-scheme>
   <autostart>true</autostart>
</distributed-scheme>

25.11 Using Synchronous Event Listeners

Some events are delivered asynchronously, so that application listeners do not disrupt the cache services that are generating the events. In some rare scenarios, asynchronous delivery can cause ambiguity of the ordering of events compared to the results of ongoing operations. To guarantee that the cache API operations and the events are ordered as if the local view of the clustered system were single-threaded, a MapListener must implement the SynchronousListener marker interface.

One example in Coherence itself that uses synchronous listeners is the Near Cache, which can use events to invalidate locally cached data ("Seppuku").

For more information on this feature, see the API documentation for MapListenerSupport.SynchronousListener.