ヘッダーをスキップ
Oracle® Coherenceチュートリアル
リリース3.7.1
B65030-01
  ドキュメント・ライブラリへ移動
ライブラリ
製品リストへ移動
製品
目次へ移動
目次
索引へ移動
索引

前
 
次
 

7 変更のリスニングとデータの変更

この章では、Coherenceキャッシュ内のデータの変更を監視するリスナーを設定します。また、エントリ・プロセッサを使用してCoherenceキャッシュ内のエントリを変更したり処理したりする方法を学習します。

この章では次の項について説明します。

7.1 概要

com.tangosol.util.ObservableMapインタフェースを使用すると、キャッシュ・エントリに対する変更を監視して、変更に対応できます。このインタフェースはjava.util.EventListenerを拡張し、標準のJavaBeansイベント・モデルを使用します。すべてのタイプのNamedCacheインスタンスがこのインタフェースを実装します。イベントをリスニングするには、MapListener(com.tangosol.util.MapListener)インスタンスをキャッシュに登録します。MapListenerインスタンスがクライアントでコールされます。つまり、リスナー・コードがクライアント・プロセスで実行されます。

イベントをリスニングするには、次のような方法があります。

これらのリスナー・タスクは、例7-1に示したaddMapListenerメソッドを使用してNamedCacheで実行できます。

例7-1 NamedCacheのリスナー・メソッド

void addMapListener(MapListener listener) 

void addMapListener(MapListener listener, Filter filter, boolean fLite) 

void addMapListener(MapListener listener, Object oKey, boolean fLite)

com.tangosol.util.MapEvent クラスは、オブジェクト・キーおよび以前の値と新しい値を取得します。Liteイベントを指定することができますが、このイベントには新しい値や以前の値が存在しない可能性があります。例7-2では、これらのメソッドをNamedCacheに登録するパターンを示しています。これは、無名クラスとして実行されています。MapEventクラスのgetOldValueメソッドやgetNewValueメソッドを使用して、イベントを起動する対象となるエントリを取得できます。

例7-2 イベント登録のコード・パターン

