11 Working with Live Events

In this exercise you learn how to work with entry and EntryProcessor events, and how to be notified of events using event interceptors. This exercise provides instructions for creating a project where you create event interceptors and cause events to exercise the features of the Live Events framework.

A brief overview of the Live Events framework is also provided. For a more detailed description of the framework and the API discussed in this chapter, see "Using Live Events" in Oracle Fusion Middleware Developing Applications with Oracle Coherence and Oracle Fusion Middleware Java API Reference for Oracle Coherence.

This chapter contains the following sections:

11.1 Introduction

Coherence provides an event framework that allows your applications to react to operations performed in the data grid. The framework uses an event-based model where events represent observable occurrences of cluster operations. The supported events include partitioned service, cache, and application events. These events can be consumed by registering event interceptors (classes that implement EventInterceptor) either programmatically or by using the cache configuration.

11.1.1 About Event Interceptors

Applications can react to Live Events by registering event interceptors (EventInterceptor). The interceptors explicitly define which events to receive and what action, if any, to take. Any number of event interceptors can be created and registered for a specific cache or for all caches managed by a specific partitioned service. Multiple interceptors that are registered for the same event type are automatically chained together and executed in the context of a single event.

Event interceptors are created by implementing the EventInterceptor interface. The interface is defined using generics and allows you to filter the events of interest by specifying the generic type of the event as a type parameter. The inherited onEvent method provides the ability to perform any necessary processing upon receiving an event. For details on the EventInterceptor API, see Oracle Fusion Middleware Java API Reference for Oracle Coherence.

The @Interceptor annotation is used to restrict the events to specific event types and also provides further configuration of the interceptor. The @Interceptor annotation includes the following attributes:

  • identifier—Specifies a unique identifier for the interceptor. This value can be overridden when registering an interceptor class in the cache configuration file.

  • entryEvents—Specifies an array of entry event types to which the interceptor wants to subscribe.

  • entryProcessorEvents—Specifies an array of entry processor event types to which the interceptor wants to subscribe.

  • transferEvents—Specifies an array of transfer event types to which the interceptor wants to subscribe.

  • transactionEvents—Specifies an array of transaction event types to which the interceptor wants to subscribe.

  • order—Specifies whether the interceptor is placed at the front of a chain of interceptors. The legal values are HIGH and LOW. A value of HIGH indicates that the interceptor is placed at the front in the chain of interceptors. A value of LOW indicates no order preference. The default value is LOW. This value can be overridden when registering an interceptor class in the cache configuration file.

For more information on the @Interceptor annotation, see "Creating Event Interceptors" in Oracle Fusion Middleware Developing Applications with Oracle Coherence

11.1.2 About Cache Events

Cache events are raised due to some operation performed against one or many entries in a cache. Cache events include entry events and entry processor events. An entry event (EntryEvent) can represent one of several operations (inserting, updating, and removing) performed against entries in a cache. Entry events can be divided into precommit events (INSERTING, UPDATING, and REMOVING), which are raised before the operation is performed to allow modification to an entry, and post-commit events (INSERTED, UPDATED, and REMOVED) which are raised after an operation has completed and in the same order as the events occurred.

Entry processor (EntryProcessor) events represent the execution of an EntryProcessor on a set of entries in a cache. Entry processor events can be divided into precommit events (EXECUTING), which are raised before an entry processor is executed to allow modification to the entry processor implementation, and post-commit events (EXECUTED), which are raised after an entry processor is executed and in the same order that the events occurred.

11.1.3 About Partitioned Service Events

Partitioned service (PartitionedService) events are comprised of transfer events, which represent partition transfers between storage-enabled members, and transaction events. Transfer events are dispatched in the context of a partition being transferred, however the contents belonging to a partition are immutable.

11.1.4 About Event Interceptor Registration

You register an event interceptor either in a cache configuration file or programmatically. An event interceptor is registered either for one or many caches, or for a specific partitioned service. An event interceptor that is registered for a specific cache only receives events that pertain to that cache. An event interceptor that is registered for a specific partitioned service receives events for all caches that are managed by the service.

In the cache configuration file, the full class name of the event interceptor is specified in the <interceptor> element, which appears under <cache-name> in the <cache-mapping> stanza. The interceptor is associated with the cache specified in the <cache-name> element. An event interceptor can also be registered for a partitioned service in the <caching-schemes> stanza. To do this, include an <interceptors> element, within <distributed-scheme> element, that includes any number of <interceptor> subelements.

Instead of using the cache configuration file, event interceptors can be registered programmatically. The key classes and methods to register event interceptors are the getInterceptorRegistry method on the ConfigurableCacheFactory interface and the getEventInterceptor and registerEventInterceptor methods on the InterceptorRegistry interface. For example, the following code registers the TimedTraceInterceptor, which is an EventInterceptor introduced later in this chapter:

ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
InterceptorRegistry reg = ccf.getInterceptorRegistry();
 
reg.registerEventInterceptor(new TimedTraceInterceptor(), RegistrationBehavior.FAIL);

A detailed description and examples of registering event interceptors programmatically is beyond the scope of this documentation. For more information, see "Using Live Events" in Oracle Fusion Middleware Developing Applications with Oracle Coherence and Oracle Fusion Middleware Java API Reference for Oracle Coherence.

11.2 Creating, Registering, and Executing an Event Interceptor

The following sections describe how to create, register, and execute an event interceptor. In this exercise, you will work with an event interceptor that will measure the timing between pre- and post-commit events.

To complete this exercise, follow these steps:

  1. Create a Event Interceptor to Measure the Time Between a Pre- and a Post-commit Event

  2. Create a Class to Delay the Processing of Events

  3. Register the Timed Events Event Interceptor

  4. Create a POF Configuration File for the Lazy Processor Class

  5. Create a Class to Exercise the Timed Events Event Interceptor

  6. Create a Driver File for Timed Events Example

  7. Create a Cache Server Startup Configuration

  8. Create a Startup Configuration for the Timed Events Driver

  9. Run the Timed Events Example

11.2.1 Create a Event Interceptor to Measure the Time Between a Pre- and a Post-commit Event

Create a event interceptor named TimedTraceInterceptor to measure the timings between precommit and post-commit events (that is, INSERTING/INSERTED, UPDATING /UPDATED, and REMOVING/REMOVED) for each type of event.

To create the TimedTraceInterceptor event interceptor class:

  1. Create a new Application Client Project in Eclipse named UEMEvents. Ensure that CoherenceConfig is selected in the Configuration field on the opening page and the Create a default main is not selected on the Application Client module page.

    See "Creating a New Project in the Eclipse IDE" for detailed information.

  2. Create a new Java class called TimedTraceInterceptor. Ensure that the Default Package is com.oracle.handson. Do not select the Main Method check box.

    See "Creating a Java Class" for more information.

  3. Write an event interceptor that implements the EventInterceptor interface. The interceptor should provide timings between pre- and post-commit events for each type of event: inserts, updates, removes, and an entry processor. You can write your own interceptor or use the code that is provided in Example 11-1.

Example 11-1 illustrates the event interceptor TimedTraceInterceptor. The interceptor implements the EventInterceptor interface. The @Interceptor annotation provides the unique name of the interceptor with the identifier attribute and the order in which it should be executed (Order.HIGH) with the order attribute.

The interceptor also contains a protected EventTimer inner-class. This class times the elapsed time for each event it is notified of. The interceptor tracks the time between a pre- and post-commit event for each entry and the respective event types (INSERT, UPDATE, REMOVE). The timings are sent to the Coherence log in batches displaying sample and cumulative statistics.

As the generic argument is com.tangosol.net.events.partition.cache.Event you will only get events that are consumers of that event, that is, EntryEvent and EntryProcessorEvent, without specifying any filtering.

Example 11-1 Class to Provide Timings Between Pre- and Post-commit Events

package com.oracle.handson;
 
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
 
import com.tangosol.net.events.EventInterceptor;
import com.tangosol.net.events.annotation.Interceptor;
import com.tangosol.net.events.annotation.Interceptor.Order;
import com.tangosol.net.events.partition.cache.EntryEvent;
import com.tangosol.net.events.partition.cache.EntryEvent.Type;
import com.tangosol.net.events.partition.cache.EntryProcessorEvent;
import com.tangosol.net.events.partition.cache.Event;
 
import com.tangosol.util.Binary;
import com.tangosol.util.BinaryEntry;
 
import java.util.HashMap;
import java.util.Map;
 
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
 
/**
 * TimedTraceInterceptor provides timings between pre- and post-commit events
 * for each type of event i.e. inserts, updates, removes, and entry processor
 * execution.
 * <p>
 * These timings are collected and averaged at a sample rate defined by
 * parameter <tt>cSample</tt>. Additionally they are output to the log
 * at the same time. This implementation does maintain a strong reference to
 * the each binary key however this is removed upon receiving the post-commit event
 * for the same key.
 *
 * @since Coherence 12.1.2
 */
