ヘッダーをスキップ
Oracle Coherenceチュートリアル
リリース3.4.1
B53324-01
  目次
目次
索引
索引

戻る
戻る
 
次へ
次へ
 

6 インプレース・データ処理

この章では、EntryProcessorsを使用して、Coherenceキャッシュのエントリを変更および処理する方法について習得します。この章の内容は次のとおりです。

6.1 概要

これまでは、キャッシュ・エントリに対するアクションの実行に、putとgetの各操作を使用してきました。データの並行性を制御するには、キーのロックおよびロック解除のオプションを使用します。しかし、同時データ・アクセスが必要な場合に動作の一貫性を保証する、より優れたデータ操作方法も存在します。EntryProcessorscom.tangosol.util.InvocableMap.EntryProcessor)はエントリに対し処理を行うエージェントで、データが保持されているエントリを直接処理します。実行処理の種類によっては、データの作成、更新、削除など、データを変更する場合があります。または、データの計算処理のみを実行する場合もあります。同一キーに対して動作するEntryProcessorsは、論理的にキューに入れられます。これは、ロックフリー(高パフォーマンス)の処理が実現可能なことを意味します。Lab5では、EntryProcessorのタイプである EntryAggregatorを使用して、グリッド全体でデータを集計しました。com.tangosol.util.InvocableMapインタフェース(NamedCacheを実装する)には、データ操作用の次のメソッドが用意されています。

Object invoke(Object oKey, InvocableMap.EntryProcessor processor): 個々のオブジェクトについて、渡されたEntryProcessorを起動してその結果を返します。

Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor): キーのコレクションについて、EntryProcessorを起動してその結果を返します。

Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor): フィルタ条件に一致するエントリについて、EntryProcessorを起動してその結果を返します。

エントリ・クラスを作成するには、com.tangosol.util.processes.AbstractProcessorを拡張し、process()メソッドを実装できます。たとえば、次のコードは、全従業員の給与を10%増加させるEntryProcessorを作成します。

class RaiseSalary extents AbstractProcessor {
...
public Object process (Entry entry) {
Employee emp = (Employee)entry.getValue();
emp.setSalary(emp.getSalary() * 1.10);
entry.setValue(emp);
return null;
}

RaiseSalaryクラスを起動するには、次のコードを実行します。

empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary());

6.2 データ・エントリの変更と処理

この項では、EntryProcessorsを使用して、Coherenceキャッシュのエントリを変更および処理する方法について説明します。

この演習では、JDeveloperを使用して次のクラスを作成します。

