ObservableMap
インタフェースを実装するCoherence内の任意のクラスからキャッシュ・イベントおよびイベントを受信できます。
この章の内容は次のとおりです。
MapListener
インタフェースが用意されており、これによりCoherenceキャッシュのデータが追加、変更または削除されたときにアプリケーション・ロジックでイベントを受信できます。ObservableMap
インタフェースが実装されています。MapListener
インタフェースを実装するオブジェクトをObservableMap
のaddMapListener
メソッドに渡します。MultiplexingMapListener
クラスは、すべてのイベントを1つのメソッドにルーティングして処理します。<listener>
要素を使用して、リスナーをキャッシュに登録できます。MapListener
実装では、SynchronousListener
マーカー・インタフェースを使用して、クラスタ化されたシステムのローカル・ビューがシングル・スレッドであるかのように、キャッシュAPIの操作およびイベントが順序付けされることを保証できます。親トピック: C++ Extendクライアントの作成
EventListener
インタフェースから成り、すべてのリスナーでこれを拡張する必要があります。Coherenceには、MapListener
インタフェースが用意されており、これによりCoherenceキャッシュのデータが追加、変更または削除されたときにアプリケーション・ロジックでイベントを受信できます。
MapListener
インタフェースを実装するアプリケーション・オブジェクトは、アプリケーションのMapListener
実装のインスタンスをaddMapListener()
メソッドに渡すことで、ObservableMap
インタフェースを実装する任意のCoherenceキャッシュまたはクラスからイベントにサインアップできます。
MapListener
に渡されるMapEvent
オブジェクトは、イベントの発生元になったソース(ObservableMap
)、イベントに関連付けられたID(キー)、そのIDに対するアクション(挿入、更新、削除)、古い値と新しい値など、発生したイベントについて必要なすべての情報を保持します。
ObservableMap
インタフェースが実装されています。すべてのキャッシュは、そのキャッシュが、ローカル、パーティション、ニア、レプリケートであるかどうか、また、リードスルー、ライトスルー、ライトビハインド、オーバーフロー、ディスク記憶域などを使用しているかどうかに関係なく、イベントを受信できます。注意:
キャッシュ・トポロジやサーバーの数に関係なく、また他のサーバーが変更している場合でも、これらのイベントはアプリケーションのリスナーに配信されます。
Coherenceキャッシュ(Coherenceキャッシュ・ファクトリを介して取得されたオブジェクト)に加えて、次のように、Coherenceでサポートされる他のいくつかのクラスでもObservableMap
インタフェースが実装されます。
ObservableHashMap
LocalCache
OverflowMap
NearCache
ReadWriteBackingMap
AbstractSerializationCache
、SerializationCache
およびSerializationPagedCache
WrapperObservableMap
、WrapperConcurrentMap
およびWrapperNamedCache
公開されている実装クラスの一覧は、Coherence APIの「ObservableMap」を参照してください。
MapListener
インタフェースを実装するオブジェクトをObservableMap
のaddMapListener
メソッドに渡します。
次に例を示します。
virtual void addKeyListener(MapListener::Handle hListener, Object::View vKey, bool fLite) = 0; virtual void removeKeyListener(MapListener::Handle hListener, Object::View vKey) = 0; virtual void addFilterListener(MapListener::Handle hListener, Filter::View vFilter = NULL, bool fLite = false) = 0; virtual void removeFilterListener(MapListener::Handle hListener, Filter::View vFilter = NULL) = 0;
MapListener
実装のサンプルを作成してみましょう。
#include "coherence/util/MapEvent.hpp" #include "coherence/util/MapListener.hpp" #include <iostream> using coherence::util::MapEvent; using coherence::util::MapListener; using namespace std; /** * A MapListener implementation that prints each event as it receives * them. */ class EventPrinter : public class_spec<EventPrinter, extends<Object>, implements<MapListener> > { friend class factory<EventPrinter>; public: virtual void entryInserted(MapEventView vEvent) { cout << vEvent << endl; } virtual void entryUpdated(MapEventView vEvent) { cout << vEvent << endl; } virtual void entryDeleted(MapEventView vEvent) { cout << vEvent << endl; } };
この実装を使用すると、任意のキャッシュからすべてのイベントを簡単に出力できます(すべてのキャッシュでObservableMap
インタフェースが実装されているため)。
NamedCache::Handle hCache; ... hCache->addFilterListener(EventPrinter::create());
ただし、後でリスナーを削除できるように、リスナーへの参照を保持する必要があります。
MapListener::Handle hListener = EventPrinter::create(); hCache->addFilterListener(hListener); m_hListener = hListener; // store the listener in a member field
後でリスナーを削除するには、次のように処理します。
MapListener::Handle hListener = m_hListener; if (hListener != NULL) { hCache->removeFilterListener(hListener); m_hListener = NULL; // clean up the listener field }
ObservableMap
インタフェースの各add*Listener
メソッドには、対応するremove*Listener
メソッドがあります。リスナーを削除するには、そのリスナーの追加に使用したadd*Listener
メソッドに対応するremove*Listener
メソッドを使用します。
MultiplexingMapListener
クラスは、すべてのイベントを1つのメソッドにルーティングして処理します。次の例は、単純なEventPrinter
クラスを示しています。#include "coherence/util/MultiplexingMapListener.hpp" #include <iostream> using coherence::util::MultiplexingMapListener; class EventPrinter : public class_spec<EventPrinter, extends<MultiplexingMapListener> > { public: virtual void onMapEvent(MapEventView vEvent) { std::cout << vEvent << std::endl; } };
<listener>
要素を使用して、リスナーをキャッシュに登録できます。構成されている場合、Coherenceでキャッシュを構成するときにリスナーが自動的に追加されます。リスナーを構成内に登録すると、特定のキャッシュが常にリスナーを必要とする場合に便利です。Integer
キー5
に対して発生したすべてのイベントが出力されます。hCache->addKeyListener(EventPrinter::create(), Integer32::create(5), false);
次のコードでは、Integer
キー5
が挿入または更新された場合にのみイベントがトリガーされます。
for (int32_t i = 0; i < 10; ++i) { Integer32::View vKey = Integer32::create(i); Integer32::View vValue = vKey; hCache->put(vKey, vValue); }
// Filters used with partitioned caches must implement coherence::io::pof::PortableObject #include "coherence/io/pof/PofReader.hpp" #include "coherence/io/pof/PofWriter.hpp" #include "coherence/io/pof/PortableObject.hpp" #include "coherence/util/Filter.hpp" #include "coherence/util/MapEvent.hpp" using coherence::io::pof::PofReader; using coherence::io::pof::PofWriter; using coherence::io::pof::PortableObject; using coherence::util::Filter; using coherence::util::MapEvent; class DeletedFilter : public class_spec<DeletedFilter, extends<Object>, implements<Filter, PortableObject> > { public: // Filter interface virtual bool evaluate(Object::View v) const { MapEvent::View vEvt = cast<MapEvent::View>(v); return MapEvent::entry_deleted == vEvt->getId(); } // PortableObject interface virtual void readExternal(PofReader::Handle hIn) { } virtual void writeExternal(PofWriter::Handle hOut) const { } }; hCache->addFilterListener(EventPrinter::create(), DeletedFilter::create(), false);
たとえば、次の一連のコールが実行されたとします。
cache::put(String::create("hello"), String::create("world")); cache::put(String::create("hello"), String::create("again")); cache::remove(String::create("hello"));
次のような結果になります。
CacheEvent{LocalCache deleted: key=hello, value=again}
問合せのリスニングを参照してください。
イベントのフィルタリングとキャッシュされたデータのフィルタリングの比較
問合せのFilter
を作成する場合、Filter
のevaluateメソッドに渡すオブジェクトはキャッシュからの値になります。または、Filter
でEntryFilter
インタフェースを実装する場合は、キャッシュからのMap::Entry
全体になります。MapListener
のイベントをフィルタリングするFilter
を作成する場合、Filter
のevaluate
メソッドに渡すオブジェクトは常にMapEvent
型になります。
MapListener::Handle hListener = EventPrinter::create(); // add listener with the default"lite" value of falsehCache->addFilterListener(hListener); // insert a 1KB value String::View vKey = String::create("test"); hCache->put(vKey, Array<octet_t>::create(1024)); // update with a 2KB value hCache->put(vKey, Array<octet_t>::create(2048)); // remove the value hCache->remove(vKey);
このコードを実行すると、挿入イベントには新しい1KBの値、更新イベントには古い1KBの値と新しい2KBの値、削除イベントには削除された2KBの値がそれぞれ保持されます。
リスナーの追加時にLiteイベントをリクエストするには、追加のブール・パラメータfLite
を使用するaddFilterListener
メソッドとaddKeyListener
メソッドのいずれかを使用できます。前述の例の場合、変更は次の箇所のみです。
cache->addFilterListener(hListener, (Filter::View) NULL, true);
注意:
Liteイベントの古い値と新しい値はNULL
にできます。ただし、Liteイベントをリクエストしても、イベントの生成と配信に余分な負荷がかからない場合は、古い値と新しい値が含まれることがあります。つまり、MapListener
受信Liteイベントを要求することは、MapListener
がイベントについて古い値と新しい値の情報を必要としないことを単純にシステムに通知することになります。
keySet
)のセットまたはID/値のペア(entrySet
)のセットとしてのポイント・イン・タイム・スナップショットになります。結果セットの内容を判断するメカニズムは、フィルタリングと呼ばれ、これによりアプリケーション開発者は即時利用可能なフィルタ(equals、less-than、like、betweenなど)の豊富なセットを使用してどのような複雑な問合せをも構築でき、また独自のカスタム・フィルタ(XPathなど)を提供できます。キャッシュの問合せに使用するフィルタと同じフィルタを、キャッシュからのイベントのリスニングに使用します。たとえば、トレーディング・システムにおいて、特定のトレーダーの未決済Order
オブジェクトをすべて問い合せることができます。
注意:
次の例では、coherence::util::extractor::ReflectionExtractor
クラスを使用しています。C++クライアントではリフレクションがサポートされませんが、クラスタ内で実行される問合せにはReflectionExtractor
を使用できます。この場合、ReflectionExtractor
は、問合せを実行するクラスタに、必要な抽出情報を渡すのみです。ReflectionExtractor
がクライアントでデータを抽出することになる場合(値をローカルにキャッシュするときのContinuousQueryCache
など)は、ReflectionExtractor
の使用はサポートされません。この場合は、カスタム・エクストラクタを用意する必要があります。
NamedCache::Handle hMapTrades = ... Filter::Handle hFilter = AndFilter::create( EqualsFilter::create(ReflectionExtractor::create("getTrader"), vTraderId), EqualsFilter::create(ReflectionExtractor::create("getStatus"), Status::OPEN)); Set::View vSetOpenTrades = hMapTrades->entrySet(hFilter);
そのトレーダーが新しいポジションを建てた、そのトレーダーがポジションを決済した、トレーダー間でポジションを付け替えた、という情報を受信するために、アプリケーションで同じフィルタを使用できます。
// receive events for all trade IDs that this trader is interested in hMapTrades->addFilterListener(hListener, MapEventFilter::create(hFilter), true);
MapEventFilter
は、問合せフィルタをイベント・フィルタに変換します。
注意:
イベントのフィルタリングとキャッシュされたデータのフィルタリングの比較: 問合せのFilter
を作成する場合、Filterのevaluate
メソッドに渡すオブジェクトはキャッシュからの値になります。または、Filter
でEntryFilter
インタフェースを実装する場合は、キャッシュからのMap::Entry
全体になります。MapListener
のイベントをフィルタリングするFilter
を作成する場合、Filter
のevaluate
メソッドに渡すオブジェクトは常にMapEvent
型になります。
MapEventFilter
は、問合せに使用するFilter
をMapListener
のイベントのフィルタリングに使用するFilter
に変換します。つまり、MapEventFilter
はキャッシュを問い合せるFilter
から作成されますが、作成されたMapEventFilter
は、問合せFilter
が期待するオブジェクトに変換することによってMapEvent
オブジェクトを評価するフィルタになります。
MapEventFilter
には、いくつもの非常に強力なオプションがあります。これにより、アプリケーション・リスナーは特に関心のあるイベントのみを受信できます。スケーラビリティとパフォーマンスの観点からさらに重要なこととして、必要なイベントのみがネットワーク上で通信されるため、それらの特定イベントを対象とするサーバーとクライアントにのみ配信されます。次に例を示します。
// receive all events for all trades that this trader is interested in int32_t nMask = MapEventFilter::e_all; hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true); // receive events for all this trader's trades that are closed or // re-assigned to a different trader nMask = MapEventFilter::e_updated_left | MapEventFilter::e_deleted; hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true); // receive events for all trades as they are assigned to this trader nMask = MapEventFilter::e_inserted | MapEventFilter::e_updated_entered; hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true); // receive events only for new trades assigned to this trader nMask = MapEventFilter::e_inserted; hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true);
イベントは通常、キャッシュで発生している変更を反映します。たとえば、あるサーバーがキャッシュ内の1つのエントリを変更し、別のサーバーがキャッシュに複数のアイテムを追加し、3つ目のサーバーが同じキャッシュから1つのアイテムを削除している間に、クラスタ内の各サーバーで50個のスレッドが同じキャッシュのデータにアクセスすることがあります。すべての変更操作によりイベントが生成されますが、それをクラスタ内の任意のサーバーで受信するように選択できます。これらの操作をクライアント・アクションと呼び、そのイベントは、クライアントにディスパッチされた状態にあります(この場合のクライアントは実際にはサーバー)。これはCoherenceクラスタなどの、真のpeer-to-peerアーキテクチャでは自然の概念です。いずれのピアもクライアントでありサーバーでもあるので、他のピアからサービスを受け、他のピアにサービスを提供します。一般的なJava Enterpriseアプリケーションでは、ピアはアプリケーションのコンテナとして動作するアプリケーション・サーバー・インスタンスで、クライアントはキャッシュに対して直接アクセスおよび変更を行い、キャッシュからのイベントをリスニングするアプリケーションの一部です。
イベントの中には、キャッシュ内部で起動されるものもあります。多くの例がありますが、最も一般的なケースは次のとおりです。
キャッシュのエントリが自動失効した場合
キャッシュの最大サイズに達したため、エントリがキャッシュから削除された場合
リードスルー操作の結果としてエントリが透過的にキャッシュに追加された場合
リードアヘッドまたはリフレッシュアヘッド操作の結果としてキャッシュ内のエントリが透過的に更新された場合
これらは変更の各例ですが、変更とはキャッシュ内部での自然な(通常は自動的な)各操作を表します。これらのイベントを統合イベントと呼びます。
必要に応じて、アプリケーションはイベントに問い合せて、それがクライアントに起因するイベントか、または統合イベントかを区別できます。この情報は、CacheEvent
というMapEvent
のサブクラスで保持されます。前述のEventPrinter
の例を使用して、統合イベントのみを出力できます。
class EventPrinter : public class_spec<EventPrinter, extends<MultiplexingMapListener> > { friend class factory<EventPrinter>; public: void onMapEvent(MapEvent::View vEvt) { if (instanceof<CacheEvent::View>(vEvt) && (cast<CacheEvent::View>(vEvt)->isSynthetic())) { std::cout << vEvt; } } };
この機能の詳細は、CacheEvent
のAPIドキュメントを参照してください。
バッキング・マップは構成可能です。特定のキャッシュのすべてのデータを、ヒープのオブジェクト形式で保持する必要がある場合、無制限で失効期限のないLocalCache
(統計が不要な場合はSafeHashMap
)を使用します。メモリーに保持するアイテムの数が少ない場合は、LocalCache
を使用します。データを必要に応じてデータベースから読み取る場合は、ReadWriteBackingMap
を使用します(アプリケーションのDAO実装を介した読取りおよび書込み方法を指定します)。ReadWriteBackingMap
に、SafeHashMap
やLocalCache
などのバッキング・マップを提供してデータを保存します。
バッキング・マップの中には観測可能なものがあります。これらのバッキング・マップから取得されたイベントは、一般的にはアプリケーションに直接関係がありません。かわりに、Coherenceではそれらを(Coherenceで)実行する必要があるアクションに変換してデータを同期的に保持し正しくバックアップを行い、またアプリケーション・リスナーでのリクエストに応じてそれをクラスタ全体に配信されるクラスタ化されたイベントに変換します。たとえば、パーティション・キャッシュにバッキング・マップとしてLocalCache
があり、ローカル・キャッシュでエントリが失効した場合、そのイベントによってエントリのすべてのバックアップ・コピーが失効します。さらに、リスナーがパーティション・キャッシュに登録されている場合、イベントがイベント・フィルタに適合すると、そのイベントはそのリスナーが登録されているサーバーのリスナーに配信されます。
応用例の中には、データが保持されているサーバー上のイベントをアプリケーションで処理する必要があるものや、それを実際にデータを管理している構造(バッキング・マップ)で行う必要があるものがあります。そのような場合、バッキング・マップが観測可能マップであれば、リスナーをバッキング・マップに構成したり、プログラムによってリスナーをバッキング・マップに追加することができます。(バッキング・マップが監視可能でない場合、WrapperObservableMap
でラップすることにより監視可能になります。)
MapListener
実装では、SynchronousListener
マーカー・インタフェースを使用して、クラスタ化されたシステムのローカル・ビューがシングル・スレッドであるかのように、キャッシュAPIの操作およびイベントが順序付けされることを保証できます。Coherence自体で同期リスナーを使用する例にニア・キャッシュがあります。これにより、ローカルでキャッシュされたデータを、イベントを使用して無効化できます。