この章では、.NETクライアントで連続問合せキャッシュを使用して、常に問合せがキャッシュからリアルタイムに最新の結果を取得する手順を説明します。
この章には次の項が含まれます:
問合せは、Coherenceキャッシュから特定の時点の問合せ結果を取得でき、さらに問合せの結果が変更された場合のイベントを受信できます。また、連続問合せ機能では問合せ結果を関連するイベントの連続的なストリームと結合し、リアルタイムに最新の問合せ結果を維持することが可能です。目的の問合せの待機時間が0である場合と同じ効果を持ち、問合せが1ミリ秒ごとに数回繰り返されるため、この機能は連続問合せと呼ばれています。
Coherence for .NETでは、問合せ結果をマテリアライズして連続問合せキャッシュに格納し、問合せに対するイベント・リスナーを使用してそのキャッシュをリアルタイムに最新の状態に維持することで、連続問合せ機能を実装します。つまり、Coherence for .NETの連続問合せでは、キャッシュされた問合せ結果が古くなることはありません。
連続問合せキャッシュには、いくつかの異なる一般的な用途があります。
これは、複合イベント処理(CEP: Complex Event Processing)システムやイベント相関エンジンにおける有用な構成要素です。
これは、アプリケーションが特定の問合せを繰り返し実行し、その問合せの最新の結果に常に即座にアクセスできることが有益である状況では、効果的です。
連続問合せキャッシュはマテリアライズド・ビューに類似しており、標準のINamedCache
APIを使用して問合せの結果にアクセスして操作する場合、およびその問合せに関連するイベントの現行ストリームを受信する場合に役立ちます。
連続問合せキャッシュは、最新のデータセットを使用場所(特定のサーバー・ノードやクライアント・デスクトップなど)でローカルに保持できるため、ニア・キャッシュと同様の方法で使用できます。ニア・キャッシュは無効化ベースですが、連続問合せキャッシュでは実際にデータが最新の状態で保持されます。
使用例の1つにトレーディング・システム・デスクトップがあります。このデスクトップには、トレーダーの未処理の注文とすべての関連情報が常に最新の状態で保持されている必要があります。Coherence*Extend機能を連続問合せキャッシュと組み合せることにより、アプリケーションで文字どおり何万もの同時ユーザーをサポートできます。
注意: 連続問合せキャッシュは、分散される可能性のあるより大きなキャッシュ・データセットの特定のサブセットについて、その最新のローカル・コピーを非常に簡単かつ効果的に維持する機能を提供します。そのため、クライアントベースおよびサーバーベースのアプリケーションを含め、ほとんどすべてのタイプのアプリケーションに有用です。 |
Coherence for .NETの連続問合せの実装には、Tangosol.Net.Cache.ContinuousQueryCache
クラスを使用します。このクラスは、Coherence for .NETのすべてのキャッシュ同様、次の機能を備えた標準のINamedCache
インタフェースを実装します。
IDictionary
インタフェースを使用したキャッシュのアクセスと操作: INamedCache
は、.NETコレクション・フレームワークの標準のIDictionary
インタフェース(.NET Hashtable
クラスで実装されるインタフェースと同じ)を拡張します。
キャッシュ内で発生するすべてのオブジェクト変更に対するイベント発行: INamedCache
はIObservableCache
インタフェースを拡張します。
キャッシュ内オブジェクトの問合せ: INamedCache
はIQueryCache
インタフェースを拡張します。
キャッシュ内オブジェクトの分散パラレル処理および集約: INamedCache
はIInvocableCache
インタフェースを拡張します。
ContinuousQueryCache
クラスは、Coherence for .NETのすべてのキャッシュで提供されるAPIであるINamedCache
インタフェースを実装しているため、非常に使いやすく、その機能が必要になった場合に別のキャッシュと簡単に置き換えることができます。
連続問合せキャッシュは、次の2つの項目で定義されます。
基礎となるキャッシュ
その基礎となるキャッシュの問合せ(連続問合せキャッシュにキャッシュされるサブセットを生成)
基礎となるキャッシュには、別の連続問合せキャッシュを含めた任意のCoherence for .NETキャッシュを使用できます。キャッシュは一般にCacheFactory
で取得されます。これにより開発者は、次の例に示すように、キャッシュの名前を指定するだけでアプリケーションのキャッシュ構成情報に基づいて自動的にキャッシュを構成できます。
INamedCache cache = CacheFactory.GetCache("orders");
この問合せは、他のキャッシュの問合せに使用される問合せと同じタイプです。例:
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid), new EqualsFilter("getStatus", Status.OPEN));
通常、キャッシュを問い合せるには、IQueryCache
のメソッドを使用します。たとえば、このトレーダーの未決済ポジションすべてのスナップショットを取得するには、次のコードを使用します。
ICollection setOpenTrades = cache.GetEntries(filter);
同様に、連続問合せキャッシュは、次に示す同じ2つの要素で構成されます。
ContinuousQueryCache cacheOpenTrades = new ContinuousQueryCache(cache, filter);
すべてのINamedCache
実装のインスタンスは、ContinuousQueryCache
を含め、不要になった時点でINamedCache.Release()
メソッドをコールして明示的に解放し、これらのインスタンスで保持されているリソースをすべて解放する必要があります。
特定のINamedCache
がアプリケーションの継続期間を通して使用される場合、リソースはそのアプリケーションがシャットダウンされたとき、または停止したときにクリーンアップされます。ただし、わずかの間だけ使用される場合は、使い終わった時点でアプリケーションからRelease()
メソッドをコールする必要があります。
または、INamedCache
がIDisposable
を拡張し、すべてのキャッシュ実装がIDisposable.Dispose()
のコールをINamedCache.Release()
に委任しているという事実を利用する方法もあります。単一メソッド内でキャッシュ・インスタンスを取得および解放する必要がある場合、次のusingブロックを使用してそれを実現できます。
例19-1 連続問合せキャッシュへの参照の取得と解放
using (INamedCache cache = CacheFactory.GetCache("my-cache")) { // use cache as usual }
usingブロックが終了すると、INamedCache
インスタンスでIDisposable.Dispose()
がコールされ、そのインスタンスに関連付けられているすべてのリソースが解放されます。
連続問合せキャッシュを構成する際、問合せで取得されたキーだけをキャッシュで追跡し、要求された場合にのみ、その値を基礎となるキャッシュから取得するように指定することができます。この機能は、問合せの結果セットが非常に膨大になる連続問合せキャッシュを作成する場合、または値をリクエストされることがないか、ほとんどない場合に役立ちます。キーのみをキャッシュするように指定するには、次の例に示すように、IsCacheValues
プロパティを構成できるコンストラクタを使用します。
例19-2 連続問合せキャッシュへのキーのみのキャッシュ
ContinuousQueryCache cacheOpenTrades = new ContinuousQueryCache(cache, filter, false);
次の例に示すように、必要に応じて、キャッシュがインスタンス化された後にIsCacheValues
プロパティを変更することもできます。
cacheOpenTrades.IsCacheValues = true;
IsCacheValuesプロパティとイベント・リスナー
連続問合せキャッシュに標準の(非Lite)イベント・リスナーがある場合、またはイベント・リスナーのいずれかがフィルタリングされる場合は、IsCacheValues
プロパティが自動的にtrue
に設定されます。これは、イベントをフィルタリングする際、および発生させたイベントに古い値と新しい値を提供する際に、ローカルにキャッシュされた値を連続問合せキャッシュが使用するためです。
連続問合せキャッシュ自体は監視可能であるため、クライアントが連続問合せキャッシュに1つ以上のイベント・リスナーを配置することが可能です。例:
例19-3 連続問合せキャッシュでのリスナーの配置
ContinuousQueryCache cacheOpenTrades = new ContinuousQueryCache(cache, filter); cacheOpenTrades.AddCacheListener(listener);
キャッシュに格納されているすべてのアイテムとキャッシュに追加されるすべてのアイテムに対してなんらかの処理を実行する必要がある場合、その方法は2つあります。1つは、処理を実行した後、以降の追加に対処するリスナーを追加する方法です。
例19-4 データを処理してからリスナーを配置する場合
ContinuousQueryCache cacheOpenTrades = new ContinuousQueryCache(cache, filter); foreach (ICacheEntry entry in cacheOpenTrades.Entries) { // .. process the cache entry } cacheOpenTrades.AddCacheListener(listener);
ただし、このコードは正しくありません。これでは、反復処理が終了してリスナーが追加されるまでのわずかの時間に発生したイベントを見逃すことになります。別の方法では、イベントを見逃さないように、まずリスナーを追加し、その後で処理を実行します。
例19-5 リスナーを配置してからデータを処理する場合
ContinuousQueryCache cacheOpenTrades = new ContinuousQueryCache(cache, filter); cacheOpenTrades.AddCacheListener(listener); foreach (ICacheEntry entry in cacheOpenTrades.Entries) { // .. process the cache entry }
ただし、イベントとIEnumerator
の両方に同じエントリが出現する可能性があります。また、イベントは非同期の可能性があるため、操作の順序は保証されません。
これを解決するには、構成時にリスナーを提供します。これで、連続問合せキャッシュ内のアイテムが(問合せに含まれていたため)最初から存在していたか、キャッシュの作成中または作成後に追加されたかに関係なく、連続問合せキャッシュはアイテムごとに1つのイベントを受信します。
例19-6 連続問合せキャッシュの構成時にリスナーを提供する場合
ContinuousQueryCache cacheOpenTrades = new ContinuousQueryCache(cache, filter, listener);
連続問合せキャッシュの実装にも同じ課題がありました。同じキャッシュから変更イベントのストリームを受信しながら、基礎となるキャッシュの正確なポイント・イン・タイム・スナップショットをいかにアセンブルするかということです。その解決法は、いくつかの部分に分かれています。第1に、Coherence for .NETでは同期イベントのオプションがサポートされ、それによって一連の順序保証が実現します。第2に、連続問合せキャッシュでは、その初期移入の実装が2つのフェーズに分かれているため、まず基礎となるキャッシュに対して問合せを実行し、その後、最初のフェーズ中に発生したイベントをすべて解決できます。このようにイベントを見逃したり重複させたりすることなくデータの可視性を保証する操作はかなり複雑であるため、連続問合せキャッシュでは、アプリケーション開発者が作成時にリスナーを渡すことで、この複雑さに対処しなくても済むようにしています。