データ・エントリの変更、および処理を行うJavaクラスを作成する手順は次のとおりです。

  1. Employeeオブジェクトに新規クラスを作成します。

    1. Lab7という新規プロジェクトを作成します。

      図6-1 新規プロジェクトの作成

      新規プロジェクトの作成
      「図6-1 新規プロジェクトの作成」の説明

    2. デフォルトのローカル記憶域プロパティを次の値に変更します。

      -Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.log.level=3
      

      図6-2 実行構成の編集

      実行構成の編集
      「図6-2 実行構成の編集」の説明

    3. PortableObjectを実装するEmployeeクラスを作成します。そのクラスに次の属性を追加します。

      private int empId

      private String lastname

      private String firstname

      private double salary

      図6-3 Employeeクラスの作成

      Employeeクラスの作成
      「図6-3 Employeeクラスの作成」の説明

    4. 必要なメソッドがすべて実装されたことを確認します(「ソース」メニューを使用する)。

      例6-1に、Employeeクラスの可能な実装を示します。

      例6-1 Employeeクラスのサンプル

      package com.oracle.coherence.handson;
      
      import com.tangosol.io.pof.PofReader;
      import com.tangosol.io.pof.PofWriter;
      import com.tangosol.io.pof.PortableObject;
      
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      
      import java.math.BigDecimal;
      
      public class Employee implements PortableObject {
          private int empId;
          private String lastname;
          private String firstname;
          private double salary;
      
      
          public Employee() {
          }
      
          public Employee(int empId1, String lastname1, String firstname1,
                          double salary1) {
              super();
              this.empId = empId1;
              this.lastname = lastname1;
              this.firstname = firstname1;
              this.salary = salary1;
          }
      
          public void setEmpId(int param) {
              this.empId = param;
          }
      
          public int getEmpId() {
              return empId;
          }
      
          public void setLastname(String param) {
              this.lastname = param;
          }
      
          public String getLastname() {
              return lastname;
          }
      
          public void setFirstname(String param) {
              this.firstname = param;
          }
      
          public String getFirstname() {
              return firstname;
          }
      
          public void setSalary(double param) {
              this.salary = param;
          }
      
          public double getSalary() {
              return salary;
          }
      
          @Override
          public boolean equals(Object object) {
              if (this == object) {
                  return true;
              }
              if (!(object instanceof Employee)) {
                  return false;
              }
              final Employee other = (Employee)object;
              if (empId != other.empId) {
                  return false;
              }
              if (!(lastname == null ? other.lastname == null : lastname.equals(other.lastname))) {
                  return false;
              }
              if (!(firstname == null ? other.firstname == null : firstname.equals(other.firstname))) {
                  return false;
              }
              if (Double.compare(salary, other.salary) != 0) {
                  return false;
              }
              return true;
          }
      
          @Override
          public int hashCode() {
              final int PRIME = 37;
              int result = 1;
              result = PRIME * result + ((lastname == null) ? 0 : lastname.hashCode());
              result = PRIME * result + ((firstname == null) ? 0 : firstname.hashCode());
              long temp = Double.doubleToLongBits(salary);
              result = PRIME * result + (int) (temp ^ (temp >>> 32));
              return result;
          }
      
          public void readExternal(PofReader dataInput) throws IOException {
              this.empId = dataInput.readInt(0);
              this.lastname = dataInput.readString(1);
              this.firstname = dataInput.readString(2);
              this.salary  = dataInput.readBigDecimal(3).doubleValue();
      
          }
      
          public void writeExternal(PofWriter dataOutput) throws IOException {
              dataOutput.writeInt(0, this.empId);
              dataOutput.writeString(1, this.lastname);
              dataOutput.writeString(2, this.firstname);
              dataOutput.writeBigDecimal(3, new BigDecimal(this.salary));
      
          }
      }
      
  2. 従業員の給与を10%増加させるクラスを作成します。

    1. AbstractProcessorを拡張するRaiseSalaryというクラスを作成します。

      図6-4 RaiseSalaryクラスの作成

      RaiseSalaryクラスの作成
      「図6-4 RaiseSalaryクラスの作成」の説明

    2. 従業員の給与を10%増加させるprocess()メソッドを実装します。

      例6-2に、可能な解決策を示します。

      例6-2 RaiseSalaryクラスのサンプル

      package com.oracle.coherence.handson;
      
      import com.tangosol.util.processor.AbstractProcessor;
      import com.tangosol.util.InvocableMap.Entry;
      
      import java.util.Map;
      
      public class RaiseSalary extends AbstractProcessor {
          public RaiseSalary() {
          }
      
          public Object process(Entry entry ) {
              Employee emp = (Employee)entry.getValue();
              emp.setSalary(emp.getSalary() * 1.10);
              entry.setValue(emp);
              return null;
          }
      }
      
  3. RaiseSalaryクラスの動作をテストするクラスを作成します。

    1. mainメソッドを含み、次のように実行されるInvokeTestというクラスを作成します。

      —複数のEmployeeオブジェクトを作成し、employeesキャッシュに追加します。

      —作成したオブジェクトでRaiseSalaryメソッドを起動します。

      —新しい給与を出力し、その変更内容を確認します。

      図6-5 InvokeTestクラスの作成

      InvokeTestクラスの作成
      「図6-5 InvokeTestクラスの作成」の説明

      例6-3に、可能な解決策を示します。

      例6-3 RaiseSalaryクラスをテストするサンプル・プログラム

      package com.oracle.coherence.handson;
      
      import com.tangosol.net.CacheFactory;
      import com.tangosol.net.NamedCache;
      import com.tangosol.util.filter.AlwaysFilter;
      
      public class InvokeTest {
          public InvokeTest() {
          }
      
          public static void main(String[] args) {
      
              NamedCache empCache = CacheFactory.getCache("employees");
      
              Employee e1 = new Employee(1,"Middleton","Tim",5000);
              empCache.put(e1.getEmpId(), e1);
      
              Employee e2 = new Employee(2,"Jones","Chris",10000);
              empCache.put(e2.getEmpId(), e2);
      
              empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary());
      
              e1 = (Employee)empCache.get(e1.getEmpId());
              e2 = (Employee)empCache.get(e2.getEmpId());
      
              System.out.println("Salary for emp 1 is now: " + e1.getSalary());
              System.out.println("Salary for emp 2 is now: " + e2.getSalary());
      
          }
      }
      
    2. InvokeTestを最初に起動しようとすると、「class not found」エラーが発生します。 その原因と解決方法について示します。

      図6-6 Class Not Foundエラー

      Class Not Foundエラー
      「図6-6 Class Not Foundエラー」の説明

      EntryProcessorを起動する場合、そのリクエストは、エントリを含む記憶域が有効なメンバー上で処理されます。

      Coherenceキャッシュ・サーバーは、作成したRaiseSalaryオブジェクトやEmployeeオブジェクトをまだ認識していません。これらのオブジェクトは、Coherenceキャッシュ・サーバーのCLASSPATHに追加する必要があります。

      cache-server.cmdファイルを編集します。"%java_exec%" -server...で始まる行の-cpエントリを変更し、/home/oracle/labs/Lab7/classesを含めるようにCLASSPATH環境変数を設定します。"%java_exec%"の行は、次のようになります。

      "%java_exec%" -server -showversion "%java_opts%" -cp "%coherence_home%\lib\coherence.jar;C:\home\oracle\labs\Lab4\classes;C:\home\oracle\labs\Lab7\classes;C:\home\oracle\labs" com.tangosol.net.DefaultCacheServer %1
      
    3. キャッシュ・サーバーを再起動します。

    4. InvokeTestを再実行し、RaiseSalaryクラスが機能していることを確認します。

      図6-7 InvokeTestクラスの正常な実行

      InvokeTestクラスの正常な実行
      「図6-7 InvokeTestクラスの正常な実行」の説明

  4. クラスタ内のデータの格納先を示すクラスを作成します。

    1. AbstractProcessorを拡張するSayHelloProcessorというクラスを作成します。

      図6-8 SayHelloProcessorクラスの作成

      SayHelloProcessorクラスの作成
      「図6-8 SayHelloProcessorクラスの作成」の説明

    2. hello」と表示するprocess()メソッドを実装します。

      例6-4に、可能な解決策を示します。

      例6-4 データの格納先を示すサンプル・プログラム

      package com.oracle.coherence.handson;
      
      import com.tangosol.util.processor.AbstractProcessor;
      import com.tangosol.util.InvocableMap.Entry;
      
      public class SayHelloProcessor extends AbstractProcessor {
          public SayHelloProcessor() {
      
          }
      
          public Object process(Entry entry ) {
              Employee emp = (Employee)entry.getValue();
              System.out.println("\nHello from " + emp.getFirstname() + " " + emp.getLastname() + "\n");
              return null;
          }
      }
      
    3. WhereAreMyEmployeesという新規クラスを作成します。このクラスにmainメソッドが存在することを確認します。

      図6-9 WhereAreMyEmployeesクラスの作成

      WhereAreMyEmployeesクラスの作成
      「図6-9 WhereAreMyEmployeesクラスの作成」の説明

      例6-5に、可能な解決策を示します。

      例6-5 Employeesのメンバーの場所を追跡するサンプル・プログラム

      package com.oracle.coherence.handson;import com.tangosol.net.CacheFactory;import com.tangosol.net.NamedCache;import com.tangosol.util.filter.AlwaysFilter;public class WhereAreMyEmployees {    public WhereAreMyEmployees() {    }    public static void main(String[] args) {                NamedCache empCache = CacheFactory.getCache("employees");                empCache.invokeAll(AlwaysFilter.INSTANCE, new SayHelloProcessor());    }}
      
    4. キャッシュのすべてのエントリで、WhereAreMyEmployeesを起動します。

      例6-6 1つのキャッシュ・サーバー・コンソールにおけるWhereAreMyEmployeesクラスの出力

      ...
      2008-12-30 18:01:04.411/25.985 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=1): Member(Id=3, Timestamp=2008-12-30 18:01:04.231, Address=130.35.99.
      248:8089, MachineId=49912, Location=site:us.oracle.com,machine:tpfaeffl-pc,proce
      ss:5988, Role=OracleWhereAreMyEmployees) joined Cluster with senior member 1
      
      Hello from Tim Middleton
      
      Hello from Chris Jones
      
      2008-12-30 18:01:04.848/26.422 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=1): Member 3 joined Service DistributedCache with senior member 1
      2008-12-30 18:01:04.848/26.422 Oracle Coherence GE 3.4.1/407 <D5> (thread=Distri
      butedCache, member=1): Service DistributedCache: sending ServiceConfigSync conta
      ining 259 entries to Member 3
      2008-12-30 18:01:04.942/26.516 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=1): TcpRing: disconnected from member 2 due to the peer departure
      
    5. その他のEmployeesと、2つ目のキャッシュ・サーバーを追加します。WhereAreMyEmployeesクラスを再実行します。キャッシュ・サーバー・コンソール上のメッセージを監視します。

      例6-7に、1つ目のキャッシュ・サーバー・コンソールからの応答を示します。

      例6-7 キャッシュ・サーバー・コンソール1の応答

      2008-12-30 18:04:34.582/236.156 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluster, member=1): Member(Id=3, Timestamp=2008-12-30 18:04:34.433, Address=130.35.99.248:8090, MachineId=49912, Location=site:us.oracle.com,machine:tpfaeffl-pc,process:3560, Role=OracleWhereAreMyEmployees) joined Cluster with senior member 1Hello from Chris Jones2008-12-30 18:04:35.239/236.813 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluster, member=1): Member 3 joined Service DistributedCache with senior member 12008-12-30 18:04:35.254/236.828 Oracle Coherence GE 3.4.1/407 <D5> (thread=DistributedCache, member=1): Service DistributedCache: sending ServiceConfigSync containing 259 entries to Member 3
      

      例6-8に、2つ目のキャッシュ・サーバー・コンソールからの応答を示します。

      例6-8 キャッシュ・サーバー・コンソール2の応答

      2008-12-30 18:04:35.239/27.047 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=2): Member 3 joined Service DistributedCache with senior member 1
      
      Hello from Tim Middleton
      
      2008-12-30 18:05:07.707/59.515 Oracle Coherence GE 3.4.1/407 <D5> (thread=Cluste
      r, member=2): MemberLeft request for Member 3 received from Member(Id=1, Timesta
      mp=2008-12-30 18:00:40.286, Address=130.35.99.248:8088, MachineId=49912, Locatio
      n=site:us.oracle.com,machine:tpfaeffl-pc,process:4460, Role=CoherenceServer)