namedCache.addMapListener(new MapListener() {
public void entryDeleted(MapEvent mapEvent) 
    {
    // TODO... handle deletion event 
    } 
public void entryInserted(MapEvent mapEvent) 
    {
    // TODO... handle inserted event 
    } 
public void entryUpdated(MapEvent mapEvent) 
    { 
     // TODO... handle updated event } }
    )

7.2 キャッシュ・リスナーの作成

この項では、NamedCacheでリスニングして、検出したあらゆる変更に対応する、Javaクラスの作成方法を説明します。

  1. キャッシュ内の変更をリスニングするクラスの作成

  2. キャッシュ・リスナー・サンプルの実行

7.2.1 キャッシュ内の変更をリスニングするクラスの作成

Loadingプロジェクトでは、新規Contactオブジェクトのエントリをリスニングするクラスを作成します。このクラスをObserverExampleと名付け、そのクラスにmainメソッドが存在することを確認します。詳細は、「Javaクラスの作成」を参照してください。

このクラス内に、新しいContactがキャッシュを更新されるたびにメッセージを表示するリスナーを追加します。たとえば、次のコードを使用すると、コンソールから読み取るまでJavaプロセスを実行させたままにできます。そうしないと、プログラムはただちに終了します。

BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();

このクラス内に、内部クラスを作成してAbstractMapListnerを拡張します。メソッドを実装して、キャッシュの値を挿入、更新および削除します。この場合、大半の処理はMapEventに含まれる新旧の値に基づいて、entryUpdatedメソッドで実行されます。

例7-3は、リスナー・クラスの実装例を示しています。

例7-3 リスナー・クラスのサンプル

package com.oracle.handson;
 
import com.tangosol.net.NamedCache;
 
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.MapEvent;
 
import com.oracle.handson.Contact;
 
import com.tangosol.net.CacheFactory;
 
import java.io.IOException;
 
 
/**
 * ObserverExample observes changes to contacts.
 */
public class ObserverExample
    {
    
    public ObserverExample() 
    {
    }
    // ----- ObserverExample methods -------------------------------------

    public static void main(String[] args) {
      NamedCache cache = CacheFactory.getCache("ContactsCache");
      new ObserverExample().observe(cache);
        try {
            System.in.read();
        } catch (IOException e) {
        }
    }
    /**
    * Observe changes to the contacts.
    *
    * @param cache  target cache
    */
    public void observe(NamedCache cache)
        {
        cache.addMapListener(new ContactChangeListener());
        }
 
    // ----- inner class: ContactChangeListener -------------------------
 
    public class ContactChangeListener
            extends AbstractMapListener
        {
        // ----- MapListener interface ------------------------------------------

        public void entryInserted(MapEvent event)
            {
            System.out.println(event);
            }
 
        public void entryUpdated(MapEvent event)
            {
            Contact contactOld = (Contact)event.getOldValue();
            Contact contactNew = (Contact)event.getNewValue();
            StringBuffer sb  = new StringBuffer();
 
            if (!contactOld.getHomeAddress().equals(
                    contactNew.getHomeAddress()))
                {
                sb.append("Home address ");
                }
            if (!contactOld.getWorkAddress().equals(
                    contactNew.getWorkAddress()))
                {
                sb.append("Work address ");
                }
            if (!contactOld.getTelephoneNumbers().equals(
                    contactNew.getTelephoneNumbers()))
                {
                sb.append("Telephone ");
                }
            if (contactOld.getAge() != contactNew.getAge())
                {
                sb.append("Birthdate ");
                }
            sb.append("was updated for ").append(event.getKey());
            System.out.println(sb);
            }
 
        public void entryDeleted(MapEvent event)
            {
            System.out.println(event.getKey());
            }
        }
    }

7.2.2 キャッシュ・リスナー・サンプルの実行

キャッシュ・リスナー・サンプルを実行するには:

  1. ObserverExampleの実行構成を作成します。「Project Explorer」のObserverExampleを右クリックし、「Run As」を選択します。「Run Configurations」ダイアログ・ボックスで、「Oracle Coherence」を選択して「New Configuration」アイコンをクリックします。

    1. Name」フィールドにObserverExampleと入力します。

    2. Main」タブの「Project」フィールドにLoadingと入力します。「Main class」フィールドにcom.oracle.handson.ObserverExampleと入力します。

    3. Coherence」タブの「General」タブで、「Cache configuration descriptor」フィールドのc:\home\oracle\workspace\Contacts\appClientModule\coherence-cache-config.xmlファイルを参照します。「Disabled (cache client)」ボタンを選択します。「Cluster port」フィールドに3155と入力します。「Apply」をクリックします。

      Other」タブで、「tangosol.pof.config」フィールドまで下方へスクロールします。POF構成ファイルcontacts-pof-config.xmlへの絶対パスを入力します。「Apply」をクリックします。

    4. Common」タブで、「Shared file」を選択してLoadingディレクトリを参照します。

  2. Loadingプロジェクトのクラス(C:\home\oracle\workspace\Loading\build\classes)と構成ファイル(C:\home\oracle\workspace)へのパスがcontacts-cache-server.cmdファイル内に存在することを確認します。

  3. 実行中のキャッシュ・サーバーがあれば停止します。詳細は、「キャッシュ・サーバーの停止」を参照してください。

  4. ContactsCacheServerを起動します。

  5. EclipseからLoaderExampleプログラムを実行してキャッシュをロードします。ObserverExampleを実行していれば、例7-4に示すようにプログラムは入力を待機しています。

    キャッシュ内の変更への対応」では、キャッシュのエントリを変更して変更済の記録を返すプログラムを作成します。

    例7-4 イベントを待機するリスナー・プログラム

    ...
    MasterMemberSet
      (
      ThisMember=Member(Id=3, Timestamp=2011-03-15 11:57:03.569, Address=130.35.99.213:8090, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:792, Role=OracleHandsonObserverExample)
      OldestMember=Member(Id=1, Timestamp=2011-03-15 11:56:32.959, Address=130.35.99.213:8088, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:4864, Role=CoherenceServer)
      ActualMemberSet=MemberSet(Size=2, BitSetCount=2
        Member(Id=1, Timestamp=2011-03-15 11:56:32.959, Address=130.35.99.213:8088, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:4864, Role=CoherenceServer)
        Member(Id=3, Timestamp=2011-03-15 11:57:03.569, Address=130.35.99.213:8090, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:792, Role=OracleHandsonObserverExample)
        )
      RecycleMillis=1200000
      RecycleSet=MemberSet(Size=0, BitSetCount=0
        )
      )
     
    TcpRing{Connections=[1]}
    IpMonitor{AddressListSize=0}
     
    2011-03-15 11:57:03.803/1.297 Oracle Coherence GE 3.7.0.0 <D5> (thread=Invocation:Management, member=3): Service Management joined the cluster with senior service member 1
    2011-03-15 11:57:03.897/1.391 Oracle Coherence GE 3.7.0.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded POF configuration from "file:/C:/home/oracle/workspace/Contacts/appClientModule/contacts-pof-config.xml"
    2011-03-15 11:57:03.913/1.407 Oracle Coherence GE 3.7.0.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded included POF configuration from "jar:file:/C:/oracle/product/coherence/lib/coherence.jar!/coherence-pof-config.xml"
    2011-03-15 11:57:03.959/1.453 Oracle Coherence GE 3.7.0.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=3): Service PartitionedPofCache joined the cluster with senior service member 1
    

7.3 キャッシュ内の変更への対応

この項では、キャッシュ内のエントリを変更して変更済の記録を返すJavaクラスを作成します。

これまで、キャッシュ内のエントリにおけるアクションを実行するために、put操作とget操作を使用しました。しかし、データへの同時アクセが必要な場合に確実に動作の一貫性が保たれる、より優れたデータ操作方法があります。エントリ・プロセッサ(com.tangosol.util.InvocableMap.EntryProcessor)はエントリに対する処理を実行するエージェントです。エントリは、データが保存されている場所で直接処理されます。実行する処理によりデータが変更されることがあり、データが作成、更新および削除されたりする場合も、計算のみが実行されたりする場合もあります。複数のノードを持つパーティション化されたキャッシュ内で並行して処理が実行できるため、スケーラブルになります。また、処理のためにデータがクライアントに取得されないため、キャッシュ内の処理でI/Oの消費を節約できます。

同じキーで機能するエントリ・プロセッサは、論理的にはキューに入れられます。これにより、ロックされない(高性能)処理が可能になります。com.tangosol.util.InvocableMapインタフェース(NamedCacheの実装による)には、データ操作に対して次のメソッドがあります。

Object invoke(Object oKey, InvocableMap.EntryProcessor processor)。個々のオブジェクトに対して渡されたEntryProcessorを起動し、その起動の結果を返します。

Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)。キーのコレクションに対してEntryProcessorを起動し、各起動の結果を返します。

Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)。フィルタに一致するエントリに対してEntryProcessorを起動し、各起動の結果を返します。


