この演習では、リスナーを設定してCoherenceキャッシュ内のデータ変更を監視します。ここでは、Coherence APIのObservableMap
、MapEvent
、EventListener
、EntryProcessor
およびAbstractMapListener
の使用について取り上げます。また、エントリ・プロセッサを使用してCoherenceキャッシュ内のエントリを変更したり処理したりする方法を学習します。
この章には次の項が含まれます:
com.tangosol.util.ObservableMap
インタフェースを使用すると、キャッシュ・エントリに対する変更を監視して、変更に対応できます。このインタフェースはjava.util.EventListener
を拡張し、標準のJavaBeansイベント・モデルを使用します。すべてのタイプのNamedCache
インスタンスがこのインタフェースを実装します。イベントをリスニングするには、MapListener
(com.tangosol.util.MapListener
)インスタンスをキャッシュに登録します。MapListener
インスタンスがクライアントでコールされます。つまり、リスナー・コードがクライアント・プロセスで実行されます。
イベントをリスニングするには、次のような方法があります。
すべてのイベントのリスニング
フィルタの条件を満たす、すべてのイベントのリスニング
特定のオブジェクト・キーのイベントのリスニング
これらのリスナー・タスクは、例7-1に示したaddMapListener
メソッドを使用してNamedCache
で実行できます。
例7-1 NamedCacheのリスナー・メソッド
void addMapListener(MapListener listener) void addMapListener(MapListener listener, Filter filter, boolean fLite) void addMapListener(MapListener listener, Object oKey, boolean fLite)
com.tangosol.util.MapEvent
クラスは、オブジェクト・キーおよび以前の値と新しい値を取得します。Liteイベントを指定することができますが、このイベントには新しい値や以前の値が存在しない可能性があります。例7-2では、これらのメソッドをNamedCache
に登録するパターンを示しています。これは、無名クラスとして実行されています。MapEvent
クラスのgetOldValue
メソッドやgetNewValue
メソッドを使用して、イベントを起動する対象となるエントリを取得できます。
例7-2 イベント登録のコード・パターン
namedCache.addMapListener(new MapListener() { public void entryDeleted(MapEvent mapEvent) { // TODO... handle deletion event } public void entryInserted(MapEvent mapEvent) { // TODO... handle inserted event } public void entryUpdated(MapEvent mapEvent) { // TODO... handle updated event } } )
この項では、NamedCache
でリスニングして、検出したあらゆる変更に対応する、Javaクラスの作成方法を説明します。
Loading
プロジェクトでは、新規Contact
オブジェクトのエントリをリスニングするクラスを作成します。このクラスをObserverExample
と名付け、そのクラスにmain
メソッドが存在することを確認します。詳細は、「Javaクラスの作成」を参照してください。
このクラス内に、新しいContact
がキャッシュを更新されるたびにメッセージを表示するリスナーを追加します。たとえば、次のコードを使用すると、コンソールから読み取るまでJavaプロセスを実行させたままにできます。そうしないと、プログラムはただちに終了します。
BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();
このクラス内に、内部クラスを作成してAbstractMapListner
を拡張します。メソッドを実装して、キャッシュの値を挿入、更新および削除します。この場合、大半の処理はMapEvent
に含まれる新旧の値に基づいて、entryUpdated
メソッドで実行されます。
例7-3は、リスナー・クラスの実装例を示しています。
例7-3 リスナー・クラスのサンプル
package com.oracle.handson; import com.tangosol.net.NamedCache; import com.tangosol.util.AbstractMapListener; import com.tangosol.util.MapEvent; import com.oracle.handson.Contact; import com.tangosol.net.CacheFactory; import java.io.IOException; /** * ObserverExample observes changes to contacts. */ public class ObserverExample { public ObserverExample() { } // ----- ObserverExample methods ------------------------------------- public static void main(String[] args) { NamedCache cache = CacheFactory.getCache("ContactsCache"); new ObserverExample().observe(cache); try { System.in.read(); } catch (IOException e) { } } /** * Observe changes to the contacts. * * @param cache target cache */ public void observe(NamedCache cache) { cache.addMapListener(new ContactChangeListener()); } // ----- inner class: ContactChangeListener ------------------------- public class ContactChangeListener extends AbstractMapListener { // ----- MapListener interface ------------------------------------------ public void entryInserted(MapEvent event) { System.out.println(event); } public void entryUpdated(MapEvent event) { Contact contactOld = (Contact)event.getOldValue(); Contact contactNew = (Contact)event.getNewValue(); StringBuffer sb = new StringBuffer(); if (!contactOld.getHomeAddress().equals( contactNew.getHomeAddress())) { sb.append("Home address "); } if (!contactOld.getWorkAddress().equals( contactNew.getWorkAddress())) { sb.append("Work address "); } if (!contactOld.getTelephoneNumbers().equals( contactNew.getTelephoneNumbers())) { sb.append("Telephone "); } if (contactOld.getAge() != contactNew.getAge()) { sb.append("Birthdate "); } sb.append("was updated for ").append(event.getKey()); System.out.println(sb); } public void entryDeleted(MapEvent event) { System.out.println(event.getKey()); } } }
キャッシュ・リスナー・サンプルを実行するには:
ObserverExample
の実行構成を作成します。「Project Explorer」のObserverExample
を右クリックし、「Run As」を選択します。「Run Configurations」ダイアログ・ボックスで、「Oracle Coherence」を選択して「New Configuration」アイコンをクリックします。
「Name」フィールドにObserverExample
と入力します。
「Main」タブの「Project」フィールドにLoading
と入力します。「Main class」フィールドにcom.oracle.handson.ObserverExample
と入力します。
「Coherence」タブの「Generalタブで、「Cache configuration descriptor」フィールドのc:\home\oracle\workspace\Contacts\appClientModule\contacts-cache-config.xml
ファイルを参照します。「Disabled (cache client)」ボタンを選択します。「Cluster port」フィールドに3155
と入力します。「Apply」をクリックします。
「Other」タブで、「tangosol.pof.config」フィールドまで下方へスクロールします。POF構成ファイルcontacts-pof-config.xml
への絶対パスを入力します。「Apply」をクリックします。
「Common」タブで、「Shared file」を選択してLoadingディレクトリを参照します。
実行中のキャッシュ・サーバーがあれば停止します。詳細は、「キャッシュ・サーバーの停止」を参照してください。
ContactsCacheServer
を起動します。
EclipseからLoaderExample
プログラムを実行してキャッシュをロードします。ObserverExample
を実行していれば、例7-4に示すようにプログラムは入力を待機しています。
「キャッシュ内の変更への対応」では、キャッシュのエントリを変更して変更済の記録を返すプログラムを作成します。
例7-4 イベントを待機するリスナー・プログラム
... MasterMemberSet( ThisMember=Member(Id=3, Timestamp=2012-08-17 13:51:15.468, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3964, Role=OracleHandsonObserverExample) OldestMember=Member(Id=1, Timestamp=2012-08-17 13:50:44.093, Address=10.159.220.86:8088, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3344, Role=CoherenceServer) ActualMemberSet=MemberSet(Size=2 Member(Id=1, Timestamp=2012-08-17 13:50:44.093, Address=10.159.220.86:8088, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3344, Role=CoherenceServer) Member(Id=3, Timestamp=2012-08-17 13:51:15.468, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:3964, Role=OracleHandsonObserverExample) ) MemberId|ServiceVersion|ServiceJoined|MemberState 1|12.1.2|2012-08-17 13:50:44.093|JOINED, 3|12.1.2|2012-08-17 13:51:15.468|JOINED RecycleMillis=1200000 RecycleSet=MemberSet(Size=0 ) ) TcpRing{Connections=[1]} IpMonitor{Addresses=0} 2012-08-17 13:51:15.890/2.031 Oracle Coherence GE 12.1.2.0 <D5> (thread=Invocation:Management, member=3): Service Management joined the cluster with senior service member 1 2012-08-17 13:51:15.921/2.062 Oracle Coherence GE 12.1.2.0 <Info> (thread=main, member=3): Loaded Reporter configuration from "jar:file:/C:/oracle/Middleware/Oracle_Home/coherence/lib/coherence.jar!/reports/report-group.xml" 2012-08-17 13:51:16.046/2.187 Oracle Coherence GE 12.1.2.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded POF configuration from "file:/C:/home/oracle/workspace/Contacts/build/classes/contacts-pof-config.xml" 2012-08-17 13:51:16.109/2.250 Oracle Coherence GE 12.1.2.0 <Info> (thread=DistributedCache:PartitionedPofCache, member=3): Loaded included POF configuration from "jar:file:/C:/oracle/Middleware/Oracle_Home/coherence/lib/coherence.jar!/coherence-pof-config.xml" 2012-08-17 13:51:16.187/2.328 Oracle Coherence GE 12.1.2.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=3): Service PartitionedPofCache joined the cluster with senior service member 1
この項では、キャッシュ内のエントリを変更して変更済の記録を返すJavaクラスを作成します。
これまで、キャッシュ内のエントリにおけるアクションを実行するために、put
操作とget
操作を使用しました。しかし、データへの同時アクセが必要な場合に確実に動作の一貫性が保たれる、より優れたデータ操作方法があります。エントリ・プロセッサ(com.tangosol.util.InvocableMap.EntryProcessor
)はエントリに対する処理を実行するエージェントです。エントリは、データが保存されている場所で直接処理されます。実行する処理によりデータが変更されることがあり、データが作成、更新および削除されたりする場合も、計算のみが実行されたりする場合もあります。複数のノードを持つパーティション化されたキャッシュ内で並行して処理が実行できるため、スケーラブルになります。また、処理のためにデータがクライアントに取得されないため、キャッシュ内の処理でI/Oの消費を節約できます。
同じキーで機能するエントリ・プロセッサは、論理的にはキューに入れられます。これにより、ロックされない(高パフォーマンス)処理が可能になります。com.tangosol.util.InvocableMap
インタフェース(NamedCache
の実装による)には、データ操作に対して次のメソッドがあります。
• Object invoke(Object oKey, InvocableMap.EntryProcessor processor)
。個々のオブジェクトに対して渡されたEntryProcessor
を起動し、その起動の結果を返します。
• Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)
。キーのコレクションに対してEntryProcessor
を起動し、各起動の結果を返します。
• Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)
。フィルタに一致するエントリに対してEntryProcessor
を起動し、各起動の結果を返します。
注意: 各クラスタ・ノードのクラス・パスで、 |
エントリ・プロセスを作成するには、com.tangosol.util.processes.AbstractProcessor
を拡張してprocess()
メソッドを実装します。たとえば、次のコードはContacts
データ・セット内の従業員の勤務先アドレスを変更するためのEntryProcessor
インスタンスを作成します。
public static class OfficeUpdater extends AbstractProcessor implements PortableObject ... public Object process(InvocableMap.Entry entry) { Contact contact = (Contact) entry.getValue(); contact.setWorkAddress(m_addrWork); entry.setValue(contact); return null; }
OfficeUpdater
クラスを起動するには、OfficeUpdater
クラスの名前を持つinvokeAll
メソッドをその引数の1つとして使用します。
cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork));
この演習では、キャッシュ内のエントリを更新するEntryProcessor
インスタンスを備えたJavaクラスを作成します。「キャッシュ内の変更をリスニングするクラスの作成」で作成したObserverExample
クラスは、これらの変更を検出して変更済の記録を表示します。
キャッシュ内のエントリを更新するためのファイルを作成するには:
キャッシュ内のエントリを更新するクラスを作成します。
Loading
プロジェクトで、キャッシュ内のContact
オブジェクトの住所を更新するmain
メソッドを備えたProcessorExample
という名前のクラスを作成します。詳細は、「Javaクラスの作成」を参照してください。
マサチューセッツ州に住み、勤務先の住所を州内のオフィスに更新するContacts
オブジェクトの記録を検索するコードを記述します。
PortableObject
インタフェース(キャッシュからのデータのシリアライズおよびデシリアライズ用)を実装し、勤務先住所を設定するためのEntryProcessor
インスタンスを持つ内部クラスを追加します。Filter
クラスのメソッドを使用して、マサチューセッツ州内に自宅住所があるContacts
メンバーを分離します。
例7-5は、考えられるProcessorExample
クラスの実装を示しています。
例7-5 キャッシュ内のオブジェクトを更新するプログラムのサンプル
package com.oracle.handson; import com.tangosol.net.NamedCache; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.processor.AbstractProcessor; import com.tangosol.util.InvocableMap; import com.tangosol.io.pof.PortableObject; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.oracle.handson.Address; import com.oracle.handson.Contact; import com.tangosol.net.CacheFactory; import java.io.IOException; /** * ProcessorExample executes an example EntryProcessor. * */ public class ProcessorExample { public ProcessorExample() { } public static void main(String[] args) { NamedCache cache = CacheFactory.getCache("ContactsCache"); new ProcessorExample().execute(cache); } // ----- ProcessorExample methods ----------------------------------- public void execute(NamedCache cache) { // People who live in Massachusetts moved to an in-state office Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.", "Boston", "MA", "02116", "US"); cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork)); } // ----- nested class: OfficeUpdater ------------------------------------ /** * OfficeUpdater updates a contact's office address. */ public static class OfficeUpdater extends AbstractProcessor implements PortableObject { // ----- constructors ------------------------------------------- /** * Default constructor (necessary for PortableObject implementation). */ public OfficeUpdater() { } public OfficeUpdater(Address addrWork) { m_addrWork = addrWork; } // ----- InvocableMap.EntryProcessor interface ------------------ public Object process(InvocableMap.Entry entry) { Contact contact = (Contact) entry.getValue(); contact.setWorkAddress(m_addrWork); entry.setValue(contact); System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName()); return null; } // ----- PortableObject interface ------------------------------- public void readExternal(PofReader reader) throws IOException { m_addrWork = (Address) reader.readObject(0); } public void writeExternal(PofWriter writer) throws IOException { writer.writeObject(0, m_addrWork); } // ----- data members ------------------------------------------- private Address m_addrWork; } }
contacts-pof-config.xml
ファイルを編集して、OfficeUpdater
エントリのユーザー定義型IDを追加します。ここでは、ProcessorExample$OfficeUpdater
クラスの定義型ID106
を追加します。
... <user-type> <type-id>1006</type-id> <class-name>com.oracle.handson. ProcessorExample$OfficeUpdater</class-name> </user-type> ...
キャッシュ更新サンプルを実行するには:
ProcessorExample
の実行構成を作成します。「Project Explorer」のObserverExample
を右クリックし、「Run As」を選択します。「Run Configurations」ダイアログ・ボックスで、Oracle Coherenceを選択して「New Configuration」アイコンをクリックします。
「Name」フィールドにProcessorExample
と入力します。
「Main」タブの「Project」フィールドにLoading
と入力します。「Main class」フィールドにcom.oracle.handson.ProcessorExample
と入力します。
「Coherence」タブの「Generalタブで、「Cache configuration descriptor」フィールドのc:\home\oracle\workspace\Contacts\appClientModule\contacts-cache-config.xml
ファイルを参照します。「Disabled (cache client)」ボタンを選択します。「Cluster port」フィールドに3155
と入力します。「Apply」をクリックします。
「Other」タブで、「tangosol.pof.config」フィールドまで下方へスクロールします。POF構成ファイルcontacts-pof-config.xml
への絶対パスを入力します。「Apply」をクリックします。
「Common」タブで、「Shared file」を選択してLoadingディレクトリを参照します。
次の手順を実行して、ObserverExample
クラスとProcessorExample
クラスをテストします。
実行中のキャッシュ・サーバーがあれば停止します。詳細は、「キャッシュ・サーバーの停止」を参照してください。
ContactsCacheServer
を再起動します。
LoaderExample
クラスを実行して、キャッシュをロードします。
ObserverExample
クラスを実行します。
ProcessorExample
を実行して、キャッシュ内の記録を更新します。
キャッシュ・サーバーのコンソール・ウィンドウに表示される例7-6のようなメッセージを確認してください。そのメッセージには、特定の従業員の勤務先住所が更新されたことについて記載されています。
例7-6 ObserverExampleクラスとProcessorExampleクラスからの出力
... Started DefaultCacheServer... 2012-08-17 14:25:40.531/31.953 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=2, Timestamp=2012-08-17 14:25:40.366, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) joined Cluster with senior member 1 2012-08-17 14:25:40.765/32.187 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 joined Service Management with senior member 1 2012-08-17 14:25:41.078/32.500 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 joined Service PartitionedPofCache with senior member 1 2012-08-17 14:25:41.421/32.843 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 left service Management with senior member 1 2012-08-17 14:25:41.437/32.859 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 2 left service PartitionedPofCache with senior member 1 2012-08-17 14:25:41.453/32.875 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): TcpRing disconnected from Member(Id=2, Timestamp=2012-08-17 14:25:40.366, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) due to a peer departure; removing the member. 2012-08-17 14:25:41.453/32.875 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=2, Timestamp=2012-08-17 14:25:41.453, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4568, Role=OracleHandsonLoaderExample) left Cluster with senior member 1 2012-08-17 14:25:58.875/50.297 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=3, Timestamp=2012-08-17 14:25:58.695, Address=10.159.220.86:8090, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:4576, Role=OracleHandsonObserverExample) joined Cluster with senior member 1 2012-08-17 14:25:59.109/50.531 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 3 joined Service Management with senior member 1 2012-08-17 14:25:59.390/50.812 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 3 joined Service PartitionedPofCache with senior member 1 2012-08-17 14:26:16.656/68.078 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=4, Timestamp=2012-08-17 14:26:16.453, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) joined Cluster with senior member 1 2012-08-17 14:26:16.906/68.328 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 joined Service Management with senior member 1 2012-08-17 14:26:17.218/68.640 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 joined Service PartitionedPofCache with senior member 1 Work address was updated for John Lfyovf Work address was updated for John Jptrpajked Work address was updated for John Mtqdln Work address was updated for John Mstaugiw Work address was updated for John Olfezqse Work address was updated for John Qjefjgtgj Work address was updated for John Kuhgkzn Work address was updated for John Jpby Work address was updated for John Cekuea Work address was updated for John Guhkam Work address was updated for John Ijwj Work address was updated for John Trlb Work address was updated for John Hnfcwxjq Work address was updated for John Kizifh Work address was updated for John Rqlhgboi Work address was updated for John Ipphab 2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): TcpRing disconnected from Member(Id=4, Timestamp=2012-08-17 14:26:16.453, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) due to a peer departure; removing the member. 2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 left service Management with senior member 1 2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member 4 left service PartitionedPofCache with senior member 1 2012-08-17 14:26:17.312/68.734 Oracle Coherence GE 12.1.2.0 <D5> (thread=Cluster, member=1): Member(Id=4, Timestamp=2012-08-17 14:26:17.312, Address=10.159.220.86:8092, MachineId=18578, Location=site:,machine:tpfaeffl-lap7,process:416, Role=OracleHandsonProcessorExample) left Cluster with senior member 1