ヘッダーをスキップ
Oracle® Fusion Middleware Oracle Coherenceチュートリアル
12c (12.1.3)
E56204-01
  ドキュメント・ライブラリへ移動
ライブラリ
製品リストへ移動
製品
目次へ移動
目次

前
 
次
 

7 変更のリスニングとデータの変更

この演習では、リスナーを設定してCoherenceキャッシュ内のデータ変更を監視します。ここでは、Coherence APIのObservableMapMapEventEventListenerEntryProcessorおよびAbstractMapListenerの使用について取り上げます。また、エントリ・プロセッサを使用してCoherenceキャッシュ内のエントリを変更したり処理したりする方法を学習します。

この章には次の項が含まれます:

7.1 概要

com.tangosol.util.ObservableMapインタフェースを使用すると、キャッシュ・エントリに対する変更を監視して、変更に対応できます。このインタフェースはjava.util.EventListenerを拡張し、標準のJavaBeansイベント・モデルを使用します。すべてのタイプのNamedCacheインスタンスがこのインタフェースを実装します。イベントをリスニングするには、MapListener (com.tangosol.util.MapListener)インスタンスをキャッシュに登録します。MapListenerインスタンスがクライアントでコールされます。つまり、リスナー・コードがクライアント・プロセスで実行されます。

イベントをリスニングするには、次のような方法があります。

これらのリスナー・タスクは、例7-1に示したaddMapListenerメソッドを使用してNamedCacheで実行できます。

例7-1 NamedCacheのリスナー・メソッド

void addMapListener(MapListener listener) 

void addMapListener(MapListener listener, Filter filter, boolean fLite) 

void addMapListener(MapListener listener, Object oKey, boolean fLite)

com.tangosol.util.MapEventクラスは、オブジェクト・キーおよび以前の値と新しい値を取得します。Liteイベントを指定することができますが、このイベントには新しい値や以前の値が存在しない可能性があります。例7-2では、これらのメソッドをNamedCacheに登録するパターンを示しています。これは、無名クラスとして実行されています。MapEventクラスのgetOldValueメソッドやgetNewValueメソッドを使用して、イベントを起動する対象となるエントリを取得できます。

例7-2 イベント登録のコード・パターン