注意:

各クラスタ・ノードのクラス・パスで、EntryProcessorクラスを使用できる必要があります。

エントリ・プロセスを作成するには、com.tangosol.util.processes.AbstractProcessorを拡張してprocess()メソッドを実装します。たとえば、次のコードはContactsデータ・セット内の従業員の勤務先アドレスを変更するためのEntryProcessorインスタンスを作成します。

public static class OfficeUpdater extends AbstractProcessor
        implements PortableObject
  ...
    public Object process(InvocableMap.Entry entry)  
        {
        Contact contact = (Contact) entry.getValue();
        contact.setWorkAddress(m_addrWork);
        entry.setValue(contact);
        return null;
        }

OfficeUpdaterクラスを起動するには、OfficeUpdaterクラスの名前を持つinvokeAllメソッドをその引数の1つとして使用します。

cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
             new OfficeUpdater(addrWork));

この演習では、キャッシュ内のエントリを更新するEntryProcessorインスタンスを備えたJavaクラスを作成します。「キャッシュ内の変更をリスニングするクラスの作成」で作成したObserverExampleクラスは、これらの変更を検出して変更済の記録を表示します。

  1. キャッシュ内のエントリを更新するクラスの作成

  2. POF構成ファイルの編集

  3. キャッシュ更新サンプルの実行

7.3.1 キャッシュ内のエントリを更新するクラスの作成

