In this chapter, you learn how EntryProcessors
can be used to modify and perform processing on entries in the Coherence cache. This chapter contains the following sections:
Until now, to perform actions on the entries in a cache, you used the put and get operations. If you want to control concurrency to the data, you have the option of locking and unlocking keys. However, there is a better way to perform operations on data that ensure consistent behavior when concurrent data access is required. EntryProcessors
(com.tangosol.util.InvocableMap.EntryProcessor
) are agents that perform processing against entries, and will process entries directly where the data is being held. The sort of processing you perform may change the data, for example create, update, remove data, or may only perform calculations on the data. EntryProcessors
that work against the same key are logically queued. This means that you can achieve lock-free (high performance) processing. In Lab5
, you used an EntryAggregator
that is a type of EntryProcessor
to aggregate data across the grid. The com.tangosol.util.InvocableMap
interface (which the NamedCache
implements) has the following methods for operating on data:
• Object invoke(Object oKey, InvocableMap.EntryProcessor processor)
—Invokes the passed EntryProcessor
against an individual object and returns the result of the invocation
• Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)
—Invokes the EntryProcessor
against the collection of keys and returns the result for each invocation
• Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)
—Invokes the EntryProcessor
against the entries that match the filter and returns the result for each invocation
To create an entry process, you can extend com.tangosol.util.processes.AbstractProcessor
and implement the process()
method. For example, the following code creates an EntryProcessor
to increase the salary of all employees by 10%:
class RaiseSalary extents AbstractProcessor { ... public Object process (Entry entry) { Employee emp = (Employee)entry.getValue(); emp.setSalary(emp.getSalary() * 1.10); entry.setValue(emp); return null; }
To invoke the RaiseSalary
class, you perform the following:
empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary());
This section describes how to use EntryProcessors
to modify and perform processing on entries in the Coherence cache.
In this exercise you will use JDeveloper, to create the following:
An Employee
class to hold your employees
A RaiseSalary
class to be invoked on all entries
An InvokeTest
class to insert employees and run RaiseSalary
on
A class that shows where data is held in a cluster
To create Java classes to modify and perform processing on data entries:
Create a new class for the Employee
object.
Create a new project called Lab7
.
Make sure that you change the default local storage properties to the following values.
-Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.log.level=3
Figure 6-2 Edit the Runtime Configuration
Create an Employee
class that implements PortableObject
. Add the following attributes to the class:
—private int empId
—private String lastname
—private String firstname
—private double salary
Ensure that you implement all of the required methods. (Use the Source menu for this).
Example 6-1 illustrates a possible implementation of the Employee
class.
Example 6-1 Sample Employee Class
package com.oracle.coherence.handson; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.math.BigDecimal; public class Employee implements PortableObject { private int empId; private String lastname; private String firstname; private double salary; public Employee() { } public Employee(int empId1, String lastname1, String firstname1, double salary1) { super(); this.empId = empId1; this.lastname = lastname1; this.firstname = firstname1; this.salary = salary1; } public void setEmpId(int param) { this.empId = param; } public int getEmpId() { return empId; } public void setLastname(String param) { this.lastname = param; } public String getLastname() { return lastname; } public void setFirstname(String param) { this.firstname = param; } public String getFirstname() { return firstname; } public void setSalary(double param) { this.salary = param; } public double getSalary() { return salary; } @Override public boolean equals(Object object) { if (this == object) { return true; } if (!(object instanceof Employee)) { return false; } final Employee other = (Employee)object; if (empId != other.empId) { return false; } if (!(lastname == null ? other.lastname == null : lastname.equals(other.lastname))) { return false; } if (!(firstname == null ? other.firstname == null : firstname.equals(other.firstname))) { return false; } if (Double.compare(salary, other.salary) != 0) { return false; } return true; } @Override public int hashCode() { final int PRIME = 37; int result = 1; result = PRIME * result + ((lastname == null) ? 0 : lastname.hashCode()); result = PRIME * result + ((firstname == null) ? 0 : firstname.hashCode()); long temp = Double.doubleToLongBits(salary); result = PRIME * result + (int) (temp ^ (temp >>> 32)); return result; } public void readExternal(PofReader dataInput) throws IOException { this.empId = dataInput.readInt(0); this.lastname = dataInput.readString(1); this.firstname = dataInput.readString(2); this.salary = dataInput.readBigDecimal(3).doubleValue(); } public void writeExternal(PofWriter dataOutput) throws IOException { dataOutput.writeInt(0, this.empId); dataOutput.writeString(1, this.lastname); dataOutput.writeString(2, this.firstname); dataOutput.writeBigDecimal(3, new BigDecimal(this.salary)); } }
Create a class to increase the salary of employees by 10%.
Create a class called RaiseSalary
that extends AbstractProcessor
.
Implement the process()
method to raise the salary of an employee by 10%.
Example 6-2 illustrates a possible solution.
Example 6-2 Sample Raise Salary Class
package com.oracle.coherence.handson; import com.tangosol.util.processor.AbstractProcessor; import com.tangosol.util.InvocableMap.Entry; import java.util.Map; public class RaiseSalary extends AbstractProcessor { public RaiseSalary() { } public Object process(Entry entry ) { Employee emp = (Employee)entry.getValue(); emp.setSalary(emp.getSalary() * 1.10); entry.setValue(emp); return null; } }
Create a class to test whether the RaiseSalary
class works.
Create a class called InvokeTest
that contains a main
method and performs the following:
—Creates several Employee
objects and puts them in the employees
cache.
—Invokes the RaiseSalary
method on them.
—Prints out the new salaries to confirm the changes that have been made.
Figure 6-5 Creating the Invoke Test Class
Example 6-3 illustrates a possible solution.
Example 6-3 Sample Program to Test RaiseSalary Class
package com.oracle.coherence.handson; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; import com.tangosol.util.filter.AlwaysFilter; public class InvokeTest { public InvokeTest() { } public static void main(String[] args) { NamedCache empCache = CacheFactory.getCache("employees"); Employee e1 = new Employee(1,"Middleton","Tim",5000); empCache.put(e1.getEmpId(), e1); Employee e2 = new Employee(2,"Jones","Chris",10000); empCache.put(e2.getEmpId(), e2); empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary()); e1 = (Employee)empCache.get(e1.getEmpId()); e2 = (Employee)empCache.get(e2.getEmpId()); System.out.println("Salary for emp 1 is now: " + e1.getSalary()); System.out.println("Salary for emp 2 is now: " + e2.getSalary()); } }
When you first try to invoke InvokeTest
, you get a "class not found
" error. Why does this happen and how do you fix it?
When you invoke an EntryProcessor
, the request is processed on the storage-enabled member that contains the entry.
The Coherence cache server does not yet know about the RaiseSalary
or Employee
objects that you have created. You must add these to the CLASSPATH
of the Coherence cache server.
Edit the cache-server.cmd
file. Modify the -cp
entry on the line beginning with "%
java_exec%" -server...
to set the CLASSPATH
environment variable to include /home/oracle/labs/Lab7/classes
. The "%java_exec%"
line should look similar to the following:
"%java_exec%" -server -showversion "%java_opts%" -cp "%coherence_home%\lib\coherence.jar;C:\home\oracle\labs\Lab4\classes;C:\home\oracle\labs\Lab7\classes;C:\home\oracle\labs" com.tangosol.net.DefaultCacheServer %1
Restart the cache server.
Run InvokeTest
again to ensure that RaiseSalary
class works.
Figure 6-7 A Successful Run of the Invoke Test Class
Create a class to show where data is held within a cluster.
Create a class called SayHelloProcessor
, which extends AbstractProcessor
.
Figure 6-8 Creating the Say Hello Processor Class
Implement the process()
method to say "hello
".
Example 6-4 illustrates a possible solution.
Example 6-4 Sample Program to Show Where Data is Held
package com.oracle.coherence.handson; import com.tangosol.util.processor.AbstractProcessor; import com.tangosol.util.InvocableMap.Entry; public class SayHelloProcessor extends AbstractProcessor { public SayHelloProcessor() { } public Object process(Entry entry ) { Employee emp = (Employee)entry.getValue(); System.out.println("\nHello from " + emp.getFirstname() + " " + emp.getLastname() + "\n"); return null; } }
Create a new class called WhereAreMyEmployees
. Ensure that this class has a main
method.
Figure 6-9 Creating the Where Are My Employees Class
Example 6-5 illustrates a possible solution.
Example 6-5 Sample Program to Trace Member Location of Employees
package com.oracle.coherence.handson;import com.tangosol.net.CacheFactory;import com.tangosol.net.NamedCache;import com.tangosol.util.filter.AlwaysFilter;public class WhereAreMyEmployees { public WhereAreMyEmployees() { } public static void main(String[] args) { NamedCache empCache = CacheFactory.getCache("employees"); empCache.invokeAll(AlwaysFilter.INSTANCE, new SayHelloProcessor()); }}
Invoke the WhereAreMyEmployees
on all the entries in the cache.
Example 6-6 Output of WhereAreMyEmployees Class with One Cache Server Console
... 2008-12-30 18:01:04.411/25.985 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste r, member=1): Member(Id=3, Timestamp=2008-12-30 18:01:04.231, Address=130.35.99. 248:8089, MachineId=49912, Location=site:us.oracle.com,machine:tpfaeffl-pc,proce ss:5988, Role=OracleWhereAreMyEmployees) joined Cluster with senior member 1 Hello from Tim Middleton Hello from Chris Jones 2008-12-30 18:01:04.848/26.422 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste r, member=1): Member 3 joined Service DistributedCache with senior member 1 2008-12-30 18:01:04.848/26.422 Oracle Coherence GE 3.4.1/407 <D5> (thread=Distri butedCache, member=1): Service DistributedCache: sending ServiceConfigSync conta ining 259 entries to Member 3 2008-12-30 18:01:04.942/26.516 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste r, member=1): TcpRing: disconnected from member 2 due to the peer departure
Add some more Employees
and a second cache server. Run the WhereAreMyEmployees
class again. Observe the messages on the cache server console.
Example 6-7 illustrates the response from one cache server console.
Example 6-7 Response on Cache Server Console 1
2008-12-30 18:04:34.582/236.156 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluster, member=1): Member(Id=3, Timestamp=2008-12-30 18:04:34.433, Address=130.35.99.248:8090, MachineId=49912, Location=site:us.oracle.com,machine:tpfaeffl-pc,process:3560, Role=OracleWhereAreMyEmployees) joined Cluster with senior member 1Hello from Chris Jones2008-12-30 18:04:35.239/236.813 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluster, member=1): Member 3 joined Service DistributedCache with senior member 12008-12-30 18:04:35.254/236.828 Oracle Coherence GE 3.4.1/407 <D5> (thread=DistributedCache, member=1): Service DistributedCache: sending ServiceConfigSync containing 259 entries to Member 3
Example 6-8 illustrates the response from the second cache server console.
Example 6-8 Response on Cache Server Console 2
2008-12-30 18:04:35.239/27.047 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste r, member=2): Member 3 joined Service DistributedCache with senior member 1 Hello from Tim Middleton 2008-12-30 18:05:07.707/59.515 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste r, member=2): MemberLeft request for Member 3 received from Member(Id=1, Timesta mp=2008-12-30 18:00:40.286, Address=130.35.99.248:8088, MachineId=49912, Locatio n=site:us.oracle.com,machine:tpfaeffl-pc,process:4460, Role=CoherenceServer)