7 Listening for Changes and Modifying Data

In this exercise, you set up listeners to observe data changes within a Coherence cache. It highlights the use of the Coherence ObservableMap, MapEvent, EventListener, EntryProcessor, and AbstractMapListener APIs. You also learn how entry processors can be used to modify and process entries in the Coherence cache.

This chapter contains the following sections:

7.1 Introduction

The com.tangosol.util.ObservableMap interface enables you to observe and act on the changes made to cache entries. It extends java.util.EventListener and uses the standard JavaBeans event model. All types of NamedCache instances implement this interface. To listen for an event, you register a MapListener (com.tangosol.util.MapListener) instance on the cache. MapListener instances are called on the client; that is, the listener code is executed in your client process.

There are multiple ways to listen for events:

  • Listen for all events

  • Listen for all events that satisfy a filter

  • Listen for events on a particular object key

These listener tasks can be performed on a NamedCache by the addMapListener methods listed in Example 7-1.

Example 7-1 Listener Methods on a NamedCache

void addMapListener(MapListener listener) 

void addMapListener(MapListener listener, Filter filter, boolean fLite) 

void addMapListener(MapListener listener, Object oKey, boolean fLite)

The com.tangosol.util.MapEvent class captures the object key, and the old and new values. You can specify a Lite event, in which the new and old values might not be present. Example 7-2 describes a pattern for registering these methods against a NamedCache. This has been done as an anonymous class. You can use the getOldValue or getNewValue methods in the MapEvent class to get the entry for which the event gets fired.

Example 7-2 Code Pattern for Registering an Event

