この章では、リスナーを設定してNamedCache内でのデータ変更を監視します。また、EntryProcessorsを使用して、Coherenceキャッシュのエントリを変更および処理する方法について習得します。
この章の内容は次のとおりです。
com.tangosol.util.ObservableMapインタフェースを使用すると、キャッシュ・エントリに加えられた変更を監視して、それに対応できます。このインタフェースはjava.util.EventListenerを拡張し、Javaの標準的なBeanイベント・モデルを使用します。すべてのタイプのNamedCacheは、このインタフェースを実装します。イベントをリスニングするには、キャッシュにMapListener(com.tangosol.util.MapListener)を登録します。MapListenersはクライアントでコールされます。つまり、リスナー・コードはクライアント・プロセスで実行されます。
イベントをリスニングするには、次の3つの方法があります。
すべてのイベントのリスニング
フィルタの条件を満たす、すべてのイベントのリスニング
特定のオブジェクト・キーにおけるイベントのリスニング
例7-1の各メソッド(実装して前述のリストのタスクを実行できます)は、NamedCacheで使用できます。
例7-1 NamedCacheのリスナー・メソッド
void addMapListener(MapListener listener) void addMapListener(MapListener listener, Filter filter, boolean fLite) void addMapListener(MapListener listener, Object oKey, boolean fLite)
com.tangosol.util.MapEventクラスは、オブジェクト・キーおよび古い値と新しい値を取得します。Liteイベントを指定できますが、新しい値と古い値が存在しない可能性があります。例7-2では、NamedCacheにこれらのメソッドを登録するパターンを説明します。これは、無名クラスとして実行されています。MapEventクラスのgetOldValue()メソッドまたはgetNewValue()メソッドを使用して、イベントを起動したエントリを取得できます。
例7-2 イベントを登録するコード・パターン
namedCache.addMapListener(new MapListener() {
public void entryDeleted(MapEvent mapEvent) {
// TODO... handle deletion event
}
public void entryInserted(MapEvent mapEvent) {
// TODO... handle inserted event
}
public void entryUpdated(MapEvent mapEvent)
{
// TODO... handle updated event } });
この項では、NamedCacheをリスニングし、検出した任意の変更に応答するJavaクラスの作成方法について説明します。
Loadingプロジェクトで、新規Contactオブジェクトのエントリをリスニングする新規クラスを作成します。このクラスにObserverExampleという名前を付け、これにmainメソッドが存在することを確認します。詳細は、「Javaクラスの作成」を参照してください。
このクラスでは、リスナーを追加し、新しいContactがキャッシュに更新されるたびにメッセージを表示します(ヒント: 次のコードを使用して、コンソールから読み取るまでJavaプロセスの実行を維持します。そうしない場合、プログラムはただちに終了します)。
BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();
例7-3に、可能な解決策を示します。
例7-3 リスナー・クラスのサンプル
package com.oracle.handson;
import com.tangosol.net.NamedCache;
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.MapEvent;
import com.oracle.handson.Contact;
import com.tangosol.net.CacheFactory;
import java.io.IOException;
/**
* ObserverExample observes changes to contacts.
*/
public class ObserverExample
{
public ObserverExample() {
}
// ----- ObserverExample methods -------------------------------------
public static void main(String[] args) {
NamedCache cache = CacheFactory.getCache("ContactsCache");
new ObserverExample().observe(cache);
try {
System.in.read();
} catch (IOException e) {
}
}
/**
* Observe changes to the contacts.
*
* @param cache target cache
*/
public void observe(NamedCache cache)
{
cache.addMapListener(new ContactChangeListener());
}
// ----- inner class: ContactChangeListener -------------------------
public class ContactChangeListener
extends AbstractMapListener
{
// ----- MapListener interface ------------------------------------------
public void entryInserted(MapEvent event)
{
System.out.println(event);
}
public void entryUpdated(MapEvent event)
{
Contact contactOld = (Contact)event.getOldValue();
Contact contactNew = (Contact)event.getNewValue();
StringBuffer sb = new StringBuffer();
if (!contactOld.getHomeAddress().equals(
contactNew.getHomeAddress()))
{
sb.append("Home address ");
}
if (!contactOld.getWorkAddress().equals(
contactNew.getWorkAddress()))
{
sb.append("Work address ");
}
if (!contactOld.getTelephoneNumbers().equals(
contactNew.getTelephoneNumbers()))
{
sb.append("Telephone ");
}
if (contactOld.getAge() != contactNew.getAge())
{
sb.append("Birthdate ");
}
sb.append("was updated for ").append(event.getKey());
System.out.println(sb);
}
public void entryDeleted(MapEvent event)
{
System.out.println(event.getKey());
}
}
}
次の手順に従い、キャッシュ・リスナー例を実行します。
コンソールの入力を有効にするには、プロジェクト・プロファイルを編集します。
Loadingプロジェクトを右クリックし、「プロジェクト・プロパティ」を選択します。
左側で「実行/デバッグ/プロファイル」を選択します。
右側で「編集」ボタンをクリックし、「ツール設定」をクリックします。「実行構成の編集」ダイアログ・ボックスの「プログラムの入力を許可」チェック・ボックスが選択されていることを確認してください。
「実行構成の編集」ダイアログ・ボックスおよび「プロジェクト・プロパティ」ダイアログ・ボックスで「OK」をクリックし、変更を保存します。
ローカル記憶域が無効になっていない場合は、これを無効にします。
Loadingプロジェクトを右クリックし、「プロジェクト・プロパティ」を選択します。
左側で「実行/デバッグ/プロファイル」を選択します。
「デフォルト」実行構成を選択し、「編集」をクリックします。「Javaオプション」フィールドで、ローカル記憶域を無効にするコマンドを追加します。
-Dtangosol.coherence.distributed.localstorage=false
Loadingプロジェクトのクラス(C:\home\oracle\labs\Loading\classes)および構成ファイルのパス(C:\home\oracle\labs)がcontacts-cache-server.cmdファイルにあることを確認します。
稼働しているキャッシュ・サーバーがあれば停止します。contacts-cache-server.cmdを実行してキャッシュ・サーバーを起動します。
JDeveloperからLoaderExampleプログラムを実行してキャッシュをロードします。ObserverExampleが実行されていれば、図7-1に示すように、プログラムが入力を待機していることを確認できます。
次の項では、キャッシュのエントリを変更し、変更されたレコードを返すプログラムを作成します。
この項では、キャッシュのエントリを変更し、変更されたレコードを返すJavaクラスを作成します。
これまでは、キャッシュ・エントリに対するアクションの実行に、putとgetの各操作を使用してきました。しかし、同時データ・アクセスが必要な場合に動作の一貫性を保証する、より優れたデータ操作方法も存在します。EntryProcessors(com.tangosol.util.InvocableMap.EntryProcessor)は、エントリに対して処理を行うエージェントです。このエントリは、データが保持されている場所で直接処理されます。実行する処理によってデータが変更されることもあります。この処理では、データの作成、更新、削除が行われる場合も、単に計算のみが行われる場合もあります。処理は、ノードが複数あるパーティション・キャッシュでパラレルに実行でき、スケーラブルです。キャッシュ内の処理では、処理のためにクライアントがデータを取得する必要がないため、I/Oを削減することもできます。
同一キーに対して動作するEntryProcessorsは、論理的にキューに入れられます。これにより、ロックフリー(高パフォーマンス)の処理が可能です。com.tangosol.util.InvocableMapインタフェース(NamedCacheを実装する)には、データ操作用の次のメソッドが用意されています。
• Object invoke(Object oKey, InvocableMap.EntryProcessor processor): 個々のオブジェクトについて、渡されたEntryProcessorを起動してその結果を返します。
• Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor): キーのコレクションについて、EntryProcessorを起動してその結果を返します。
• Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor): フィルタ条件に一致するエントリについて、EntryProcessorを起動してその結果を返します。
|
注意: 各クラスタ・ノードのクラスパスでEntryProcessorクラスを使用できる必要があります。 |
エントリ・クラスを作成するには、com.tangosol.util.processes.AbstractProcessorを拡張し、process()メソッドを実装できます。たとえば、次のコードは、Contactsデータセット内の従業員の勤務先住所を変更するEntryProcessorを作成します。
public static class OfficeUpdater extends AbstractProcessor
implements PortableObject
...
public Object process(InvocableMap.Entry entry) {
Contact contact = (Contact) entry.getValue();
contact.setWorkAddress(m_addrWork);
entry.setValue(contact);
return null;
}
OfficeUpdaterクラスを起動するには、次のコードを実行します。
cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
new OfficeUpdater(addrWork));
この演習では、キャッシュのエントリを更新するEntryProcessorsを含むJavaクラスを作成します。前の演習で作成したObserverExampleクラスは、これらの変更を検出し、変更されたレコードを表示します。
次の手順に従い、キャッシュ内のエントリを更新するファイルを作成します。
キャッシュ内のエントリを更新するクラスを作成します。
Loadingプロジェクトで、キャッシュ内のContactオブジェクトの住所を更新するmainメソッドを含むProcessorExampleというクラスを作成します。詳細は、「Javaクラスの作成」を参照してください。
マサチューセッツに住むContactsのレコードを検索し、勤務先住所を州内のオフィスに更新するコードを記述します。
ヒント: PortableObjectを実装し(キャッシュからのデータのシリアライズおよびデシリアライズ用)、勤務先住所を設定するEntryProcessorを含む内部クラスを含めます。Filterクラスからのメソッドを使用し、自宅住所がマサチューセッツにあるContactsを分離します。
例7-4に、ProcessorExampleクラスの可能なコードを示します。
例7-4 キャッシュ内のオブジェクトを更新するプログラムのサンプル
package com.oracle.handson;
import com.tangosol.net.NamedCache;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.processor.AbstractProcessor;
import com.tangosol.util.InvocableMap;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.oracle.handson.Address;
import com.oracle.handson.Contact;
import com.tangosol.net.CacheFactory;
import java.io.IOException;
/**
* ProcessorExample executes an example EntryProcessor.
*
*/
public class ProcessorExample
{
public ProcessorExample() {
}
public static void main(String[] args) {
NamedCache cache = CacheFactory.getCache("ContactsCache");
new ProcessorExample().execute(cache);
}
// ----- ProcessorExample methods -----------------------------------
public void execute(NamedCache cache)
{
// People who live in Massachusetts moved to an in-state office
Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.",
"Boston", "MA", "02116", "US");
cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),
new OfficeUpdater(addrWork));
}
// ----- nested class: OfficeUpdater ------------------------------------
/**
* OfficeUpdater updates a contact's office address.
*/
public static class OfficeUpdater
extends AbstractProcessor
implements PortableObject
{
// ----- constructors -------------------------------------------
/**
* Default constructor (necessary for PortableObject implementation).
*/
public OfficeUpdater() {
}
public OfficeUpdater(Address addrWork) {
m_addrWork = addrWork;
}
// ----- InvocableMap.EntryProcessor interface ------------------
public Object process(InvocableMap.Entry entry)
{
Contact contact = (Contact) entry.getValue();
contact.setWorkAddress(m_addrWork);
entry.setValue(contact);
System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName());
return null;
}
// ----- PortableObject interface -------------------------------
public void readExternal(PofReader reader)
throws IOException
{
m_addrWork = (Address) reader.readObject(0);
}
public void writeExternal(PofWriter writer)
throws IOException
{
writer.writeObject(0, m_addrWork);
}
// ----- data members -------------------------------------------
private Address m_addrWork;
}
}
contacts-pof-config.xmlファイルを編集して、OfficeUpdaterエントリのユーザー定義型IDを追加します。
...
<user-type>
<type-id>1006</type-id>
<class-name>com.oracle.handson.
ProcessorExample$OfficeUpdater</class-name>
</user-type>
...
次の手順に従い、キャッシュ更新例を実行します。
JDeveloperで、Loadingプロジェクトのファイルをコンパイルします。
次の手順を実行し、ObserverExampleクラスおよびProcessorExampleクラスをテストします。
キャッシュ・サーバーを再起動します。
キャッシュをロードするLoaderExampleクラスを実行します。
ObserverExampleクラスを実行します。メッセージ・ウィンドウの下部にある「入力」領域には、値を入力しないでください。
キャッシュのレコードを更新するProcessorExampleを実行します。JDeveloperには、図7-2のような住所が更新されたことを示すメッセージが表示されます。