Skip Headers
Oracle® Coherence Tutorial for Oracle Coherence
Release 3.5

Part Number E14527-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
View PDF

5 Listening for Changes and Modifying Data

In this chapter, you set up listeners to observe data changes within a NamedCache. You also learn how EntryProcessors can be used to modify and perform processing on entries in the Coherence cache. This chapter contains the following sections:

5.1 Introduction

The com.tangosol.util.ObservableMap interface enables you to observe and take action on the changes made to cache entries. It extends java.util.EventListener and uses the standard bean event model within Java. All types of NamedCaches implement this interface. To listen for an event, you register a MapListener (com.tangosol.util.MapListener) on the cache. MapListeners are called on the client; that is, the listener code is executed in your client process.

There are three ways to listen for events:

The methods listed in Example 5-1 (which implement the preceding list) can be used on a NamedCache:

Example 5-1 Listener Methods on a NamedCache

void addMapListener(MapListener listener) 

void addMapListener(MapListener listener, Filter filter, boolean fLite) 

void addMapListener(MapListener listener, Object oKey, boolean fLite)

The com.tangosol.util.MapEvent class captures the object key, and the old and new values. You can specify a "Lite" event, in which the new and old values may not be present. Example 5-2 describes a pattern for registering these methods against a NamedCache. This has been done as an anonymous class.

Example 5-2 Code Pattern for Registering an Event