namedCache.addMapListener(new MapListener() {
public void entryDeleted(MapEvent mapEvent) 
    {
    // TODO... handle deletion event 
    } 
public void entryInserted(MapEvent mapEvent) 
    {
    // TODO... handle inserted event 
    } 
public void entryUpdated(MapEvent mapEvent) 
    { 
     // TODO... handle updated event } }
    )

7.2 Creating a Cache Listener

This section describes how to create a Java class that listens on a NamedCache and responds to any changes it detects.

  1. Create a Class to Listen for Changes in the Cache

  2. Run the Cache Listener Example

7.2.1 Create a Class to Listen for Changes in the Cache

In the Loading project, create the class that will listen for a new Contact object entry.Name the class ObserverExample and ensure that it has a main method. See "Creating a Java Class" for detailed information.

Within this class, add a listener to display a message whenever a new Contact is updated to the cache. For example, use the following code to keep the Java process running until you read from the console. Otherwise, your program will immediately exit.

BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();

Within the class, create an inner class to extend AbstractMapListner. Implement the methods to insert, update, and delete the cache values. In this case, most of the work should be done in the entryUpdated method, based on the old and new values contained in a MapEvent.

Example 7-3 illustrates a possible implementation of a listener class.

Example 7-3 Sample Listener Class

package com.oracle.handson;
 
import com.tangosol.net.NamedCache;
 
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.MapEvent;
 
import com.oracle.handson.Contact;
 
import com.tangosol.net.CacheFactory;
 
import java.io.IOException;
 
 
/**
 * ObserverExample observes changes to contacts.
 */
public class ObserverExample
    {
    
    public ObserverExample() 
    {
    }
    // ----- ObserverExample methods -------------------------------------

    public static void main(String[] args) {
      NamedCache cache = CacheFactory.getCache("ContactsCache");
      new ObserverExample().observe(cache);
        try {
            System.in.read();
        } catch (IOException e) {
        }
    }
    /**
    * Observe changes to the contacts.
    *
    * @param cache  target cache
    */
    public void observe(NamedCache cache)
        {
        cache.addMapListener(new ContactChangeListener());
        }
 
    // ----- inner class: ContactChangeListener -------------------------
 
    public class ContactChangeListener
            extends AbstractMapListener
        {
        // ----- MapListener interface ------------------------------------------

        public void entryInserted(MapEvent event)
            {
            System.out.println(event);
            }
 
        public void entryUpdated(MapEvent event)
            {
            Contact contactOld = (Contact)event.getOldValue();
            Contact contactNew = (Contact)event.getNewValue();
            StringBuffer sb  = new StringBuffer();
 
            if (!contactOld.getHomeAddress().equals(
                    contactNew.getHomeAddress()))
                {
                sb.append("Home address ");
                }
            if (!contactOld.getWorkAddress().equals(
                    contactNew.getWorkAddress()))
                {
                sb.append("Work address ");
                }
            if (!contactOld.getTelephoneNumbers().equals(
                    contactNew.getTelephoneNumbers()))
                {
                sb.append("Telephone ");
                }
            if (contactOld.getAge() != contactNew.getAge())
                {
                sb.append("Birthdate ");
                }
            sb.append("was updated for ").append(event.getKey());
            System.out.println(sb);
            }
 
        public void entryDeleted(MapEvent event)
            {
            System.out.println(event.getKey());
            }
        }
    }

7.2.2 Run the Cache Listener Example

To run the Cache Listener example:

  1. Create a run configuration for ObserverExample. Right click ObserverExample in the Project Explorer and select Run As. In the Run Configurations dialog box select Oracle Coherence and click the New Configuration icon.

    1. In the Name field, enter ObserverExample.

    2. In the Project field in the Main tab, enter Loading. In the Main class field, enter com.oracle.handson.ObserverExample.

    3. In the General tab of the Coherence tab, browse to the c:\home\oracle\workspace\Contacts\appClientModule\contacts-cache-config.xml file in the Cache configuration descriptor field. Select the Disabled (cache client) button. Enter 3155 in the Cluster port field. Click Apply.

      In the Other tab, scroll down to the tangosol.pof.config field. Enter the absolute path to the POF configuration file contacts-pof-config.xml. Click Apply.

    4. In the Common tab, select Shared file and browse for the Loading directory.

  2. Stop any running cache servers. See "Stopping Cache Servers" for more information.

  3. Start the ContactsCacheServer.

  4. Load the cache by running the LoaderExample program from Eclipse. If you now run the ObserverExample, the program waits for input, as illustrated in Example 7-4.

    In "Responding to Changes in the Cache", you create a program that modifies entries in the cache and returns the changed records.

    Example 7-4 Listener Program Waiting for Events

    ...
    MasterMemberSet(
      ThisMember=Member(Id=3, Timestamp=2012-08-17 13:51:15.468, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3964, Role=OracleHandsonObserverExample)
      OldestMember=Member(Id=1, Timestamp=2012-08-17 13:50:44.093, Address=10.159.220.86:8088, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3344, Role=CoherenceServer)
      ActualMemberSet=MemberSet(Size=2
        Member(Id=1, Timestamp=2012-08-17 13:50:44.093, Address=10.159.220.86:8088, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3344, Role=CoherenceServer)
        Member(Id=3, Timestamp=2012-08-17 13:51:15.468, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3964, Role=OracleHandsonObserverExample)
        )
      MemberId|ServiceVersion|ServiceJoined|MemberState
        1|12.1.2|2012-08-17 13:50:44.093|JOINED,
        3|12.1.2|2012-08-17 13:51:15.468|JOINED
      RecycleMillis=1200000
      RecycleSet=MemberSet(Size=0
        )
      )
     
    TcpRing{Connections=[1]}
    IpMonitor{Addresses=0}
     
    2012-08-17 13:51:15.890/2.031 Oracle Coherence GE 12.1.2.0 <D5> (thread=Invocation:Management, member=3): Service Management joined the cluster with senior service member 1
    2012-08-17 13:51:15.921/2.062 Oracle Coherence GE 12.1.2.0 <Info> (thread=main, member=3): Loaded Reporter configuration from "jar:file:/C:/oracle/Middleware/Oracle_Home/coherence/lib/coherence.jar!/reports/report-group.xml"
    2012-08-17 13:51:16.046/2.187 Oracle Coherence GE 12.1.2.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded POF configuration from "file:/C:/home/oracle/workspace/Contacts/build/classes/contacts-pof-config.xml"
    2012-08-17 13:51:16.109/2.250 Oracle Coherence GE 12.1.2.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded included POF configuration from "jar:file:/C:/oracle/Middleware/Oracle_Home/coherence/lib/coherence.jar!/coherence-pof-config.xml"
    2012-08-17 13:51:16.187/2.328 Oracle Coherence GE 12.1.2.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=3): Service PartitionedPofCache joined the cluster with senior service member 1
    

7.3 Responding to Changes in the Cache

In this section, you will create a Java class to modify entries in the cache and return the changed records.

Until now, to perform actions on the entries in a cache, you used the put and get operations. However, there is a better way to perform operations on data that ensure consistent behavior when concurrent data access is required. Entry processors (com.tangosol.util.InvocableMap.EntryProcessor) are agents that perform processing against entries. The entries are processed directly where the data is being held. The processing you perform can change the data: it can create, update, remove data, or only perform calculations. The processing can occur in parallel in a partitioned cache with multiple nodes, so it is scalable. Processing in the cache also saves I/O expense because data is not pulled to the client for processing.

Entry processors that work on the same key are logically queued. This allows lock-free (high performance) processing. The com.tangosol.util.InvocableMap interface (which the NamedCache implements) has the following methods for operating on data:

Object invoke(Object oKey, InvocableMap.EntryProcessor processor), which invokes the passed EntryProcessor against an individual object and returns the result of the invocation.

Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor), which invokes the EntryProcessor against the collection of keys and returns the result for each invocation.

Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor), which invokes the EntryProcessor against the entries that match the filter and returns the result for each invocation.

