6 In-Place Processing of Data

In this chapter, you learn how EntryProcessors can be used to modify and perform processing on entries in the Coherence cache. This chapter contains the following sections:

6.1 Introduction

Until now, to perform actions on the entries in a cache, you used the put and get operations. If you want to control concurrency to the data, you have the option of locking and unlocking keys. However, there is a better way to perform operations on data that ensure consistent behavior when concurrent data access is required. EntryProcessors (com.tangosol.util.InvocableMap.EntryProcessor) are agents that perform processing against entries, and will process entries directly where the data is being held. The sort of processing you perform may change the data, for example create, update, remove data, or may only perform calculations on the data. EntryProcessors that work against the same key are logically queued. This means that you can achieve lock-free (high performance) processing. In Lab5, you used an EntryAggregator that is a type of EntryProcessor to aggregate data across the grid. The com.tangosol.util.InvocableMap interface (which the NamedCache implements) has the following methods for operating on data:

Object invoke(Object oKey, InvocableMap.EntryProcessor processor)—Invokes the passed EntryProcessor against an individual object and returns the result of the invocation

Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)—Invokes the EntryProcessor against the collection of keys and returns the result for each invocation

Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)—Invokes the EntryProcessor against the entries that match the filter and returns the result for each invocation

To create an entry process, you can extend com.tangosol.util.processes.AbstractProcessor and implement the process() method. For example, the following code creates an EntryProcessor to increase the salary of all employees by 10%:

class RaiseSalary extents AbstractProcessor { 
...
public Object process (Entry entry) { 
Employee emp = (Employee)entry.getValue(); 
emp.setSalary(emp.getSalary() * 1.10); 
entry.setValue(emp); 
return null; 
}

To invoke the RaiseSalary class, you perform the following:

empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary());

6.2 Modifying and Processing Data Entries

This section describes how to use EntryProcessors to modify and perform processing on entries in the Coherence cache.

In this exercise you will use JDeveloper, to create the following:

  • An Employee class to hold your employees

  • A RaiseSalary class to be invoked on all entries

  • An InvokeTest class to insert employees and run RaiseSalary on

  • A class that shows where data is held in a cluster