@Interceptor(identifier = "trace", order = Order.HIGH)
public class TimedTraceInterceptor
        implements EventInterceptor<Event<? extends Enum<?>>>
    {
 
    // ----- constructors ---------------------------------------------------
 
    /**
     * Default no-arg constructor.
     */
    public TimedTraceInterceptor()
        {
        this(DEFAULT_SAMPLE_RATE);
        }
 
    /**
     * Construct an TimedTraceInterceptor with specified {@link Event}
     * types and chain position.
     *
     * @param cSample   sample size to calculate mean and output statistics
     */
    public TimedTraceInterceptor(int cSample)
        {
        Map<Enum, EventTimer> mapTimedEvents = m_mapTimedEvents = new HashMap<Enum, EventTimer>(3);
        EventTimer insertTimer = new EventTimer(Type.INSERTED, cSample);
        EventTimer updateTimer = new EventTimer(Type.UPDATED, cSample);
        EventTimer removeTimer = new EventTimer(Type.REMOVED, cSample);
        EventTimer invocationTimer = new EventTimer(EntryProcessorEvent.Type.EXECUTED, cSample);
 
        mapTimedEvents.put(Type.INSERTED,  insertTimer);
        mapTimedEvents.put(Type.INSERTING, insertTimer);
        mapTimedEvents.put(Type.UPDATED,   updateTimer);
        mapTimedEvents.put(Type.UPDATING,  updateTimer);
        mapTimedEvents.put(Type.REMOVED,   removeTimer);
        mapTimedEvents.put(Type.REMOVING,  removeTimer);
        mapTimedEvents.put(EntryProcessorEvent.Type.EXECUTED,  invocationTimer);
        mapTimedEvents.put(EntryProcessorEvent.Type.EXECUTING, invocationTimer);
        }
 
    // ----- EventInterceptor methods ---------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void onEvent(Event event)
        {
        if (event instanceof EntryEvent)
            {
            process((EntryEvent) event);
            }
        else if (event instanceof EntryProcessorEvent)
            {
            process((EntryProcessorEvent) event);
            }
        }
 
    /**
     * This method will be invoked upon execution of an entry processor and
     * will time its execution from prior to post execution, including any
     * backup requests that need to be made as a result.
     *
     * @param event  the {@link EntryProcessorEvent} that encompasses the
     *               requested event
     */
    protected void process(EntryProcessorEvent event)
        {
        EventTimer mapTimedEvents = m_mapTimedEvents.get(event.getType());
 
        for (BinaryEntry binEntry : event.getEntrySet())
            {
            if (event.getType() == EntryProcessorEvent.Type.EXECUTING)
                {
                mapTimedEvents.starting(binEntry);
                }
            else if (event.getType() == EntryProcessorEvent.Type.EXECUTED)
                {
                mapTimedEvents.started(binEntry);
                }
            }
        }
 
    /**
     * This method will be invoked upon execution of a data mutating request
     * and will time its execution from prior to post execution, including
     * any backup requests that need to be made as a result.
     *
     * @param event  the {@link EntryEvent} that encompasses the
     *               requested event
     */
    protected void process(EntryEvent event)
        {
        EventTimer mapTimedEvents = m_mapTimedEvents.get(event.getType());
 
        switch ((Type) event.getType())
            {
            case INSERTING:
            case UPDATING:
            case REMOVING:
                for (BinaryEntry binEntry : event.getEntrySet())
                    {
                    mapTimedEvents.starting(binEntry);
                    }
                break;
            case INSERTED:
            case UPDATED:
            case REMOVED:
                for (BinaryEntry binEntry : event.getEntrySet())
                    {
                    mapTimedEvents.started(binEntry);
                    }
                break;
            }
        }
 
    // ----- inner class: EventTimer ----------------------------------------
 
    /**
     * The EventTimer times the elapsed time for each event it is notified
     * of. It correlates the completion event based on equality comparisons
     * of the Binary provided. Additionally it calculates the mean based on a
     * sample set of <tt>cSample</tt> size. When reaching this sample set
     * a log will be made of the current sample set mean and the cumulative
     * mean.
     */
    protected class EventTimer
        {
 
        // ----- constructors -----------------------------------------------
 
        /**
         * Construct an EventTimer with the event type provided.
         *
         * @param eventType  the type of event this timer will be timing
         */
        protected EventTimer(Enum eventType, int cSample)
            {
            m_eventType = eventType;
            m_cSampleSize = cSample;
            }
 
        /**
         * Notifies the timer of the execution of the provided key will
         * imminently commence.
         *
         * @param binEntry  the event will commence for this <tt>binEntry</tt>
         */
        public void starting(BinaryEntry binEntry)
            {
            m_mapElapsedTimes.put(binEntry.getBinaryKey(), System.nanoTime());
            }
 
        /**
         * Notifies the timer of the completion of the event for the provided
         * key.
         *
         * @param binEntry  the event has completed for this <tt>binEntry</tt>
         */
        public void started(BinaryEntry binEntry)
            {
            Long lStart = m_mapElapsedTimes.remove(binEntry.getBinaryKey());
            if (lStart == null)
                {
                return;
                }
 
            add(System.nanoTime() - lStart);
            }
 
        /**
         * Regardless of the specific data item add the elapsed time taken to
         * process the data item. Upon reaching the sample set size of events
         * calculate the mean, reset timings and continue.
         *
         * @param lElapsed  the number of nanos taken for a data item to
         *                  process
         */
        protected void add(long lElapsed)
            {
            AtomicInteger nEvents           = m_nEvents;
            AtomicLong    lTotalElapsed     = m_lTotalElapsed;
            int           nCurrEvents       = nEvents.incrementAndGet();
            long          lCurrTotalElapsed = lTotalElapsed.addAndGet(lElapsed);
 
            if (nCurrEvents % m_cSampleSize == 0)
                {
                nEvents.set(0);
                lTotalElapsed.set(0L);
                ++m_cSamples;
 
                long lMean   = lCurrTotalElapsed / nCurrEvents;
                     m_lMean = m_lMean == 0 ? lMean : lMean + m_lMean / 2;
 
                String sStats = String.format("EventStats[name = %s, sampleMean = %fms, mean = %fms]",
                        m_eventType, (double) lMean / 1000000, (double) m_lMean / 1000000);
 
                CacheFactory.log(sStats, CacheFactory.LOG_INFO);
 
                NamedCache cacheResults = CacheFactory.getCache("events-results");
                int        nMemberId    = CacheFactory.getCluster().getLocalMember().getId();
 
                cacheResults.put(
                        String.format("%d-%s-%d", nMemberId, m_eventType.name(), m_cSamples),
                        sStats);
                }
            }
 
        // ----- data members -----------------------------------------------
 
        /**
         * Sample size to calculate mean and output statistics.
         */
        private int               m_cSampleSize;
 
        /**
         * The start times for a number of Binary keys.
         */
        private Map<Binary, Long> m_mapElapsedTimes = new ConcurrentHashMap<Binary, Long>();
 
        /**
         * A counter of the total elapsed time.
         */
        private AtomicLong        m_lTotalElapsed = new AtomicLong();
 
        /**
         * A counter of the number of events processed
         */
        private AtomicInteger     m_nEvents = new AtomicInteger();
 
        /**
         * An average over time.
         */
        private long              m_lMean;
 
        /**
         * The number of samples taken.
         */
        private int               m_cSamples;
 
        /**
         * The type of event being timed.
         */
        private Enum              m_eventType;
        }
 
    // ----- constants ------------------------------------------------------
 
    /**
     * The sample size for elapsed times.
     */
    protected static final int DEFAULT_SAMPLE_RATE = 100;
 
    // ----- data members ---------------------------------------------------
 
    /**
     * A map of event types to their timers.
     */
    private Map<Enum, EventTimer> m_mapTimedEvents;
    }

11.2.2 Create a Class to Delay the Processing of Events

Create a class named LazyProcessor to create a superficial delay between the processing of events. The class should be able to specify the number of milliseconds this processor should sleep between processing events. This class will be used by the EventsTimingExample subclass in the EventsExamples class. You will create the EventsExamples class in a later step.

The data that the LazyProcessor class produces will be sent across the wire, so the class should use POF (Portable Object Format). You will add the LazyProcessor class to the POF configuration file in a later step.

To create the LazyProcessor class:

  1. Create a new Java class called LazyProcessor. Do not include a main method.

    See "Creating a Java Class" for more information.

  2. Create the code for the LazyProcessor class. Because the class uses the PortableObject interface for data serialization, the LazyProcessor class must implement PortableObject interface. The class must also extend the AbstractProcessor class. Import the Base, InvocableMap.Entry, AbstractProcessor, PortableObject, PofReader, and PofWriter classes and interfaces. You can write your own code for the LazyProcessor class or use the code that is provided in Example 11-2.

Example 11-2 Class to Delay the Processing of Events

package com.oracle.handson;
 
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.Base;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
 
import java.io.IOException;
 