キャッシュ内のエントリを更新するためのファイルを作成するには:

  1. キャッシュ内のエントリを更新するクラスを作成します。

    Loadingプロジェクトで、キャッシュ内のContactオブジェクトの住所を更新するmainメソッドを備えたProcessorExampleという名前のクラスを作成します。詳細は、「Javaクラスの作成」を参照してください。

  2. マサチューセッツ州に住み、勤務先の住所を州内のオフィスに更新するContactsオブジェクトの記録を検索するコードを記述します。

    PortableObjectインタフェース(キャッシュからのデータのシリアライズおよぎデシリアライズ用)を実装し、勤務先住所を設定するためのEntryProcessorインスタンスを持つ内部クラスを追加します。Filterクラスのメソッドを使用して、マサチューセッツ州内に自宅住所があるContactsメンバーを分離します。

    例7-5は、考えられるProcessorExampleクラスの実装を示しています。

    例7-5 キャッシュ内のオブジェクトを更新するプログラムのサンプル

    package com.oracle.handson;
    
    import com.tangosol.net.NamedCache;
    
    import com.tangosol.util.filter.EqualsFilter;
    import com.tangosol.util.processor.AbstractProcessor;
    import com.tangosol.util.InvocableMap;
    
    import com.tangosol.io.pof.PortableObject;
    import com.tangosol.io.pof.PofReader;
    import com.tangosol.io.pof.PofWriter;
    
    import com.oracle.handson.Address;
    import com.oracle.handson.Contact;
    
    import com.tangosol.net.CacheFactory;
    
    import java.io.IOException;
    
    /**
    * ProcessorExample executes an example EntryProcessor.
    *
    */
    public class ProcessorExample
        {
        public ProcessorExample() 
        {
        }
            public static void main(String[] args) 
              {
              NamedCache cache = CacheFactory.getCache("ContactsCache");
              new ProcessorExample().execute(cache);
              }
        // ----- ProcessorExample methods -----------------------------------
    
        public void execute(NamedCache cache)
            {
            // People who live in Massachusetts moved to an in-state office
            Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.",
                    "Boston", "MA", "02116", "US");
    
            cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
                    new OfficeUpdater(addrWork));
            }
        // ----- nested class: OfficeUpdater ------------------------------------
    
        /**
        * OfficeUpdater updates a contact's office address.
        */
        public static class OfficeUpdater
                extends AbstractProcessor
                implements PortableObject
            {
            // ----- constructors -------------------------------------------
    
            /**
            * Default constructor (necessary for PortableObject implementation).
            */
            public OfficeUpdater() 
                {
                }
    
            public OfficeUpdater(Address addrWork) 
               {
                m_addrWork = addrWork;
                }
    
            // ----- InvocableMap.EntryProcessor interface ------------------
    
            public Object process(InvocableMap.Entry entry)
                {
                Contact contact = (Contact) entry.getValue();
    
                contact.setWorkAddress(m_addrWork);
                entry.setValue(contact);
                System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName());
                return null;
                }
    
            // ----- PortableObject interface -------------------------------
    
            public void readExternal(PofReader reader)
                    throws IOException
                {
                m_addrWork = (Address) reader.readObject(0);
                }
    
            public void writeExternal(PofWriter writer)
                    throws IOException
                {
                writer.writeObject(0, m_addrWork);
                }
    
            // ----- data members -------------------------------------------
    
            private Address m_addrWork;
            }
        }
    

7.3.2 POF構成ファイルの編集

contacts-pof-config.xmlファイルを編集して、OfficeUpdaterエントリのユーザー定義型IDを追加します。ここでは、ProcessorExample$OfficeUpdaterクラスの定義型ID106を追加します。

...
<user-type>
       <type-id>1006</type-id>
      <class-name>com.oracle.handson.
 ProcessorExample$OfficeUpdater</class-name>
</user-type>
...

7.3.3 キャッシュ更新サンプルの実行

