In this chapter, you observe data changes within a NamedCache
. This chapter contains the following sections:
The com.tangosol.util.ObservableMap
interface enables you to observe and take action on the changes made to cache entries. It extends java.util.EventListener
and uses the standard bean event model within Java. All types of NamedCaches
implement this interface. To listen for an event, you register a MapListener
(com.tangosol.util.MapListener
) on the cache.
There are three ways to listen for events:
Listen for all events
Listen for all events that satisfy a filter
Listen for events on a particular object key
The methods listed in Example 5-1 (which implement the preceding list) can be used on a NamedCache
:
Example 5-1 Listener Methods on a NamedCache
void addMapListener(MapListener listener) void addMapListener(MapListener listener, Filter filter, boolean fLite) void addMapListener(MapListener listener, Object oKey, boolean fLite)
The com.tangosol.util.MapEvent
class captures the object key, and the old and new values. You can specify a "Lite" event, in which the new and old values may not be present. Example 5-2 describes a pattern for registering these methods against a NamedCache
. This has been done as an anonymous class.
Example 5-2 Code Pattern for Registering an Event
namedCache.addMapListener(new MapListener() { public void entryDeleted(MapEvent mapEvent) { //TODO... handle deletion event } public void entryInserted(MapEvent mapEvent) { //TODO... handle inserted event } public void entryUpdated(MapEvent mapEvent) { //TODO... handle updated event } });
You can use the getOldValue()
or getNewValue()
methods in the preceding MapEvent
class to get the entry for which the event gets fired.
This section describes how to create a Java class that listens on a NamedCache
and responds to any changes it detects.
To create a Java class that listens to the cache and responds to changes:
In the Lab6
project, create a new class that listens for a new Person
object entry.
Create a new Java class called ListenForNewPerson
. Ensure that it has a main
method.
Within this class, add a listener to print out a message whenever a new Person
is added to the cache.
Hint: Use the following code to keep the Java process running until you read from the console; otherwise your program exits immediately.
BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();
Example 5-3 illustrates a possible solution.
Example 5-3 Sample Listener Class
package com.oracle.coherence.handson; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; import com.tangosol.util.MapEvent; import com.tangosol.util.MapListener; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.filter.MapEventFilter; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; public class ListenForNewPerson { public ListenForNewPerson() { } public static void main(String[] args) throws IOException { // connect to named cache NamedCache person = CacheFactory.getCache("person"); // listen for insert events on Person // This can be done in an easier way by using a new AbstractMapListener() // and then overriding only the method you want to // person.addMapListener(new MapListener() { public void entryDeleted(MapEvent mapEvent) { // ignore } public void entryInserted(MapEvent mapEvent) { Person p = (Person)mapEvent.getNewValue(); System.out.println("New person added: " + p.getFirstname() + " " + p.getLastname()); } public void entryUpdated(MapEvent mapEvent) { // ignore } } ); System.out.println("waiting for events"); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine(); } }
To enable the console input, you must perform the following:
—Right-click the Lab6
project and select Project Properties.
—Select Run/Debug/Profile at the left.
—Click the Edit button at the right and click Tool Settings. Ensure that the Allow Program Input check box in the Edit Run Configuration dialog box is selected.
—Click OK in the Edit Run Configuration dialog box and in the Project Properties dialog box to save your changes.
Start the cache server if it is not already running.
C:\oracle\product\coherence\bin>cache-server.cmd
Run the ListenForNewPerson
Java class. You should see the Input area at the bottom of the messages window. This is where you can input information from the console.
Figure 5-2 Listener Program Waiting for Events
In Lab6
, create a class that adds and removes entries in a cache.
In Lab6
, create a class called PersonEventTester
that puts a new Person
object in the cache. Ensure that it has a main
method.
Figure 5-3 Creating a PersonEventTester Class
Example 5-4 illustrates possible code for the PersonEventTester
class.
Example 5-4 Sample Program to Put a New Object in the Cache
package com.oracle.coherence.handson; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; public class PersonEventTester { public PersonEventTester() { } public static void main(String[] args) { NamedCache person = CacheFactory.getCache("person"); Person p1 = new Person(1,"Middleton","Tim", "Level 2, 66 Kings Park Road, West Perth", 39,Person.MALE); System.out.println("put person"); person.put(p1.getId(),p1); Person p2 = (Person)person.get(p1.getId()); p2.setFirstname("Timothy"); System.out.println("Update person"); person.put(p2.getId(),p2); } }
Perform the following steps to test the ListenForNewPerson
and PersonEventTester
classes.
—Restart your cache server.
—Run your ListenForNewPerson
class. Do not input any value through the Input area at the bottom of the messages window.
Figure 5-4 Output from the ListenForNewPerson Class
—Run the PersonEventTester
to create a new record in the cache. What happens?
Figure 5-5 Output from the PersonEventTester Class
—You should see a message in the ListenForNewPerson
messages window indicating that a new record has been added.
Figure 5-6 New Record Detected by the ListenForNewPerson Class
In Lab6
, create a class that listens for an update to the Person
object.
a) In Lab6
, create a new class called ListenForUpdatedPerson
. Ensure that it has a main
method.
Figure 5-7 Creating an ListenForUpdatedPerson Class
In the class, add a MapListener
to the person cache that prints out the new and old values of a male Person
when that person object is updated.
Hint: You must supply two new parameters to the addListener
method: a new MapEventFilter()
parameter and a parameter indicating whether you want a lite event
Example 5-5 illustrates a possible solution.
Example 5-5 Sample Code that Listens for an Update to an Object
package com.oracle.coherence.handson; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; import com.tangosol.util.AbstractMapListener; import com.tangosol.util.MapEvent; import com.tangosol.util.MapListener; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.filter.MapEventFilter; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; public class ListenForUpdatedPerson { public ListenForUpdatedPerson() { } public static void main(String[] args) throws IOException { // connect to named cache NamedCache person = CacheFactory.getCache("person"); // listen for insert events on Person // This can be done in an easier way by using a new AbstractMapListener() // and then overriding only the method you want to // person.addMapListener(new AbstractMapListener() { public void entryUpdated(MapEvent mapEvent) { Person oldPerson = (Person)mapEvent.getOldValue(); Person newPerson = (Person)mapEvent.getNewValue(); // better to implement toString() on the person object to display it.. :) System.out.println("Old person is " + oldPerson.getFirstname() + " " + oldPerson.getLastname()); System.out.println("New person is " + newPerson.getFirstname() + " " + newPerson.getLastname()); } }, new MapEventFilter(MapEventFilter.E_UPDATED, new EqualsFilter("getGender", Person.MALE)), false // not a lite event ); System.out.println("waiting for events"); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine(); } }
Next, perform the following:
—Restart your cache server.
—Run the ListenForNewPerson
class. Do not input any value through the Input area at the bottom of the messages window.
Figure 5-8 Output of the ListenForNewPerson Class
—Run the ListenForUpdatedPerson
class. Do not input any value through the Input area at the bottom of the messages window.
Figure 5-9 Output of the ListenForUpdatedPerson Class
—Run the PersonEventTester
class.
Figure 5-10 Output from PersonEventTester Class
—You should see appropriate messages in the correct windows.
Figure 5-11 Output from the ListenForNewPerson Class
Figure 5-12 Output from the ListenForUpdatedPerson Class
In this exercise, you create your own scalable and robust chat program. You should now have all the elements of a basic chat program.
To create a chat program:
In Lab6
, create a ChatMessage
Java class to store the chat messages.
Example 5-6 illustrates a possible solution.
Example 5-6 Sample Chat Program
package com.oracle.coherence.handson; import java.io.Serializable; public class ChatMessage implements Serializable { private String from; private long entryTime; private String message; public ChatMessage() { } public ChatMessage(String from, String message) { this.from = from; this.message = message; this.entryTime = System.currentTimeMillis(); } public void setFrom(String from) { this.from = from; } public String getFrom() { return from; } public void setEntryTime(long entryTime) { this.entryTime = entryTime; } public long getEntryTime() { return entryTime; } public void setMessage(String message) { this.message = message; } public String getMessage() { return message; } }
Stop all running cache servers. Navigate to the /oracle/product/coherence/bin
directory and edit cache-server.cmd
. Modify the CLASSPATH
environment variable to include /home/oracle/labs/Lab6/classes
. Restart the cache server.
Create a ChatClient
Java class. Ensure that it has a main
method.
Figure 5-14 Creating a Chat Client Program
Write the client program so that it performs the following:
Gets a person's name.
Sets up a new MapListener
to receive messages that are posted in the chat.
Loops and reads a message from the command line and posts this to the messages
named cache (Exit when the user enters bye
.)
Runs multiple copies of this and observes the behavior.
You can add some extra features if you wish. For example:
Add a facility to list all users in the chat. (You would need a second named cache.)
Send a private message to a named individual.
Example 5-7 illustrates a possible solution:
Example 5-7 Sample Chat Client Program
package com.oracle.coherence.handson; import com.tangosol.net.*; import java.io.*; import com.tangosol.util.UUID; import com.tangosol.util.filter.*; import com.tangosol.util.MapEvent; import com.tangosol.util.AbstractMapListener; import java.util.Date; import java.util.Iterator; import java.util.Map; import java.util.Set; public class ChatClient { public static void main(String[] args) { String userName = null; String message; NamedCache chatmembers = null; try { System.out.println("Welcome to the Coherence Chat Client"); System.out.println("------------------------------------"); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); System.out.print("User Name:"); userName = console.readLine(); // join the chatroom named cache as storage enabled = true NamedCache cache = CacheFactory.getCache("chatroom"); chatmembers = CacheFactory.getCache("chatmembers"); chatmembers.put(userName,userName); // register a listener to display the messages cache.addMapListener(new AbstractMapListener() { public void entryInserted(MapEvent event) { ChatMessage msg = (ChatMessage)event.getNewValue(); System.out.println("From: " + msg.getFrom()); System.out.println("Time: " + new Date(msg.getEntryTime())); System.out.println("Mesg: " + msg.getMessage() ); System.out.println(); } } , new MapEventFilter(MapEvent.ENTRY_INSERTED, new NotEqualsFilter("getFrom", userName)), false); chatmembers.addMapListener( new AbstractMapListener() { public void entryDeleted(MapEvent event) { String who = (String)event.getOldValue(); System.out.println(who + " has left the chat"); } public void entryInserted(MapEvent event) { String who = (String)event.getNewValue(); System.out.println(who + " has entered the chat"); } } ); do { System.out.print("\nEnter message or bye to quit: "); message = console.readLine(); if ("bye".equals(message)) break; // else add this to the chat else if ("help".equals(message)) { System.out.println("HELP:"); System.out.println("bye - quit"); System.out.println("who - list of users in the chat\n"); } else if ("who".equals(message)) { System.out.println("Current chat memebers"); System.out.println("====================="); Set s = chatmembers.entrySet(); for (Iterator<Map.Entry> entries = chatmembers.entrySet().iterator() ; entries.hasNext();) { Map.Entry entry = entries.next(); String member = (String)entry.getValue(); System.out.println(member); } } else { cache.put(new UUID(),new ChatMessage(userName, message)); } } while (true); System.out.println("Bye"); } catch (Exception e) { e.printStackTrace(); } finally { chatmembers.remove(userName); } } }
Figure 5-15 illustrates the output of the chat program.
In this exercise, you extend objects to have composite keys, rather than a single value. Using JDeveloper, you perform the following:
Create a new Person
class and implement a Person.Key
inner class
Use this new Person
class to put and get values
Enable data affinity—Keep related data together in the same partition.
In all the examples so far, you have used single attributes for the key, such as person ID or name. If you want to store different versions of objects or have composite keys, there are several (bad) ways in which you "can" do this, such as storing versions as Fred1
and Fred-2
, or strings such as Fred-Jones
. Using this method to get a key for a particular individual requires some string mangling, which could get messy and potentially give suboptimal performance.
Rather than using a single key, as in the preceding case, you can create a class that encapsulates the business logic of that key and implement this as your key. This helps you to future-proof your objects against changes. For example, you can create an inner class that provides you a key.
Person.Key(int id, int version)
An interesting side-effect of this is that you can then make your key implement the KeyAssociation
interface, and use the getAssociatedKey()
method to provide you with partition affinity on an attribute of a key. For example, you can potentially store all people with the same last name or any common field within the same partition. The code in Example 5-8 defines an inner class called Key
for the Person
class based on the person ID and a version number.
Example 5-8 Inner Class to Implement a Key Association for Data Affinity
public static class Key implements PortableObject, KeyAssociation { //define a key of id and version private int id; private int version; private String lastname; public Key() { //for serializble } public Object getAssociatedKey() { return lastname; } public Key(int id, int version) { this.id = id; this.version = version; } public Key(Person p) { this.id = p.getId(); this.version = 1; // default this.lastname = p.getLastname(); } public void writeExternal(PofWriter dataOutput) throws IOException { dataOutput.writeInt(0, this.id); dataOutput.writeInt(1, this.version); dataOutput.writeString(2, this.lastname); } public void readExternal(PofReader dataInput) throws IOException { this.id = dataInput.readInt(0); this.version = dataInput.readInt(1); this.lastname = dataInput.readString(2); } @Override public boolean equals(Object object) { . . . } @Override public int hashCode() { final int PRIME = 37; int result = 1; return result; } }
Example 5-9 illustrates how to create and use the inner class within Coherence:
Example 5-9 Sample Code to Create and Use an Inner Class within Coherence
... NamedCache person = CacheFactory.getCache("person"); Person p = new Person(1,"Middleton","Tim","Address",29,Person.MALE); person.put(p.getKey(), p); Person p2 = (Person)person.get(new Person.Key(1,1)); System.out.println("Person is " + p2.getLastname()); ...
You can then modify the inner class to implement KeyAssociation
, and then provide an implementation of the getAssociatedKey
method. If getAssociatedKey()
returns a last name, this ensures that all people with the same last name reside in the same key partition and therefore, the same member.
To create partitions and keys on data for use with Coherence:
Copy the existing Person
class from Lab4
into a new project.
Create a new project called Lab6a
.
Make sure that you change the default local storage properties in the Java Options field in the Edit Run Configuration dialog box.
-Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.log.level=3
Figure 5-17 Turning Off Local Storage and Setting Log Level for the Runtime Configuration
Create a new class called Person
.
Copy and paste the source from the Person.java
file from Lab4
into this class. Modify the Person.java
file as shown in the given code snippet for Person class.
Implement the Key
class.
Ensure that you implement the getKey
method in the Person
class.
public Key getKey() { return new Key(this); } public void setId(int param) { this.id = param; }
Implement the KeyAssociation
class for data affinity.
Modify your Person.Key
class to implement the KeyAssociation
interface. You must do the following:
—Implement the getAssociatedKey
method within the Person.Key
class
—Add a new attribute to the Key
class called Lastname
—Initialize Lastname
in the constructor
Compile your new Person
class.
Restart your cache servers because the definition of the Person
object has changed.
Example 5-10 illustrates a possible implementation of the Person
class.
Example 5-10 Implementation of the Person Object with a Key Inner Class
package com.oracle.coherence.handson; import java.io.IOException; import com.tangosol.net.cache.KeyAssociation; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; public class Person implements PortableObject { private int id; private String lastname; private String firstname; private String address; private int age; private String gender; public static String MALE="M"; public static String FEMALE="F"; public Person() { } public Person(int id1, String lastname1, String firstname1, String address1, int age1, String gender1) { super(); this.id = id1; this.lastname = lastname1; this.firstname = firstname1; this.address = address1; this.age = age1; this.gender = gender1; } public void setId(int param) { this.id = param; } public Key getKey() { return new Key(this); } public int getId() { return id; } public void setLastname(String param) { this.lastname = param; } public String getLastname() { return lastname; } public void setFirstname(String param) { this.firstname = param; } public String getFirstname() { return firstname; } public void setAddress(String param) { this.address = param; } public String getAddress() { return address; } public void setAge(int param) { this.age = param; } public int getAge() { return age; } public void setGender(String param) { this.gender = param; } public String getGender() { return gender; } public double getAgeDouble() { return this.age; } @Override public boolean equals(Object object) { if (this == object) { return true; } if (!(object instanceof Person)) { return false; } final Person other = (Person)object; if (id != other.id) { return false; } if (!(lastname == null ? other.lastname == null : lastname.equals(other.lastname))) { return false; } if (!(firstname == null ? other.firstname == null : firstname.equals(other.firstname))) { return false; } if (!(address == null ? other.address == null : address.equals(other.address))) { return false; } if (age != other.age) { return false; } if (!(gender == null ? other.gender == null : gender.equals(other.gender))) { return false; } return true; } @Override public int hashCode() { final int PRIME = 37; int result = 1; result = PRIME * result + ((lastname == null) ? 0 : lastname.hashCode()); result = PRIME * result + ((firstname == null) ? 0 : firstname.hashCode()); result = PRIME * result + ((address == null) ? 0 : address.hashCode()); result = PRIME * result + ((gender == null) ? 0 : gender.hashCode()); return result; } public void readExternal(PofReader pofReader) throws IOException{ this.id = pofReader.readInt(0); this.lastname = pofReader.readString(1); this.firstname = pofReader.readString(2); this.address = pofReader.readString(3); this.age = pofReader.readInt(4); this.gender = pofReader.readString(5); } public void writeExternal(PofWriter pofWriter) throws IOException{ pofWriter.writeInt(0, this.id); pofWriter.writeString(1, this.lastname); pofWriter.writeString(2, this.firstname); pofWriter.writeString(3, this.address); pofWriter.writeInt(4, this.age); pofWriter.writeString(5, this.gender); } public static class Key implements PortableObject, KeyAssociation { // lets define a key of id and version private int id; private int version; private String lastname; public Key() { //for serializble } public Object getAssociatedKey() { return lastname; } public Key(int id, int version) { this.id = id; this.version = version; } public Key(Person p) { this.id = p.getId(); this.version = 1; // default this.lastname = p.getLastname(); } public void writeExternal(PofWriter dataOutput) throws IOException { dataOutput.writeInt(0, this.id); dataOutput.writeInt(1, this.version); dataOutput.writeString(2, this.lastname); } public void readExternal(PofReader dataInput) throws IOException { this.id = dataInput.readInt(0); this.version = dataInput.readInt(1); this.lastname = dataInput.readString(2); } @Override public boolean equals(Object object) { if (this == object) { return true; } if (!(object instanceof Person.Key)) { return false; } final Person.Key other = (Person.Key)object; if (id != other.id) { return false; } if (version != other.version) { return false; } if (lastname != other.lastname) { return false; } return true; } @Override public int hashCode() { final int PRIME = 37; int result = 1; return result; } } }
Create a KeyTester
class in your project to test your new key.
In the KeyTester
class, implement the code to create a new Person
object, and put and get the object from the cache. The KeyTester
class must have a main
method.
Example 5-11 illustrates a possible implementation.
Example 5-11 Sample Code to Create an Object and Put and Get it From the Cache
package com.oracle.coherence.handson; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; public class KeyTester { public KeyTester() { } public static void main(String[] args) { NamedCache person = CacheFactory.getCache("person"); Person p = new Person(1,"Middleton","Tim","Address",29,Person.MALE); person.put(p.getKey(), p); Person p2 = (Person)person.get(new Person.Key(p)); System.out.println("Person is " + p2.getFirstname() + " " + p2.getLastname()); } }
When you run the KeyTester
class you should see output similar to the following:
Person is Tim Middleton Process exited with exit code 0.
Create a new class called KeyTester2
with a main
method and add some new Person
objects.
Figure 5-20 Creating the KeyTester2 Class
Hint: You can use the code in Example 5-12 to add entries:
Example 5-12 Code to Add Entries to KeyTester2
... LinkedList peopleList = new LinkedList(); // build the list peopleList.add( new Person(1,"Flinstone","Fred","Address",29,Person.MALE)); peopleList.add( new Person(2,"Flinstone","Wilma","Address",29,Person.FEMALE)) ; peopleList.add( new Person(3,"Rubble","Barney","Address",44,Person.MALE)); peopleList.add( new Person(4,"Rubble","Betty","Address",44,Person.FEMALE)); for (java.util.Iterator iterator = peopleList.iterator(); iterator.hasNext();) { Person p = (Person)iterator.next(); person.put(p.getKey(), p); } ...
Now that you have added the entries, how do you check whether your data affinity has worked? There are several ways of doing this:
—You can use an EntryProcessor
to do a System.out.println
on all the Person
entries.
—You can use the CacheService
.getKeyOwner()
method to find out who owns the entry. The getKeyOwner()
returns the member that owns the key. Note that the entry does not have to exist. For example, you can find out which member will own a key before you create it. You can use the following:
Member m = ((DistributedCacheService)namedCache.getCacheService()).get KeyOwner(pp.getKey());
Add the code to get the member that owns each of the keys. After you get the member, you can use various methods to get information about the member, such as first name, last name and who owns it.
Example 5-13 illustrates a possible implementation of KeyTester2
.
Example 5-13 Sample Code for an Alternate KeyTester Class
package com.oracle.coherence.handson; import com.tangosol.net.CacheFactory; import com.tangosol.net.DistributedCacheService; import com.tangosol.net.Member; import com.tangosol.net.NamedCache; import java.util.LinkedList; public class KeyTester2 { public KeyTester2() { } public static void main(String[] args) { NamedCache person = CacheFactory.getCache("person"); LinkedList peopleList = new LinkedList(); // build the list peopleList.add( new Person(1,"Flinstone","Fred","Address",29,Person.MALE)); peopleList.add( new Person(2,"Flinstone","Wilma","Address",29,Person.FEMALE)); peopleList.add( new Person(3,"Rubble","Barney","Address",44,Person.MALE)); peopleList.add( new Person(4,"Rubble","Betty","Address",44,Person.FEMALE)); peopleList.add( new Person(5,"Rubble","Dino","Address",44,Person.FEMALE)); for (java.util.Iterator iterator = peopleList.iterator(); iterator.hasNext();) { Person p = (Person)iterator.next(); person.put(p.getKey(), p); Member m = ((DistributedCacheService)person.getCacheService()).getKeyOwner(p.getKey()); System.out.println("Person " + p.getFirstname() + " " + p.getLastname() + " is owned by member " + m.getId()); } } }
Run KeyTester2
with one cache server running. You should get an output similar to the following:
Person Fred Flinstone is owned by member 1 Person Wilma Flinstone is owned by member 1 Person Barney Rubble is owned by member 1 Person Betty Rubble is owned by member 1 Person Dino Rubble is owned by member 1
Start up a second cache server and rerun KeyTester2
. What happens?
You see that the same members own all the people with the same last name. If you still see all the people in the same member, add another cache server (and optionally more person objects). This should distribute the data, and you should see something like the following:
Person Fred Flinstone is owned by member 3 Person Wilma Flinstone is owned by member 3 Person Barney Rubble is owned by member 1 Person Betty Rubble is owned by member 1 Person Dino Rubble is owned by member 1 Process exited with exit code 0.