/**
 * LazyProcessor will sleep for a specified period of time.
 *
 *
 * @since 12.1.2
 */
public class LazyProcessor
        extends AbstractProcessor
        implements PortableObject
    {
    // ----- constructors ---------------------------------------------------
 
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
 
    /**
     * Default no-arg constructor.
     */
    public LazyProcessor()
        {
        }
 
    /**
     * Constructs a LazyProcessor with a specified time to sleep.
     *
     * @param lSleepTime  the number of milliseconds this processor should
     *                    sleep
     */
    public LazyProcessor(long lSleepTime)
        {
        m_lSleepTime = lSleepTime;
        }
 
    /**
     * {@inheritDoc}
     */
    public Object process(Entry entry)
        {
        try
            {
            Thread.sleep(m_lSleepTime);
            }
        catch (InterruptedException e)
            {
            throw Base.ensureRuntimeException(e);
            }
        return null;
        }
 
    // ----- PortableObject members -----------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void readExternal(PofReader in) throws IOException
        {
        m_lSleepTime = in.readLong(0);
        }
 
    /**
     * {@inheritDoc}
     */
    public void writeExternal(PofWriter out) throws IOException
        {
        out.writeLong(0, m_lSleepTime);
        }
 
    // ----- data members ---------------------------------------------------
 
    /**
     * The number of milliseconds this processor should sleep.
     */
    private long m_lSleepTime;
    }

11.2.3 Register the Timed Events Event Interceptor

In the UEMEvents project, the interceptors are registered in the cache configuration file. The fully-qualified name of the event interceptor is specified in the interceptor element, which appears under <cache-name> element in the <cache-mapping> stanza. The interceptor is associated with the cache specified in the <cache-name> element. In this example, TimedTraceInterceptor is the event interceptor on the events cache.

To create a cache configuration file which defines the event interceptors:

  1. Open the coherence-cache-config.xml file from the Project Explorer window. You can find the file under Events/appClientModule.

  2. Save the file as uem-cache-config.xml.

  3. Write the cache configuration that calls the event interceptors. The following list highlights some key elements:

    • Under the <cache-mapping> element there is a reference from the com.oracle.handson.TimedTraceInterceptor class in the <interceptor> element to the events cache in the <cache-name> element. The events cache uses the events-distributed-scheme This scheme uses the PartitionedPofCache service which is listed under <distributed-schemes>.

    • There is a mapping between the events-results cache and the dist-events-results scheme. In the <distributed-scheme> section, there is an association between the dist-events-results scheme and the PartitionedEventsResults service.

Example 11-3 illustrates a possible implementation for the uem-cache-config.xml file.

Example 11-3 Cache Configuration File That Registers the TimedTraceInterceptor

<?xml version="1.0"?>
 
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
  <defaults>
    <serializer>pof</serializer>
  </defaults>
 
  <caching-scheme-mapping>
    <cache-mapping>
        <cache-name>events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.TimedTraceInterceptor</class-name>
                    <init-params>
                        <init-param>
                            <param-type>int</param-type>
                            <param-value>100</param-value>
                        </init-param>
                    </init-params>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>events-results</cache-name>
        <scheme-name>dist-events-results</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>
 
  <caching-schemes>
    <distributed-scheme>
      <scheme-name>events-distributed-scheme</scheme-name>
      <service-name>PartitionedPofCache</service-name>
      <thread-count>5</thread-count>
      <backing-map-scheme>
        <local-scheme>
          <!-- each node will be limited to 32MB -->
          <high-units>32M</high-units>
          <unit-calculator>binary</unit-calculator>
        </local-scheme>
      </backing-map-scheme>
      <autostart>true</autostart>
    </distributed-scheme>
 
    <!-- A PartitionedCache service used to store results for events examples
      -->
    <distributed-scheme>
        <scheme-name>dist-events-results</scheme-name>
        <service-name>PartitionedEventsResults</service-name>
        <thread-count>5</thread-count>
        <backing-map-scheme>
            <local-scheme/>
        </backing-map-scheme>
        <autostart>true</autostart>
    </distributed-scheme>
 
    <!--
    Invocation Service scheme.
    -->
    <invocation-scheme>
      <scheme-name>examples-invocation</scheme-name>
      <service-name>InvocationService</service-name>
 
      <autostart system-property="tangosol.coherence.invocation.autostart">true</autostart>
    </invocation-scheme>
 
 </caching-schemes>
</cache-config>

11.2.4 Create a POF Configuration File for the Lazy Processor Class

All of the information produced by the TimedTraceInterceptor class is exclusive to its local member. The LazyProcessor, and its state, is transmitted to storage-enabled members and executed, thus it must be added to the POF configuration file. In the Eclipse Project Explorer, right-click the pof-config.xml file and rename it uem-pof-config.xml. Open the uem-pof-config.xml file in the editor and replace its contents with the code in Example 11-4. The example illustrates a POF configuration file containing this class.

Example 11-4 POF Configuration File for the LazyProcessor Class

<?xml version="1.0"?>
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
    xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config http://xmlns.oracle.com/coherence/coherence-pof-config/1.2/coherence-pof-config.xsd">
   <user-type-list>
   <!-- include all "standard" Coherence POF user types -->
   <include>coherence-pof-config.xml</include>
    <user-type>
      <type-id>1008</type-id>
      <class-name>com.oracle.handson.LazyProcessor</class-name>
    </user-type>
   </user-type-list>
</pof-config>

11.2.5 Create a Class to Exercise the Timed Events Event Interceptor

Create a class named EventsExamples to trigger actions to be performed by the TimedTraceInterceptor class. The class should illustrate how the elapsed time can be measured between pre- and post-commit events inserted into a results cache. The entries inserted into the results cache are sent to standard output by the process executing this class.

To create the EventsExamples class:

  1. Create a new Java class called EventsExamples. Do not include a main method.

    See "Creating a Java Class" for more information.

  2. Write the code for the EventsExamples class. Import the LazyProcessor, CacheFactory, NamedCache, PartitionedService, MapEvent, MapListener, MultiplexListener, and Callable classes and interfaces. You can write your own code or use the code supplied in Example 11-5.

Example 11-5 illustrates a sample implementation of the EventsExamples class. The example contains the EventsTimingExample inner-class. This inner-class accesses the events and event-results caches and obtains the number of cache cluster members, the name of the triggering event, and the sample size from the TimedTraceInterceptor class logs. The calculation of the time for the sample to be processed and the mean time for all of the samples to be processed are provided by the TimedTraceInterceptor class.

The EventsTimingExample subclass provides the values for the total amount of time for event processing and the number of threads on which the events can run. It also calls the LazyProcessor class to calculate the amount of time between processing events (sleep time).

Example 11-5 Class to Exercise the TimedTraceInterceptor Event Interceptor

package com.oracle.handson;
 
import com.oracle.handson.LazyProcessor;
 
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.MultiplexingMapListener;
 
import java.util.concurrent.Callable;
 
/**
 * EventsExamples illustrates various features within the Live Events
 * Model. This includes providing mean elapsed times split by event type.
 *
 * @since Coherence 12.1.2
 */
@SuppressWarnings("unchecked")
public class EventsExamples
    {
 
    // ----- inner-class: EventsTimingExample -------------------------------
 
    /**
     * The EventsTimingExample is a catalyst for action to be performed by
     * {@link TimedTraceInterceptor}. This illustrates how the elapsed time
     * between pre- and post-commit events can be measured which are inserted 
     * into a results cache. The entries inserted into the results cache are
     * displayed via the stdout of the process executing this class.
     */
    public static class EventsTimingExample
            implements Callable<Boolean>
        {
        // ----- Callable methods -------------------------------------------
 
        /**
         * {@inheritDoc}
         */
        public Boolean call() throws Exception
            {
            NamedCache cacheEvents  = CacheFactory.getCache("events");
            NamedCache cacheResults = CacheFactory.getCache("events-results");
            int        cFrequency   = ((PartitionedService) cacheEvents.getCacheService()).getOwnershipEnabledMembers().size();
            int        cSet         = 110;
 
            MapListener ml = new MultiplexingMapListener()
                {
                @Override
                protected void onMapEvent(MapEvent evt)
                    {
                    String[] asKey = ((String) evt.getKey()).split("-");
 
                    System.out.printf("Received stats [memberId=%s, eventType=%s, sample=%s] = %s\n",
                            asKey[0], asKey[1], asKey[2], evt.getNewValue());
                    }
                };
 
            try
                {
                cacheResults.addMapListener(ml);
 
                // execute inserts and updates
                for (int i = cFrequency; i > 0; --i)
                    {
                    for (int j = 1, cMax = cSet * cFrequency; j <= cMax; ++j)
                        {
                        cacheEvents.put(j, "value " + j);
                        }
                    }
 
                // execute processors
                int nTotalTime = 3000;
                int cThreads   = 5;
                int nSleepTime = nTotalTime / (cThreads * cSet * cFrequency);
                for (int i = 1, cMax = cSet * cFrequency; i <= cMax; ++i)
                    {
                    cacheEvents.invoke(i, new LazyProcessor(nSleepTime));
                    }
                }
            finally
                {
                cacheEvents.clear();
                cacheResults.removeMapListener(ml);
                cacheResults.clear();
                }
            return true;
            }
        }
    }