To create Java classes to modify and perform processing on data entries:

  1. Create a new class for the Employee object.

    1. Create a new project called Lab7.

      Figure 6-1 Creating a New Project

      Creating a New Project
      Description of "Figure 6-1 Creating a New Project"

    2. Make sure that you change the default local storage properties to the following values.

      -Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.log.level=3
      

      Figure 6-2 Edit the Runtime Configuration

      Edit the Runtime Configuration
      Description of "Figure 6-2 Edit the Runtime Configuration"

    3. Create an Employee class that implements PortableObject. Add the following attributes to the class:

      private int empId

      private String lastname

      private String firstname

      private double salary

      Figure 6-3 Creating the Employee Class

      Creating the Employee Class
      Description of "Figure 6-3 Creating the Employee Class"

    4. Ensure that you implement all of the required methods. (Use the Source menu for this).

      Example 6-1 illustrates a possible implementation of the Employee class.

      Example 6-1 Sample Employee Class

      package com.oracle.coherence.handson;
      
      import com.tangosol.io.pof.PofReader;
      import com.tangosol.io.pof.PofWriter;
      import com.tangosol.io.pof.PortableObject;
      
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      
      import java.math.BigDecimal;
      
      public class Employee implements PortableObject {
          private int empId;
          private String lastname;
          private String firstname;
          private double salary;
          
              
          public Employee() {
          }
      
          public Employee(int empId1, String lastname1, String firstname1, 
                          double salary1) {
              super();
              this.empId = empId1;
              this.lastname = lastname1;
              this.firstname = firstname1;
              this.salary = salary1;
          }
      
          public void setEmpId(int param) {
              this.empId = param;
          }
      
          public int getEmpId() {
              return empId;
          }
      
          public void setLastname(String param) {
              this.lastname = param;
          }
      
          public String getLastname() {
              return lastname;
          }
      
          public void setFirstname(String param) {
              this.firstname = param;
          }
      
          public String getFirstname() {
              return firstname;
          }
      
          public void setSalary(double param) {
              this.salary = param;
          }
      
          public double getSalary() {
              return salary;
          }
      
          @Override
          public boolean equals(Object object) {
              if (this == object) {
                  return true;
              }
              if (!(object instanceof Employee)) {
                  return false;
              }
              final Employee other = (Employee)object;
              if (empId != other.empId) {
                  return false;
              }
              if (!(lastname == null ? other.lastname == null : lastname.equals(other.lastname))) {
                  return false;
              }
              if (!(firstname == null ? other.firstname == null : firstname.equals(other.firstname))) {
                  return false;
              }
              if (Double.compare(salary, other.salary) != 0) {
                  return false;
              }
              return true;
          }
      
          @Override
          public int hashCode() {
              final int PRIME = 37;
              int result = 1;
              result = PRIME * result + ((lastname == null) ? 0 : lastname.hashCode());
              result = PRIME * result + ((firstname == null) ? 0 : firstname.hashCode());
              long temp = Double.doubleToLongBits(salary);
              result = PRIME * result + (int) (temp ^ (temp >>> 32));
              return result;
          }
          
          public void readExternal(PofReader dataInput) throws IOException {
              this.empId = dataInput.readInt(0);    
              this.lastname = dataInput.readString(1);
              this.firstname = dataInput.readString(2);
              this.salary  = dataInput.readBigDecimal(3).doubleValue();
      
          }
      
          public void writeExternal(PofWriter dataOutput) throws IOException {
              dataOutput.writeInt(0, this.empId);
              dataOutput.writeString(1, this.lastname);
              dataOutput.writeString(2, this.firstname);
              dataOutput.writeBigDecimal(3, new BigDecimal(this.salary));
              
          }
      }
      
  2. Create a class to increase the salary of employees by 10%.

    1. Create a class called RaiseSalary that extends AbstractProcessor.

      Figure 6-4 Create Raise Salary Class

      Create Raise Salary Class
      Description of "Figure 6-4 Create Raise Salary Class"

    2. Implement the process() method to raise the salary of an employee by 10%.

      Example 6-2 illustrates a possible solution.

      Example 6-2 Sample Raise Salary Class

      package com.oracle.coherence.handson;
      
      import com.tangosol.util.processor.AbstractProcessor;
      import com.tangosol.util.InvocableMap.Entry;
      
      import java.util.Map;
      
      public class RaiseSalary extends AbstractProcessor {
          public RaiseSalary() {
          }
          
          public Object process(Entry entry ) {
              Employee emp = (Employee)entry.getValue();
              emp.setSalary(emp.getSalary() * 1.10);
              entry.setValue(emp);
              return null;
          }
      }
      
  3. Create a class to test whether the RaiseSalary class works.

    1. Create a class called InvokeTest that contains a main method and performs the following:

      —Creates several Employee objects and puts them in the employees cache.

      —Invokes the RaiseSalary method on them.

      —Prints out the new salaries to confirm the changes that have been made.

      Figure 6-5 Creating the Invoke Test Class

      Creating the Invoke Test Class
      Description of "Figure 6-5 Creating the Invoke Test Class"

      Example 6-3 illustrates a possible solution.

      Example 6-3 Sample Program to Test RaiseSalary Class

      package com.oracle.coherence.handson;
      
      import com.tangosol.net.CacheFactory;
      import com.tangosol.net.NamedCache;
      import com.tangosol.util.filter.AlwaysFilter;
      
      public class InvokeTest {
          public InvokeTest() {
          }
      
          public static void main(String[] args) {
              
              NamedCache empCache = CacheFactory.getCache("employees");
              
              Employee e1 = new Employee(1,"Middleton","Tim",5000);
              empCache.put(e1.getEmpId(), e1);
              
              Employee e2 = new Employee(2,"Jones","Chris",10000);
              empCache.put(e2.getEmpId(), e2);
              
              empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary());
              
              e1 = (Employee)empCache.get(e1.getEmpId());
              e2 = (Employee)empCache.get(e2.getEmpId());
              
              System.out.println("Salary for emp 1 is now: " + e1.getSalary());
              System.out.println("Salary for emp 2 is now: " + e2.getSalary());
              
          }
      }
      
    2. When you first try to invoke InvokeTest, you get a "class not found" error. Why does this happen and how do you fix it?

      Figure 6-6 Class Not Found Error

      Class Not Found Error
      Description of "Figure 6-6 Class Not Found Error"

      When you invoke an EntryProcessor, the request is processed on the storage-enabled member that contains the entry.

      The Coherence cache server does not yet know about the RaiseSalary or Employee objects that you have created. You must add these to the CLASSPATH of the Coherence cache server.

      Edit the cache-server.cmd file. Modify the -cp entry on the line beginning with "%java_exec%" -server... to set the CLASSPATH environment variable to include /home/oracle/labs/Lab7/classes. The "%java_exec%" line should look similar to the following:

      "%java_exec%" -server -showversion "%java_opts%" -cp "%coherence_home%\lib\coherence.jar;C:\home\oracle\labs\Lab4\classes;C:\home\oracle\labs\Lab7\classes;C:\home\oracle\labs" com.tangosol.net.DefaultCacheServer %1
      
    3. Restart the cache server.

    4. Run InvokeTest again to ensure that RaiseSalary class works.

      Figure 6-7 A Successful Run of the Invoke Test Class

      Successful Run of the Invoke Test Class
      Description of "Figure 6-7 A Successful Run of the Invoke Test Class "

  4. Create a class to show where data is held within a cluster.

    1. Create a class called SayHelloProcessor, which extends AbstractProcessor.

      Figure 6-8 Creating the Say Hello Processor Class

      Creating the Say Hello Processor Class
      Description of "Figure 6-8 Creating the Say Hello Processor Class"

    2. Implement the process() method to say "hello".

      Example 6-4 illustrates a possible solution.

      Example 6-4 Sample Program to Show Where Data is Held

      package com.oracle.coherence.handson;
      
      import com.tangosol.util.processor.AbstractProcessor;
      import com.tangosol.util.InvocableMap.Entry;
      
      public class SayHelloProcessor extends AbstractProcessor {
          public SayHelloProcessor() {
          
          }
          
          public Object process(Entry entry ) {
              Employee emp = (Employee)entry.getValue();
              System.out.println("\nHello from " + emp.getFirstname() + " " + emp.getLastname() + "\n");
              return null;
          }
      }
      
    3. Create a new class called WhereAreMyEmployees. Ensure that this class has a main method.

      Figure 6-9 Creating the Where Are My Employees Class

      Creating the Where Are My Employees Class
      Description of "Figure 6-9 Creating the Where Are My Employees Class"

      Example 6-5 illustrates a possible solution.

      Example 6-5 Sample Program to Trace Member Location of Employees

      package com.oracle.coherence.handson;import com.tangosol.net.CacheFactory;import com.tangosol.net.NamedCache;import com.tangosol.util.filter.AlwaysFilter;public class WhereAreMyEmployees {    public WhereAreMyEmployees() {    }    public static void main(String[] args) {                NamedCache empCache = CacheFactory.getCache("employees");                empCache.invokeAll(AlwaysFilter.INSTANCE, new SayHelloProcessor());    }}
      
    4. Invoke the WhereAreMyEmployees on all the entries in the cache.

      Example 6-6 Output of WhereAreMyEmployees Class with One Cache Server Console

      ...
      2008-12-30 18:01:04.411/25.985 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=1): Member(Id=3, Timestamp=2008-12-30 18:01:04.231, Address=130.35.99.
      248:8089, MachineId=49912, Location=site:us.oracle.com,machine:tpfaeffl-pc,proce
      ss:5988, Role=OracleWhereAreMyEmployees) joined Cluster with senior member 1
      
      Hello from Tim Middleton
      
      Hello from Chris Jones
      
      2008-12-30 18:01:04.848/26.422 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=1): Member 3 joined Service DistributedCache with senior member 1
      2008-12-30 18:01:04.848/26.422 Oracle Coherence GE 3.4.1/407 <D5> (thread=Distri
      butedCache, member=1): Service DistributedCache: sending ServiceConfigSync conta
      ining 259 entries to Member 3
      2008-12-30 18:01:04.942/26.516 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=1): TcpRing: disconnected from member 2 due to the peer departure
      
    5. Add some more Employees and a second cache server. Run the WhereAreMyEmployees class again. Observe the messages on the cache server console.

      Example 6-7 illustrates the response from one cache server console.

      Example 6-7 Response on Cache Server Console 1

      2008-12-30 18:04:34.582/236.156 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluster, member=1): Member(Id=3, Timestamp=2008-12-30 18:04:34.433, Address=130.35.99.248:8090, MachineId=49912, Location=site:us.oracle.com,machine:tpfaeffl-pc,process:3560, Role=OracleWhereAreMyEmployees) joined Cluster with senior member 1Hello from Chris Jones2008-12-30 18:04:35.239/236.813 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluster, member=1): Member 3 joined Service DistributedCache with senior member 12008-12-30 18:04:35.254/236.828 Oracle Coherence GE 3.4.1/407 <D5> (thread=DistributedCache, member=1): Service DistributedCache: sending ServiceConfigSync containing 259 entries to Member 3
      

      Example 6-8 illustrates the response from the second cache server console.

      Example 6-8 Response on Cache Server Console 2

      2008-12-30 18:04:35.239/27.047 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=2): Member 3 joined Service DistributedCache with senior member 1
       
      Hello from Tim Middleton
       
      2008-12-30 18:05:07.707/59.515 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=2): MemberLeft request for Member 3 received from Member(Id=1, Timesta
      mp=2008-12-30 18:00:40.286, Address=130.35.99.248:8088, MachineId=49912, Locatio
      n=site:us.oracle.com,machine:tpfaeffl-pc,process:4460, Role=CoherenceServer)