Oracle® Coherence Tutorial for Oracle Coherence Release 3.5 Part Number E14527-01 |
|
|
View PDF |
In this chapter, you set up listeners to observe data changes within a NamedCache
. You also learn how EntryProcessors
can be used to modify and perform processing on entries in the Coherence cache. This chapter contains the following sections:
The com.tangosol.util.ObservableMap
interface enables you to observe and take action on the changes made to cache entries. It extends java.util.EventListener
and uses the standard bean event model within Java. All types of NamedCaches
implement this interface. To listen for an event, you register a MapListener
(com.tangosol.util.MapListener
) on the cache. MapListeners
are called on the client; that is, the listener code is executed in your client process.
There are three ways to listen for events:
Listen for all events
Listen for all events that satisfy a filter
Listen for events on a particular object key
The methods listed in Example 5-1 (which implement the preceding list) can be used on a NamedCache
:
Example 5-1 Listener Methods on a NamedCache
void addMapListener(MapListener listener) void addMapListener(MapListener listener, Filter filter, boolean fLite) void addMapListener(MapListener listener, Object oKey, boolean fLite)
The com.tangosol.util.MapEvent
class captures the object key, and the old and new values. You can specify a "Lite" event, in which the new and old values may not be present. Example 5-2 describes a pattern for registering these methods against a NamedCache
. This has been done as an anonymous class.
Example 5-2 Code Pattern for Registering an Event
namedCache.addMapListener(new MapListener() { public void entryDeleted(MapEvent mapEvent) { // TODO... handle deletion event } public void entryInserted(MapEvent mapEvent) { // TODO... handle inserted event } public void entryUpdated(MapEvent mapEvent) { // TODO... handle updated event } });
You can use the getOldValue()
or getNewValue()
methods in the preceding MapEvent
class to get the entry for which the event gets fired.
This section describes how to create a Java class that listens on a NamedCache
and responds to any changes it detects.
In the Loading
project, create a new class that listens for a new Contact
object entry.
Name the class ObserverExample
and ensure that it has a main
method. See "Creating a Java Class" if you need detailed information.
Within this class, add a listener to display a message whenever a new Contact
is updated to the cache.
Hint: Use the following code to keep the Java process running until you read from the console; otherwise your program exits immediately.
BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();
Example 5-3 illustrates a possible solution.
Example 5-3 Sample Listener Class
package com.oracle.coherence.handson; import com.tangosol.net.NamedCache; import com.tangosol.util.AbstractMapListener; import com.tangosol.util.MapEvent; import com.oracle.coherence.handson.Contact; import com.tangosol.net.CacheFactory; import java.io.IOException; /** * ObserverExample observes changes to contacts. */ public class ObserverExample { public ObserverExample() { } // ----- ObserverExample methods ------------------------------------- public static void main(String[] args) { NamedCache cache = CacheFactory.getCache("ContactsCache"); new ObserverExample().observe(cache); try { System.in.read(); } catch (IOException e) { } } /** * Observe changes to the contacts. * * @param cache target cache */ public void observe(NamedCache cache) { cache.addMapListener(new ContactChangeListener()); } // ----- inner class: ContactChangeListener ------------------------- public class ContactChangeListener extends AbstractMapListener { // ----- MapListener interface ------------------------------------------ public void entryInserted(MapEvent event) { System.out.println(event); } public void entryUpdated(MapEvent event) { Contact contactOld = (Contact)event.getOldValue(); Contact contactNew = (Contact)event.getNewValue(); StringBuffer sb = new StringBuffer(); if (!contactOld.getHomeAddress().equals( contactNew.getHomeAddress())) { sb.append("Home address "); } if (!contactOld.getWorkAddress().equals( contactNew.getWorkAddress())) { sb.append("Work address "); } if (!contactOld.getTelephoneNumbers().equals( contactNew.getTelephoneNumbers())) { sb.append("Telephone "); } if (contactOld.getAge() != contactNew.getAge()) { sb.append("Birthdate "); } sb.append("was updated for ").append(event.getKey()); System.out.println(sb); } public void entryDeleted(MapEvent event) { System.out.println(event.getKey()); } } }
To enable the console input, you must perform the following:
—Right-click the Loading
project and select Project Properties.
—Select Run/Debug/Profile at the left.
—Click the Edit button at the right and click Tool Settings. Ensure that the Allow Program Input check box in the Edit Run Configuration dialog box is selected.
—Click OK in the Edit Run Configuration dialog box and in the Project Properties dialog box to save your changes.
Turn off local storage if it is not already disabled.
—Right-click the Loading
project and select Project Properties.
—Select Run/Debug/Profile at the left.
—Select the Default run configuration and click Edit. In Java Options field, add the command to disable local storage.
-Dtangosol.coherence.distributed.localstorage=false
Edit the contacts-cache-server.cmd
file to add the classes for the Loading
project to the classpath if it is not already there.
C:\home\oracle\labs\Loading\classes
Edit the contacts-pof-config.xml
file to add a user type ID for the OfficeUpdater
entries
... <user-type> <type-id>1006</type-id> <class-name>com.oracle.coherence.handson. ProcessorExample$OfficeUpdater</class-name> </user-type> ...
Start the cache server if it is not already running.
Load the cache by running the LoaderExample
program from JDeveloper. If you now run the ObserverExample
, you will see the program wait for input.
In the next section, you will create a program that modifies entries in the cache and returns the changed records.
In this section, you create a Java class to modify entries in the cache and return the changed records.
Until now, to perform actions on the entries in a cache, you used the put and get operations. However, there is a better way to perform operations on data that ensure consistent behavior when concurrent data access is required. EntryProcessors
(com.tangosol.util.InvocableMap.EntryProcessor
) are agents that perform processing against entries. The entries will be processed directly where the data is being held. The processing you perform may change the data: it may create, update, remove data, or only perform calculations. The processing can occur in parallel in a partitioned cache with multiple nodes, so it is scalable. Processing in the cache also saves the expense of I/O because data does not have to be retrieved to the client for processing.
EntryProcessors
that work against the same key are logically queued. This allows lock-free (high performance) processing. The com.tangosol.util.InvocableMap
interface (which the NamedCache
implements) has the following methods for operating on data:
• Object invoke(Object oKey, InvocableMap.EntryProcessor processor)
—Invokes the passed EntryProcessor
against an individual object and returns the result of the invocation.
• Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)
—Invokes the EntryProcessor
against the collection of keys and returns the result for each invocation.
• Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)
—Invokes the EntryProcessor
against the entries that match the filter and returns the result for each invocation.
Note:
EntryProcessor
classes must be available in the classpath for each cluster node.To create an entry process, you can extend com.tangosol.util.processes.AbstractProcessor
and implement the process()
method. For example, the following code creates an EntryProcessor
to change the work address of employees in the Contacts
data set:
public static class OfficeUpdater extends AbstractProcessor implements PortableObject ... public Object process(InvocableMap.Entry entry) { Contact contact = (Contact) entry.getValue(); contact.setWorkAddress(m_addrWork); entry.setValue(contact); return null; }
To invoke the OfficeUpdater
class, you perform the following:
cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork));
In this exercise, you create a Java class with EntryProcessors
that updates entries in the cache. The ObserverExample
class created in the previous exercise will detect these changes and display the changed records.
Create a class that updates entries in the cache.
In the Loading
project, create a class called ProcessorExample
with a main
method that updates the address of a Contact
object in the cache. See "Creating a Java Class" if you need detailed information.
Write code to find the records of Contacts
that live in Massachusetts and update their work address to an in-state office.
Hint: include a subclass that implements PortableObject
(for serializing and deserializing data from the cache) and contains an EntryProcessor
to set the work addresses. Use methods from the Filter
class to isolate the Contacts
whose home address is in Massachusetts.
Example 5-4 illustrates possible code for the PersonEventTester
class.
Example 5-4 Sample Program to Update an Object in the Cache
package com.oracle.coherence.handson; import com.tangosol.net.NamedCache; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.processor.AbstractProcessor; import com.tangosol.util.InvocableMap; import com.tangosol.io.pof.PortableObject; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.oracle.coherence.handson.Address; import com.oracle.coherence.handson.Contact; import com.tangosol.net.CacheFactory; import java.io.IOException; /** * ProcessorExample executes an example EntryProcessor. * */ public class ProcessorExample { public ProcessorExample() { } public static void main(String[] args) { NamedCache cache = CacheFactory.getCache("ContactsCache"); new ProcessorExample().execute(cache); } // ----- ProcessorExample methods ----------------------------------- public void execute(NamedCache cache) { // People who live in Massachusetts moved to an in-state office Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.", "Boston", "MA", "02116", "US"); cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork)); } // ----- nested class: OfficeUpdater ------------------------------------ /** * OfficeUpdater updates a contact's office address. */ public static class OfficeUpdater extends AbstractProcessor implements PortableObject { // ----- constructors ------------------------------------------- /** * Default constructor (necessary for PortableObject implementation). */ public OfficeUpdater() { } public OfficeUpdater(Address addrWork) { m_addrWork = addrWork; } // ----- InvocableMap.EntryProcessor interface ------------------ public Object process(InvocableMap.Entry entry) { Contact contact = (Contact) entry.getValue(); contact.setWorkAddress(m_addrWork); entry.setValue(contact); return null; } // ----- PortableObject interface ------------------------------- public void readExternal(PofReader reader) throws IOException { m_addrWork = (Address) reader.readObject(0); } public void writeExternal(PofWriter writer) throws IOException { writer.writeObject(0, m_addrWork); } // ----- data members ------------------------------------------- private Address m_addrWork; } }
Perform the following steps to test the ObserverExample
and ProcessorExample
classes.
—Restart your cache server.
—Run the LoaderExample
class to load the cache.
—Run the ObserverExample
class. Do not input any value through the Input area at the bottom of the messages window.
—Run the ProcessorExample
to update records in the cache. You should see messages in JDeveloper indicting that addresses have been updated. This is illustrated in Figure 5-2.