11.2.6 Create a Driver File for Timed Events Example

Create a driver file to run the EventsTimingExample example defined in the EventsExamples class.

To create a driver file:

  1. Create a new Java class called Driver in the UEMEvents project. Ensure that it includes a main method.

    See "Creating a Java Class" for detailed information.

  2. Write the code to run the EventsTimingExample example defined in the EventsExamples class. You can write your own driver or use the code supplied in Example 11-6.

Example 11-6 Driver File for Timed Events Example

package com.oracle.handson;
 
import com.oracle.handson.EventsExamples.EventsTimingExample;
 
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Callable;
 
/**
 * Driver executes all the Coherence Events examples.
 * <p>
 * <strong>Timed Events Example</strong> - In this example we time
 * the elapsed time between pre- and post-commit events for each of the events
 * that occur.
 *
 * @since Coherence 12.1.2
 */
public class Driver
    {
 
    // ----- static methods -------------------------------------------------
 
    /**
    * Execute Events examples.
    *
    * @param asArg  command line arguments
    */
    public static void main(String[] asArg)
        {
        System.out.println("------ events examples begin ------");
 
        // Run examples
        for (Map.Entry<String, Callable<Boolean>> example : EVENTS_EXAMPLES.entrySet())
            {
            String sExample = example.getKey();
 
            try
                {
                System.out.printf("------   %s begin\n\n", sExample);
                boolean fSuccess = example.getValue().call();
                System.out.printf("\n------   %s completed %ssuccessfully\n", sExample, fSuccess ? "" : "un");
                }
            catch(Exception e)
                {
                System.err.printf("----------%s completed unsuccessfully with the following exception:\n", sExample);
                e.printStackTrace();
                }
            }
 
        System.out.println("------ events examples completed------");
        }
 
    // ----- constants ------------------------------------------------------
 
    /**
     * All the examples to be executed in insertion order.
     */
    protected static final Map<String, Callable<Boolean>> EVENTS_EXAMPLES = new LinkedHashMap<String, Callable<Boolean>>();
 
    /**
     * Default examples.
     */
    static
        {
        EVENTS_EXAMPLES.put("timing interceptor", new EventsTimingExample());
        }
    }

11.2.7 Create a Cache Server Startup Configuration

Create a configuration to start the cache server for the UEMEvents project.

  1. Right click the project and select Run As then Run Configurations. Double click the Oracle Coherence icon in the Run Configurations dialog box to create a new launch configuration.

  2. In the Main tab, enter UEMEventsServer in the Name field. Click Browse in the Project field and select the UEMEvents project in the Project Selection dialog box. Select the Include system libraries when searching for a main class checkbox and click Search. Enter DefaultCacheServer in the Select Type field and select com.tangosol.net.DefaultCacheServer. Click Apply. The Main tab should look similar to Figure 11-1.

    Figure 11-1 Main Tab for the Events Server Startup Configuration

    Description of Figure 11-1 follows
    Description of "Figure 11-1 Main Tab for the Events Server Startup Configuration"

  3. In the General tab of the Coherence tab, identify the path to the cache configuration file under Cache configuration descriptor. Click the Browse button to navigate to the Absolute file path of the cache configuration file C:\home\oracle\workspace\UEMEvents\appClientModule\uem-cache-config.xml. Select Enabled (cache server) under Local storage. Enter a unique value, such as 3155, for the Cluster port.

    In the Other tab, set the tangosol.pof.config item to uem-pof-config.xml.

  4. In the Common tab, select Shared file and browse to the \UEMEvents project.

  5. The Classpath tab should look similar to Example 11-2. The UEMEvents project appears below User Entries. The JRE System Library and Coherence12.1.3 library appear in the Bootstrap Entries section.

    Figure 11-2 Classpath Tab for the Events Server Startup Configuration

    Description of Figure 11-2 follows
    Description of "Figure 11-2 Classpath Tab for the Events Server Startup Configuration"

11.2.8 Create a Startup Configuration for the Timed Events Driver

Create a configuration to start the client driver for the UEMEvents project.

  1. Right click the project and select Run As then Run Configurations. Double click the Oracle Coherence icon in the Run Configurations dialog box to create a new launch configuration.

  2. In the Main tab, enter UEMEventDriver in the Name field. Click Browse in the Project field and select the UEMEvents project in the Project Selection dialog box. Select the Include system libraries when searching for a main class checkbox and click Search. Enter Driver in the Select Type field and select com.oracle.handson.Driver. Click Apply. The Main tab should look similar to Figure 11-0.

    Figure 11-3 Main Tab for the Events Client Startup Configuration

    Description of Figure 11-3 follows
    Description of "Figure 11-3 Main Tab for the Events Client Startup Configuration"

  3. In the General tab of the Coherence tab, identify the path to the cache configuration file under Cache configuration descriptor. Click the Browse button to navigate to the Absolute file path of the cache configuration file C:\home\oracle\workspace\UEMEvents\appClientModule\uem-cache-config.xml. Select Disabled (cache client) under Local storage. Enter a unique value, such as 3155, for the Cluster port.

    In the Other tab, set the tangosol.pof.config item to the uem-pof-config.xml.

  4. In the Common tab, select Shared file and browse to the \UEMEvents project.

  5. The Classpath tab should look similar to Example 11-4. The UEMEvents project appears below User Entries. The JRE System Library and Coherence12.1.3 library appear in the Bootstrap Entries section.

    Figure 11-4 Classpath Tab for the Events Client Startup Configuration

    Description of Figure 11-4 follows
    Description of "Figure 11-4 Classpath Tab for the Events Client Startup Configuration"

11.2.9 Run the Timed Events Example

Right-click the UEMEvents project in the Project Explorer and select Run As, then Run Configurations. In the Run Configurations dialog box, select the UEMEventsServer launch configuration and click Run to start the cache server. After the cache server starts, select UEMEventsServer and click Run a second and a third time to start a total of three cache servers.

After the third cache server starts, select the UEMEventDriver configuration and click Run. The output from the cache client should look similar to Example 11-7.

Example 11-7 Output from the Cache Client

------ events examples begin ------
------   timing interceptor begin
2014-01-02 15:43:20.284/0.340 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational configuration from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence.xml"
2014-01-02 15:43:20.384/0.440 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational overrides from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence-override-dev.xml"
2014-01-02 15:43:20.464/0.520 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational overrides from "file:/C:/home/oracle/workspace/UEMEvents/build/classes/tangosol-coherence-override.xml"
2014-01-02 15:43:20.474/0.530 Oracle Coherence 12.1.3.0.0 <D5> (thread=main, member=n/a): Optional configuration override "cache-factory-config.xml" is not specified
2014-01-02 15:43:20.474/0.530 Oracle Coherence 12.1.3.0.0 <D5> (thread=main, member=n/a): Optional configuration override "cache-factory-builder-config.xml" is not specified
2014-01-02 15:43:20.474/0.530 Oracle Coherence 12.1.3.0.0 <D5> (thread=main, member=n/a): Optional configuration override "/custom-mbeans.xml" is not specified
...
... 
2014-01-02 15:43:24.650/4.706 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=5): Service PartitionedPofCache joined the cluster with senior service member 1
2014-01-02 15:43:24.699/4.755 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedEventsResults, member=5): Service PartitionedEventsResults joined the cluster with senior service member 1
Received stats [memberId=2, eventType=INSERTED, sample=1] = EventStats[name = INSERTED, sampleMean = 0.294040ms, mean = 0.294040ms]
Received stats [memberId=3, eventType=INSERTED, sample=1] = EventStats[name = INSERTED, sampleMean = 0.397855ms, mean = 0.397855ms]
Received stats [memberId=1, eventType=INSERTED, sample=1] = EventStats[name = INSERTED, sampleMean = 0.373270ms, mean = 0.373270ms]
Received stats [memberId=3, eventType=UPDATED, sample=1] = EventStats[name = UPDATED, sampleMean = 0.187132ms, mean = 0.187132ms]
Received stats [memberId=2, eventType=UPDATED, sample=1] = EventStats[name = UPDATED, sampleMean = 0.234314ms, mean = 0.234314ms]
Received stats [memberId=1, eventType=UPDATED, sample=1] = EventStats[name = UPDATED, sampleMean = 0.237622ms, mean = 0.237622ms]
Received stats [memberId=2, eventType=UPDATED, sample=2] = EventStats[name = UPDATED, sampleMean = 1.315323ms, mean = 1.432480ms]
Received stats [memberId=3, eventType=UPDATED, sample=2] = EventStats[name = UPDATED, sampleMean = 0.417201ms, mean = 0.510767ms]
Received stats [memberId=1, eventType=UPDATED, sample=2] = EventStats[name = UPDATED, sampleMean = 0.190555ms, mean = 0.309366ms]
Received stats [memberId=2, eventType=EXECUTED, sample=1] = EventStats[name = EXECUTED, sampleMean = 1.766313ms, mean = 1.766313ms]
Received stats [memberId=3, eventType=EXECUTED, sample=1] = EventStats[name = EXECUTED, sampleMean = 1.672603ms, mean = 1.672603ms]
Received stats [memberId=1, eventType=EXECUTED, sample=1] = EventStats[name = EXECUTED, sampleMean = 1.676003ms, mean = 1.676003ms]
 
