Coherenceでは、キャッシュ・イベントが提供されています。変更が実際にはクラスタ内のどこで発生していても、とても簡単な方法で必要なイベントを必要なときに受信できます。
イベント・モデルには、すべてのリスナーで拡張する必要があるEventListenerインタフェースがあります。Coherenceには、MapListenerインタフェースが用意されており、これによりCoherenceキャッシュのデータが追加、変更または削除されたときにアプリケーション・ロジックでイベントを受信できます。例12-1に、MapListener APIの一部を示します。
例12-1 coherence::util::MapListenerクラス・ファイルからの抜粋
class MapListener
: public interface_spec<MapListener,
implements<EventListener> >
{
// ----- handle definitions ---------------------------------------------
public:
/**
* Handle definition.
*/
typedef TypedHandle<MapListener> Handle;
/**
* View definition.
*/
typedef TypedHandle<const MapListener> View;
/**
* MapEvent View definition.
*/
typedef TypedHandle<const MapEvent> MapEventView;
// ----- MapListener interface ------------------------------------------
public:
/**
* Invoked when a map entry has been inserted.
*
* @param vEvent the MapEvent carrying the insert information
*/
virtual void entryInserted(MapEventView vEvent) = 0;
/**
* Invoked when a map entry has been updated.
*
* @param vEvent the MapEvent carrying the update information
*/
virtual void entryUpdated(MapEventView vEvent) = 0;
/**
* Invoked when a map entry has been removed.
*
* @param vEvent the MapEvent carrying the delete information
*/
virtual void entryDeleted(MapEventView vEvent) = 0;
};
MapListenerインタフェースを実装するアプリケーション・オブジェクトは、アプリケーションのMapListener実装のインスタンスをaddMapListener()メソッドのいずれかに渡すだけで、ObservableMapインタフェースを実装する任意のCoherenceキャッシュまたはクラスからイベントにサインアップできます。
MapListenerに渡されるMapEventオブジェクトは、イベントの発生元になったソース(ObservableMap)、イベントに関連付けられたID(キー)、そのIDに対するアクション(挿入、更新、削除)、古い値と新しい値など、発生させたイベントについて必要なすべての情報を保持します。例12-2に、MapEvent APIの一部を示します。
例12-2 coherence::util::MapEventからの抜粋
class MapEvent
: public class_spec<MapEvent,
extends<EventObject> >
{
friend class factory<MapEvent>;
// ----- MapEvent interface ---------------------------------------------
public:
/**
* Return an ObservableMap object on which this event has actually
* occurred.
*
* @return an ObservableMap object
*/
virtual ObservableMap::Handle getMap() const;
/**
* Return this event's id. The event id is one of the ENTRY_*
* enumerated constants.
*
* @return an id
*/
virtual int32_t getId() const;
/**
* Return a key associated with this event.
*
* @return a key
*/
virtual Object::View getKey() const;
/**
* Return an old value associated with this event.
* <p>
* The old value represents a value deleted from or updated in a map.
* It is always NULL for "insert" notifications.
*
* @return an old value
*/
virtual Object::View getOldValue() const;
/**
* Return a new value associated with this event.
* <p>
* The new value represents a new value inserted into or updated in
* a map. It is always NULL for "delete" notifications.
*
* @return a new value
*/
virtual Object::View getNewValue() const;
// ----- Objectinterface -----------------------------------------------
public:
/**
* {@inheritDoc}
*/
virtual void toStream(std::ostream& out) const;
// ----- helper methods -------------------------------------------------
public:
/**
* Dispatch this event to the specified listeners collection.
* <p>
* This call is equivalent to
* <pre>
* dispatch(listeners, true);
* </pre>
*
* @param vListeners the listeners collection
*
* @throws ClassCastException if any of the targets is not
* an instance of MapListener interface
*/
virtual void dispatch(Listeners::View vListeners) const;
/**
* Dispatch this event to the specified listeners collection.
*
* @param vListeners the listeners collection
* @param fStrict if true then any RuntimeException thrown by event
* handlers stops all further event processing and
* the exception is re-thrown; if false then all
* exceptions are logged and the process continues
*
* @throws ClassCastException if any of the targets is not
* an instance of MapListener interface
*/
virtual void dispatch(Listeners::View vListeners,
bool fStrict) const;
/**
* Dispatch this event to the specified MapListener.
*
* @param hListener the listener
*/
virtual void dispatch(MapListener::Handle hListener) const;
/**
* Get the event's description.
*
* @return this event's description
*/
virtual String::View getDescription() const;
/**
* Convert an event ID into a human-readable string.
*
* @param nId an event ID, one of the ENTRY_* enumerated values
*
* @return a corresponding human-readable string, for example
* "inserted"
*/
static String::View getDescription(int32_t nId);
// ----- constants ------------------------------------------------------
public:
/**
* This event indicates that an entry has been added to the map.
*/
static const int32_t ENTRY_INSERTED = 1;
/**
* This event indicates that an entry has been updated in the map.
*/
static const int32_t ENTRY_UPDATED = 2;
/**
* This event indicates that an entry has been removed from the map.
*/
static const int32_t ENTRY_DELETED = 3;
};
すべてのCoherenceキャッシュには、ObservableMapが実装されます。実際に、すべてのCoherenceキャッシュで実装されるNamedCacheインタフェースは、ObservableMapインタフェースを拡張します。つまり、キャッシュが、ローカル、パーティション、ニア、レプリケーションであるかどうか、また、リードスルー、ライトスルー、ライトビハインド、オーバーフロー、ディスク記憶域などを使用しているかどうかに関係なく、アプリケーションはサインアップして任意のキャッシュからイベントを受信できます。
|
注意: キャッシュ・トポロジやサーバーの数に関係なく、また他のサーバーが変更している場合でも、これらのイベントはアプリケーションのリスナーに配信されます。 |
Coherenceキャッシュ(Coherenceキャッシュ・ファクトリを介して取得されたオブジェクト)に加えて、次のように、Coherenceでサポートされる他のいくつかのクラスでもObservableMapインタフェースが実装されます。
ObservableHashMap
LocalCache
OverflowMap
NearCache
ReadWriteBackingMap
AbstractSerializationCache、SerializationCacheおよびSerializationPagedCache
WrapperObservableMap、WrapperConcurrentMapおよびWrapperNamedCache
公開されている実装クラスの一覧は、Coherence APIの「ObservableMap」を参照してください。
イベントにサインアップするには、MapListenerインタフェースを実装するオブジェクトをObservableMapのaddMapListenerメソッドの1つに渡します。
例12-3 ObservableMapのメソッド
virtual void addKeyListener(MapListener::Handle hListener, Object::View vKey, bool fLite) = 0; virtual void removeKeyListener(MapListener::Handle hListener, Object::View vKey) = 0; virtual void addFilterListener(MapListener::Handle hListener, Filter::View vFilter = NULL, bool fLite = false) = 0; virtual void removeFilterListener(MapListener::Handle hListener, Filter::View vFilter = NULL) = 0;
MapListener実装のサンプルを作成してみましょう。
例12-4 MapListener実装のサンプル
#include "coherence/util/MapEvent.hpp"
#include "coherence/util/MapListener.hpp"
#include <iostream>
using coherence::util::MapEvent;
using coherence::util::MapListener;
using namespace std;
/**
* A MapListener implementation that prints each event as it receives
* them.
*/
class EventPrinter
: public class_spec<EventPrinter,
extends<Object>,
implements<MapListener> >
{
friend class factory<EventPrinter>;
public:
virtual void entryInserted(MapEventView vEvent)
{
cout << vEvent << endl;
}
virtual void entryUpdated(MapEventView vEvent)
{
cout << vEvent << endl;
}
virtual void entryDeleted(MapEventView vEvent)
{
cout << vEvent << endl;
}
};
この実装を使用すると、任意のキャッシュからすべてのイベントを非常に簡単に出力できます(すべてのキャッシュでObservableMapインタフェースが実装されているため)。
ただし、後でリスナーを削除できるように、リスナーへの参照を保持する必要があります。
例12-6 リスナーへの参照の保持
MapListener::Handle hListener = EventPrinter::create(); hCache->addFilterListener(hListener); m_hListener = hListener; // store the listener in a member field
後でリスナーを削除するには、次のように処理します。
例12-7 リスナーへの参照の削除
MapListener::Handle hListener = m_hListener;
if (hListener != NULL)
{
hCache->removeFilterListener(hListener);
m_hListener = NULL; // clean up the listener field
}
ObservableMapインタフェースの各add*Listenerメソッドには、対応するremove*Listenerメソッドがあります。リスナーを削除するには、そのリスナーの追加に使用したadd*Listenerメソッドに対応するremove*Listenerメソッドを使用します。
MapListenerの作成に役立つもう1つのベース・クラスは、MultiplexingMapListenerです。これは、すべてのイベントを1つのメソッドにルーティングして処理します。例12-8に、EventPrinterの例を簡略化したものを示します。
例12-8 MultiplexingMapListenerを使用したイベントのルーティング
#include "coherence/util/MultiplexingMapListener.hpp"
#include <iostream>
using coherence::util::MultiplexingMapListener;
class EventPrinter
: public class_spec<EventPrinter,
extends<MultiplexingMapListener> >
{
public:
virtual void onMapEvent(MapEventView vEvent)
{
std::cout << vEvent << std::endl;
}
};
特定のキャッシュにリスナーを常駐させる必要がある場合、<listener>要素を使用して、それをキャッシュ・コンフィギュレーションに配置します。キャッシュが構成される際に、リスナーは自動的に追加されます。
特定のID(キー)に対して発生したイベントのサインアップは非常に簡単です。例12-9のC++コードでは、Integerキー5に対して発生したすべてのイベントが出力されます。
例12-9 指定したIntegerキーに対して発生したイベントの出力
hCache->addKeyListener(EventPrinter::create(), Integer32::create(5), false);
例12-10のコードでは、Integerキー5が挿入または更新された場合にのみイベントがトリガーされます。
特定のキーのリスニングと同様に、特定のイベントをリスニングすることもできます。例12-11では、削除イベントのみを受信できるフィルタを指定したリスナーが、キャッシュに追加されます。
例12-11 削除されたイベントのみを許可するフィルタを指定したリスナーの追加
// Filters used with partitioned caches must implement coherence::io::pof::PortableObject
#include "coherence/io/pof/PofReader.hpp"
#include "coherence/io/pof/PofWriter.hpp"
#include "coherence/io/pof/PortableObject.hpp"
#include "coherence/util/Filter.hpp"
#include "coherence/util/MapEvent.hpp"
using coherence::io::pof::PofReader;
using coherence::io::pof::PofWriter;
using coherence::io::pof::PortableObject;
using coherence::util::Filter;
using coherence::util::MapEvent;
class DeletedFilter
: public class_spec<DeletedFilter,
extends<Object>,
implements<Filter, PortableObject> >
{
public:
// Filter interface virtual bool evaluate(Object::View v) const
{
MapEvent::View vEvt = cast<MapEvent::View>(v);
return MapEvent::ENTRY_DELETED == vEvt->getId();
}
// PortableObject interface virtual void readExternal(PofReader::Handle hIn)
{
}
virtual void writeExternal(PofWriter::Handle hOut) const
{
}
};
hCache->addFilterListener(EventPrinter::create(), DeletedFilter::create(), false);
たとえば、次の一連のコールが実行されたとします。
例12-12 キャッシュに対するデータの挿入および削除
cache::put(String::create("hello"), String::create("world"));
cache::put(String::create("hello"), String::create("again"));
cache::remove(String::create("hello"));
次のような結果になります。
CacheEvent{LocalCache deleted: key=hello, value=again}
詳細は、「応用: 問合せのリスニング」を参照してください。
イベントのフィルタリングとキャッシュされたデータのフィルタリングの比較
問合せのFilterを作成する場合、Filterのevaluateメソッドに渡すオブジェクトはキャッシュからの値になります。または、FilterでEntryFilterインタフェースを実装する場合は、キャッシュからのMap::Entry全体になります。MapListenerのイベントをフィルタリングするFilterを作成する場合、Filterのevaluateメソッドに渡すオブジェクトは常にMapEvent型になります。
問合せフィルタを使用してキャッシュ・イベントをリスニングする方法の詳細は、「応用: 問合せのリスニング」を参照してください。
デフォルトでは、イベントの一部として古い値と新しい値が両方とも提供されます。次の例があります。
例12-13 値の挿入、更新、削除
MapListener::Handle hListener = EventPrinter::create();
// add listener with the default"lite" value of falsehCache->addFilterListener(hListener);
// insert a 1KB value
String::View vKey = String::create("test");
hCache->put(vKey, Array<octet_t>::create(1024));
// update with a 2KB value
hCache->put(vKey, Array<octet_t>::create(2048));
// remove the value
hCache->remove(vKey);
このコードを実行すると、挿入イベントには新しい1KBの値、更新イベントには古い1KBの値と新しい2KBの値、削除イベントには削除された2KBの値がそれぞれ保持されます。
アプリケーションでイベントに古い値と新しい値を両方とも含める必要がない場合、Liteイベントのみをリクエストして、そのように指定することができます。リスナーの追加時にLiteイベントをリクエストするには、追加のブール・パラメータfLiteを使用するaddFilterListenerメソッドとaddKeyListenerメソッドのいずれかを使用できます。前述の例の場合、変更は次の箇所のみです。
|
注意: Liteイベントの古い値と新しい値は、NULLにすることができます。ただし、Liteイベントをリクエストしても、イベントの生成と配信に余分な負荷がかからない場合は、古い値と新しい値が含まれることがあります。つまり、MapListenerでLiteイベントを受信するようにリクエストしても、それは単にシステムにMapListenerがイベントの古い値と新しい値を認識する必要がないことを知らせているだけになります。 |
すべてのCoherenceキャッシュでは、任意の基準による問合せがサポートされます。アプリケーションでキャッシュからのデータを問い合せる場合、その結果はID(keySet)のセットまたはID/値のペア(entrySet)のセットとしてのポイント・イン・タイム・スナップショットになります。結果セットの内容を決定するためのメカニズムをフィルタリングと呼びます。これによりアプリケーション開発者は、すぐに使える豊富なフィルタのセット(equals、less-than、like、betweenなど)を使用して任意の複雑さの問合せを作成したり、独自のカスタム・フィルタ(XPathなど)を提供することができます。
キャッシュの問合せに使用するフィルタと同じフィルタを、キャッシュからのイベントのリスニングに使用することもできます。たとえば、トレーディング・システムにおいて、特定のトレーダーの未決済Orderオブジェクトをすべて問い合せることができます。
|
注意: クラスタでの問合せの実行: 例12-15では、coherence::util::extractor::ReflectionExtractorクラスが使用されています。C++クライアントではリフレクションがサポートされませんが、クラスタ内で実行される問合せにはReflectionExtractorを使用できます。この場合、ReflectionExtractorは、問合せを実行するクラスタに、必要な抽出情報を渡すのみです。ReflectionExtractorがクライアントでデータを抽出することになる場合(値をローカルにキャッシュするときのContinuousQueryCacheなど)は、ReflectionExtractorの使用はサポートされません。この場合は、カスタム・エクストラクタを用意する必要があります。 |
例12-15 キャッシュ・イベントのフィルタリング
NamedCache::Handle hMapTrades = ...
Filter::Handle hFilter = AndFilter::create(
EqualsFilter::create(ReflectionExtractor::create("getTrader"), vTraderId),
EqualsFilter::create(ReflectionExtractor::create("getStatus"), Status::OPEN));
Set::View vSetOpenTrades = hMapTrades->entrySet(hFilter);
そのトレーダーが新しいポジションを建てた、そのトレーダーがポジションを決済した、トレーダー間でポジションを付け替えた、という情報を受信するために、アプリケーションで同じフィルタを使用できます。
例12-16 特化されたイベントのフィルタリング
// receive events for all trade IDs that this trader is interested in hMapTrades->addFilterListener(hListener, MapEventFilter::create(hFilter), true);
MapEventFilterは、問合せフィルタをイベント・フィルタに変換します。
|
注意: イベントのフィルタリングとキャッシュされたデータのフィルタリングの比較: 問合せのFilterを作成する場合、Filterのevaluateメソッドに渡すオブジェクトはキャッシュからの値になります。または、FilterでEntryFilterインタフェースを実装する場合は、キャッシュからのMap::Entry全体になります。MapListenerのイベントをフィルタリングするFilterを作成する場合、Filterのevaluateメソッドに渡すオブジェクトは常にMapEvent型になります。
|
MapEventFilterには、いくつもの非常に強力なオプションがあります。これにより、アプリケーション・リスナーは特に関心のあるイベントのみを受信できます。スケーラビリティとパフォーマンスの観点からさらに重要なこととして、必要なイベントのみがネットワーク上で通信されるため、それらの特定イベントを対象とするサーバーとクライアントにのみ配信されます。次に例を示します。
例12-17 ネットワークを介した、特化されたイベントのみの伝達
// receive all events for all trades that this trader is interested in int32_t nMask = MapEventFilter::E_ALL; hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true); // receive events for all this trader's trades that are closed or // re-assigned to a different trader nMask = MapEventFilter::E_UPDATED_LEFT | MapEventFilter::E_DELETED; hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true); // receive events for all trades as they are assigned to this trader nMask = MapEventFilter::E_INSERTED | MapEventFilter::E_UPDATED_ENTERED; hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true); // receive events only for new trades assigned to this trader nMask = MapEventFilter::E_INSERTED; hMapTrades->addFilterListener(hListener, MapEventFilter::create(nMask, hFilter), true);
サポートされる各種オプションの詳細は、MapEventFilterのAPIドキュメントを参照してください。
イベントは通常、キャッシュで発生している変更を反映します。たとえば、あるサーバーがキャッシュ内の1つのエントリを変更し、別のサーバーがキャッシュに複数のアイテムを追加し、3つ目のサーバーが同じキャッシュから1つのアイテムを削除している間に、クラスタ内の各サーバーで50個のスレッドが同じキャッシュのデータにアクセスすることがあります。すべての変更操作によりイベントが生成されますが、それをクラスタ内の任意のサーバーで受信するように選択できます。これらの操作をクライアント・アクションと呼び、そのイベントは、「クライアントにディスパッチされた」状態にあります(この場合の「クライアント」は実際には「サーバー」)。これはCoherenceクラスタなどの、真のpeer-to-peerアーキテクチャでは自然の概念です。いずれのピアもクライアントでありサーバーでもあるので、他のピアからサービスを受け、他のピアにサービスを提供します。一般的なJava Enterpriseアプリケーションでは、ピアはアプリケーションのコンテナとして動作するアプリケーション・サーバー・インスタンスで、クライアントは直接キャッシュに対してアクセスおよび変更を行い、キャッシュからのイベントをリスニングするアプリケーションの一部です。
イベントの中には、キャッシュ内部で起動されるものもあります。多くの例がありますが、最も一般的なケースは次のとおりです。
キャッシュのエントリが自動失効した場合
キャッシュの最大サイズに達したため、エントリがキャッシュから削除された場合
リードスルー操作の結果としてエントリが透過的にキャッシュに追加された場合
リードアヘッドまたはリフレッシュアヘッド操作の結果としてキャッシュ内のエントリが透過的に更新された場合
これらは変更の各例ですが、変更とはキャッシュ内部での自然な(通常は自動的な)各操作を表します。これらのイベントを、統合イベントと呼びます。
必要に応じて、アプリケーションはイベントに問い合せて、それがクライアントに起因するイベントか、または統合イベントかを区別できます。この情報は、CacheEventというMapEventのサブクラスで保持されます。前述のEventPrinterの例を使用して、統合イベントのみを出力できます。
例12-18 クライアントに起因するイベントと統合イベントの区別
class EventPrinter
: public class_spec<EventPrinter,
extends<MultiplexingMapListener> >
{
friend class factory<EventPrinter>;
public:
void onMapEvent(MapEvent::View vEvt)
{
if (instanceof<CacheEvent::View>(vEvt) &&
(cast<CacheEvent::View>(vEvt)->isSynthetic()))
{
std::cout << vEvt;
}
}
};
この機能の詳細は、CacheEventのAPIドキュメントを参照してください。
Coherenceキャッシュのイベントをリスニングし、データに関して分散、パーティション、レプリケーション、ニア・キャッシュ、連続問合せ、リードスルー/ライトスルー、ライトビハインドのローカル表示を確認できる一方で、いわば「こっそりのぞき見る」こともできます。
一部の応用例では、こっそりのぞき見る、つまりサービスの背後でマップをリスニングすることが必要になる場合があります。分散環境でのレプリケーション、パーティション、およびその他のデータ管理方式は、いずれも分散サービスです。このサービスにはさらに実際にデータを管理するための機能が必要です。それが「バッキング・マップ」と呼ばれるものです。
バッキング・マップは構成可能です。特定のキャッシュのすべてのデータを、ヒープのオブジェクト形式で保持する必要がある場合、無制限で失効期限のないLocalCache(統計が不要な場合はSafeHashMap)を使用します。メモリーに保持するアイテムの数が少ない場合は、LocalCacheを使用します。データを必要に応じてデータベースから読み取る場合は、ReadWriteBackingMapを使用します(アプリケーションのDAO実装を介した読取りおよび書込み方法を指定する)。ReadWriteBackingMapに、SafeHashMapやLocalCacheなどのバッキング・マップを提供してデータを保存します。
バッキング・マップの中には監視可能なものがあります。これらのバッキング・マップから取得されたイベントは、一般的にはアプリケーションに直接関係がありません。かわりに、Coherenceではそれらを(Coherenceで)実行する必要があるアクションに変換してデータを同期的に保持し正しくバックアップを行い、またアプリケーション・リスナーでのリクエストに応じてそれをクラスタ全体に配信されるクラスタ化されたイベントに変換します。たとえば、パーティション・キャッシュにバッキング・マップとしてLocalCacheがあり、ローカル・キャッシュでエントリが失効した場合、そのイベントによってエントリのすべてのバックアップ・コピーが失効します。さらに、リスナーがパーティション・キャッシュに登録されている場合、イベントがイベント・フィルタに適合すると、そのイベントはそのリスナーが登録されているサーバーのリスナーに配信されます。
応用例の中には、データが保持されているサーバー上のイベントをアプリケーションで処理する必要があるものや、それを実際にデータを管理している構造(バッキング・マップ)で行う必要があるものがあります。そのような場合、バッキング・マップが監視可能なマップであれば、リスナーをバッキング・マップに構成したり、プログラムによってリスナーをバッキング・マップに追加することができます(バッキング・マップが監視可能でない場合、WrapperObservableMapでラップすることにより監視可能になる)。
この機能の詳細は、BackingMapManagerのAPIドキュメントを参照してください。
イベントの中には、アプリケーション・リスナーがイベントを生成するキャッシュ・サービスを妨害しないように、非同期に配信されるものがあります。まれなケースですが、非同期配信では進行している処理の結果に比べてイベントの順序があいまいになる場合があります。クラスタ・システムのローカル表示が単一スレッドであるかのようにキャッシュAPI操作およびイベントの順序を保証するには、MapListenerでSynchronousListenerマーカー・インタフェースを実装する必要があります。
Coherence自体で同期リスナーを使用する例にニア・キャッシュがあります。これにより、ローカルでキャッシュされたデータを、イベントを使用して無効化できます(通称、Seppuku)。
この機能の詳細は、SynchronousListenerのAPIドキュメントを参照してください。