namedCache.addMapListener(new MapListener() {
public void entryDeleted(MapEvent mapEvent) 
    {
    // TODO... handle deletion event 
    } 
public void entryInserted(MapEvent mapEvent) 
    {
    // TODO... handle inserted event 
    } 
public void entryUpdated(MapEvent mapEvent) 
    { 
     // TODO... handle updated event } }
    )

7.2 キャッシュ・リスナーの作成

この項では、NamedCacheでリスニングして、検出したあらゆる変更に対応する、Javaクラスの作成方法を説明します。

  1. キャッシュ内の変更をリスニングするクラスの作成

  2. キャッシュ・リスナー・サンプルの実行

7.2.1 キャッシュ内の変更をリスニングするクラスの作成

Loadingプロジェクトで、新しいContactオブジェクト・エントリをリスニングするクラスを作成します。クラスにObserverExampleという名前を付け、mainメソッドがあることを確認します。詳細は、「Javaクラスの作成」を参照してください。

このクラス内に、新しいContactがキャッシュを更新されるたびにメッセージを表示するリスナーを追加します。たとえば、次のコードを使用すると、コンソールから読み取るまでJavaプロセスを実行させたままにできます。そうしないと、プログラムはただちに終了します。

BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();

このクラス内に、内部クラスを作成してAbstractMapListnerを拡張します。メソッドを実装して、キャッシュの値を挿入、更新および削除します。この場合、大半の処理はMapEventに含まれる新旧の値に基づいて、entryUpdatedメソッドで実行されます。

例7-3は、リスナー・クラスの実装例を示しています。

例7-3 リスナー・クラスのサンプル

package com.oracle.handson;
 
import com.tangosol.net.NamedCache;
 
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.MapEvent;
 
import com.oracle.handson.Contact;
 
import com.tangosol.net.CacheFactory;
 
import java.io.IOException;
 
 
/**
 * ObserverExample observes changes to contacts.
 */
public class ObserverExample
    {
    
    public ObserverExample() 
    {
    }
    // ----- ObserverExample methods -------------------------------------

    public static void main(String[] args) {
      NamedCache cache = CacheFactory.getCache("ContactsCache");
      new ObserverExample().observe(cache);
        try {
            System.in.read();
        } catch (IOException e) {
        }
    }
    /**
    * Observe changes to the contacts.
    *
    * @param cache  target cache
    */
    public void observe(NamedCache cache)
        {
        cache.addMapListener(new ContactChangeListener());
        }
 
    // ----- inner class: ContactChangeListener -------------------------
 
    public class ContactChangeListener
            extends AbstractMapListener
        {
        // ----- MapListener interface ------------------------------------------

        public void entryInserted(MapEvent event)
            {
            System.out.println(event);
            }
 
        public void entryUpdated(MapEvent event)
            {
            Contact contactOld = (Contact)event.getOldValue();
            Contact contactNew = (Contact)event.getNewValue();
            StringBuffer sb  = new StringBuffer();
 
            if (!contactOld.getHomeAddress().equals(
                    contactNew.getHomeAddress()))
                {
                sb.append("Home address ");
                }
            if (!contactOld.getWorkAddress().equals(
                    contactNew.getWorkAddress()))
                {
                sb.append("Work address ");
                }
            if (!contactOld.getTelephoneNumbers().equals(
                    contactNew.getTelephoneNumbers()))
                {
                sb.append("Telephone ");
                }
            if (contactOld.getAge() != contactNew.getAge())
                {
                sb.append("Birthdate ");
                }
            sb.append("was updated for ").append(event.getKey());
            System.out.println(sb);
            }
 
        public void entryDeleted(MapEvent event)
            {
            System.out.println(event.getKey());
            }
        }
    }

7.2.2 キャッシュ・リスナー・サンプルの実行

キャッシュ・リスナー・サンプルを実行するには:

  1. ObserverExampleの実行構成を作成します。「Project Explorer」ObserverExampleを右クリックし、「Run As」を選択します。「Run Configurations」ダイアログ・ボックスで、「Oracle Coherence」を選択して「New Configuration」アイコンをクリックします。

    1. 「Name」フィールドにObserverExampleと入力します。

    2. 「Main」タブの「Project」フィールドにLoadingと入力します。「Main class」フィールドにcom.oracle.handson.ObserverExampleと入力します。

    3. 「Coherence」タブの「Generalタブで、「Cache configuration descriptor」フィールドのc:\home\oracle\workspace\Contacts\appClientModule\contacts-cache-config.xmlファイルを参照します。「Disabled (cache client)」ボタンを選択します。「Cluster port」フィールドに3155と入力します。「Apply」をクリックします。

      「Other」タブで、「tangosol.pof.config」フィールドまで下方へスクロールします。POF構成ファイルcontacts-pof-config.xmlへの絶対パスを入力します。「Apply」をクリックします。

    4. 「Common」タブで、「Shared file」を選択してLoadingディレクトリを参照します。

  2. 実行中のキャッシュ・サーバーがあれば停止します。詳細は、「キャッシュ・サーバーの停止」を参照してください。

  3. ContactsCacheServerを起動します。

  4. EclipseからLoaderExampleプログラムを実行してキャッシュをロードします。ObserverExampleを実行している場合、例7-4に示すようにプログラムは入力を待機しています。

    「キャッシュ内の変更への対応」では、キャッシュのエントリを変更して変更済の記録を返すプログラムを作成します。

    例7-4 イベントを待機するリスナー・プログラム

    ...
    MasterMemberSet(
      ThisMember=Member(Id=3, Timestamp=2013-11-21 11:29:45.272, Address=130.35.99.6:8090, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:4752, Role=OracleHandsonObserverExample)
      OldestMember=Member(Id=1, Timestamp=2013-11-21 11:29:02.843, Address=130.35.99.6:8088, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:6736, Role=CoherenceServer)
      ActualMemberSet=MemberSet(Size=2
        Member(Id=1, Timestamp=2013-11-21 11:29:02.843, Address=130.35.99.6:8088, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:6736, Role=CoherenceServer)
        Member(Id=3, Timestamp=2013-11-21 11:29:45.272, Address=130.35.99.6:8090, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:4752, Role=OracleHandsonObserverExample)
        )
      MemberId|ServiceVersion|ServiceJoined|MemberState
        1|12.1.3|2013-11-21 11:29:02.843|JOINED,
        3|12.1.3|2013-11-21 11:29:45.272|JOINED
      RecycleMillis=1200000
      RecycleSet=MemberSet(Size=1
        Member(Id=2, Timestamp=2013-11-21 11:29:23.048, Address=130.35.99.6:8090, MachineId=47251)
        )
      )
     
    TcpRing{Connections=[1]}
    IpMonitor{Addresses=0}
     
    2013-11-21 11:29:45.713/4.273 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Invocation:Management, member=3): Service Management joined the cluster with senior service member 1
    2013-11-21 11:29:45.734/4.304 Oracle Coherence GE 12.1.3.0.0 <Info> (thread=main, member=3): Loaded Reporter configuration from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/reports/report-group.xml"
    2013-11-21 11:29:46.071/4.631 Oracle Coherence GE 12.1.3.0.0 <Info> (thread=NameService:TcpAcceptor, member=3): TcpAcceptor now listening for connections on 130.35.99.6:8090.3
    2013-11-21 11:29:46.395/4.955 Oracle Coherence GE 12.1.3.0.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded POF configuration from "file:/C:/home/oracle/workspace/Contacts/appClientModule/contacts-pof-config.xml"
    2013-11-21 11:29:46.422/4.982 Oracle Coherence GE 12.1.3.0.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded included POF configuration from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/coherence-pof-config.xml"
    2013-11-21 11:29:46.459/5.019 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=3): Service PartitionedPofCache joined the cluster with senior service member 1
    