------   timing interceptor completed successfully
------ events examples completed------
2014-01-02 15:43:28.344/8.400 Oracle Coherence GE 12.1.3.0.0 <D4> (thread=ShutdownHook, member=5): ShutdownHook: stopping cluster node

11.3 Vetoing Pre- and Post-commit Events Using an Event Interceptor

In this exercise you will create an event interceptor to detect and veto events based on a specified key. To complete this exercise, follow these steps:

  1. Create an Event Interceptor to Detect and Veto Events

  2. Register the Veto Events Event Interceptor

  3. Create a Class to Exercise the Veto Events Event Interceptor

  4. Edit the Driver File for the Veto Events Example

  5. Run the Veto Events Example

11.3.1 Create an Event Interceptor to Detect and Veto Events

To exercise the ability of Live Events to accept or veto events, create an event interceptor named CantankerousInterceptor. The interceptor will throw exceptions based on events that correspond to a specified key.

  1. Create a new Java class called CantankerousInterceptor. Ensure that the Default Package is com.oracle.handson. Do not select the Main Method check box.

    See "Creating a Java Class" for more information.

  2. Write the code for the event interceptor. Import the Event, EventInterceptor, Interceptor, EntryEvent, and EntryEvent.Type classes and interfaces. You can write your own interceptor code or use the code that is provided in Example 11-8.

Example 11-8 illustrates an event interceptor that throws a runtime exception during pre or post-commit events, based on the key that is attempting to be inserted. If the exception is thrown at precommit time, then a rollback occurs and the exception is propagated to the client. If the exception occurs at post-commit time, then a log event is recorded. The keys used for the exceptions are VETO and NON-VETO. INSERTING and UPDATING are events that can be vetoed, whereas INSERTED and UPDATED events cannot be vetoed.

Example 11-8 Class to Detect and Veto Events

package com.oracle.handson;
 
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
 
import com.tangosol.net.events.Event;
import com.tangosol.net.events.EventInterceptor;
import com.tangosol.net.events.annotation.Interceptor;
import com.tangosol.net.events.partition.cache.EntryEvent;
import com.tangosol.net.events.partition.cache.EntryEvent.Type;
 
import com.tangosol.util.BinaryEntry;
 
/**
 * A CantankerousInterceptor is an {@link EventInterceptor} implementation
 * that is argumentative in nature, hence the event of inserting certain keys
 * will result in {@link RuntimeException}s at either pre- or post-commit
 * phases. Throwing a {@link RuntimeException} during the precommit phase
 * will result in a rollback and related exception being propagated to the
 * client. A post-commit exception will result in a log event. A precommit
 * event is considered an *ING event with a post-commit event being a
 * *ED event.
 * <p>
 * This interceptor assumes it will be working against a cache with strings
 * as keys with the following items that will be considered objectionable.
 * <table>
 *     <tr><td>Key</td><td>Exception Thrown During Event</td></tr>
 *     <tr><td>{@value #VETO}</td><td>{@link Type#INSERTING} ||
 *             {@link Type#UPDATING}</td></tr>
 *     <tr><td>{@value #NON_VETO}</td><td>{@link Type#INSERTED} ||
 *             {@link Type#UPDATED}</td></tr>
 * </table>
 *
 *
 * @since Coherence 12.1.2
 */
@Interceptor(identifier = "cantankerous",
        entryEvents = {Type.INSERTING, Type.INSERTED, Type.UPDATING, Type.UPDATED})
public class CantankerousInterceptor
        implements EventInterceptor<EntryEvent>
    {
    // ----- EventInterceptor methods ---------------------------------------
 
    /**
     * Throws {@link RuntimeException} iff the key used for this event is
     * {@code #VETO} or {@code #NON_VETO}.
     *
     * @param event  the {@link Event} to be processed
     *
     * @throws RuntimeException iff {@code #VETO} || {@code #NON_VETO} are
     *         keys of the event
     */
    public void onEvent(EntryEvent event)
        {
        for (BinaryEntry binEntry : event.getEntrySet())
            {
            if (VETO.equals(binEntry.getKey()))
                {
                throw new RuntimeException("Objection! value = " + binEntry.getValue());
                }
            else if (NON_VETO.equals(binEntry.getKey())
                    && (event.getType() == Type.INSERTED || event.getType() == Type.UPDATED))
                {
 
                NamedCache cacheResults = CacheFactory.getCache("events-results");
                int        nMemberId    = CacheFactory.getCluster().getLocalMember().getId();
                String     sMessage     = "Objection falls on deaf ears! value = " + binEntry.getValue();
 
                cacheResults.put(
                        String.format("%d-NON_VETO-%d", nMemberId, ++m_cNonVetoableEvents),
                        sMessage);
 
                throw new RuntimeException(sMessage);
                }
            }
        }
 
    // ----- constants ------------------------------------------------------
 
    /**
     * String used to determine whether the event should be VETO'd during the
     * precommit phase.
     */
    public static final String VETO     = "VETO";
 
    /**
     * String used to determine whether the event should be VETO'd during the
     * post-commit phase.
     */
    public static final String NON_VETO = "NON-VETO";
 
    // ----- data members ---------------------------------------------------
 
    /**
     * A counter of the number of non-vetoable exceptions raised.
     */
    private int m_cNonVetoableEvents;
    }

11.3.2 Register the Veto Events Event Interceptor

Open the uem-cache-config.xml cache configuration file and add the code to register the CantankerousInterceptor event interceptor and its cache. List the cache it references, vetod-events, in the <cache-name> element and its associated scheme events-distributed-scheme in the <scheme-name> element. List the fully-qualified class name of the CantankerousInterceptor class in the <class-name> subelement of the <interceptors> stanza. The added code is illustrated in bold font.

Example 11-9 Cache Configuration to Register the CantankerousInterceptor Class

<?xml version="1.0"?>
 
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
  <defaults>
    <serializer>pof</serializer>
  </defaults>
 
  <caching-scheme-mapping>
    <cache-mapping>
        <cache-name>events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.TimedTraceInterceptor</class-name>
                    <init-params>
                        <init-param>
                            <param-type>int</param-type>
                            <param-value>100</param-value>
                        </init-param>
                    </init-params>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>vetod-events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.CantankerousInterceptor</class-name>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>events-results</cache-name>
        <scheme-name>dist-events-results</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>
 
  <caching-schemes>
    <distributed-scheme>
      <scheme-name>events-distributed-scheme</scheme-name>
      <service-name>PartitionedPofCache</service-name>
      <thread-count>5</thread-count>
      <backing-map-scheme>
        <local-scheme>
          <!-- each node will be limited to 32MB -->
          <high-units>32M</high-units>
          <unit-calculator>binary</unit-calculator>
        </local-scheme>
      </backing-map-scheme>
      <autostart>true</autostart>
    </distributed-scheme>
 
    <!-- A PartitionedCache service used to store results for events examples
      -->
    <distributed-scheme>
        <scheme-name>dist-events-results</scheme-name>
        <service-name>PartitionedEventsResults</service-name>
        <thread-count>5</thread-count>
        <backing-map-scheme>
            <local-scheme/>
        </backing-map-scheme>
        <autostart>true</autostart>
    </distributed-scheme>
 
    <!--
    Invocation Service scheme.
    -->
    <invocation-scheme>
      <scheme-name>examples-invocation</scheme-name>
      <service-name>InvocationService</service-name>
 
      <autostart system-property="tangosol.coherence.invocation.autostart">true</autostart>
    </invocation-scheme>
 
 </caching-schemes>
</cache-config>

11.3.3 Create a Class to Exercise the Veto Events Event Interceptor

Create a class named VetoExample to exercise the CantankerousInterceptor event interceptor. Within the VetoExample class, create a subclass, VetoedEventsExample. The VetoedEventsExample subclass initiates the action to be performed by CantankerousInterceptor. The code illustrates the semantics of throwing exceptions in pre- and post-commit events. The exceptions that are expected only to be logged are inserted into a results cache. The entries inserted into the results cache are displayed by using the standard output of the process executing this class.

Example 11-10 Class to Exercise the TimedTraceInterceptor EventInterceptor

package com.oracle.handson;
 
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.MultiplexingMapListener;
 
import java.util.concurrent.Callable;
 
/**
 * VetoExample illustrates the different semantics in throwing exceptions in pre
 *  events compared to post events.
 *
 * @since Coherence 12.1.2
 */
