この章では、マップ・イベント・リスナーを使用して、キャッシュ・イベントおよびCoherenceのObservableMap
インタフェースを実装するいずれかのクラスからのイベントを受信する方法について説明します。
この章には次の項が含まれます:
Coherenceでは、JavaBeanイベント・モデルを使用してキャッシュ・イベントが提供されます。この実装によって、実際にクラスタ内のどこで変更が行われたのかに関係なく、アプリケーションはそれらが必要としているタイミングと場所でイベントを受信できます。JavaBeanモデルに精通している開発者であれば、複雑なクラスタであってもイベントに難無く対応できるはずです。
注意: Coherenceには、ライブ・イベント・プログラミング・モデルも用意されています。ライブ・イベントでは、一般的なイベント・タイプのサポートが提供され、マップ・イベントのかわりに使用できます。ライブ・イベントの使用の詳細は、「ライブ・イベントの使用」を参照してください。 |
JavaBeansイベント・モデルには、すべてのリスナーで拡張する必要があるEventListener
インタフェースがあります。Coherenceには、MapListener
インタフェースが用意されており、これによりCoherenceキャッシュのデータが追加、変更または削除されたときにアプリケーション・ロジックでイベントを受信できます。
MapListener
インタフェースを実装するアプリケーション・オブジェクトは、アプリケーションのMapListener
実装のインスタンスをaddMapListener()
メソッドに渡すだけで、ObservableMap
インタフェースを実装する任意のCoherenceキャッシュまたはクラスからイベントにサインアップできます。
MapListener
に渡されるMapEvent
オブジェクトは、イベントの発生元になったソース(ObservableMap
)、イベントに関連付けられたID(キー)、そのIDに対するアクション(挿入、更新、削除)、古い値と新しい値など、発生したイベントについて必要なすべての情報を保持します。
MapListener
インタフェースとMapEvent
クラスの詳細は、Oracle Coherence Java APIリファレンスを参照してください。
パーティション・キャッシュ・サービスは、通常の状況ではイベントが1回だけ配信されることを保証します。ただし、これが保証されない可能性のあるシナリオが2つあります。
データ損失を引き起こすクラスタの突発障害(データを保持している2台のマシンの同時クラッシュなど)。この場合、PARTITION_LOST
イベントが、サーバー側のすべての登録済PartitionListener
インスタンスに出力されます。
クライアントの切断。この場合、MEMBER_LEFT
イベントが、クライアント側のすべての登録済MemberListener
インスタンスに出力されます。
すべてのCoherenceキャッシュには、ObservableMap
が実装されます。実際に、すべてのCoherenceキャッシュで実装されるNamedCache
インタフェースは、ObservableMap
インタフェースを拡張します。つまり、キャッシュが、ローカル、パーティション、ニア、レプリケートであるかどうか、また、リードスルー、ライトスルー、ライトビハインド、オーバーフロー、ディスク記憶域などを使用しているかどうかに関係なく、アプリケーションはサインアップして任意のキャッシュからイベントを受信できます。
注意: キャッシュ・トポロジやサーバーの数に関係なく、また他のサーバーが変更している場合でも、これらのイベントはアプリケーションのリスナーに配信されます。 |
Coherenceキャッシュ(Coherenceキャッシュ・ファクトリを介して取得されたオブジェクト)に加えて、次のように、Coherenceでサポートされる他のいくつかのクラスでもObservableMap
インタフェースが実装されます。
ObservableHashMap
LocalCache
OverflowMap
NearCache
ReadWriteBackingMap
AbstractSerializationCache
、SerializationCache
およびSerializationPagedCache
WrapperObservableMap
、WrapperConcurrentMap
およびWrapperNamedCache
イベントにサインアップするには、MapListener
インタフェースを実装するオブジェクトをObservableMap
のaddMapListener
メソッドに渡します。例25-1
は、addMapListenerメソッドを示しています。
例25-1 ObservableMap APIのメソッド
public void addMapListener(MapListener listener); public void addMapListener(MapListener listener, Object oKey, boolean fLite); public void addMapListener(MapListener listener, Filter filter, boolean fLite);
例25-2は、イベントを受信したときにそれぞれのイベントを出力するMapListener
実装のサンプルを示しています。
例25-2 MapListener実装のサンプル
/** * A MapListener implementation that prints each event as it receives * them. */ public static class EventPrinter extends Base implements MapListener { public void entryInserted(MapEvent evt) { out(evt); } public void entryUpdated(MapEvent evt) { out(evt); } public void entryDeleted(MapEvent evt) { out(evt); } }
この実装を使用すると、任意のキャッシュからすべてのイベントを出力できます(すべてのキャッシュでObservableMap
インタフェースが実装されているため)。
cache.addMapListener(new EventPrinter());
ただし、後でリスナーを削除できるように、リスナーへの参照を保持する必要があります。
例25-3 リスナーへの参照の保持
Listener listener = new EventPrinter(); cache.addMapListener(listener); m_listener = listener; // store the listener in a field
後でリスナーを削除するには、次のように処理します。
例25-4 リスナーの削除
Listener listener = m_listener; if (listener != null) { cache.removeMapListener(listener); m_listener = null; // clean up the listener field }
ObservableMap
インタフェースの各addMapListener
メソッドには、対応するremoveMapListener
メソッドがあります。リスナーを削除するには、そのリスナーの追加に使用したaddMapListener
メソッドに対応するremoveMapListener
メソッドを使用します。
MapListener
として使用する内部クラスを作成する場合、または1つか2つのイベント・タイプ(挿入、更新、または削除)のみをリスニングするMapListener
を実装する場合、AbstractMapListener
ベース・クラスを使用できます。たとえば、例25-5の無名の内部クラスでは、キャッシュの挿入イベントのみを出力します。
例25-5 キャッシュの挿入イベントのみを出力する内部クラス
cache.addMapListener(new AbstractMapListener() { public void entryInserted(MapEvent evt) { out(evt); } });
MapListener
の作成に役立つもう1つのベース・クラスは、MultiplexingMapListener
です。これは、すべてのイベントを1つのメソッドにルーティングして処理します。このクラスを使用すると、EventPrinter
のサンプルを例25-6のコードのように簡略化できます。1つのメソッドを実装するだけですべてのイベントを取得できるため、MultiplexingMapListener
は、MapListener
として使用する内部クラスを作成する際にも非常に便利です。
ラムダ式を使用すると、MapListener<K, V>
実装を追加できます。次の例では、ラムダ式を使用して、Coherenceで提供されるSimpleMapListener<K, V>
実装を追加します。この実装では、イベント・タイプに基づいて適切なイベント・ハンドラに委任します。
MapListener<ContactId, Contact> listener = new SimpleMapListener<ContactId, Contact>().addInsertHandler((event) -> System.out.println("\ninserted:\n" + event.getNewValue())); cache.addMapListener(listener);
特定のキャッシュにリスナーを常駐させる必要がある場合、<listener
要素を使用して、それをキャッシュ構成に配置します。キャッシュ構成の際に、リスナーは自動的に追加されます。
特定のID(キー)に対して発生したイベントのサインアップは非常に簡単です。たとえば、整数キー5
に対して発生したすべてのイベントを出力するには、次のように処理します。
cache.addMapListener(new EventPrinter(), new Integer(5), false);
そのため、例25-7のコードでは、整数キー5
が挿入または更新された場合にのみイベントがトリガーされます。
特定のキーのリスニングと同様に、特定のイベントをリスニングすることもできます。例25-8では、削除イベントのみを受信できるフィルタが指定されたリスナーがキャッシュに追加されます。
例25-8 削除されたイベントのフィルタを指定したリスナーの追加
// Filters used with partitioned caches must be // Serializable, Externalizable or ExternalizableLite public class DeletedFilter implements Filter, Serializable { public boolean evaluate(Object o) { MapEvent evt = (MapEvent) o; return evt.getId() == MapEvent.ENTRY_DELETED; } } cache.addMapListener(new EventPrinter(), new DeletedFilter(), false);
注意: イベントのフィルタリングとキャッシュされたデータのフィルタリングの比較問合せのフィルタを作成する場合、 問合せフィルタを使用してキャッシュ・イベントをリスニングする方法の詳細は、「問合せのリスニング」を参照してください。 |
次の一連のコールを実行すると、
cache.put("hello", "world"); cache.put("hello", "again"); cache.remove("hello");
次のような結果になります。
CacheEvent{LocalCache deleted: key=hello, value=again}
詳細は、「問合せのリスニング」を参照してください。
デフォルトでは、イベントの一部として古い値と新しい値が両方とも提供されます。次の例があります。
例25-9 キャッシュにおける値の挿入、更新、削除
MapListener listener = new MultiplexingMapListener() { public void onMapEvent(MapEvent evt) { out("event has occurred: " + evt); out("(the wire-size of the event would have been " + ExternalizableHelper.toBinary(evt).length() + " bytes.)"); } }; cache.addMapListener(listener); // insert a 1KB value cache.put("test", new byte[1024]); // update with a 2KB value cache.put("test", new byte[2048]); // remove the 2KB value cache.remove("test");
例25-10に示すテストの実行結果の出力では、1つ目のイベントは挿入される値(1KB)を保持し、2つ目のイベントは置換前の値(1KB)と新しい値(2KB)を保持しています。3つ目のイベントは、削除される値(2KB)を保持しています。
例25-10 サンプル出力
event has occurred: CacheEvent{LocalCache added: key=test, value=[B@a470b8} (the wire-size of the event would have been 1283 bytes.) event has occurred: CacheEvent{LocalCache updated: key=test, old value=[B@a470b8, new value=[B@1c6f579} (the wire-size of the event would have been 3340 bytes.) event has occurred: CacheEvent{LocalCache deleted: key=test, value=[B@1c6f579} (the wire-size of the event would have been 2307 bytes.)
アプリケーションでイベントに古い値と新しい値を両方とも含める必要がない場合、Liteイベントのみをリクエストして、そのように指定することができます。リスナーの追加時にLiteイベントをリクエストするには、追加のブール・パラメータfLite
を使用するaddMapListener
メソッドを使用できます。例25-9の場合は、次の箇所のみを変更します。
cache.addMapListener(listener, (Filter) null, true);
注意: Liteイベントの古い値と新しい値は、NULLにすることができます。ただし、Liteイベントをリクエストしても、イベントの生成と配信に余分な負荷がかからない場合は、古い値と新しい値が含まれることがあります。つまり、MapListener でLiteイベントを受信するようにリクエストしても、それは単にシステムにMapListener がイベントの古い値と新しい値を認識する必要がないことを知らせているだけになります。 |
すべてのCoherenceキャッシュはどの基準による問合せもサポートしています。アプリケーションがキャッシュのデータを問い合せると、その結果は一連の識別子(keySet
)または一連の識別子と値の組合せ(entrySet
)のいずれかとして、ポイント・イン・タイム・スナップショットで返されます。結果セットの内容を判断するメカニズムは、フィルタリングと呼ばれ、これによりアプリケーションの開発者は即時利用可能なフィルタ(equals、less-than、like、betweenなど)の豊富なセットを使用してどのように複雑な問合せも構築でき、また独自のカスタム・フィルタ(XPath
など)を提供できるようになります。
キャッシュの問合せに使用するフィルタと同じフィルタで、キャッシュからのイベントをリスニングできます。たとえば、取引システムにおいて、特定のトレーダーが受け持つ未決済のOrder
オブジェクトをすべて問い合せることができます。
例25-11 キャッシュからのイベントのリスニング
NamedCache mapTrades = ... Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid), new EqualsFilter("getStatus", Status.OPEN)); Set setOpenTrades = mapTrades.entrySet(filter);
そのトレーダーが新しいポジションを建てた、そのトレーダーがポジションを決済した、トレーダー間でポジションを付け替えた、という情報を受信するために、アプリケーションで同じフィルタを使用できます。
例25-12 オブジェクトのイベントのリスニング
// receive events for all trade IDs that this trader is interested in mapTrades.addMapListener(listener, new MapEventFilter(filter), true);
MapEventFilter
は、問合せフィルタをイベント・フィルタに変換します。
MapEventFilter
には、いくつもの非常に強力なオプションがあります。これにより、アプリケーション・リスナーは特に関心のあるイベントのみを受信できます。スケーラビリティとパフォーマンスの観点からさらに重要なこととして、必要なイベントのみがネットワーク上で通信されるため、それらの特定イベントを対象とするサーバーとクライアントにのみ配信されます。例25-13は、これらのシナリオを示しています。
例25-13 MapEventFilterを使用した様々なイベントのフィルタリング
// receive all events for all trades that this trader is interested in nMask = MapEventFilter.E_ALL; mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true); // receive events for all this trader's trades that are closed or // re-assigned to a different trader nMask = MapEventFilter.E_UPDATED_LEFT | MapEventFilter.E_DELETED; mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true); // receive events for all trades as they are assigned to this trader nMask = MapEventFilter.E_INSERTED | MapEventFilter.E_UPDATED_ENTERED; mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true); // receive events only fornew trades assigned to this trader nMask = MapEventFilter.E_INSERTED; mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);
サポートされる各種オプションの詳細は、MapEventFilter
のAPIドキュメントを参照してください。
問合せのFilter
を作成する場合、Filter
のevaluateメソッドに渡すオブジェクトはキャッシュからの値です。または、Filter
でEntryFilter
インタフェースを実装する場合は、キャッシュからのMap.Entry
全体です。MapListener
のイベントをフィルタリングするFilter
を作成する場合、Filter
のevaluateメソッドに渡すオブジェクトはMapEvent
型です。
MapEventFilter
は、問合せに使用するFilter
をMapListener
のイベントのフィルタリングに使用するFilter
に変換します。つまり、MapEventFilter
はキャッシュを問い合せるFilter
から作成されますが、作成されたMapEventFilter
は、問合せFilterが期待するオブジェクトに変換することによってMapEvent
オブジェクトを評価するフィルタになります。
イベントは通常、キャッシュに対する変更を反映します。たとえば、あるサーバーがキャッシュ内の1つのエントリを変更し、別のサーバーがキャッシュに複数のアイテムを追加し、3つ目のサーバーが同じキャッシュから1つのアイテムを削除している間に、クラスタ内の各サーバーで50個のスレッドが同じキャッシュのデータにアクセスすることがあります。すべての変更操作によりイベントが生成されますが、それをクラスタ内の任意のサーバーで受信するように選択できます。これらの操作をクライアント・アクションと呼び、そのイベントは、クライアントにディスパッチされた状態にあります(この場合の「クライアント」が実際には「サーバー」であっても)。これはCoherenceクラスタなどの、真のpeer-to-peerアーキテクチャでは自然の概念です。いずれのピアもクライアントでありサーバーでもあるので、他のピアからサービスを受け、他のピアにサービスを提供します。一般的なJava Enterpriseアプリケーションでは、ピアはアプリケーションのコンテナとして動作するアプリケーション・サーバー・インスタンスで、クライアントは直接キャッシュに対してアクセスおよび変更を行い、キャッシュからのイベントをリスニングするアプリケーションの一部です。
イベントの中には、キャッシュ内部で起動されるものもあります。多くの例がありますが、最も一般的なケースは次のとおりです。
キャッシュのエントリが自動失効した場合
キャッシュの最大サイズに達したため、エントリがキャッシュから削除された場合
リードスルー操作の結果としてエントリが透過的にキャッシュに追加された場合
リードアヘッドまたはリフレッシュアヘッド操作の結果としてキャッシュ内のエントリが透過的に更新された場合
これらはそれぞれ変更の例ですが、この場合の変更とはキャッシュ内部での自然な(通常は自動的な)操作を表します。このようなイベントは統合イベントと呼ばれます。
必要に応じて、アプリケーションはイベントに問い合せて、それがクライアントに起因するイベントか、または統合イベントかを区別できます。この情報は、CacheEvent
というMapEvent
のサブクラスで保持されます。前述のEventPrinter
の例を使用して、統合イベントのみを出力できます。
例25-14 統合イベントの判断
public static class EventPrinter extends MultiplexingMapListener { public void onMapEvent(MapEvent evt) { if (evt instanceof CacheEvent && ((CacheEvent) evt).isSynthetic()) { out(evt); ) } }
この機能の詳細は、CacheEvent
のAPIドキュメントを参照してください。
Coherenceキャッシュのイベントをリスニングし、データに関して分散、パーティション、レプリケート、ニア・キャッシュ、連続問合せ、リードスルー/ライトスルー、ライトビハインドのローカル表示を確認できる一方で、いわば「こっそりのぞき見る」こともできます。
いくつかの応用例の中には、サービスの背後でマップをリスニングする必要がある場合があります。分散環境でのレプリケーション、パーティション、およびその他のデータ管理方式は、いずれも分散サービスです。このサービスにはさらに実際にデータを管理するための機能が必要です。それが「バッキング・マップ」と呼ばれるものです。
バッキング・マップを構成できます。特定のキャッシュのすべてのデータを、ヒープのオブジェクト形式で保持する必要がある場合、無制限で失効期限のないLocalCache
(統計が不要な場合はSafeHashMap
)を使用します。メモリーに保持するアイテムの数が少ない場合は、LocalCache
を使用します。データを必要に応じてデータベースから読み取る場合は、ReadWriteBackingMap
を使用します(アプリケーションのDAO実装を介した読取りおよび書込み方法を指定します)。ReadWriteBackingMap
に、SafeHashMap
やLocalCache
などのバッキング・マップを提供してデータを保存します。
バッキング・マップの中には監視可能なものがあります。これらのバッキング・マップから取得されたイベントは、一般的にはアプリケーションに直接関係がありません。かわりに、Coherenceではそれらを(Coherenceで)実行する必要があるアクションに変換してデータを同期的に保持し正しくバックアップを行い、またアプリケーション・リスナーでのリクエストに応じてそれをクラスタ全体に配信されるクラスタ化されたイベントに変換します。たとえば、パーティション・キャッシュにバッキング・マップとしてLocalCache
があり、ローカル・キャッシュでエントリが失効した場合、そのイベントによってエントリのすべてのバックアップ・コピーが失効します。さらに、リスナーがパーティション・キャッシュに登録されている場合、イベントがイベント・フィルタに適合すると、そのイベントはそのリスナーが登録されているサーバーのリスナーに配信されます。
応用例の中には、データが保持されているサーバー上のイベントをアプリケーションで処理する必要があるものや、それを実際にデータを管理している構造(バッキング・マップ)で行う必要があるものがあります。そのような場合、バッキング・マップが監視可能なマップであれば、リスナーをバッキング・マップに構成したり、プログラムによってリスナーをバッキング・マップに追加することができます。(バッキング・マップが監視可能でない場合、WrapperObservableMap
でラップすることにより監視可能になります。)
各バッキング・マップ・イベントは、1回だけディスパッチされます。ただし、1つのput
から複数のバッキング・マップ・イベントが生成されることがあります。たとえば、put
からのエントリを再分散する必要がある場合、分散イベント(元のノードから削除され、新しいノードに挿入されるイベント)が作成されます。この場合、1つのput
に対してバッキング・マップ・リスナーが複数回コールされます。
最後に、バッキング・マップ・リスナーは常に同期されます。バッキング・マップ・リスナーは、バッキング・マップ自身の同期化モニターを保持しつつ、変更操作を行うスレッドで起動されます。内部バッキング・マップ・リスナーでは多くの場合、イベントは即座には処理されませんが、キューに配置され、後で非同期に処理されます。
この機能の詳細は、BackingMapManager
のAPIドキュメントを参照してください。
バッキングMapListener
イベントは、読取り可能なJavaフォーマットでレプリケート・キャッシュから返されます。ただし、分散キャッシュから返されたバッキングMapListener
イベントは、Coherenceの内部フォーマットになります。Coherence Incubator Commonプロジェクトには、分散キャッシュから読取り可能なバッキングMapListener
イベントから取得できるAbstractMultiplexingBackingMapListener
クラスが用意されています。Coherence Commonライブラリをダウンロードするには、http://coherence.oracle.com/display/INCUBATOR/Coherence+Common
を参照してください。
分散キャッシュからの読取り可能なバッキングMapListener
イベントを生成するには:
AbstractMultiplexingBackingMapListener
クラスを実装します。
cache-config
ファイルのbacking-map-scheme
の<listener>
セクションに実装を登録します。
Javaプロパティのcacheconfig
で、キャッシュ・サーバー・アプリケーション・ファイルおよびクライアント・ファイルを開始します。
-Dcoherence.cacheconfig="cache-config.xml"
AbstractMultiplexingBackingMapListener
クラスには、onBackingMapEvent
メソッドが用意されています。これをオーバーライドしてイベントが返される方法を指定できます。
次のVerboseBackingMapListener
クラスの一覧は、AbstractMultiplexingBackingMapListener
のサンプルの実装です。onBackingMapEvent
メソッドは、結果を標準出力に送信するように上書きされています。
例25-15 AbstractMultiplexingBackingMapListener実装
import com.tangosol.net.BackingMapManagerContext; import com.tangosol.util.MapEvent; public class VerboseBackingMapListener extends AbstractMultiplexingBackingMapListener { public VerboseBackingMapListener(BackingMapManagerContext context) { super(context); } @Override protected void onBackingMapEvent(MapEvent mapEvent, Cause cause) { System.out.printf("Thread: %s Cause: %s Event: %s\n", Thread.currentThread().getName(), cause, mapEvent); } }
例25-16は、分散スキーム定義の例です。ファイルの<listener>
セクションで、VerboseBackingMapListener
は、com.tangosol.net.BackingMapManagerContext
タイプとして識別されます。
例25-16 詳細バッキング・マップ・リスナーを指定する分散スキーム
<distributed-scheme> <scheme-name>my-dist-scheme</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <read-write-backing-map-scheme> <internal-cache-scheme> <local-scheme> <high-units>0</high-units> <expiry-delay>0</expiry-delay> </local-scheme> </internal-cache-scheme> <cachestore-scheme> <class-scheme> <class-name>CustomCacheStore</class-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>{cache-name}</param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> <listener> <class-scheme> <class-name>VerboseBackingMapListener</class-name> <init-params> <init-param> <param-type>com.tangosol.net.BackingMapManagerContext </param-type> <param-value>{manager-context}</param-value> </init-param> </init-params> </class-scheme> </listener> </read-write-backing-map-scheme> </backing-map-scheme> <autostart>true</autostart> </distributed-scheme>
イベントの中には、アプリケーション・リスナーがイベントを生成するキャッシュ・サービスを妨害しないように、非同期に配信されるものがあります。まれなケースですが、非同期配信では進行している処理の結果に比べてイベントの順序があいまいになる場合があります。クラスタ化システムのローカル表示が単一スレッドであるかのようにキャッシュAPI操作およびイベントの順序を保証するには、MapListener
でSynchronousListener
マーカー・インタフェースを実装する必要があります。
Coherence自体で同期リスナーを使用する例にニア・キャッシュがあります。これにより、ローカルでキャッシュされたデータを、イベントを使用して無効化できます(通称、Seppuku)。
この機能の詳細は、MapListenerSupport.SynchronousListener
のAPIドキュメントを参照してください。