namedCache.addMapListener(new MapListener() {
public void entryDeleted(MapEvent mapEvent) {
      // TODO... handle deletion event 
} 
public void entryInserted(MapEvent mapEvent) {
    // TODO... handle inserted event 
} 
public void entryUpdated(MapEvent mapEvent) 
{ 
     // TODO... handle updated event } });

You can use the getOldValue() or getNewValue() methods in the preceding MapEvent class to get the entry for which the event gets fired.

5.2 Creating a Cache Listener

This section describes how to create a Java class that listens on a NamedCache and responds to any changes it detects.

  1. In the Loading project, create a new class that listens for a new Contact object entry.

    Name the class ObserverExample and ensure that it has a main method. See "Creating a Java Class" if you need detailed information.

  2. Within this class, add a listener to display a message whenever a new Contact is updated to the cache.

    Hint: Use the following code to keep the Java process running until you read from the console; otherwise your program exits immediately.

    BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();
    

    Example 5-3 illustrates a possible solution.

    Example 5-3 Sample Listener Class

    package com.oracle.coherence.handson;
     
    import com.tangosol.net.NamedCache;
     
    import com.tangosol.util.AbstractMapListener;
    import com.tangosol.util.MapEvent;
     
    import com.oracle.coherence.handson.Contact;
     
    import com.tangosol.net.CacheFactory;
     
    import java.io.IOException;
     
     
    /**
     * ObserverExample observes changes to contacts.
     */
    public class ObserverExample
        {
        
        public ObserverExample() {
        }
        // ----- ObserverExample methods -------------------------------------
    
        public static void main(String[] args) {
          NamedCache cache = CacheFactory.getCache("ContactsCache");
          new ObserverExample().observe(cache);
            try {
                System.in.read();
            } catch (IOException e) {
            }
        }
        /**
        * Observe changes to the contacts.
        *
        * @param cache  target cache
        */
        public void observe(NamedCache cache)
            {
            cache.addMapListener(new ContactChangeListener());
            }
     
        // ----- inner class: ContactChangeListener -------------------------
     
        public class ContactChangeListener
                extends AbstractMapListener
            {
            // ----- MapListener interface ------------------------------------------
    
            public void entryInserted(MapEvent event)
                {
                System.out.println(event);
                }
     
            public void entryUpdated(MapEvent event)
                {
                Contact contactOld = (Contact)event.getOldValue();
                Contact contactNew = (Contact)event.getNewValue();
                StringBuffer sb  = new StringBuffer();
     
                if (!contactOld.getHomeAddress().equals(
                        contactNew.getHomeAddress()))
                    {
                    sb.append("Home address ");
                    }
                if (!contactOld.getWorkAddress().equals(
                        contactNew.getWorkAddress()))
                    {
                    sb.append("Work address ");
                    }
                if (!contactOld.getTelephoneNumbers().equals(
                        contactNew.getTelephoneNumbers()))
                    {
                    sb.append("Telephone ");
                    }
                if (contactOld.getAge() != contactNew.getAge())
                    {
                    sb.append("Birthdate ");
                    }
                sb.append("was updated for ").append(event.getKey());
                System.out.println(sb);
                }
     
            public void entryDeleted(MapEvent event)
                {
                System.out.println(event.getKey());
                }
            }
        }
    
  3. To enable the console input, you must perform the following:

    —Right-click the Loading project and select Project Properties.

    —Select Run/Debug/Profile at the left.

    —Click the Edit button at the right and click Tool Settings. Ensure that the Allow Program Input check box in the Edit Run Configuration dialog box is selected.

    —Click OK in the Edit Run Configuration dialog box and in the Project Properties dialog box to save your changes.

  4. Turn off local storage if it is not already disabled.

    —Right-click the Loading project and select Project Properties.

    —Select Run/Debug/Profile at the left.

    —Select the Default run configuration and click Edit. In Java Options field, add the command to disable local storage.

    -Dtangosol.coherence.distributed.localstorage=false
    
  5. Edit the contacts-cache-server.cmd file to add the classes for the Loading project to the classpath if it is not already there.

    C:\home\oracle\labs\Loading\classes
    
  6. Edit the contacts-pof-config.xml file to add a user type ID for the OfficeUpdater entries

    ...
    <user-type>
           <type-id>1006</type-id>
          <class-name>com.oracle.coherence.handson.
     ProcessorExample$OfficeUpdater</class-name>
    </user-type>
    ...
    
  7. Start the cache server if it is not already running.

  8. Load the cache by running the LoaderExample program from JDeveloper. If you now run the ObserverExample, you will see the program wait for input.

    In the next section, you will create a program that modifies entries in the cache and returns the changed records.

    Figure 5-1 Listener Program Waiting for Events

    Listener Program Waiting for Events

5.3 Responding to Changes in the Cache

In this section, you create a Java class to modify entries in the cache and return the changed records.

Until now, to perform actions on the entries in a cache, you used the put and get operations. However, there is a better way to perform operations on data that ensure consistent behavior when concurrent data access is required. EntryProcessors (com.tangosol.util.InvocableMap.EntryProcessor) are agents that perform processing against entries. The entries will be processed directly where the data is being held. The processing you perform may change the data: it may create, update, remove data, or only perform calculations. The processing can occur in parallel in a partitioned cache with multiple nodes, so it is scalable. Processing in the cache also saves the expense of I/O because data does not have to be retrieved to the client for processing.

EntryProcessors that work against the same key are logically queued. This allows lock-free (high performance) processing. The com.tangosol.util.InvocableMap interface (which the NamedCache implements) has the following methods for operating on data:

Object invoke(Object oKey, InvocableMap.EntryProcessor processor)—Invokes the passed EntryProcessor against an individual object and returns the result of the invocation.

Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)—Invokes the EntryProcessor against the collection of keys and returns the result for each invocation.

Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)—Invokes the EntryProcessor against the entries that match the filter and returns the result for each invocation.

Note:

EntryProcessor classes must be available in the classpath for each cluster node.

To create an entry process, you can extend com.tangosol.util.processes.AbstractProcessor and implement the process() method. For example, the following code creates an EntryProcessor to change the work address of employees in the Contacts data set:

public static class OfficeUpdater extends AbstractProcessor
        implements PortableObject
  ...
    public Object process(InvocableMap.Entry entry)  {
        Contact contact = (Contact) entry.getValue();
        contact.setWorkAddress(m_addrWork);
        entry.setValue(contact);
        return null;
        }

To invoke the OfficeUpdater class, you perform the following:

cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
             new OfficeUpdater(addrWork));