@SuppressWarnings("unchecked")
public class VetoExample
    {
 
    /**
     * The VetoExample is a catalyst for action to be performed by
     * {@link CantankerousInterceptor}. This illustrates the semantics of
     * throwing exceptions in pre- and post-commit events. The exceptions that are
     * expected to only be logged are inserted into a results cache. The
     * entries inserted into the results cache are
     * displayed via the stdout of the process executing this class.
     */
    public static class VetoedEventsExample
            implements Callable<Boolean>
        {
 
        // ----- Callable methods -------------------------------------------
 
        /**
         * {@inheritDoc}
         */
        public Boolean call() throws Exception
            {
            // perform events to cause interceptors to veto said event
            NamedCache  cacheVetoEvents = CacheFactory.getCache("vetod-events");
            NamedCache  cacheResults    = CacheFactory.getCache("events-results");
            MapListener ml              = new MultiplexingMapListener()
                {
                @Override
                protected void onMapEvent(MapEvent evt)
                    {
                    String[] asKey = ((String) evt.getKey()).split("-");
 
                    System.out.printf("Received event [memberId=%s, eventType=%s, count=%s] = %s\n",
                            asKey[0], asKey[1], asKey[2], evt.getNewValue());
                    }
                };
            try
                {
                int cSet      = 110;
                int cVetos    = 5;
                int cNonVetos = 10;
                int cVetod    = 0;
 
                cacheResults.addMapListener(ml);
 
                for (int i = 1; i <= cSet; ++i)
                    {
                    boolean fVetod = false;
 
                    if (i % (cSet / cVetos) == 0)
                        {
                        try
                            {
                            cacheVetoEvents.put(CantankerousInterceptor.VETO, "value: " + i);
                            }
                        catch(Throwable e)
                            {
                            fVetod = true;
                            ++cVetod;
                            }
                        }
                    if (i % (cSet / cNonVetos) == 0)
                        {
                        cacheVetoEvents.put(CantankerousInterceptor.NON_VETO, "value: " + i);
                        fVetod = true;
                        }
 
                    if (!fVetod)
                        {
                        cacheVetoEvents.put(String.valueOf(i), "value: " + i);
                        }
                    }
                System.out.printf("Number of veto'd events: %d\n", cVetod);
                }
            finally
                {
                cacheVetoEvents.clear();
                cacheResults.removeMapListener(ml);
                cacheResults.clear();
                }
            return true;
            }
        }
   } 

11.3.4 Edit the Driver File for the Veto Events Example

Edit the driver file that you created in "Create a Driver File for Timed Events Example" to run the VetoedEventsExample example defined in the VetoExample class.

To edit the driver file:

  1. Replace the import statement for the Events Timing Example:

    import com.oracle.handson.EventsExamples.EventsTimingExample; 
    

    with an import statement for the VetoedEventsExample subclass.

    import com.oracle.handson.VetoExample.VetoedEventsExample;
    
  2. Replace the command which calls the Timed Events example:

    EVENTS_EXAMPLES.put("timing interceptor", new EventsTimingExample());
    

    with the following command to run the VetoedEventsExample example defined in the VetoExample class:

    EVENTS_EXAMPLES.put("veto interceptor", new VetoedEventsExample());
    

11.3.5 Run the Veto Events Example

Right-click the UEMEvents project in the Project Explorer and select Run As, then Run Configurations. In the Run Configurations dialog box, select the UEMEventsServer launch configuration that you created in "Create a Cache Server Startup Configuration". Click Run to start the cache server. After the cache server starts, select UEMEventsServer and click Run a second and a third time to start a total of three cache servers.

After the third cache server starts, select the UEMEventDriver launch configuration and click Run. The output from the veto events client should look similar to Example 11-11.

Example 11-11 Output from the Veto Events Client

------ events examples begin ------
------   veto interceptor begin
2014-01-02 16:07:28.606/0.340 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational configuration from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence.xml"
2014-01-02 16:07:28.716/0.450 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational overrides from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence-override-dev.xml" 
...
2014-01-02 16:07:33.018/4.752 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedEventsResults, member=4): Service PartitionedEventsResults joined the cluster with senior service member 1 
Received event [memberId=3, eventType=NON_VETO, count=1] = Objection falls on deaf ears! value = value: 11
Received event [memberId=3, eventType=NON_VETO, count=2] = Objection falls on deaf ears! value = value: 22
Received event [memberId=3, eventType=NON_VETO, count=3] = Objection falls on deaf ears! value = value: 33
Received event [memberId=3, eventType=NON_VETO, count=4] = Objection falls on deaf ears! value = value: 44
Received event [memberId=3, eventType=NON_VETO, count=5] = Objection falls on deaf ears! value = value: 55
Received event [memberId=3, eventType=NON_VETO, count=6] = Objection falls on deaf ears! value = value: 66
Received event [memberId=3, eventType=NON_VETO, count=7] = Objection falls on deaf ears! value = value: 77
Received event [memberId=3, eventType=NON_VETO, count=8] = Objection falls on deaf ears! value = value: 88
Received event [memberId=3, eventType=NON_VETO, count=9] = Objection falls on deaf ears! value = value: 99 
Number of veto'd events: 5
Received event [memberId=3, eventType=NON_VETO, count=10] = Objection falls on deaf ears! value = value: 110
------   veto interceptor completed successfully
------ events examples completed------

Notice the output from the third cache server illustrated in Example 11-12. The output displays the exceptions caused by the vetoed events.

Example 11-12 Output from the Cache Server

Started DefaultCacheServer...
 
2014-01-02 16:06:50.262/5.785 Oracle Coherence GE 12.1.3.0.0 <D4> (thread=DistributedCache:PartitionedPofCache, member=2): Asking member 1 for primary ownership of PartitionSet{0..127}
...
...
2014-01-02 16:07:33.178/48.701 Oracle Coherence GE 12.1.3.0.0 <Error> (thread=DistributedCache:PartitionedPofCache:EventDispatcher, member=2): Exception caught while dispatching to "<cantankerous, com.oracle.handson.CantankerousInterceptor>": java.lang.RuntimeException: Objection falls on deaf ears! value = value: 11 
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:74)
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:1)
     at com.tangosol.net.events.internal.NamedEventInterceptor.onEvent(NamedEventInterceptor.java:240)
     at com.tangosol.net.events.internal.AbstractEvent.nextInterceptor(AbstractEvent.java:116)
     at com.tangosol.net.events.internal.AbstractEvent.dispatch(AbstractEvent.java:154)
     at com.tangosol.net.events.internal.AbstractEventDispatcher$1.proceed(AbstractEventDispatcher.java:254)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.service.grid.PartitionedService$Continuations$Task.run(PartitionedService.CDB:6)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.Service$EventDispatcher.onNotify(Service.CDB:26)
     at com.tangosol.coherence.component.util.Daemon.run(Daemon.CDB:51)
     at java.lang.Thread.run(Thread.java:722)
 
2014-01-02 16:07:33.238/48.761 Oracle Coherence GE 12.1.3.0.0 <Error> (thread=DistributedCache:PartitionedPofCache:EventDispatcher, member=3): Exception caught while dispatching to "<cantankerous, com.oracle.handson.CantankerousInterceptor>": Objection falls on deaf ears! value = value: 22
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:74)
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:1)
     at com.tangosol.net.events.internal.NamedEventInterceptor.onEvent(NamedEventInterceptor.java:240)
     at com.tangosol.net.events.internal.AbstractEvent.nextInterceptor(AbstractEvent.java:116)
     at com.tangosol.net.events.internal.AbstractEvent.dispatch(AbstractEvent.java:154)
     at com.tangosol.net.events.internal.AbstractEventDispatcher$1.proceed(AbstractEventDispatcher.java:254)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.service.grid.PartitionedService$Continuations$Task.run(PartitionedService.CDB:6)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.Service$EventDispatcher.onNotify(Service.CDB:26)
     at com.tangosol.coherence.component.util.Daemon.run(Daemon.CDB:51)
     at java.lang.Thread.run(Thread.java:722)

...

2014-01-02 16:07:33.238/48.761 Oracle Coherence GE 12.1.3.0.0 <Error> (thread=DistributedCache:PartitionedPofCache:EventDispatcher, member=3): Exception caught while dispatching to "<cantankerous, com.oracle.handson.CantankerousInterceptor>": Objection falls on deaf ears! value = value: 110
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:74)
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:1)
     at com.tangosol.net.events.internal.NamedEventInterceptor.onEvent(NamedEventInterceptor.java:240)
     at com.tangosol.net.events.internal.AbstractEvent.nextInterceptor(AbstractEvent.java:116)
     at com.tangosol.net.events.internal.AbstractEvent.dispatch(AbstractEvent.java:154)
     at com.tangosol.net.events.internal.AbstractEventDispatcher$1.proceed(AbstractEventDispatcher.java:254)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.service.grid.PartitionedService$Continuations$Task.run(PartitionedService.CDB:6)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.Service$EventDispatcher.onNotify(Service.CDB:26)
     at com.tangosol.coherence.component.util.Daemon.run(Daemon.CDB:51)
     at java.lang.Thread.run(Thread.java:722)
 