7.3 キャッシュ内の変更への対応

この項では、キャッシュ内のエントリを変更して変更済の記録を返すJavaクラスを作成します。

これまで、キャッシュ内のエントリにおけるアクションを実行するために、put操作とget操作を使用しました。しかし、データへの同時アクセが必要な場合に確実に動作の一貫性が保たれる、より優れたデータ操作方法があります。エントリ・プロセッサ(com.tangosol.util.InvocableMap.EntryProcessor)はエントリに対する処理を実行するエージェントです。エントリは、データが保存されている場所で直接処理されます。実行する処理によりデータが変更されることがあり、データが作成、更新および削除されたりする場合も、計算のみが実行されたりする場合もあります。複数のノードを持つパーティション化されたキャッシュ内で並行して処理が実行できるため、スケーラブルになります。また、処理のためにデータがクライアントに取得されないため、キャッシュ内の処理でI/Oの消費を節約できます。

同じキーで機能するエントリ・プロセッサは、論理的にはキューに入れられます。これにより、ロックされない(高パフォーマンス)処理が可能になります。com.tangosol.util.InvocableMapインタフェース(NamedCacheの実装による)には、データ操作に対して次のメソッドがあります。

Object invoke(Object oKey, InvocableMap.EntryProcessor processor)。個々のオブジェクトに対して渡されたEntryProcessorを起動し、その起動の結果を返します。

Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)。キーのコレクションに対してEntryProcessorを起動し、各起動の結果を返します。

Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)。フィルタに一致するエントリに対してEntryProcessorを起動し、各起動の結果を返します。


注意:

各クラスタ・ノードのクラス・パスで、EntryProcessorクラスを使用できる必要があります。


エントリ・プロセスを作成するには、com.tangosol.util.processes.AbstractProcessorを拡張してprocess()メソッドを実装します。たとえば、次のコードはContactsデータ・セット内の従業員の勤務先アドレスを変更するためのEntryProcessorインスタンスを作成します。