キャッシュ更新サンプルを実行するには:

  1. ProcessorExampleの実行構成を作成します。「Project Explorer」のObserverExampleを右クリックし、「Run As」を選択します。「Run Configurations」ダイアログ・ボックスで、Oracle Coherenceを選択して「New Configuration」アイコンをクリックします。

    • Name」フィールドにProcessorExampleと入力します。

    • Main」タブの「Project」フィールドにLoadingと入力します。「Main class」フィールドにcom.oracle.handson.ProcessorExampleと入力します。

    • Coherence」タブの「General」タブで、「Cache configuration descriptor」フィールドのc:\home\oracle\workspace\Contacts\appClientModule\coherence-cache-config.xmlファイルを参照します。「Disabled (cache client)」ボタンを選択します。「Cluster port」フィールドに3155と入力します。「Apply」をクリックします。

      Other」タブで、「tangosol.pof.config」フィールドまで下方へスクロールします。POF構成ファイルcontacts-pof-config.xmlへの絶対パスを入力します。「Apply」をクリックします。

    • Common」タブで、「Shared file」を選択してLoadingディレクトリを参照します。

  2. 次の手順を実行して、ObserverExampleクラスとProcessorExampleクラスをテストします。

    1. 実行中のキャッシュ・サーバーがあれば停止します。詳細は、「キャッシュ・サーバーの停止」を参照してください。

    2. ContactsCacheServerを再起動します。

    3. LoaderExampleクラスを実行して、キャッシュをロードします。

    4. ObserverExampleクラスを実行します。

    5. ProcessorExampleを実行して、キャッシュ内の記録を更新します。

      キャッシュ・サーバーのコンソール・ウィンドウに表示される例7-6のようなメッセージを確認してください。そのメッセージには、特定の従業員の勤務先住所が更新されたことについて記載されています。

      例7-6 ObserverExampleクラスとProcessorExampleクラスからの出力

      ...
      Group{Address=224.3.7.0, Port=3155, TTL=4}
       
      MasterMemberSet
        (
        ThisMember=Member(Id=3, Timestamp=2011-03-15 12:20:17.538, Address=130.35.99.213:8090, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:4396, Role=OracleHandsonObserverExample)
        OldestMember=Member(Id=1, Timestamp=2011-03-15 12:17:47.491, Address=130.35.99.213:8088, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:1980, Role=CoherenceServer)
        ActualMemberSet=MemberSet(Size=2, BitSetCount=2
          Member(Id=1, Timestamp=2011-03-15 12:17:47.491, Address=130.35.99.213:8088, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:1980, Role=CoherenceServer)
          Member(Id=3, Timestamp=2011-03-15 12:20:17.538, Address=130.35.99.213:8090, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:4396, Role=OracleHandsonObserverExample)
          )
        RecycleMillis=1200000
        RecycleSet=MemberSet(Size=0, BitSetCount=0
          )
        )
       
      TcpRing{Connections=[1]}
      IpMonitor{AddressListSize=0}
       
      2011-03-15 12:20:17.772/1.313 Oracle Coherence GE 3.7.0.0 <D5> (thread=Invocation:Management, member=3): Service Management joined the cluster with senior service member 1
      2011-03-15 12:20:17.866/1.407 Oracle Coherence GE 3.7.0.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded POF configuration from "file:/C:/home/oracle/workspace/Contacts/appClientModule/contacts-pof-config.xml"
      2011-03-15 12:20:17.881/1.422 Oracle Coherence GE 3.7.0.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded included POF configuration from "jar:file:/C:/oracle/product/coherence/lib/coherence.jar!/coherence-pof-config.xml"
      2011-03-15 12:20:17.928/1.469 Oracle Coherence GE 3.7.0.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=3): Service PartitionedPofCache joined the cluster with senior service member 1
      2011-03-15 12:20:33.850/17.391 Oracle Coherence GE 3.7.0.0 <D5> (thread=Cluster, member=3): Member(Id=4, Timestamp=2011-03-15 12:20:33.647, Address=130.35.99.213:8092, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:4400, Role=OracleHandsonProcessorExample) joined Cluster with senior member 1
      2011-03-15 12:20:33.913/17.454 Oracle Coherence GE 3.7.0.0 <D5> (thread=Cluster, member=3): Member 4 joined Service Management with senior member 1
      2011-03-15 12:20:34.084/17.625 Oracle Coherence GE 3.7.0.0 <D5> (thread=Cluster, member=3): Member 4 joined Service PartitionedPofCache with senior member 1
      Work address was updated for John Yuoo
      Work address was updated for John Dikkx
      Work address was updated for John Kwtkvp
      Work address was updated for John Tdqpyncf
      Work address was updated for John Bophtnbxig
      Work address was updated for John Sqyiohnpcj
      Work address was updated for John Vjfdqs
      Work address was updated for John Cudpahnugc
      Work address was updated for John Wovzeja
      Work address was updated for John Woyy
      Work address was updated for John Peladt
      Work address was updated for John Nuetsjd
      Work address was updated for John Oueywut
      Work address was updated for John Uanwypjz
      Work address was updated for John Xfazifx
      Work address was updated for John Qdnod
      Work address was updated for John Sgiephelfq
      Work address was updated for John Oajpabav
      2011-03-15 12:20:34.272/17.813 Oracle Coherence GE 3.7.0.0 <D5> (thread=Cluster, member=3): MemberLeft notification for Member(Id=4, Timestamp=2011-03-15 12:20:33.647, Address=130.35.99.213:8092, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:4400, Role=OracleHandsonProcessorExample) received from Member(Id=1, Timestamp=2011-03-15 12:17:47.491, Address=130.35.99.213:8088, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:1980, Role=CoherenceServer)
      2011-03-15 12:20:34.272/17.813 Oracle Coherence GE 3.7.0.0 <D5> (thread=Cluster, member=3): Member 4 left service Management with senior member 1
      2011-03-15 12:20:34.272/17.813 Oracle Coherence GE 3.7.0.0 <D5> (thread=Cluster, member=3): Member 4 left service PartitionedPofCache with senior member 1
      2011-03-15 12:20:34.272/17.813 Oracle Coherence GE 3.7.0.0 <D5> (thread=Cluster, member=3): Member(Id=4, Timestamp=2011-03-15 12:20:34.272, Address=130.35.99.213:8092, MachineId=49877, Location=site:us.oracle.com,machine:tpfaeffl-lap7,process:4400, Role=OracleHandsonProcessorExample) left Cluster with senior member 1