...
2014-01-02 16:07:33.649/49.172 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Cluster, member=2): Member(Id=4, Timestamp=2014-01-02 16:07:33.649, Address=10.159.154.103:8094, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:8152, Role=OracleHandsonDriver) left Cluster with senior member 1 
...

11.4 Logging Partition Activity Using an Event Interceptor

In this exercise you will create an event interceptor to log partition events for a partitioned service. To complete this exercise, follow these steps:

  1. Create a Class to Terminate a JVM and to Enable and Disable Logging

  2. Create an Event Interceptor to Log Partition Activity

  3. Create a Class to Exercise the Log Partition Activity Example

  4. Register the Log Partition Activity Event Interceptor

  5. Edit the POF Configuration File

  6. Edit the Driver File for the Log Partition Activity Example

  7. Run the Log Partition Activity Example

11.4.1 Create a Class to Terminate a JVM and to Enable and Disable Logging

Create a class named RedistributionInvocable that defines three actionable states that will be executed on various members of the cluster. For this example, define the states as follows:

  • DISABLE: Disable the logging performed by the RedistributionInterceptor event interceptor.

  • ENABLE: Enable the logging performed by the RedistributionInterceptor event interceptor.

  • KILL: Terminate the JVM that this invocable (RedistributionInvocable) is executed on.

You will create the RedistributionInterceptor event interceptor in a later step.

For the variable that determines whether logging is enabled or disabled, use the AtomicBoolean class. For example:

public static final AtomicBoolean ENABLED = new AtomicBoolean(false) 

To terminate the invocable, the KILL state can simply call System.exit.

The data that the class produces will be sent across the wire, so the class should use POF (Portable Object Format). You will add the class to the POF configuration file in a later step.

To create the RedistributionInvocable class:

  1. Create a new Java class called RedistributionInvocable. Ensure that the Default Package is com.oracle.handson. Do not select the Main Method check box.

    See "Creating a Java Class" for more information.

  2. Write the class to define three different states that can be assigned to an interceptor object. Import the AbstractInvocable, AtomicBoolean, PortableObject, PofReader, and PofWriter classes and interfaces. The RedistributionInvocable class should extend the AbstractInvocable class and implement the PortableObject interface.You can write your own RedistributionInvocable class or use the code provided in Example 11-13.

Example 11-13 Class to Terminate a JVM and to Enable or Disable Logging

package com.oracle.handson;
 
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
 
import com.tangosol.net.AbstractInvocable;
 
import java.io.IOException;
 
import java.util.concurrent.atomic.AtomicBoolean;
 
/**
 * RedistributionInvocable has three states in which appropriate action is
 * taken:
 * <ol>
 *     <li><strong>{@link State#DISABLE}</strong> - Disables the logging
 *     performed by {@link com.oracle.handson.RedistributionInterceptor}.</li>
 *     <li><strong>{@link State#ENABLE}</strong> - Enables the logging
 *     performed by {@link com.oracle.handson.RedistributionInterceptor}.</li>
 *     <li><strong>{@link State#KILL}</strong> - Kills the JVM this invocable
 *     is executed on.</li>
 * </ol>
 *
 *
 * @since 12.1.2
 */
public class RedistributionInvocable
        extends AbstractInvocable
        implements PortableObject
    {
    // ----- constructors ---------------------------------------------------
 
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
 
    /**
     * Default no-arg constructor.
     */
    public RedistributionInvocable()
        {
        this(State.DISABLE);
        }
 
    /**
     * Constructs a RedistributionInvocable with the specified state.
     *
     * @param state  the state indicating the action to be performed
     */
    public RedistributionInvocable(State state)
        {
        m_state = state;
        }
 
    // ----- Invocable methods ----------------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void run()
        {
        switch (m_state)
            {
            case DISABLE:
                ENABLED.set(false);
                break;
            case ENABLE:
                ENABLED.set(true);
                break;
            case KILL:
                System.exit(1);
            }
        }
 
    // ----- PortableObject methods -----------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void readExternal(PofReader in) throws IOException
        {
        m_state = State.values()[in.readInt(0)];
        }
 
    /**
     * {@inheritDoc}
     */
    public void writeExternal(PofWriter out) throws IOException
        {
        out.writeInt(0, m_state.ordinal());
        }
 
    // ----- inner class: State ---------------------------------------------
 
    /**
     * Representation of the action to be performed when
     * {@link RedistributionInvocable#run()}.
     */
    public enum State
        {
        /**
         * Disables the logging performed by
         * {@link com.oracle.handson.RedistributionInterceptor}
         */
        DISABLE,
        /**
         * Enables the logging performed by
         * {@link com.oracle.handson.RedistributionInterceptor}
         */
        ENABLE,
        /**
         * Terminates the JVM process in which the
         * {@link RedistributionInvocable} is executed.
         */
        KILL
        }
 
    // ----- constants ------------------------------------------------------
 
    /**
     * Flag used to determine whether to log partition events.
     */
    public static final AtomicBoolean ENABLED = new AtomicBoolean(false);
 
    // ----- data members ---------------------------------------------------
 
    /**
     * The state used to determine which action to perform.
     */
    private State m_state;
    }

11.4.2 Create an Event Interceptor to Log Partition Activity

Create an event interceptor named RedistributionInterceptor to log partition events for a partitioned service.

To create the RedistributionInterceptor event interceptor:

  1. Create a new Java class called RedistributionInterceptor. Ensure that the Default Package is com.oracle.handson. Do not select the Main Method check box.

    See "Creating a Java Class" for more information.

  2. Write the RedistributionInterceptor class to log partition events. Import the RedistributionInvocable, CacheFactory, EventInterceptor, Interceptor, TransferEvent classes and interfaces. The RedistributionInterceptor class should implement the EventInterceptor<TransferEvent> interface. You can write your own class to log partition events or use the code provided in Example 11-14.

Example 11-14 illustrates an event interceptor to log partition events. A name can be assigned to the interceptor by using the optional identifier attribute in the @Interceptor annotation. The event interceptor determines whether the partition event should be logged by referencing the value of the RedistributionInvocable.ENABLED constant.

Example 11-14 Class to Log Partition Events

package com.oracle.handson;
 
import com.oracle.handson.RedistributionInvocable;
 
import com.tangosol.net.CacheFactory;
 
import com.tangosol.net.events.EventInterceptor;
import com.tangosol.net.events.annotation.Interceptor;
import com.tangosol.net.events.partition.TransferEvent;
 
/**
 * RedistributionInterceptor is an {@link  
 * com.tangosol.net.events.EventInterceptor}
 * that logs partition activity when enabled. Logging can be enabled via
 * setting the {@link RedistributionInvocable#ENABLED} constant.
 *
 * @since Coherence 12.1.2
 */
@Interceptor(identifier = "redist")
public class RedistributionInterceptor
        implements EventInterceptor<TransferEvent>
    {
 
    // ----- EventInterceptor methods ---------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void onEvent(TransferEvent event)
        {
        if (RedistributionInvocable.ENABLED.get())
            {
            CacheFactory.log(String.format("Discovered event %s for partition-id %d from remote member %s\n",
                event.getType(), event.getPartitionId(), event.getRemoteMember()),
                CacheFactory.LOG_INFO);
            }
        }
    }

11.4.3 Create a Class to Exercise the Log Partition Activity Example

Create a class called LogExample to trigger actions to be performed by the RedistributionInterceptor class.

To create the LogExample class:

  1. Create a new Java class called LogExample. Do not include a main method.

    See "Creating a Java Class" for more information.

  2. Write the code for the LogExample class. Import the RedistributionInvocable, RedistributionInvocable.State, CacheFactory, InvocationService, Member, ArrayList, Collection, Random, Set and Callable classes and interfaces. You can write your own code or use the code supplied in Example 11-15.

Example 11-15 illustrates a sample implementation of the LogExample class. The example contains the subclass RedistributionEventsExample subclass which triggers actions to be performed by the RedistributionInterceptor class. The subclass illustrates how partition redistribution events can be logged. At least two cluster members must be running to run this example.

Example 11-15 Sample Class to Exercise the Log Partition Activity Example

package com.oracle.handson;
 
import com.oracle.handson.RedistributionInvocable;
import com.oracle.handson.RedistributionInvocable.State;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.InvocationService;
import com.tangosol.net.Member;
 
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
 
/**
 * LogExample illustrates logging of partition movement when enabled.
 *
 * @since Coherence 12.1.2
 */