Note:

EntryProcessor classes must be available in the class path for each cluster node.

To create an entry process, you can extend com.tangosol.util.processes.AbstractProcessor and implement the process() method. For example, the following code creates an EntryProcessor instance to change the work address of employees in the Contacts data set:

public static class OfficeUpdater extends AbstractProcessor
        implements PortableObject
  ...
    public Object process(InvocableMap.Entry entry)  
        {
        Contact contact = (Contact) entry.getValue();
        contact.setWorkAddress(m_addrWork);
        entry.setValue(contact);
        return null;
        }

To invoke the OfficeUpdater class, you can use the invokeAll method with the name of the OfficeUpdater class as one of its arguments.

cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
             new OfficeUpdater(addrWork));

In this exercise, you create a Java class with EntryProcessor instances that update entries in the cache. The ObserverExample class created in "Create a Class to Listen for Changes in the Cache" will detect these changes and display the changed records.

  1. Create a Class to Update Entries in the Cache

  2. Edit the POF Configuration File

  3. Run the Cache Update Example

7.3.1 Create a Class to Update Entries in the Cache

To create a file to update entries in the cache:

  1. Create a class that updates entries in the cache.

    In the Loading project, create a class called ProcessorExample with a main method that updates the address of a Contact object in the cache. See "Creating a Java Class" for detailed information.

  2. Write code to find the records of the Contacts object that live in Massachusetts and update their work addresses to an in-state office.

    Include an inner class that implements the PortableObject interface (for serializing and deserializing data from the cache) and contains an EntryProcessor instance to set the work addresses. Use methods from the Filter class to isolate the Contacts members whose home addresses are in Massachusetts.

    Example 7-5 illustrates a possible implementation of the ProcessorExample class.

    Example 7-5 Sample Program to Update an Object in the Cache

    package com.oracle.handson;
    
    import com.tangosol.net.NamedCache;
    
    import com.tangosol.util.filter.EqualsFilter;
    import com.tangosol.util.processor.AbstractProcessor;
    import com.tangosol.util.InvocableMap;
    
    import com.tangosol.io.pof.PortableObject;
    import com.tangosol.io.pof.PofReader;
    import com.tangosol.io.pof.PofWriter;
    
    import com.oracle.handson.Address;
    import com.oracle.handson.Contact;
    
    import com.tangosol.net.CacheFactory;
    
    import java.io.IOException;
    
    /**
    * ProcessorExample executes an example EntryProcessor.
    *
    */
    public class ProcessorExample
        {
        public ProcessorExample() 
        {
        }
            public static void main(String[] args) 
              {
              NamedCache cache = CacheFactory.getCache("ContactsCache");
              new ProcessorExample().execute(cache);
              }
        // ----- ProcessorExample methods -----------------------------------
    
        public void execute(NamedCache cache)
            {
            // People who live in Massachusetts moved to an in-state office
            Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.",
                    "Boston", "MA", "02116", "US");
    
            cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
                    new OfficeUpdater(addrWork));
            }
        // ----- nested class: OfficeUpdater ------------------------------------
    
        /**
        * OfficeUpdater updates a contact's office address.
        */
        public static class OfficeUpdater
                extends AbstractProcessor
                implements PortableObject
            {
            // ----- constructors -------------------------------------------
    
            /**
            * Default constructor (necessary for PortableObject implementation).
            */
            public OfficeUpdater() 
                {
                }
    
            public OfficeUpdater(Address addrWork) 
               {
                m_addrWork = addrWork;
                }
    
            // ----- InvocableMap.EntryProcessor interface ------------------
    
            public Object process(InvocableMap.Entry entry)
                {
                Contact contact = (Contact) entry.getValue();
    
                contact.setWorkAddress(m_addrWork);
                entry.setValue(contact);
                System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName());
                return null;
                }
    
            // ----- PortableObject interface -------------------------------
    
            public void readExternal(PofReader reader)
                    throws IOException
                {
                m_addrWork = (Address) reader.readObject(0);
                }
    
            public void writeExternal(PofWriter writer)
                    throws IOException
                {
                writer.writeObject(0, m_addrWork);
                }
    
            // ----- data members -------------------------------------------
    
            private Address m_addrWork;
            }
        }
    

