この章では、リスナーを設定してNamedCache
内でのデータ変更を監視します。また、EntryProcessors
を使用して、Coherenceキャッシュのエントリを変更および処理する方法について習得します。
この章の内容は次のとおりです。
com.tangosol.util.ObservableMap
インタフェースを使用すると、キャッシュ・エントリに加えられた変更を監視して、それに対応できます。このインタフェースはjava.util.EventListener
を拡張し、Javaの標準的なBeanイベント・モデルを使用します。すべてのタイプのNamedCache
は、このインタフェースを実装します。イベントをリスニングするには、キャッシュにMapListener
(com.tangosol.util.MapListener
)を登録します。MapListeners
はクライアントでコールされます。つまり、リスナー・コードはクライアント・プロセスで実行されます。
イベントをリスニングするには、次の3つの方法があります。
すべてのイベントのリスニング
フィルタの条件を満たす、すべてのイベントのリスニング
特定のオブジェクト・キーにおけるイベントのリスニング
例7-1の各メソッド(実装して前述のリストのタスクを実行できます)は、NamedCache
で使用できます。
例7-1 NamedCacheのリスナー・メソッド
void addMapListener(MapListener listener) void addMapListener(MapListener listener, Filter filter, boolean fLite) void addMapListener(MapListener listener, Object oKey, boolean fLite)
com.tangosol.util.MapEvent
クラスは、オブジェクト・キーおよび古い値と新しい値を取得します。Liteイベントを指定できますが、新しい値と古い値が存在しない可能性があります。例7-2では、NamedCache
にこれらのメソッドを登録するパターンを説明します。これは、無名クラスとして実行されています。MapEvent
クラスのgetOldValue()
メソッドまたはgetNewValue()
メソッドを使用して、イベントを起動したエントリを取得できます。
例7-2 イベントを登録するコード・パターン
namedCache.addMapListener(new MapListener() { public void entryDeleted(MapEvent mapEvent) { // TODO... handle deletion event } public void entryInserted(MapEvent mapEvent) { // TODO... handle inserted event } public void entryUpdated(MapEvent mapEvent) { // TODO... handle updated event } });
この項では、NamedCache
をリスニングし、検出した任意の変更に応答するJavaクラスの作成方法について説明します。
Loading
プロジェクトで、新規Contact
オブジェクトのエントリをリスニングする新規クラスを作成します。このクラスにObserverExample
という名前を付け、これにmain
メソッドが存在することを確認します。詳細は、「Javaクラスの作成」を参照してください。
このクラスでは、リスナーを追加し、新しいContact
がキャッシュに更新されるたびにメッセージを表示します(ヒント: 次のコードを使用して、コンソールから読み取るまでJavaプロセスの実行を維持します。そうしない場合、プログラムはただちに終了します)。
BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();
例7-3に、可能な解決策を示します。
例7-3 リスナー・クラスのサンプル
package com.oracle.handson; import com.tangosol.net.NamedCache; import com.tangosol.util.AbstractMapListener; import com.tangosol.util.MapEvent; import com.oracle.handson.Contact; import com.tangosol.net.CacheFactory; import java.io.IOException; /** * ObserverExample observes changes to contacts. */ public class ObserverExample { public ObserverExample() { } // ----- ObserverExample methods ------------------------------------- public static void main(String[] args) { NamedCache cache = CacheFactory.getCache("ContactsCache"); new ObserverExample().observe(cache); try { System.in.read(); } catch (IOException e) { } } /** * Observe changes to the contacts. * * @param cache target cache */ public void observe(NamedCache cache) { cache.addMapListener(new ContactChangeListener()); } // ----- inner class: ContactChangeListener ------------------------- public class ContactChangeListener extends AbstractMapListener { // ----- MapListener interface ------------------------------------------ public void entryInserted(MapEvent event) { System.out.println(event); } public void entryUpdated(MapEvent event) { Contact contactOld = (Contact)event.getOldValue(); Contact contactNew = (Contact)event.getNewValue(); StringBuffer sb = new StringBuffer(); if (!contactOld.getHomeAddress().equals( contactNew.getHomeAddress())) { sb.append("Home address "); } if (!contactOld.getWorkAddress().equals( contactNew.getWorkAddress())) { sb.append("Work address "); } if (!contactOld.getTelephoneNumbers().equals( contactNew.getTelephoneNumbers())) { sb.append("Telephone "); } if (contactOld.getAge() != contactNew.getAge()) { sb.append("Birthdate "); } sb.append("was updated for ").append(event.getKey()); System.out.println(sb); } public void entryDeleted(MapEvent event) { System.out.println(event.getKey()); } } }
次の手順に従い、キャッシュ・リスナー例を実行します。
コンソールの入力を有効にするには、プロジェクト・プロファイルを編集します。
Loading
プロジェクトを右クリックし、「プロジェクト・プロパティ」を選択します。
左側で「実行/デバッグ/プロファイル」を選択します。
右側で「編集」ボタンをクリックし、「ツール設定」をクリックします。「実行構成の編集」ダイアログ・ボックスの「プログラムの入力を許可」チェック・ボックスが選択されていることを確認してください。
「実行構成の編集」ダイアログ・ボックスおよび「プロジェクト・プロパティ」ダイアログ・ボックスで「OK」をクリックし、変更を保存します。
ローカル記憶域が無効になっていない場合は、これを無効にします。
Loading
プロジェクトを右クリックし、「プロジェクト・プロパティ」を選択します。
左側で「実行/デバッグ/プロファイル」を選択します。
「デフォルト」実行構成を選択し、「編集」をクリックします。「Javaオプション」フィールドで、ローカル記憶域を無効にするコマンドを追加します。
-Dtangosol.coherence.distributed.localstorage=false
Loading
プロジェクトのクラス(C:\home\oracle\labs\Loading\classes
)および構成ファイルのパス(C:\home\oracle\labs
)がcontacts-cache-server.cmd
ファイルにあることを確認します。
稼働しているキャッシュ・サーバーがあれば停止します。contacts-cache-server.cmd
を実行してキャッシュ・サーバーを起動します。
JDeveloperからLoaderExample
プログラムを実行してキャッシュをロードします。ObserverExample
が実行されていれば、図7-1に示すように、プログラムが入力を待機していることを確認できます。
次の項では、キャッシュのエントリを変更し、変更されたレコードを返すプログラムを作成します。
この項では、キャッシュのエントリを変更し、変更されたレコードを返すJavaクラスを作成します。
これまでは、キャッシュ・エントリに対するアクションの実行に、putとgetの各操作を使用してきました。しかし、同時データ・アクセスが必要な場合に動作の一貫性を保証する、より優れたデータ操作方法も存在します。EntryProcessors
(com.tangosol.util.InvocableMap.EntryProcessor
)は、エントリに対して処理を行うエージェントです。このエントリは、データが保持されている場所で直接処理されます。実行する処理によってデータが変更されることもあります。この処理では、データの作成、更新、削除が行われる場合も、単に計算のみが行われる場合もあります。処理は、ノードが複数あるパーティション・キャッシュでパラレルに実行でき、スケーラブルです。キャッシュ内の処理では、処理のためにクライアントがデータを取得する必要がないため、I/Oを削減することもできます。
同一キーに対して動作するEntryProcessors
は、論理的にキューに入れられます。これにより、ロックフリー(高パフォーマンス)の処理が可能です。com.tangosol.util.InvocableMap
インタフェース(NamedCache
を実装する)には、データ操作用の次のメソッドが用意されています。
• Object invoke(Object oKey, InvocableMap.EntryProcessor processor)
: 個々のオブジェクトについて、渡されたEntryProcessor
を起動してその結果を返します。
• Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)
: キーのコレクションについて、EntryProcessor
を起動してその結果を返します。
• Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)
: フィルタ条件に一致するエントリについて、EntryProcessor
を起動してその結果を返します。
注意: 各クラスタ・ノードのクラスパスでEntryProcessor クラスを使用できる必要があります。 |
エントリ・クラスを作成するには、com.tangosol.util.processes.AbstractProcessor
を拡張し、process()
メソッドを実装できます。たとえば、次のコードは、Contacts
データセット内の従業員の勤務先住所を変更するEntryProcessor
を作成します。
public static class OfficeUpdater extends AbstractProcessor implements PortableObject ... public Object process(InvocableMap.Entry entry) { Contact contact = (Contact) entry.getValue(); contact.setWorkAddress(m_addrWork); entry.setValue(contact); return null; }
OfficeUpdater
クラスを起動するには、次のコードを実行します。
cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork));
この演習では、キャッシュのエントリを更新するEntryProcessors
を含むJavaクラスを作成します。前の演習で作成したObserverExample
クラスは、これらの変更を検出し、変更されたレコードを表示します。
次の手順に従い、キャッシュ内のエントリを更新するファイルを作成します。
キャッシュ内のエントリを更新するクラスを作成します。
Loading
プロジェクトで、キャッシュ内のContact
オブジェクトの住所を更新するmain
メソッドを含むProcessorExample
というクラスを作成します。詳細は、「Javaクラスの作成」を参照してください。
マサチューセッツに住むContacts
のレコードを検索し、勤務先住所を州内のオフィスに更新するコードを記述します。
ヒント: PortableObject
を実装し(キャッシュからのデータのシリアライズおよびデシリアライズ用)、勤務先住所を設定するEntryProcessor
を含む内部クラスを含めます。Filter
クラスからのメソッドを使用し、自宅住所がマサチューセッツにあるContacts
を分離します。
例7-4に、ProcessorExample
クラスの可能なコードを示します。
例7-4 キャッシュ内のオブジェクトを更新するプログラムのサンプル
package com.oracle.handson; import com.tangosol.net.NamedCache; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.processor.AbstractProcessor; import com.tangosol.util.InvocableMap; import com.tangosol.io.pof.PortableObject; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.oracle.handson.Address; import com.oracle.handson.Contact; import com.tangosol.net.CacheFactory; import java.io.IOException; /** * ProcessorExample executes an example EntryProcessor. * */ public class ProcessorExample { public ProcessorExample() { } public static void main(String[] args) { NamedCache cache = CacheFactory.getCache("ContactsCache"); new ProcessorExample().execute(cache); } // ----- ProcessorExample methods ----------------------------------- public void execute(NamedCache cache) { // People who live in Massachusetts moved to an in-state office Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.", "Boston", "MA", "02116", "US"); cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork)); } // ----- nested class: OfficeUpdater ------------------------------------ /** * OfficeUpdater updates a contact's office address. */ public static class OfficeUpdater extends AbstractProcessor implements PortableObject { // ----- constructors ------------------------------------------- /** * Default constructor (necessary for PortableObject implementation). */ public OfficeUpdater() { } public OfficeUpdater(Address addrWork) { m_addrWork = addrWork; } // ----- InvocableMap.EntryProcessor interface ------------------ public Object process(InvocableMap.Entry entry) { Contact contact = (Contact) entry.getValue(); contact.setWorkAddress(m_addrWork); entry.setValue(contact); System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName()); return null; } // ----- PortableObject interface ------------------------------- public void readExternal(PofReader reader) throws IOException { m_addrWork = (Address) reader.readObject(0); } public void writeExternal(PofWriter writer) throws IOException { writer.writeObject(0, m_addrWork); } // ----- data members ------------------------------------------- private Address m_addrWork; } }
contacts-pof-config.xml
ファイルを編集して、OfficeUpdater
エントリのユーザー定義型IDを追加します。
... <user-type> <type-id>1006</type-id> <class-name>com.oracle.handson. ProcessorExample$OfficeUpdater</class-name> </user-type> ...
次の手順に従い、キャッシュ更新例を実行します。
JDeveloperで、Loadingプロジェクトのファイルをコンパイルします。
次の手順を実行し、ObserverExample
クラスおよびProcessorExample
クラスをテストします。
キャッシュ・サーバーを再起動します。
キャッシュをロードするLoaderExample
クラスを実行します。
ObserverExample
クラスを実行します。メッセージ・ウィンドウの下部にある「入力」領域には、値を入力しないでください。
キャッシュのレコードを更新するProcessorExample
を実行します。JDeveloperには、図7-2のような住所が更新されたことを示すメッセージが表示されます。