@SuppressWarnings("unchecked")
public class LogExample
    {
 
    /**
     * The RedistributionEventsExample is a catalyst for action to be
     * performed by {@link RedistributionInterceptor}. This illustrates how
     * partition redistribution events can be logged.
     */
    public static class RedistributionEventsExample
            implements Callable<Boolean>
        {
 
        // ----- Callable methods -------------------------------------------
 
        /**
         * {@inheritDoc}
         */
        public Boolean call() throws Exception
            {
            // transfer events
            try
                {
                InvocationService is       = (InvocationService) CacheFactory.getService("InvocationService");
                Random            rnd      = new Random();
                int               cMembers = is.getInfo().getServiceMembers().size();
 
                if (cMembers < 3)
                    {
                    System.err.println("<Error> At least two members must exist for the RedistributionEvent example");
                    return false;
                    }
 
                // enable the logging of transfer event
                is.query(new RedistributionInvocable(State.ENABLE), null);
 
                Set<Member> isMembers = is.getInfo().getServiceMembers();
                isMembers.remove(is.getCluster().getLocalMember());
 
                Member memChosen = new ArrayList<Member>(isMembers).get(rnd.nextInt(isMembers.size()));
 
                System.out.printf("Choosing to kill member %s\n", memChosen);
                is.query(new RedistributionInvocable(State.KILL), Collections.singleton(memChosen));
                }
            finally
                {
                }
 
            return true;
            }
        }
    }

11.4.4 Register the Log Partition Activity Event Interceptor

The event interceptors can be registered either programmatically or by including references to them in the cache configuration file.

In the UEMEvents project, the interceptors are registered in the cache configuration file. The fully-qualified class name of the event interceptor is specified in the <interceptor> element. The interceptor is associated with the cache specified in the <cache-name> element.

For the log partition events example, the event interceptor, RedistributionInterceptor, is registered on the partitioned cache service under the <distributed-scheme> element.

To edit the cache configuration file to define the RedistributionInterceptor event interceptor:

  1. Open the uem-cache-config.xml file from the Project Explorer window. You can find the file under Events/appClientModule.

  2. Write the cache configuration that calls the event interceptors. Under the <distributed-scheme> element, there should be a reference to the fully-qualified RedistributionInterceptor class in the <interceptor> element.

Example 11-16 illustrates a possible implementation for the uem-cache-config.xml file. The configuration for the RedistributionInterceptor event interceptor is illustrated in bold font.

Example 11-16 Cache Configuration File with Event Interceptors

<?xml version="1.0"?>
 
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
  <defaults>
    <serializer>pof</serializer>
  </defaults>
 
  <caching-scheme-mapping>
    <cache-mapping>
        <cache-name>events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.TimedTraceInterceptor</class-name>
                    <init-params>
                        <init-param>
                            <param-type>int</param-type>
                            <param-value>100</param-value>
                        </init-param>
                    </init-params>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>vetod-events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.CantankerousInterceptor</class-name>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>events-results</cache-name>
        <scheme-name>dist-events-results</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>
 
  <caching-schemes>
    <distributed-scheme>
      <scheme-name>events-distributed-scheme</scheme-name>
      <service-name>PartitionedPofCache</service-name>
      <thread-count>5</thread-count>
      <backing-map-scheme>
        <local-scheme>
          <!-- each node will be limited to 32MB -->
          <high-units>32M</high-units>
          <unit-calculator>binary</unit-calculator>
        </local-scheme>
      </backing-map-scheme>
      <autostart>true</autostart>
      <interceptors>
          <interceptor>
              <instance>
                  <class-name>com.oracle.handson.RedistributionInterceptor</class-name>
              </instance>
          </interceptor>
      </interceptors>
    </distributed-scheme>
 
    <!-- A PartitionedCache service used to store results for events examples
      -->
    <distributed-scheme>
        <scheme-name>dist-events-results</scheme-name>
        <service-name>PartitionedEventsResults</service-name>
        <thread-count>5</thread-count>
        <backing-map-scheme>
            <local-scheme/>
        </backing-map-scheme>
        <autostart>true</autostart>
    </distributed-scheme>
 
    <!--
    Invocation Service scheme.
    -->
    <invocation-scheme>
      <scheme-name>examples-invocation</scheme-name>
      <service-name>InvocationService</service-name>
 
      <autostart system-property="tangosol.coherence.invocation.autostart">true</autostart>
    </invocation-scheme>
 
 </caching-schemes>
</cache-config>

11.4.5 Edit the POF Configuration File

With the exception of the RedistributionInvocable class, all of the information produced by the classes in the log partition activity exercise remain on their own cluster members. The information produced by the RedistributionInvocable class however, will be sent across the wire to other cluster members. Thus, it must be added to the POF configuration file.

To edit the POF configuration file for the RedistributionInvocable data type:

  1. Open the uem-pof-config.xml file. You can find the file under UEMEvents/appClientModule/META-INF.

  2. Define <user-type> elements for the com.oracle.handson.RedistributionInvocable, class and assign type ID 1009 to it. The file must include the coherence-pof-config.xml file which reserves the first 1000 IDs for Coherence data types.

Example 11-17 illustrates a sample uem-pof-config.xml file. The configuration for the RedistributionInvocable class is illustrated in bold font.

Example 11-17 POF Configuration File for the Log Partition Events Example

<?xml version="1.0"?>
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
    xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config http://xmlns.oracle.com/coherence/coherence-pof-config/1.2/coherence-pof-config.xsd">
    <user-type-list>
    <!-- include all "standard" Coherence POF user types -->
    <include>coherence-pof-config.xml</include>
    <user-type>
      <type-id>1008</type-id>
      <class-name>com.oracle.handson.LazyProcessor</class-name>
    </user-type>
    <user-type>
      <type-id>1009</type-id>
      <class-name>com.oracle.handson.RedistributionInvocable</class-name>
    </user-type>
    </user-type-list>
</pof-config>

11.4.6 Edit the Driver File for the Log Partition Activity Example

Edit the driver file that you created in "Create a Driver File for Timed Events Example" to run the RedistributionEventsExample example defined in the LogExample class.

To edit the driver file:

  1. Replace the import statement for the Vetoed Events example:

    import com.oracle.handson.VetoExample.VetoedEventsExample; 
    

    with an import statement for the RedistributionEventsExample subclass of the LogExample class.

    import com.oracle.handson.LogExample.RedistributionEventsExample;
    
  2. Replace the command which calls the Vetoed Events example

    EVENTS_EXAMPLES.put("veto interceptor", new VetoedEventsExample());
    

    with the following command to run the RedistributionEventsExample example defined in the LogExample class:

    EVENTS_EXAMPLES.put("redistribution interceptor", new RedistributionEventsExample());
    

11.4.7 Run the Log Partition Activity Example

Right-click the UEMEvents project in the Project Explorer and select Run As, then Run Configurations. In the Run Configurations dialog box, select the UEMEventsServer launch configuration that you created in "Create a Cache Server Startup Configuration". Click Run to start the cache server. After the cache server starts, select UEMEventsServer and click Run a second and a third time to start a total of three cache servers.

After the third cache server starts, select the UEMEventDriver configuration and click Run. The output from the cache client should look similar to Example 11-18.

Example 11-18 Output from the Cache Client

------ events examples begin ------
------   redistribution interceptor begin
2014-01-02 16:38:38.901/0.350 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational configuration from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence.xml" 

...

2014-01-02 16:38:43.249/4.698 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Invocation:InvocationService, member=4): Service InvocationService joined the cluster with senior service member 1
Choosing to kill member Member(Id=3, Timestamp=2014-01-02 16:38:17.942, Address=10.159.154.103:8092, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:8168, Role=CoherenceServer) 
Choosing to kill member Member(Id=3, Timestamp=2014-01-02 16:38:17.942, Address=10.159.154.103:8092, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:8168, Role=CoherenceServer)
------   redistribution interceptor completed successfully
------ events examples completed------
2014-01-02 16:38:43.276/4.725 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Cluster, member=4): TcpRing disconnected from Member(Id=3, Timestamp=2014-01-02 16:38:17.942, Address=10.159.154.103:8092, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:8168, Role=CoherenceServer) due to a peer departure; removing the member. 

...

Example 11-19 displays the output from the cache server that was terminated. The output illustrates the client joining the cluster (Member 4) and the shutdown of the current cache server.

Example 11-19 Output from First Cache Server

...
2014-01-02 16:38:20.701/6.178 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedEventsResults, member=3): Transferring 1B of backup[1] for PartitionSet{0} to member 2
2014-01-02 16:38:42.299/27.776 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Cluster, member=3): Member(Id=4, Timestamp=2014-01-02 16:38:42.09, Address=10.159.154.103:8094, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:1620, Role=OracleHandsonDriver) joined Cluster with senior member 1
2014-01-02 16:38:42.495/27.972 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Invocation:Management, member=3): Member 4 joined Service Management with senior member 1
2014-01-02 16:38:43.254/28.731 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Invocation:InvocationService, member=3): Member 4 joined Service InvocationService with senior member 1
2014-01-02 16:38:43.254/28.731 Oracle Coherence GE 12.1.3.0.0 <D4> (thread=ShutdownHook, member=1): ShutdownHook: stopping cluster node