Coherenceでは、データ・グリッド・サービス、およびデータ・グリッドを使用するクライアントベースおよびサーバーベースのアプリケーションの構築に適したインフラストラクチャを提供します。Coherenceの基本的なレベルでは、グリッド内の多数のサーバー間の膨大なデータ量を管理できます。また、そのデータをほぼ待機時間なしでアクセスできます。さらに、map-reduceな方法でのデータ間のパラレル問合せや、そのデータの記録システムとして動作するデータベースおよびEISシステムとの統合もサポートされます。Coherenceには、効果的なデータ・グリッドの構築に適した各種サービスも備わっています。
この章の内容は次のとおりです。
Coherenceでは、データ・グリッドの管理対象である任意のデータ・マップ・エントリに対してエージェントを実行する機能を提供します。
map.invoke(key, agent);
パーティション化されたデータの場合、エージェントの実行データを所有しているグリッド・ノード上でエージェントが実行されます。キューイング、並行性の管理、エージェントの実行、エージェントによるデータ・アクセス、およびエージェントによるデータ変更は、すべてそのグリッド・ノード上で発生します(結果のデータ変更の同期バックアップがある場合にのみ、追加的なネットワーク・トラフィックが必要になります)。様々な処理用途で、分散された並行処理制御、整合性およびデータ更新を処理するよりも、エージェントのシリアライズされた形式(通常、最大で数百バイト程度)を移動するほうがずっと効率的です。
リクエスト/レスポンス処理の場合、エージェントは次のような結果を返します。
Object oResult = map.invoke(key, agent);
つまり、データ・グリッドとしてのCoherenceは、データ・トポロジ構成に基づいてエージェントの実行場所を決定し、その場所にエージェントを移動してから、そのエージェントを実行し(エージェントを実行しながら、そのアイテムの並行処理制御を自動的に処理する)、変更があればそれをバックアップして結果を返します。
Coherenceでは、グリッド内のすべてのノードにわたって、エントリのコレクションに対してエージェントをパラレル実行できるようにするmap-reduce機能が提供されます。パラレル実行により、グリッド全体で作業のバランスを取ることで大量のデータを処理することが可能になります。invokeAllメソッドは、次のように使用されます。
map.invokeAll(collectionKeys, agent);
リクエスト/レスポンス処理の場合、エージェントは各キーで1つの結果を返します。
Map mapResults = map.invokeAll(collectionKeys, agent);
Coherenceは、データ・トポロジ構成に基づいてエージェントの最適な実行場所を決定し、その場所にエージェントを移動してから、そのエージェントを実行し(エージェントを実行しながら、そのアイテムの並行処理制御を自動的に処理する)、変更があればそれをバックアップして結合された結果を返します。結果セットに対して集計を実行する手順については、「データ・グリッドの集計」を参照してください。
Coherenceは、データ・グリッド全体にわたる問合せ機能をサポートしています。たとえば、取引システムにおいて、特定のトレーダーが受け持つ未決済のOrderオブジェクトをすべて問い合せることができます。
例24-1 データ・グリッド全域の問合せ
NamedCache map = CacheFactory.getCache("trades");
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);
この機能をデータ・グリッドのパラレル実行と組み合せることで、問合せに対するエージェントの実行機能を実現します。前項のとおり、実行はパラレルで行われるため、問合せに適合するIDまたはエントリを返すかわりに、エントリに対するエージェントが実行されます。
map.invokeAll(filter, agent);
リクエスト/レスポンス処理の場合、エージェントは各キーで1つの結果を返します。
Map mapResults = map.invokeAll(filter, agent);
つまり、パラレル問合せとパラレル実行を組み合せることで、データ・グリッドに対する問合せベースのエージェント起動が実現されます。
AlwaysFilterのインスタンス(またはNULL)をinvokeAllメソッドに渡すと、渡されたエージェントがInvocableMapのすべてのエントリに対して実行されます。
map.invokeAll((Filter) null, agent);
その他のタイプのエージェント起動と同様の方法で、リクエスト/レスポンス処理がサポートされます。
Map mapResults = map.invokeAll((Filter) null, agent);
アプリケーションでは、データ・グリッド内の特定のマップで展開されているすべてのデータを1行のコードで処理できます。
エージェントは、通常、AbstractProcessorクラスを拡張してEntryProcessorインタフェースを実装します。
Coherenceには、次のようなエージェントが含まれます。
AbstractProcessor: EntryProcessorを作成するための抽象ベース・クラス
ExtractorProcessor: InvocableMapに保存されたオブジェクトから値(プロパティ値など)を抽出して返します
CompositeProcessor: 同じエントリに対して順番に起動されるEntryProcessorオブジェクトのコレクションをバンドルします
ConditionalProcessor: エントリ-プロセスに対するFilterがtrueに評価された場合、EntryProcessorを起動します
PropertyProcessor: PropertyManipulatorに依存するEntryProcessor実装の抽象ベース・クラス
NumberIncrementor: 基本形の整数型や、Byte、Short、Integer、Long、Float、Double、BigInteger、BigDecimalの任意のプロパティを実行前後に増分します
NumberMultiplier: 基本形の整数型や、Byte、Short、Integer、Long、Float、Double、BigInteger、BigDecimalの任意のプロパティを乗算した後で前の値または新しい値を返します
InvocableMapインタフェース内のEntryProcessorインタフェースには、次の2つのメソッドのみが含まれます。
例24-2 EntryProcessorインタフェースのメソッド
/**
* An invocable agent that operates against the Entry objects within a
* Map.
*/
public interface EntryProcessor
extends Serializable
{
/**
* Process a Map Entry.
*
* @param entry the Entry to process
*
* @return the result of the processing, if any
*/
public Object process(Entry entry);
/**
* Process a Set of InvocableMap Entry objects. This method is
* semantically equivalent to:
* <pre>
* Map mapResults = new ListMap();
* for (Iterator iter = setEntries.iterator(); iter.hasNext(); )
* {
* Entry entry = (Entry) iter.next();
* mapResults.put(entry.getKey(), process(entry));
* }
* return mapResults;
* </pre>
*
* @param setEntries a read-only Set of InvocableMap Entry objects to
* process
*
* @return a Map containing the results of the processing, up to one
* entry for each InvocableMap Entry that was processed, keyed
* by the keys of the Map that were processed, with a
* corresponding value being the result of the processing for
* each key
*/
public Map processAll(Set setEntries);
}
前述の例で示したとおり、AbstractProcessorはprocessAllメソッドを実装します。
EntryProcessorに渡されるInvocableMap.Entryは、Map.Entryインタフェースを拡張したものです。これにより、EntryProcessor実装でエントリに関する必要な情報を取得して、必要な変更を非常に効果的に実行できます。
例24-3 InvocableMap.Entry API
/**
* An InvocableMap Entry contains additional information and exposes
* additional operations that the basic Map Entry does not. It allows
* non-existent entries to be represented, thus allowing their optional
* creation. It allows existent entries to be removed from the Map. It
* supports several optimizations that can ultimately be mapped
* through to indexes and other data structures of the underlying Map.
*/
public interface Entry
extends Map.Entry
{
// ----- Map Entry interface ------------------------------------
/**
* Return the key corresponding to this entry. The resultant key does
* not necessarily exist within the containing Map, which is to say
* that <tt>InvocableMap.this.containsKey(getKey)</tt> could return
* false. To test for the presence of this key within the Map, use
* {@link #isPresent}, and to create the entry for the key, use
* {@link #setValue}.
*
* @return the key corresponding to this entry; may be null if the
* underlying Map supports null keys
*/
public Object getKey();
/**
* Return the value corresponding to this entry. If the entry does
* not exist, then the value is null. To differentiate between
* a null value and a non-existent entry, use {@link #isPresent}.
* <p/>
* <b>Note:</b> any modifications to the value retrieved using this
* method are not guaranteed to persist unless followed by a
* {@link #setValue} or {@link #update} call.
*
* @return the value corresponding to this entry; may be null if the
* value is null or if the Entry does not exist in the Map
*/
public Object getValue();
/**
* Store the value corresponding to this entry. If the entry does
* not exist, then the entry is created by invoking this method,
* even with a null value (assuming the Map supports null values).
*
* @param oValue the new value for this Entry
*
* @return the previous value of this Entry, or null if the Entry did
* not exist
*/
public Object setValue(Object oValue);
// ----- InvocableMap Entry interface ---------------------------
/**
* Store the value corresponding to this entry. If the entry does
* not exist, then the entry is created by invoking this method,
* even with a null value (assuming the Map supports null values).
* <p/>
* Unlike the other form of {@link #setValue(Object) setValue}, this
* form does not return the previous value, and consequently may be
* significantly less expensive (in terms of cost of execution) for
* certain Map implementations.
*
* @param oValue the new value for this Entry
* @param fSynthetic pass true only if the insertion into or
* modification of the Map should be treated as a
* synthetic event
*/
public void setValue(Object oValue, boolean fSynthetic);
/**
* Extract a value out of the Entry's value. Calling this method is
* semantically equivalent to
* <tt>extractor.extract(entry.getValue())</tt>, but this method may
* be significantly less expensive because the resultant value may be
* obtained from a forward index, for example.
*
* @param extractor a ValueExtractor to apply to the Entry's value
*
* @return the extracted value
*/
public Object extract(ValueExtractor extractor);
/**
* Update the Entry's value. Calling this method is semantically
* equivalent to:
* <pre>
* Object oTarget = entry.getValue();
* updater.update(oTarget, oValue);
* entry.setValue(oTarget, false);
* </pre>
* The benefit of using this method is that it may allow the Entry
* implementation to significantly optimize the operation, such as
* for purposes of delta updates and backup maintenance.
*
* @param updater a ValueUpdater used to modify the Entry's value
*/
public void update(ValueUpdater updater, Object oValue);
/**
* Determine if this Entry exists in the Map. If the Entry is not
* present, it can be created by calling {@link #setValue} or
* {@link #setValue}. If the Entry is present, it can be destroyed by
* calling {@link #remove}.
*
* @return true iff this Entry is existent in the containing Map
*/
public boolean isPresent();
/**
* Remove this Entry from the Map if it is present in the Map.
* <p/>
* This method supports both the operation corresponding to
* {@link Map#remove} and synthetic operations such as
* eviction. If the containing Map does not differentiate between
* the two, then this method must be identical to
* <tt>InvocableMap.this.remove(getKey())</tt>.
*
* @param fSynthetic pass true only if the removal from the Map
* should be treated as a synthetic event
*/
public void remove(boolean fSynthetic);
}
scalarエージェントに加えて、InvocableMapインタフェースも、エントリのサブセットに対して単一の結果を取得するための操作を実行するエントリ集計をサポートしています。エントリ集計は、グリッド全体でパラレルに行われ、大量のデータを処理する際のmap-reduceサポートを提供します。
例24-4 InvocableMap APIの集計
/** * Perform an aggregating operation against the entries specified by the * passed keys. * * @param collKeys the Collection of keys that specify the entries within * this Map to aggregate across * @param agent the EntryAggregator that is used to aggregate across * the specified entries of this Map * * @return the result of the aggregation */ public Object aggregate(Collection collKeys, EntryAggregator agent); /** * Perform an aggregating operation against the set of entries that are * selected by the given Filter. * <p/> * <b>Note:</b> calling this method on partitioned caches requires a * Coherence Enterprise Edition (or higher) license. * * @param filter the Filter that is used to select entries within this * Map to aggregate across * @param agent the EntryAggregator that is used to aggregate across * the selected entries of this Map * * @return the result of the aggregation */ public Object aggregate(Filter filter, EntryAggregator agent);
単純なEntryAggregatorは、InvocableMap.Entryオブジェクトのセットを処理して結果を得ます。
例24-5 EntryAggregator API
/**
* An EntryAggregator represents processing that can be directed to occur
* against some subset of the entries in an InvocableMap, resulting in a
* aggregated result. Common examples of aggregation include functions
* such as min(), max() and avg(). However, the concept of aggregation
* applies to any process that must evaluate a group of entries to
* come up with a single answer.
*/
public interface EntryAggregator
extends Serializable
{
/**
* Process a set of InvocableMap Entry objects to produce an
* aggregated result.
*
* @param setEntries a Set of read-only InvocableMap Entry objects to
* aggregate
*
* @return the aggregated result from processing the entries
*/
public Object aggregate(Set setEntries);
}
データ・グリッド内での効果的な実行には、集計処理のパラレル実行の設計が不可欠です。
例24-6 集計をパラレル実行するParallelAwareAggregator API
/**
* A ParallelAwareAggregator is an advanced extension to EntryAggregator
* that is explicitly capable of being run in parallel, for example in a
* distributed environment.
*/
public interface ParallelAwareAggregator
extends EntryAggregator
{
/**
* Get an aggregator that can take the place of this aggregator in
* situations in which the InvocableMap can aggregate in parallel.
*
* @return the aggregator that is run in parallel
*/
public EntryAggregator getParallelAggregator();
/**
* Aggregate the results of the parallel aggregations.
*
* @return the aggregation of the parallel aggregation results
*/
public Object aggregateResults(Collection collResults);
}
Coherenceには、次のような通常の集計関数がすべて備わっています。
Count
DistinctValues
DoubleAverage
DoubleMax
DoubleMin
DoubleSum
LongMax
LongMin
LongSum
|
注意: Coherenceに付属するすべてのアグリゲータは、パラレル実行に対応しています。 |
Coherenceアグリゲータの一覧は、com.tangosol.util.aggregatorパッケージを参照してください。独自のアグリゲータを実装する場合、AbstractAggregator抽象ベース・クラスを参照してください。
Coherenceには、グリッド内の任意の場所で単一パスのエージェント(起動可能なオブジェクトと呼ばれる)を実行できる起動サービスが用意されています。エージェントは、グリッドの特定のノードで実行したり、グリッドの特定のノード・セットでパラレル実行することができます。また、グリッドのすべてのノードでパラレル実行することもできます。
起動サービスは、キャッシュ構成ファイル<invocation-scheme>要素を使用して構成します。サービスの名前を使用すると、アプリケーションはサービスへの参照を容易に取得できます。
InvocationService service = (InvocationService)CacheFactory.getService
("MyService");
エージェントは、アプリケーションの一部である簡単な実行可能クラスです。単純なエージェントの例は、JVMからGCをリクエストするように設計されたものです。
例24-7 ガベージ・コレクションをリクエストする単純なエージェント
/**
* Agent that issues a garbage collection.
*/
public class GCAgent
extends AbstractInvocable
{
public void run()
{
System.gc();
}
}
次のように、クラスタ全体でそのエージェントを実行するコードは1行で済みます。
service.execute(new GCAgent(), null, null);
次に、グリッド全体のリクエスト/レスポンス・モデルをサポートするエージェントの例を示します。
例24-8 グリッド全体のリクエスト・モデルおよびレスポンス・モデルをサポートするエージェント
/**
* Agent that determines how much free memory a grid node has.
*/
public class FreeMemAgent
extends AbstractInvocable
{
public void run()
{
Runtime runtime = Runtime.getRuntime();
int cbFree = runtime.freeMemory();
int cbTotal = runtime.totalMemory();
setResult(new int[] {cbFree, cbTotal});
}
}
グリッド全体でそのエージェントを実行して、すべての結果を取得するのに必要なのも、次の1行のコードのみです。
Map map = service.query(new FreeMemAgent(), null);
グリッド全体のリクエストまたはレスポンスを実行するのは容易ですが、その結果を出力するには、より多くのコーディングが必要です。
例24-9 グリッド全体のリクエストまたはレスポンスからの結果の出力
Iterator iter = map.entrySet().iterator();
while (iter.hasNext())
{
Map.Entry entry = (Map.Entry) iter.next();
Member member = (Member) entry.getKey();
int[] anInfo = (int[]) entry.getValue();
if (anInfo != null) // nullif member died
System.out.println("Member " + member + " has "
+ anInfo[0] + " bytes free out of "
+ anInfo[1] + " bytes total");
}
エージェントの操作はステートフルにすることができます。つまり、その起動状態をシリアライズして、エージェントを実行するグリッド・ノードに転送します。
例24-10 ステートフル・エージェントの操作
/**
* Agent that carries some state with it.
*/
public class StatefulAgent
extends AbstractInvocable
{
public StatefulAgent(String sKey)
{
m_sKey = sKey;
}
public void run()
{
// the agent has the key that it was constructed with
String sKey = m_sKey;
// ...
}
private String m_sKey;
}
Coherenceは、CommonJワーク・マネージャのグリッド対応の実装を提供します。ワーク・マネージャを使用すると、アプリケーションで実行する必要がある作業コレクションを送信できます。ワーク・マネージャは、その作業がパラレル実行されるように分散します(通常は、グリッド全体に分散します)。つまり、送信された作業アイテムが10個あり、グリッドに10台のサーバーがあるとすると、各サーバーは1つの作業アイテムを処理することになります。さらに、グリッド内の作業アイテムの分散処理は調整できます。そのため、データの効率性および局所性を考慮して、特定の作業が指定したサーバー(たとえば、特定のメインフレーム・サービスのゲートウェイとして動作するサーバーなど)で最初に実行されるように選択できます。
アプリケーションは作業の完了を待機できますが、その待機時間のタイムアウトを設定することもできます。そのためのAPIは非常に強力なため、アプリケーションで最初の作業アイテムが完了するのを待機したり、指定された作業アイテム・セットが完了するのを待機することができます。このAPIのメソッドを組み合せると、たとえば「実行するアイテムが10個あり、その中の7個は重要でないため最大5秒間待機し、残りの3個の重要なアイテムは最大30秒間待機する」という処理が可能になります。
例24-11 ワーク・マネージャの使用
Work[] aWork = ...
Collection collBigItems = new ArrayList();
Collection collAllItems = new ArrayList();
for (int i = 0, c = aWork.length; i < c; ++i)
{
WorkItem item = manager.schedule(aWork[i]);
if (i < 3)
{
// the first three work items are the important ones
collBigItems.add(item);
}
collAllItems.add(item);
}
Collection collDone = manager.waitForAll(collAllItems, 5000L);
if (!collDone.containsAll(collBigItems))
{
// wait the remainder of 30 seconds for the important work to finish
manager.waitForAll(collBigItems, 25000L);
}