Coherenceでは、データ・グリッド・サービス、およびデータ・グリッドを使用するクライアントベースおよびサーバーベースのアプリケーションの構築に適したインフラストラクチャを提供します。Coherenceの基本的なレベルでは、グリッド内の多数のサーバー間の膨大なデータ量を管理できます。また、そのデータをほぼ待機時間なしでアクセスできます。さらに、データ間のパラレル問合せや、そのデータの記録システムとして動作するデータベースおよびEISシステムとの統合もサポートされます。Coherenceには、効果的なデータ・グリッドの構築に適した各種サービスも備わっています。
Coherenceのデータ・グリッド機能のインフラストラクチャの詳細は、第3章「問合せ可能なデータ・ファブリックの提供」を参照してください。
注意: 次に説明するデータ・グリッド機能は、すべてCoherence Enterprise Edition以上の機能です。 |
Coherenceでは、データ・グリッドの管理対象である任意のデータ・マップ・エントリに対してエージェントを実行する機能を提供します。
map.invoke(key, agent);
パーティション化されたデータの場合、エージェントの実行データを所有しているグリッド・ノード上でエージェントが実行されます。つまり、キューイング、並行性の管理、エージェントの実行、エージェントによるデータ・アクセス、およびエージェントによるデータ変更は、すべてそのグリッド・ノード上で発生します(結果のデータ変更の同期バックアップがある場合にのみ、追加的なネットワーク・トラフィックが必要になる)。多くの処理用途では、分散された並行処理制御、一貫性、およびデータ更新を管理するよりも、シリアライズされた形式のエージェントを移動させるほうがはるかに効果的です(通常は、数百バイト程度のメモリー消費で済む)。
リクエスト/レスポンス処理の場合、エージェントは次のような結果を返します。
Object oResult = map.invoke(key, agent);
つまり、データ・グリッドとしてのCoherenceは、データ・トポロジ構成に基づいてエージェントの実行場所を決定し、その場所にエージェントを移動してから、そのエージェントを実行し(エージェントを実行しながら、そのアイテムの並行処理制御を自動的に処理する)、変更があればそれをバックアップして結果を返します。
Coherenceでは、エントリのコレクション全体に対してエージェントを実行する機能も提供します。パーティション化されたデータ・グリッドでは、パラレル実行が行われます。つまり、グリッド内のノード数が多いほど、データ・グリッド全体に作業がロード・バランシングされます。
map.invokeAll(collectionKeys, agent);
リクエスト/レスポンス処理の場合、エージェントは各キーで1つの結果を返します。
Map mapResults = map.invokeAll(collectionKeys, agent);
つまり、Coherenceは、データ・トポロジ構成に基づいてエージェントの最適な実行場所を決定し、その場所にエージェントを移動してから、そのエージェントを実行し(エージェントを実行しながら、そのアイテムの並行処理制御を自動的に処理する)、変更があればそれをバックアップして結合された結果を返します。
第3章「問合せ可能なデータ・ファブリックの提供」の説明のとおり、Coherenceではデータ・グリッド全体に問い合せる機能がサポートされています。たとえば、取引システムにおいて、特定のトレーダーが受け持つ未決済のOrder
オブジェクトをすべて問い合せることができます。
例2-1 データ・グリッド間の問合せ
NamedCache map = CacheFactory.getCache("trades"); Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid), new EqualsFilter("getStatus", Status.OPEN)); Set setOpenTradeIds = mapTrades.keySet(filter);
この機能をデータ・グリッドのパラレル実行と組み合せることで、問合せに対するエージェントの実行機能を実現します。前項のとおり、実行はパラレルで行われるため、問合せに適合するIDまたはエントリを返すかわりに、エントリに対するエージェントが実行されます。
map.invokeAll(filter, agent);
リクエスト/レスポンス処理の場合、エージェントは各キーで1つの結果を返します。
Map mapResults = map.invokeAll(filter, agent);
つまり、パラレル問合せとパラレル実行を組み合せることで、データ・グリッドに対する問合せベースのエージェント起動が実現されます。
AlwaysFilter
のインスタンス(またはNULL)をinvokeAll
メソッドに渡すと、渡されたエージェントがInvocableMap
のすべてのエントリに対して実行されます。
map.invokeAll((Filter) null, agent);
その他のタイプのエージェント起動と同様の方法で、リクエスト/レスポンス処理がサポートされます。
Map mapResults = map.invokeAll((Filter) null, agent);
アプリケーションは、1行のコードでデータ・グリッド内の特定のマップで展開されているすべてのデータを処理できます。
エージェントは、通常、AbstractProcessor
クラスを拡張してEntryProcessor
インタフェースを実装します。
Coherenceには、次のようなエージェントが含まれます。
AbstractProcessor
: EntryProcessor
を作成するための抽象ベース・クラス
ExtractorProcessor
: InvocableMap
に保存されたオブジェクトから特定の値(プロパティ値など)を抽出して返します
CompositeProcessor
: 同じエントリに対して順番に起動されるEntryProcessor
オブジェクトのコレクションをまとめてバンドルします
ConditionalProcessor
: エントリ-プロセスに対するFilter
がtrueに評価された場合、EntryProcessor
を起動します
PropertyProcessor
: PropertyManipulator
に依存するEntryProcessor
実装の抽象ベース・クラス
NumberIncrementor
: 基本形の整数型や、Byte
、Short
、Integer
、Long
、Float
、Double
、BigInteger
、BigDecimal
の任意のプロパティを実行前後に増分します
NumberMultiplier
: 基本形の整数型や、Byte
、Short
、Integer
、Long
、Float
、Double
、BigInteger
、BigDecimal
の任意のプロパティを乗算した後で前の値または新しい値を返します
InvocableMap
インタフェース内のEntryProcessor
インタフェースには、次の2つのメソッドのみが含まれます。
例2-2 EntryProcessorインタフェースのメソッド
/** * An invocable agent that operates against the Entry objects within a * Map. */ public interface EntryProcessor extends Serializable { /** * Process a Map Entry. * * @param entry the Entry to process * * @return the result of the processing, if any */ public Object process(Entry entry); /** * Process a Set of InvocableMap Entry objects. This method is * semantically equivalent to: * <pre> * Map mapResults = new ListMap(); * for (Iterator iter = setEntries.iterator(); iter.hasNext(); ) * { * Entry entry = (Entry) iter.next(); * mapResults.put(entry.getKey(), process(entry)); * } * return mapResults; * </pre> * * @param setEntries a read-only Set of InvocableMap Entry objects to * process * * @return a Map containing the results of the processing, up to one * entry for each InvocableMap Entry that was processed, keyed * by the keys of the Map that were processed, with a * corresponding value being the result of the processing for * each key */ public Map processAll(Set setEntries); }
前述の例で示したとおり、AbstractProcessor
はprocessAll
メソッドを実装します。
EntryProcessor
に渡されるInvocableMap.Entry
は、Map.Entry
インタフェースを拡張したものです。これにより、EntryProcessor
実装でエントリに関する必要な情報を取得して、必要な変更を非常に効果的に実行できます。
例2-3 InvocableMap.Entry API
/** * An InvocableMap Entry contains additional information and exposes * additional operations that the basic Map Entry does not. It allows * non-existent entries to be represented, thus allowing their optional * creation. It allows existent entries to be removed from the Map. It * supports several optimizations that can ultimately be mapped * through to indexes and other data structures of the underlying Map. */ public interface Entry extends Map.Entry { // ----- Map Entry interface ------------------------------------ /** * Return the key corresponding to this entry. The resultant key does * not necessarily exist within the containing Map, which is to say * that <tt>InvocableMap.this.containsKey(getKey)</tt> could return * false. To test for the presence of this key within the Map, use * {@link #isPresent}, and to create the entry for the key, use * {@link #setValue}. * * @return the key corresponding to this entry; may be null if the * underlying Map supports null keys */ public Object getKey(); /** * Return the value corresponding to this entry. If the entry does * not exist, then the value will be null. To differentiate between * a null value and a non-existent entry, use {@link #isPresent}. * <p/> * <b>Note:</b> any modifications to the value retrieved using this * method are not guaranteed to persist unless followed by a * {@link #setValue} or {@link #update} call. * * @return the value corresponding to this entry; may be null if the * value is null or if the Entry does not exist in the Map */ public Object getValue(); /** * Store the value corresponding to this entry. If the entry does * not exist, then the entry will be created by invoking this method, * even with a null value (assuming the Map supports null values). * * @param oValue the new value for this Entry * * @return the previous value of this Entry, or null if the Entry did * not exist */ public Object setValue(Object oValue); // ----- InvocableMap Entry interface --------------------------- /** * Store the value corresponding to this entry. If the entry does * not exist, then the entry will be created by invoking this method, * even with a null value (assuming the Map supports null values). * <p/> * Unlike the other form of {@link #setValue(Object) setValue}, this * form does not return the previous value, and consequently may be * significantly less expensive (in terms of cost of execution) for * certain Map implementations. * * @param oValue the new value for this Entry * @param fSynthetic pass true only if the insertion into or * modification of the Map should be treated as a * synthetic event */ public void setValue(Object oValue, boolean fSynthetic); /** * Extract a value out of the Entry's value. Calling this method is * semantically equivalent to * <tt>extractor.extract(entry.getValue())</tt>, but this method may * be significantly less expensive because the resultant value may be * obtained from a forward index, for example. * * @param extractor a ValueExtractor to apply to the Entry's value * * @return the extracted value */ public Object extract(ValueExtractor extractor); /** * Update the Entry's value. Calling this method is semantically * equivalent to: * <pre> * Object oTarget = entry.getValue(); * updater.update(oTarget, oValue); * entry.setValue(oTarget, false); * </pre> * The benefit of using this method is that it may allow the Entry * implementation to significantly optimize the operation, such as * for purposes of delta updates and backup maintenance. * * @param updater a ValueUpdater used to modify the Entry's value */ public void update(ValueUpdater updater, Object oValue); /** * Determine if this Entry exists in the Map. If the Entry is not * present, it can be created by calling {@link #setValue} or * {@link #setValue}. If the Entry is present, it can be destroyed by * calling {@link #remove}. * * @return true iff this Entry is existent in the containing Map */ public boolean isPresent(); /** * Remove this Entry from the Map if it is present in the Map. * <p/> * This method supports both the operation corresponding to * {@link Map#remove} and synthetic operations such as * eviction. If the containing Map does not differentiate between * the two, then this method will always be identical to * <tt>InvocableMap.this.remove(getKey())</tt>. * * @param fSynthetic pass true only if the removal from the Map * should be treated as a synthetic event */ public void remove(boolean fSynthetic); }
前項で説明したエージェントは、スカラー・エージェントに相当しますが、InvocableMap
インタフェースでも集計がサポートされます。
例2-4 InvocableMap APIの集計
/** * Perform an aggregating operation against the entries specified by the * passed keys. * * @param collKeys the Collection of keys that specify the entries within * this Map to aggregate across * @param agent the EntryAggregator that is used to aggregate across * the specified entries of this Map * * @return the result of the aggregation */ public Object aggregate(Collection collKeys, EntryAggregator agent); /** * Perform an aggregating operation against the set of entries that are * selected by the given Filter. * <p/> * <b>Note:</b> calling this method on partitioned caches requires a * Coherence Enterprise Edition (or higher) license. * * @param filter the Filter that is used to select entries within this * Map to aggregate across * @param agent the EntryAggregator that is used to aggregate across * the selected entries of this Map * * @return the result of the aggregation */ public Object aggregate(Filter filter, EntryAggregator agent);
単純なEntryAggregator
は、InvocableMap
.Entry
オブジェクトのセットを処理して結果を得ます。
例2-5 EntryAggregator API
/** * An EntryAggregator represents processing that can be directed to occur * against some subset of the entries in an InvocableMap, resulting in a * aggregated result. Common examples of aggregation include functions * such as min(), max() and avg(). However, the concept of aggregation * applies to any process that must evaluate a group of entries to * come up with a single answer. */ public interface EntryAggregator extends Serializable { /** * Process a set of InvocableMap Entry objects to produce an * aggregated result. * * @param setEntries a Set of read-only InvocableMap Entry objects to * aggregate * * @return the aggregated result from processing the entries */ public Object aggregate(Set setEntries); }
データ・グリッド内での効果的な実行には、集計処理のパラレル実行の設計が不可欠です。
例2-6 集計をパラレル実行するParallelAwareAggregator API
/** * A ParallelAwareAggregator is an advanced extension to EntryAggregator * that is explicitly capable of being run in parallel, for example in a * distributed environment. */ public interface ParallelAwareAggregator extends EntryAggregator { /** * Get an aggregator that can take the place of this aggregator in * situations in which the InvocableMap can aggregate in parallel. * * @return the aggregator that will be run in parallel */ public EntryAggregator getParallelAggregator(); /** * Aggregate the results of the parallel aggregations. * * @return the aggregation of the parallel aggregation results */ public Object aggregateResults(Collection collResults); }
Coherenceには、次のような通常の集計関数がすべて備わっています。
Count
DistinctValues
DoubleAverage
DoubleMax
DoubleMin
DoubleSum
LongMax
LongMin
LongSum
注意: Coherenceに付属するすべてのアグリゲータは、パラレル実行に対応しています。 |
Coherenceアグリゲータの一覧は、com.tangosol.util.aggregator
パッケージを参照してください。独自のアグリゲータを実装する場合、AbstractAggregator
抽象ベース・クラスを参照してください。
Coherenceには、グリッド内の任意の場所で単一パスのエージェント(起動可能なオブジェクトと呼ばれる)を実行できる起動サービスが用意されています。エージェントは、グリッドの特定のノードで実行したり、グリッドの特定のノード・セットでパラレル実行することができます。また、グリッドのすべてのノードでパラレル実行することもできます。
起動サービスは、キャッシュ構成ファイル<invocation-scheme>
要素を使用して構成します。サービスの名前を使用すると、アプリケーションはサービスへの参照を容易に取得できます。
InvocationService service = CacheFactory.getInvocationService("agents");
エージェントは、アプリケーションの一部である簡単な実行可能クラスです。単純なエージェントの例には、JVMからGCをリクエストするように設計されたものがあります。
例2-7 ガベージ・コレクションをリクエストする単純なエージェント
/** * Agent that issues a garbage collection. */ public class GCAgent extends AbstractInvocable { public void run() { System.gc(); } }
次のように、クラスタ全体でそのエージェントを実行するコードは1行で済みます。
service.execute(new GCAgent(), null, null);
次に、グリッド全体のリクエスト/レスポンス・モデルをサポートするエージェントの例を示します。
例2-8 グリッド全体のリクエスト・モデルおよびレスポンス・モデルをサポートするエージェント
/** * Agent that determines how much free memory a grid node has. */ public class FreeMemAgent extends AbstractInvocable { public void run() { Runtime runtime = Runtime.getRuntime(); int cbFree = runtime.freeMemory(); int cbTotal = runtime.totalMemory(); setResult(new int[] {cbFree, cbTotal}); } }
グリッド全体でそのエージェントを実行して、すべての結果を取得するのに必要なのも、次の1行のコードのみです。
Map map = service.query(new FreeMemAgent(), null);
グリッド全体のリクエストまたはレスポンスを実行するのは容易ですが、その結果を出力するには、より多くのコーディングが必要です。
例2-9 グリッド全体のリクエストまたはレスポンスからの結果の出力
Iterator iter = map.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); Member member = (Member) entry.getKey(); int[] anInfo = (int[]) entry.getValue(); if (anInfo != null) // nullif member died System.out.println("Member " + member + " has " + anInfo[0] + " bytes free out of " + anInfo[1] + " bytes total"); }
エージェントの操作はステートフルにすることができます。つまり、その起動状態をシリアライズして、エージェントを実行するグリッド・ノードに転送します。
例2-10 ステートフル・エージェントの操作
/** * Agent that carries some state with it. */ public class StatefulAgent extends AbstractInvocable { public StatefulAgent(String sKey) { m_sKey = sKey; } public void run() { // the agent has the key that it was constructed with String sKey = m_sKey; // ... } private String m_sKey; }
Coherenceには、CommonJ Work Managerのグリッド対応実装が用意されています。Work Managerを使用すると、アプリケーションで実行する必要がある作業コレクションを送信できます。Work Managerは、その作業がパラレル実行されるように分散します(通常は、グリッド全体に分散する)。つまり、送信された作業アイテムが10個あり、グリッドに10台のサーバーがあるとすると、各サーバーは1つの作業アイテムを処理することになります。さらに、グリッド内の作業アイテムの分散処理は調整できます。そのため、データの効率性および局所性を考慮して、特定の作業が指定したサーバー(たとえば、特定のメインフレーム・サービスのゲートウェイとして動作するサーバーなど)で最初に実行されるように選択できます。
アプリケーションは作業の完了を待機しますが、その待機時間のタイムアウトを設定することもできます。そのためのAPIは非常に強力なため、アプリケーションで最初の作業アイテムが完了するのを待機したり、指定された作業アイテム・セットが完了するのを待機することができます。このAPIのメソッドを組み合せると、たとえば「実行するアイテムが10個あり、その中の7個は重要でないため最大5秒間待機し、残りの3個の重要なアイテムは最大30秒間待機する」という処理が可能になります。
例2-11 Work Managerの使用
Work[] aWork = ... Collection collBigItems = new ArrayList(); Collection collAllItems = new ArrayList(); for (int i = 0, c = aWork.length; i < c; ++i) { WorkItem item = manager.schedule(aWork[i]); if (i < 3) { // the first three work items are the important ones collBigItems.add(item); } collAllItems.add(item); } Collection collDone = manager.waitForAll(collAllItems, 5000L); if (!collDone.containsAll(collBigItems)) { // wait the remainder of 30 seconds for the important work to finish manager.waitForAll(collBigItems, 25000L); }
Work Managerの主な使用例として、標準ベースのブレード・インフラストラクチャを使用した、アプリケーションにおける大まかなサービス・リクエストの処理があります。この場合、「この家族の情報が欲しい」などの一見単純なリクエストが多く発生します。しかし、実際には、このリクエストによって、Webサービス、RDMBSコールなどで構成される複数の異なるバックエンド・データソースに対する多数のリクエストが展開されます。この使用例では、それぞれ異なるが関連性のある2つの問題に展開されるため、Work Managerの分散機能での解決が要求されます。
コール元に不相応な待機時間を強いないために、大まかなリクエストをきめ細かな複数のリクエストに切り分けてパラレル実行するにはどうすればよいでしょうか。前述の例では、100以上のコール先に問い合せて情報を取得する必要があります。Java EEには正式なスレッド・モデルがなく、それに対応するメッセージベースのアプローチは不十分であるため、Coherence Work Managerを実装しています。
低コストのブレードを活用しながら多数の外部システム・コールをパラレル実行するため、必要な作業を多くのデュアル・プロセッサ(ハイパースレッド機能を採用しているため、論理的には4プロセッサ構成になる)マシンに展開させることで、本来は垂直方向のスケーラビリティの問題を水平方向のハードウェア・レベルでのスケーラビリティで解決できます。リモートのWork Managerインスタンスへのリクエスト配信にかかる負荷は、サービスを実行する際の負荷に比べて小さく、通常は数十〜数百ミリ秒しかかからないため妥当であると判断しました。