In this exercise, you create a Java class with EntryProcessors that updates entries in the cache. The ObserverExample class created in the previous exercise will detect these changes and display the changed records.

  1. Create a class that updates entries in the cache.

    In the Loading project, create a class called ProcessorExample with a main method that updates the address of a Contact object in the cache. See "Creating a Java Class" if you need detailed information.

  2. Write code to find the records of Contacts that live in Massachusetts and update their work address to an in-state office.

    Hint: include a subclass that implements PortableObject (for serializing and deserializing data from the cache) and contains an EntryProcessor to set the work addresses. Use methods from the Filter class to isolate the Contacts whose home address is in Massachusetts.

    Example 5-4 illustrates possible code for the PersonEventTester class.

    Example 5-4 Sample Program to Update an Object in the Cache

    package com.oracle.coherence.handson;
    
    import com.tangosol.net.NamedCache;
    
    import com.tangosol.util.filter.EqualsFilter;
    import com.tangosol.util.processor.AbstractProcessor;
    import com.tangosol.util.InvocableMap;
    
    import com.tangosol.io.pof.PortableObject;
    import com.tangosol.io.pof.PofReader;
    import com.tangosol.io.pof.PofWriter;
    
    import com.oracle.coherence.handson.Address;
    import com.oracle.coherence.handson.Contact;
    
    import com.tangosol.net.CacheFactory;
    
    import java.io.IOException;
    
    /**
    * ProcessorExample executes an example EntryProcessor.
    *
    */
    public class ProcessorExample
        {
        public ProcessorExample() {
            
        }
            public static void main(String[] args) {
              NamedCache cache = CacheFactory.getCache("ContactsCache");
              new ProcessorExample().execute(cache);
            }
        // ----- ProcessorExample methods -----------------------------------
    
        public void execute(NamedCache cache)
            {
            // People who live in Massachusetts moved to an in-state office
            Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.",
                    "Boston", "MA", "02116", "US");
    
            cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
                    new OfficeUpdater(addrWork));
            }
        // ----- nested class: OfficeUpdater ------------------------------------
    
        /**
        * OfficeUpdater updates a contact's office address.
        */
        public static class OfficeUpdater
                extends AbstractProcessor
                implements PortableObject
            {
            // ----- constructors -------------------------------------------
    
            /**
            * Default constructor (necessary for PortableObject implementation).
            */
            public OfficeUpdater() {
                }
    
            public OfficeUpdater(Address addrWork) {
                m_addrWork = addrWork;
                }
    
            // ----- InvocableMap.EntryProcessor interface ------------------
    
            public Object process(InvocableMap.Entry entry)
                {
                Contact contact = (Contact) entry.getValue();
    
                contact.setWorkAddress(m_addrWork);
                entry.setValue(contact);
                return null;
                }
    
            // ----- PortableObject interface -------------------------------
    
            public void readExternal(PofReader reader)
                    throws IOException
                {
                m_addrWork = (Address) reader.readObject(0);
                }
    
            public void writeExternal(PofWriter writer)
                    throws IOException
                {
                writer.writeObject(0, m_addrWork);
                }
    
            // ----- data members -------------------------------------------
    
            private Address m_addrWork;
            }
        }
    
  3. Perform the following steps to test the ObserverExample and ProcessorExample classes.

    —Restart your cache server.

    —Run the LoaderExample class to load the cache.

    —Run the ObserverExample class. Do not input any value through the Input area at the bottom of the messages window.

    —Run the ProcessorExample to update records in the cache. You should see messages in JDeveloper indicting that addresses have been updated. This is illustrated in Figure 5-2.

    Figure 5-2 Output from the ObserverExample and ProcessorExample Classes

    Output from the PersonEventTester Class