public static class OfficeUpdater extends AbstractProcessor
        implements PortableObject
  ...
    public Object process(InvocableMap.Entry entry)  
        {
        Contact contact = (Contact) entry.getValue();
        contact.setWorkAddress(m_addrWork);
        entry.setValue(contact);
        return null;
        }

OfficeUpdaterクラスを起動するには、OfficeUpdaterクラスの名前を持つinvokeAllメソッドをその引数の1つとして使用します。

cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
             new OfficeUpdater(addrWork));

この演習では、キャッシュ内のエントリを更新するEntryProcessorインスタンスを備えたJavaクラスを作成します。「キャッシュ内の変更をリスニングするクラスの作成」で作成したObserverExampleクラスは、これらの変更を検出して変更済の記録を表示します。

  1. キャッシュ内のエントリを更新するクラスの作成

  2. POF構成ファイルの編集

  3. キャッシュ更新サンプルの実行

7.3.1 キャッシュ内のエントリを更新するクラスの作成

キャッシュ内のエントリを更新するためのファイルを作成するには:

  1. キャッシュ内のエントリを更新するクラスを作成します。

    Loadingプロジェクトで、キャッシュ内のContactオブジェクトの住所を更新するmainメソッドを備えたProcessorExampleという名前のクラスを作成します。詳細は、「Javaクラスの作成」を参照してください。

  2. マサチューセッツ州に住み、勤務先の住所を州内のオフィスに更新するContactsオブジェクトの記録を検索するコードを記述します。

    PortableObjectインタフェース(キャッシュからのデータのシリアライズおよびデシリアライズ用)を実装し、勤務先住所を設定するためのEntryProcessorインスタンスを持つ内部クラスを追加します。Filterクラスのメソッドを使用して、マサチューセッツ州内に自宅住所があるContactsメンバーを分離します。

    例7-5は、考えられるProcessorExampleクラスの実装を示しています。

    例7-5 キャッシュ内のオブジェクトを更新するプログラムのサンプル

    package com.oracle.handson;
    
    import com.tangosol.net.NamedCache;
    
    import com.tangosol.util.filter.EqualsFilter;
    import com.tangosol.util.processor.AbstractProcessor;
    import com.tangosol.util.InvocableMap;
    
    import com.tangosol.io.pof.PortableObject;
    import com.tangosol.io.pof.PofReader;
    import com.tangosol.io.pof.PofWriter;
    
    import com.oracle.handson.Address;
    import com.oracle.handson.Contact;
    
    import com.tangosol.net.CacheFactory;
    
    import java.io.IOException;
    
    /**
    * ProcessorExample executes an example EntryProcessor.
    *
    */
    public class ProcessorExample
        {
        public ProcessorExample() 
        {
        }
            public static void main(String[] args) 
              {
              NamedCache cache = CacheFactory.getCache("ContactsCache");
              new ProcessorExample().execute(cache);
              }
        // ----- ProcessorExample methods -----------------------------------
    
        public void execute(NamedCache cache)
            {
            // People who live in Massachusetts moved to an in-state office
            Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.",
                    "Boston", "MA", "02116", "US");
    
            cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
                    new OfficeUpdater(addrWork));
            }
        // ----- nested class: OfficeUpdater ------------------------------------
    
        /**
        * OfficeUpdater updates a contact's office address.
        */
        public static class OfficeUpdater
                extends AbstractProcessor
                implements PortableObject
            {
            // ----- constructors -------------------------------------------
    
            /**
            * Default constructor (necessary for PortableObject implementation).
            */
            public OfficeUpdater() 
                {
                }
    
            public OfficeUpdater(Address addrWork) 
               {
                m_addrWork = addrWork;
                }
    
            // ----- InvocableMap.EntryProcessor interface ------------------
    
            public Object process(InvocableMap.Entry entry)
                {
                Contact contact = (Contact) entry.getValue();
    
                contact.setWorkAddress(m_addrWork);
                entry.setValue(contact);
                System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName());
                return null;
                }
    
            // ----- PortableObject interface -------------------------------
    
            public void readExternal(PofReader reader)
                    throws IOException
                {
                m_addrWork = (Address) reader.readObject(0);
                }
    
            public void writeExternal(PofWriter writer)
                    throws IOException
                {
                writer.writeObject(0, m_addrWork);
                }
    
            // ----- data members -------------------------------------------
    
            private Address m_addrWork;
            }
        }
    