7.3.2 Edit the POF Configuration File

Edit the contacts-pof-config.xml file to add a user type ID for the OfficeUpdater entries. In this case, add the type ID 106 for the ProcessorExample$OfficeUpdater class.

...
<user-type>
       <type-id>1006</type-id>
      <class-name>com.oracle.handson.
 ProcessorExample$OfficeUpdater</class-name>
</user-type>
...

7.3.3 Run the Cache Update Example

To run the cache update example.

  1. Create a run configuration for ProcessorExample. Right click ObserverExample in the Project Explorer and select Run As. In the Run Configurations dialog box select Oracle Coherence and click the New Configuration icon

    • In the Name field, enter ProcessorExample.

    • In the Project field in the Main tab, enter Loading. In the Main class field, enter com.oracle.handson.ProcessorExample.

    • In the General tab of the Coherence tab, browse to the c:\home\oracle\workspace\Contacts\appClientModule\contacts-cache-config.xml file in the Cache configuration descriptor field. Select the Disabled (cache client) button. Enter 3155 in the Cluster port field. Click Apply.

      In the Other tab, scroll down to the tangosol.pof.config field. Enter the absolute path to the POF configuration file contacts-pof-config.xml. Click Apply.

    • In the Common tab, select Shared file and browse for the Loading directory.

  2. Perform the following steps to test the ObserverExample and ProcessorExample classes.

    1. Stop any running cache servers. See "Stopping Cache Servers" for more information.

    2. Restart the ContactsCacheServer.

    3. Run the LoaderExample class to load the cache.

    4. Run the ObserverExample class.

    5. Run the ProcessorExample to update records in the cache.

      You should see messages in the cache server console window, that are similar to Example 7-6, indicating that the work addresses for the specified employees were updated.

      Example 7-6 Output from the ObserverExample and ProcessorExample Classes

      ...
      Started DefaultCacheServer...
       
      2012-08-17 14:25:40.531/31.953 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=2, Timestamp=2012-08-17 14:25:40.366, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) joined Cluster with senior member 1
      2012-08-17 14:25:40.765/32.187 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 joined Service Management with senior member 1
      2012-08-17 14:25:41.078/32.500 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 joined Service PartitionedPofCache with senior member 1
      2012-08-17 14:25:41.421/32.843 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 left service Management with senior member 1
      2012-08-17 14:25:41.437/32.859 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 left service PartitionedPofCache with senior member 1
      2012-08-17 14:25:41.453/32.875 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): TcpRing disconnected from Member(Id=2, Timestamp=2012-08-17 14:25:40.366, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) due to a peer departure; removing the member.
      2012-08-17 14:25:41.453/32.875 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=2, Timestamp=2012-08-17 14:25:41.453, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) left Cluster with senior member 1
      2012-08-17 14:25:58.875/50.297 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=3, Timestamp=2012-08-17 14:25:58.695, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4576, Role=OracleHandsonObserverExample) joined Cluster with senior member 1
      2012-08-17 14:25:59.109/50.531 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 3 joined Service Management with senior member 1
      2012-08-17 14:25:59.390/50.812 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 3 joined Service PartitionedPofCache with senior member 1
      2012-08-17 14:26:16.656/68.078 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=4, Timestamp=2012-08-17 14:26:16.453, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) joined Cluster with senior member 1
      2012-08-17 14:26:16.906/68.328 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 joined Service Management with senior member 1
      2012-08-17 14:26:17.218/68.640 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 joined Service PartitionedPofCache with senior member 1
      Work address was updated for John Lfyovf
      Work address was updated for John Jptrpajked
      Work address was updated for John Mtqdln
      Work address was updated for John Mstaugiw
      Work address was updated for John Olfezqse
      Work address was updated for John Qjefjgtgj
      Work address was updated for John Kuhgkzn
      Work address was updated for John Jpby
      Work address was updated for John Cekuea
      Work address was updated for John Guhkam
      Work address was updated for John Ijwj
      Work address was updated for John Trlb
      Work address was updated for John Hnfcwxjq
      Work address was updated for John Kizifh
      Work address was updated for John Rqlhgboi
      Work address was updated for John Ipphab
      2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): TcpRing disconnected from Member(Id=4, Timestamp=2012-08-17 14:26:16.453, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) due to a peer departure; removing the member.
      2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 left service Management with senior member 1
      2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 left service PartitionedPofCache with senior member 1
      2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=4, Timestamp=2012-08-17 14:26:17.312, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) left Cluster with senior member 1