In this exercise, you set up listeners to observe data changes within a Coherence cache. It highlights the use of the Coherence ObservableMap
, MapEvent
, EventListener
, EntryProcessor
, and AbstractMapListener
APIs. You also learn how entry processors can be used to modify and process entries in the Coherence cache.
This chapter contains the following sections:
The com.tangosol.util.ObservableMap
interface enables you to observe and act on the changes made to cache entries. It extends java.util.EventListener
and uses the standard JavaBeans event model. All types of NamedCache
instances implement this interface. To listen for an event, you register a MapListener
(com.tangosol.util.MapListener
) instance on the cache. MapListener
instances are called on the client; that is, the listener code is executed in your client process.
There are multiple ways to listen for events:
Listen for all events
Listen for all events that satisfy a filter
Listen for events on a particular object key
These listener tasks can be performed on a NamedCache
by the addMapListener
methods listed in Example 7-1.
Example 7-1 Listener Methods on a NamedCache
void addMapListener(MapListener listener) void addMapListener(MapListener listener, Filter filter, boolean fLite) void addMapListener(MapListener listener, Object oKey, boolean fLite)
The com.tangosol.util.MapEvent
class captures the object key, and the old and new values. You can specify a Lite event, in which the new and old values might not be present. Example 7-2 describes a pattern for registering these methods against a NamedCache
. This has been done as an anonymous class. You can use the getOldValue
or getNewValue
methods in the MapEvent
class to get the entry for which the event gets fired.
Example 7-2 Code Pattern for Registering an Event
namedCache.addMapListener(new MapListener() { public void entryDeleted(MapEvent mapEvent) { // TODO... handle deletion event } public void entryInserted(MapEvent mapEvent) { // TODO... handle inserted event } public void entryUpdated(MapEvent mapEvent) { // TODO... handle updated event } } )
This section describes how to create a Java class that listens on a NamedCache
and responds to any changes it detects.
In the Loading
project, create the class that will listen for a new Contact
object entry.Name the class ObserverExample
and ensure that it has a main
method. See "Creating a Java Class" for detailed information.
Within this class, add a listener to display a message whenever a new Contact
is updated to the cache. For example, use the following code to keep the Java process running until you read from the console. Otherwise, your program will immediately exit.
BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();
Within the class, create an inner class to extend AbstractMapListner
. Implement the methods to insert, update, and delete the cache values. In this case, most of the work should be done in the entryUpdated
method, based on the old and new values contained in a MapEvent
.
Example 7-3 illustrates a possible implementation of a listener class.
Example 7-3 Sample Listener Class
package com.oracle.handson; import com.tangosol.net.NamedCache; import com.tangosol.util.AbstractMapListener; import com.tangosol.util.MapEvent; import com.oracle.handson.Contact; import com.tangosol.net.CacheFactory; import java.io.IOException; /** * ObserverExample observes changes to contacts. */ public class ObserverExample { public ObserverExample() { } // ----- ObserverExample methods ------------------------------------- public static void main(String[] args) { NamedCache cache = CacheFactory.getCache("ContactsCache"); new ObserverExample().observe(cache); try { System.in.read(); } catch (IOException e) { } } /** * Observe changes to the contacts. * * @param cache target cache */ public void observe(NamedCache cache) { cache.addMapListener(new ContactChangeListener()); } // ----- inner class: ContactChangeListener ------------------------- public class ContactChangeListener extends AbstractMapListener { // ----- MapListener interface ------------------------------------------ public void entryInserted(MapEvent event) { System.out.println(event); } public void entryUpdated(MapEvent event) { Contact contactOld = (Contact)event.getOldValue(); Contact contactNew = (Contact)event.getNewValue(); StringBuffer sb = new StringBuffer(); if (!contactOld.getHomeAddress().equals( contactNew.getHomeAddress())) { sb.append("Home address "); } if (!contactOld.getWorkAddress().equals( contactNew.getWorkAddress())) { sb.append("Work address "); } if (!contactOld.getTelephoneNumbers().equals( contactNew.getTelephoneNumbers())) { sb.append("Telephone "); } if (contactOld.getAge() != contactNew.getAge()) { sb.append("Birthdate "); } sb.append("was updated for ").append(event.getKey()); System.out.println(sb); } public void entryDeleted(MapEvent event) { System.out.println(event.getKey()); } } }
To run the Cache Listener example:
Create a run configuration for ObserverExample
. Right click ObserverExample
in the Project Explorer and select Run As. In the Run Configurations dialog box select Oracle Coherence and click the New Configuration icon.
In the Name field, enter ObserverExample
.
In the Project field in the Main tab, enter Loading
. In the Main class field, enter com.oracle.handson.ObserverExample
.
In the General tab of the Coherence tab, browse to the c:\home\oracle\workspace\Contacts\appClientModule\contacts-cache-config.xml
file in the Cache configuration descriptor field. Select the Disabled (cache client) button. Enter 3155
in the Cluster port field. Click Apply.
In the Other tab, scroll down to the tangosol.pof.config field. Enter the absolute path to the POF configuration file contacts-pof-config.xml
. Click Apply.
In the Common tab, select Shared file and browse for the Loading directory.
Stop any running cache servers. See "Stopping Cache Servers" for more information.
Start the ContactsCacheServer
.
Load the cache by running the LoaderExample
program from Eclipse. If you now run the ObserverExample
, the program waits for input, as illustrated in Example 7-4.
In "Responding to Changes in the Cache", you create a program that modifies entries in the cache and returns the changed records.
Example 7-4 Listener Program Waiting for Events
... MasterMemberSet( ThisMember=Member(Id=3, Timestamp=2012-08-17 13:51:15.468, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3964, Role=OracleHandsonObserverExample) OldestMember=Member(Id=1, Timestamp=2012-08-17 13:50:44.093, Address=10.159.220.86:8088, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3344, Role=CoherenceServer) ActualMemberSet=MemberSet(Size=2 Member(Id=1, Timestamp=2012-08-17 13:50:44.093, Address=10.159.220.86:8088, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3344, Role=CoherenceServer) Member(Id=3, Timestamp=2012-08-17 13:51:15.468, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3964, Role=OracleHandsonObserverExample) ) MemberId|ServiceVersion|ServiceJoined|MemberState 1|12.1.2|2012-08-17 13:50:44.093|JOINED, 3|12.1.2|2012-08-17 13:51:15.468|JOINED RecycleMillis=1200000 RecycleSet=MemberSet(Size=0 ) ) TcpRing{Connections=[1]} IpMonitor{Addresses=0} 2012-08-17 13:51:15.890/2.031 Oracle Coherence GE 12.1.2.0 <D5> (thread=Invocation:Management, member=3): Service Management joined the cluster with senior service member 1 2012-08-17 13:51:15.921/2.062 Oracle Coherence GE 12.1.2.0 <Info> (thread=main, member=3): Loaded Reporter configuration from "jar:file:/C:/oracle/Middleware/Oracle_Home/coherence/lib/coherence.jar!/reports/report-group.xml" 2012-08-17 13:51:16.046/2.187 Oracle Coherence GE 12.1.2.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded POF configuration from "file:/C:/home/oracle/workspace/Contacts/build/classes/contacts-pof-config.xml" 2012-08-17 13:51:16.109/2.250 Oracle Coherence GE 12.1.2.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded included POF configuration from "jar:file:/C:/oracle/Middleware/Oracle_Home/coherence/lib/coherence.jar!/coherence-pof-config.xml" 2012-08-17 13:51:16.187/2.328 Oracle Coherence GE 12.1.2.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=3): Service PartitionedPofCache joined the cluster with senior service member 1
In this section, you will create a Java class to modify entries in the cache and return the changed records.
Until now, to perform actions on the entries in a cache, you used the put
and get
operations. However, there is a better way to perform operations on data that ensure consistent behavior when concurrent data access is required. Entry processors (com.tangosol.util.InvocableMap.EntryProcessor
) are agents that perform processing against entries. The entries are processed directly where the data is being held. The processing you perform can change the data: it can create, update, remove data, or only perform calculations. The processing can occur in parallel in a partitioned cache with multiple nodes, so it is scalable. Processing in the cache also saves I/O expense because data is not pulled to the client for processing.
Entry processors that work on the same key are logically queued. This allows lock-free (high performance) processing. The com.tangosol.util.InvocableMap
interface (which the NamedCache
implements) has the following methods for operating on data:
• Object invoke(Object oKey, InvocableMap.EntryProcessor processor)
, which invokes the passed EntryProcessor
against an individual object and returns the result of the invocation.
• Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)
, which invokes the EntryProcessor
against the collection of keys and returns the result for each invocation.
• Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)
, which invokes the EntryProcessor
against the entries that match the filter and returns the result for each invocation.
Note:
EntryProcessor
classes must be available in the class path for each cluster node.
To create an entry process, you can extend com.tangosol.util.processes.AbstractProcessor
and implement the process()
method. For example, the following code creates an EntryProcessor
instance to change the work address of employees in the Contacts
data set:
public static class OfficeUpdater extends AbstractProcessor implements PortableObject ... public Object process(InvocableMap.Entry entry) { Contact contact = (Contact) entry.getValue(); contact.setWorkAddress(m_addrWork); entry.setValue(contact); return null; }
To invoke the OfficeUpdater
class, you can use the invokeAll
method with the name of the OfficeUpdater
class as one of its arguments.
cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork));
In this exercise, you create a Java class with EntryProcessor
instances that update entries in the cache. The ObserverExample
class created in "Create a Class to Listen for Changes in the Cache" will detect these changes and display the changed records.
To create a file to update entries in the cache:
Create a class that updates entries in the cache.
In the Loading
project, create a class called ProcessorExample
with a main
method that updates the address of a Contact
object in the cache. See "Creating a Java Class" for detailed information.
Write code to find the records of the Contacts
object that live in Massachusetts and update their work addresses to an in-state office.
Include an inner class that implements the PortableObject
interface (for serializing and deserializing data from the cache) and contains an EntryProcessor
instance to set the work addresses. Use methods from the Filter
class to isolate the Contacts
members whose home addresses are in Massachusetts.
Example 7-5 illustrates a possible implementation of the ProcessorExample
class.
Example 7-5 Sample Program to Update an Object in the Cache
package com.oracle.handson; import com.tangosol.net.NamedCache; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.processor.AbstractProcessor; import com.tangosol.util.InvocableMap; import com.tangosol.io.pof.PortableObject; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.oracle.handson.Address; import com.oracle.handson.Contact; import com.tangosol.net.CacheFactory; import java.io.IOException; /** * ProcessorExample executes an example EntryProcessor. * */ public class ProcessorExample { public ProcessorExample() { } public static void main(String[] args) { NamedCache cache = CacheFactory.getCache("ContactsCache"); new ProcessorExample().execute(cache); } // ----- ProcessorExample methods ----------------------------------- public void execute(NamedCache cache) { // People who live in Massachusetts moved to an in-state office Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.", "Boston", "MA", "02116", "US"); cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork)); } // ----- nested class: OfficeUpdater ------------------------------------ /** * OfficeUpdater updates a contact's office address. */ public static class OfficeUpdater extends AbstractProcessor implements PortableObject { // ----- constructors ------------------------------------------- /** * Default constructor (necessary for PortableObject implementation). */ public OfficeUpdater() { } public OfficeUpdater(Address addrWork) { m_addrWork = addrWork; } // ----- InvocableMap.EntryProcessor interface ------------------ public Object process(InvocableMap.Entry entry) { Contact contact = (Contact) entry.getValue(); contact.setWorkAddress(m_addrWork); entry.setValue(contact); System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName()); return null; } // ----- PortableObject interface ------------------------------- public void readExternal(PofReader reader) throws IOException { m_addrWork = (Address) reader.readObject(0); } public void writeExternal(PofWriter writer) throws IOException { writer.writeObject(0, m_addrWork); } // ----- data members ------------------------------------------- private Address m_addrWork; } }
Edit the contacts-pof-config.xml
file to add a user type ID for the OfficeUpdater
entries. In this case, add the type ID 106
for the ProcessorExample$OfficeUpdater
class.
... <user-type> <type-id>1006</type-id> <class-name>com.oracle.handson. ProcessorExample$OfficeUpdater</class-name> </user-type> ...
To run the cache update example.
Create a run configuration for ProcessorExample
. Right click ObserverExample
in the Project Explorer and select Run As. In the Run Configurations dialog box select Oracle Coherence and click the New Configuration icon
In the Name field, enter ProcessorExample
.
In the Project field in the Main tab, enter Loading
. In the Main class field, enter com.oracle.handson.ProcessorExample
.
In the General tab of the Coherence tab, browse to the c:\home\oracle\workspace\Contacts\appClientModule\contacts-cache-config.xml
file in the Cache configuration descriptor field. Select the Disabled (cache client) button. Enter 3155
in the Cluster port field. Click Apply.
In the Other tab, scroll down to the tangosol.pof.config field. Enter the absolute path to the POF configuration file contacts-pof-config.xml
. Click Apply.
In the Common tab, select Shared file and browse for the Loading directory.
Perform the following steps to test the ObserverExample
and ProcessorExample
classes.
Stop any running cache servers. See "Stopping Cache Servers" for more information.
Restart the ContactsCacheServer
.
Run the LoaderExample
class to load the cache.
Run the ObserverExample
class.
Run the ProcessorExample
to update records in the cache.
You should see messages in the cache server console window, that are similar to Example 7-6, indicating that the work addresses for the specified employees were updated.
Example 7-6 Output from the ObserverExample and ProcessorExample Classes
... Started DefaultCacheServer... 2012-08-17 14:25:40.531/31.953 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=2, Timestamp=2012-08-17 14:25:40.366, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) joined Cluster with senior member 1 2012-08-17 14:25:40.765/32.187 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 joined Service Management with senior member 1 2012-08-17 14:25:41.078/32.500 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 joined Service PartitionedPofCache with senior member 1 2012-08-17 14:25:41.421/32.843 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 left service Management with senior member 1 2012-08-17 14:25:41.437/32.859 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 left service PartitionedPofCache with senior member 1 2012-08-17 14:25:41.453/32.875 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): TcpRing disconnected from Member(Id=2, Timestamp=2012-08-17 14:25:40.366, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) due to a peer departure; removing the member. 2012-08-17 14:25:41.453/32.875 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=2, Timestamp=2012-08-17 14:25:41.453, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) left Cluster with senior member 1 2012-08-17 14:25:58.875/50.297 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=3, Timestamp=2012-08-17 14:25:58.695, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4576, Role=OracleHandsonObserverExample) joined Cluster with senior member 1 2012-08-17 14:25:59.109/50.531 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 3 joined Service Management with senior member 1 2012-08-17 14:25:59.390/50.812 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 3 joined Service PartitionedPofCache with senior member 1 2012-08-17 14:26:16.656/68.078 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=4, Timestamp=2012-08-17 14:26:16.453, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) joined Cluster with senior member 1 2012-08-17 14:26:16.906/68.328 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 joined Service Management with senior member 1 2012-08-17 14:26:17.218/68.640 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 joined Service PartitionedPofCache with senior member 1 Work address was updated for John Lfyovf Work address was updated for John Jptrpajked Work address was updated for John Mtqdln Work address was updated for John Mstaugiw Work address was updated for John Olfezqse Work address was updated for John Qjefjgtgj Work address was updated for John Kuhgkzn Work address was updated for John Jpby Work address was updated for John Cekuea Work address was updated for John Guhkam Work address was updated for John Ijwj Work address was updated for John Trlb Work address was updated for John Hnfcwxjq Work address was updated for John Kizifh Work address was updated for John Rqlhgboi Work address was updated for John Ipphab 2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): TcpRing disconnected from Member(Id=4, Timestamp=2012-08-17 14:26:16.453, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) due to a peer departure; removing the member. 2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 left service Management with senior member 1 2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 left service PartitionedPofCache with senior member 1 2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=4, Timestamp=2012-08-17 14:26:17.312, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) left Cluster with senior member 1