Coherenceでは、 Coherenceキャッシュからポイント・イン・タイムの問合せ結果を取得 でき、 その問合せ結果を変更するイベントを受信できる一方、問合せ結果と連続的な関連するイベント・ストリームとを組み合せて、最新の問合せ結果をリアルタイムに維持することもできます。目的の問合せの待機時間が0である場合と同じ効果を持ち、問合せがミリ秒ごとに数回繰り返されるため、この機能は連続問合せと呼ばれています。
連続問合せキャッシュは、Oracleデータベースのマテリアライズド・ビューに似ています。マテリアライズド・ビューでは、データベース表に問い合せたデータがビューにコピーされます。データベース内のデータが変更されると、ビュー内のデータが自動的に更新されます。マテリアライズド・ビューによって、結果セットへの変更を参照できます。連続問合せでは、クライアント上にキャッシュのローカル・コピーが作成されます。フィルタを使用することで、キャッシュのサイズと内容を制限できます。イベント・リスナーを組み合せることで、キャッシュをリアルタイムに更新できます。
たとえば、複数の顧客のすべての発注をリアルタイムに監視するとします。連続問合せキャッシュを作成し、対象の顧客に関係するあらゆるイベントをリスニングするイベント・リスナーを設定します。Coherenceは、グリッド上で個々の顧客に関係するデータ・オブジェクトをすべて問い合せ、そのオブジェクトをローカル・キャッシュにコピーします。この問合せのイベント・リスナーでは、その顧客に対してグリッドで行われるあらゆる挿入、更新または削除がリスニングされます。イベントが発生すると、顧客データのローカル・コピーが更新されます。
この章の内容は次のとおりです。
連続問合せキャッシュには、いくつかの異なる一般的な用途があります。
これは、複合イベント処理(CEP: Complex Event Processing)システムやイベント相関エンジンにおける有用な構成要素です。
これは、アプリケーションが特定の問合せを繰り返し実行し、その問合せの最新の結果に常に即座にアクセスできることが有益である状況では、効果的です。
連続問合せキャッシュはマテリアライズド・ビューに類似しており、標準のNamedCache
APIを使用して問合せの結果にアクセスし、操作する場合、およびその問合せに関連するイベントの現行ストリームを受信する場合に役立ちます。
連続問合せキャッシュは、最新のデータセットを使用場所(特定のサーバー・ノードやクライアントなど)でローカルに保持できるため、ニア・キャッシュと同様の方法で使用できます。ただし、ニア・キャッシュは無効化ベースですが、連続問合せキャッシュではデータを実際に最新の状態で保持します。
Coherence*Extend機能を連続問合せキャッシュと組み合せることにより、アプリケーションで文字どおり何万もの同時ユーザーをサポートできます。
注意: 連続問合せキャッシュは、分散される可能性のあるより大きなキャッシュ・データセットの特定のサブセットについて、その最新のローカル・コピーを非常に簡単かつ効果的に維持する機能を提供します。そのため、クライアントベースおよびサーバーベースのアプリケーションを含め、ほとんどすべてのタイプのアプリケーションに有用です。 |
Coherenceでの連続問合せの実装には、ContinuousQueryCache
クラスを使用します。このクラスは、Coherenceのすべてのキャッシュと同様、次の機能を備えた標準のNamedCache
インタフェースを実装します。
Map
インタフェースを使用したキャッシュのアクセスと操作: NamedCache
は、Javaコレクション・フレームワークのMap
インタフェースに基づくMap
インタフェースを拡張します。
キャッシュ内で発生するすべてのオブジェクト変更に対するイベント: NamedCache
はObservableMap
インタフェースを拡張します。
IDをベースとしたクラスタ全体でのキャッシュ内オブジェクトのロック: NamedCache
はConcurrentMap
インタフェースを拡張します。
キャッシュ内オブジェクトの問合せ: NamedCache
はQueryMap
インタフェースを拡張します。
キャッシュ内オブジェクトの分散パラレル処理および集約: NamedCache
はInvocableMap
インタフェースを拡張します。
ContinuousQueryCache
は、Coherenceのすべてのキャッシュで提供されるAPIであるNamedCache
インタフェースを実装しているため、非常に使いやすく、その機能が必要になった場合に別のキャッシュと簡単に置き換えることができます。
連続問合せキャッシュは、次の2つの機能で定義されます。
連続問合せの基礎となるキャッシュ
その基礎となるキャッシュの問合せ(連続問合せキャッシュにキャッシュされるサブセットを生成)
基礎となるキャッシュには、別の連続問合せキャッシュを含めた任意のCoherenceキャッシュを使用できます。キャッシュを取得する最も簡単な方法は、CacheFactory
クラスを使用することです。このクラスを使用すると、名前を指定するだけでキャッシュを作成できます。キャッシュは自動的に作成され、アプリケーションのキャッシュ構成要素に基づいて構成されます。たとえば、次のコード行ではordersという名前のキャッシュが作成されます。
NamedCache::Handle hCache = CacheFactory::getCache("orders");
この問合せは、 他のキャッシュの問合せに使用される問合せと同じタイプです。例11-1は、コード・フィルタを使用して、特定の注文ステータスにある特定のトレーダーを検出する方法を示しています。
例11-1 フィルタを使用した問合せ
ValueExtractor::Handle hTraderExtractor = ReflectionExtractor::create("getTrader"); ValueExtractor::Handle hStatusExtractor = ReflectionExtractor::create("getStatus"); Filter::Handle hFilter = AndFilter::create(EqualsFilter::create(hTraderExtractor, vTraderId), EqualsFilter::create(hStatusExtractor, vStatus));
通常、キャッシュを問い合せるには、QueryMap
クラスのメソッドを使用します。たとえば、このトレーダーの未決済ポジションすべてのスナップショットを取得するには、次のコードを使用します。
Set::View vSetOpenTrades = hCache->entrySet(hFilter);
これに対し、連続問合せキャッシュはContinuousQueryCache::create
メソッドでキャッシュとフィルタを渡すことによって構成されます。
ContinuousQueryCache::Handle hCacheOpenTrades = ContinuousQueryCache::create(hCache, hFilter);
連続問合せキャッシュでは、その基礎となるキャッシュに1つ以上のイベント・リスナーが配置されます。連続問合せキャッシュがアプリケーションの継続期間を通して使用される場合、リソースはそのノードがシャットダウンされたとき、または停止したときにクリーンアップされます。ただし、連続問合せキャッシュがわずかの間だけ使用される場合は、使い終わった時点で連続問合せキャッシュに対してアプリケーションからrelease()メソッドをコールする必要があります。
連続問合せキャッシュを構成する際、問合せで取得されたキーだけをキャッシュで追跡し、要求された場合にのみ、その値を基礎となるキャッシュから取得するように指定することができます。この機能は、問合せの結果セットが非常に膨大になる連続問合せキャッシュを作成する場合、または値をリクエストされることがないか、ほとんどない場合に役立ちます。キーのみをキャッシュするように指定するには、ContinuousQueryCache
を作成する際に、次の例のようにfalseを渡します。
ContinuousQueryCache::Handle hCacheOpenTrades = ContinuousQueryCache::create(hCache, hFilter, false);
次の例に示すように、必要に応じて、キャッシュがインスタンス化された後にCacheValues
プロパティを変更することもできます。
hCacheOpenTrades->setCacheValues(true);
連続問合せキャッシュに標準の(非Lite)イベント・リスナーがある場合、またはイベント・リスナーのいずれかがフィルタリングされる場合は、CacheValues
プロパティが自動的にtrue
に設定されます。これは、イベントをフィルタリングする際、および発生させたイベントに古い値と新しい値を提供する際に、ローカルにキャッシュされた値を連続問合せキャッシュが使用するためです。
連続問合せキャッシュが値をキャッシュするように構成されている場合は、ReflectionExtractor
の使用がサポートされません。これは、ReflectionExtractor
ではC++でのリフレクションがサポートされていないためです。この場合は、カスタム・エクストラクタを用意する必要があります。連続問合せキャッシュで値がローカルにキャッシュされていない場合は、ReflectionExtractor
を使用できます。これは、このエクストラクタがクライアントで抽出を実行せず、かわりに必要な抽出情報をクラスタに渡して問合せを実行するためです。
連続問合せキャッシュ自体は監視可能であるため、クライアントが連続問合せキャッシュに1つ以上のイベント・リスナーを配置することが可能です。例:
例11-2 連続問合せキャッシュへのリスナーの配置
ContinuousQueryCache::Handle hCacheOpenTrades = ContinuousQueryCache::create(hCache, hFilter); hCacheOpenTrades->addFilterListener(hListener);
キャッシュに格納されているすべてのアイテム、およびキャッシュに追加されるすべてのアイテムに対して、アプリケーションでなんらかの処理を実行する必要がある場合は、作成中にリスナーを設定します。連続問合せキャッシュ内のアイテムが(問合せに含まれていたため)最初から存在していたか、キャッシュの作成中または作成後に追加されたかに関係なく、作成されたキャッシュはアイテムごとに1つのイベントを受信します。ContinuousQueryCache
のファクトリ作成メソッドには、キャッシュ、フィルタおよびリスナーを指定できる形式があります。
例11-3 フィルタとリスナーを使用する連続問合せキャッシュの作成
ContinuousQueryCache::Handle hCacheOpenTrades = ContinuousQueryCache::create( hRemoteCache, hFilter, true, hListener);
連続問合せキャッシュ内のアイテムを処理するには2つのアプローチがありますが、どちらも予期しない結果や望ましくない結果をもたらす場合があります。まず、処理を実行した後、その後の追加に対応するためにリスナーを追加すると、反復処理が終了してリスナーが追加されるまでのわずかの時間に発生したイベントを見逃すことになります。これを例11-4に示します。
例11-4 データを処理してからリスナーを追加する場合
ContinuousQueryCache::Handle hCacheOpenTrades = ContinuousQueryCache::create(hCache, hFilter); for (Iterator::Handle hIter = hCacheOpenTrades->entrySet()->iterator(); hIter->hasNext(); ) { Map::Entry::View vEntry = cast<Map::Entry::View>(hIter->next()); // .. process the cache entry } hCacheOpenTrades->addFilterListener(hListener);
2番目のアプローチでは、イベントを見逃さないように、まずリスナーを追加し、その後で処理を実行します。ただし、イベントとイテレータ
の両方に同じエントリが出現する可能性があります。イベントは非同期の可能性があるため、操作の順序は保証されません。
例11-5 リスナーを追加してからデータを処理する場合
ContinuousQueryCache::Handle hCacheOpenTrades = ContinuousQueryCache::create(hRemoteCache, hFilter); hCacheOpenTrades->addFilterListener(hListener); for (Iterator::Handle hIter = hCacheOpenTrades->entrySet()->iterator(); hIter->hasNext(); ) { Map::Entry::View vEntry = cast<Map::Entry::View>(hIter->next()); // .. process the cache entry }
連続問合せキャッシュの実装にも同じ課題がありました。同じキャッシュから変更イベントのストリームを受信しながら、基礎となるキャッシュの正確なポイント・イン・タイム・スナップショットをいかにアセンブルするかということです。その解決法は、いくつかの部分に分かれています。まず、Coherenceでは同期イベントのオプションがサポートされており、これによって順序付けが保証されます。第2に、連続問合せキャッシュでは、その初期移入の実装が2つのフェーズに分かれているため、まず基礎となるキャッシュに対して問合せを実行し、その後、最初のフェーズ中に発生したイベントをすべて解決できます。このようにイベントを見逃したり重複させたりすることなくデータの可視性を保証する操作はかなり複雑であるため、ContinuousQueryCache
では、アプリケーション開発者が作成時にリスナーを渡すことで、この複雑さに対処しなくても済むようにしています。
連続問合せキャッシュは読取り専用キャッシュにすることができます。そのためには、次の例のように、ContinuousQueryCache
クラスのブール・メソッドsetReadOnly
を使用します。
hCacheOpenTrades->setReadOnly(true);
読取り専用の連続問合せキャッシュでは、オブジェクトの追加、変更、削除またはロックができません。
連続問合せキャッシュを読取り専用に設定した後は、読取り/書込みに戻すことはできません。