7.3.2 POF構成ファイルの編集

contacts-pof-config.xmlファイルを編集して、OfficeUpdaterエントリのユーザー定義型IDを追加します。ここでは、ProcessorExample$OfficeUpdaterクラスの定義型ID106を追加します。

...
<user-type>
       <type-id>1006</type-id>
      <class-name>com.oracle.handson.
 ProcessorExample$OfficeUpdater</class-name>
</user-type>
...

7.3.3 キャッシュ更新サンプルの実行

キャッシュ更新サンプルを実行するには:

  1. ProcessorExampleの実行構成を作成します。「Project Explorer」ProcessorExampleを右クリックし、「Run As」を選択します。「Run Configurations」ダイアログ・ボックスで、Oracle Coherenceを選択して「New Configuration」アイコンをクリックします。

    • 「Name」フィールドにProcessorExampleと入力します。

    • 「Main」タブの「Project」フィールドにLoadingと入力します。「Main class」フィールドにcom.oracle.handson.ProcessorExampleと入力します。

    • 「Coherence」タブの「Generalタブで、「Cache configuration descriptor」フィールドのc:\home\oracle\workspace\Contacts\appClientModule\contacts-cache-config.xmlファイルを参照します。「Disabled (cache client)」ボタンを選択します。「Cluster port」フィールドに3155と入力します。「Apply」をクリックします。

      「Other」タブで、「tangosol.pof.config」フィールドまで下方へスクロールします。POF構成ファイルcontacts-pof-config.xmlへの絶対パスを入力します。「Apply」をクリックします。

    • 「Common」タブで、「Shared file」を選択してLoadingディレクトリを参照します。

  2. 次の手順を実行して、ObserverExampleクラスとProcessorExampleクラスをテストします。

    1. 実行中のキャッシュ・サーバーがあれば停止します。詳細は、「キャッシュ・サーバーの停止」を参照してください。

    2. ContactsCacheServerを再起動します。

    3. LoaderExampleクラスを実行して、キャッシュをロードします。

    4. ObserverExampleクラスを実行します。

    5. ProcessorExampleを実行して、キャッシュ内の記録を更新します。

      キャッシュ・サーバーのコンソール・ウィンドウに表示される例7-6のようなメッセージを確認してください。そのメッセージには、特定の従業員の勤務先住所が更新されたことについて記載されています。

      例7-6 ObserverExampleクラスとProcessorExampleクラスからの出力

      ...
      Started DefaultCacheServer...
      ...
      2013-11-21 15:44:33.425/69.017 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=1): Member 4 joined Service PartitionedPofCache with senior member 1
      Work address was updated for John Lfyovf
      Work address was updated for John Jptrpajked
      Work address was updated for John Mtqdln
      Work address was updated for John Mstaugiw
      Work address was updated for John Olfezqse
      Work address was updated for John Qjefjgtgj
      Work address was updated for John Kuhgkzn
      Work address was updated for John Jpby
      Work address was updated for John Cekuea
      Work address was updated for John Guhkam
      Work address was updated for John Ijwj
      Work address was updated for John Trlb
      Work address was updated for John Hnfcwxjq
      Work address was updated for John Kizifh
      Work address was updated for John Rqlhgboi
      Work address was updated for John Ipphab
      2013-11-21 15:44:33.547/69.139 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Cluster, member=1): TcpRing disconnected from Member(Id=4, Timestamp=2013-11-21 15:44:32.073, Address=130.35.99.28:8092, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:7872, Role=OracleHandsonProcessorExample) due to a peer departure; removing the member.
      ...