ヘッダーをスキップ
Oracle® Coherenceスタート・ガイド
リリース3.7.1
B65028-01
  ドキュメント・ライブラリへ移動
ライブラリ
製品リストへ移動
製品
目次へ移動
目次

前
 
次
 

2 データ・グリッドの提供

Coherenceでは、データ・グリッド・サービス、およびデータ・グリッドを使用するクライアントベースおよびサーバーベースのアプリケーションの構築に適したインフラストラクチャを提供します。Coherenceの基本的なレベルでは、グリッド内の多数のサーバー間の膨大なデータ量を管理できます。また、そのデータをほぼ待機時間なしでアクセスできます。さらに、データ間のパラレル問合せや、そのデータの記録システムとして動作するデータベースおよびEISシステムとの統合もサポートされます。Coherenceには、効果的なデータ・グリッドの構築に適した各種サービスも備わっています。

Coherenceのデータ・グリッド機能のインフラストラクチャの詳細は、第3章「問合せ可能なデータ・ファブリックの提供」を参照してください。

2.1 ターゲット実行

Coherenceでは、データ・グリッドの管理対象である任意のデータ・マップ・エントリに対してエージェントを実行する機能を提供します。

map.invoke(key, agent);

パーティション化されたデータの場合、エージェントの実行データを所有しているグリッド・ノード上でエージェントが実行されます。キューイング、並行性の管理、エージェントの実行、エージェントによるデータ・アクセス、およびエージェントによるデータ変更は、すべてそのグリッド・ノード上で発生します。(結果のデータ変更の同期バックアップがある場合にのみ、追加的なネットワーク・トラフィックが必要になります)。多くの処理用途では、分散された並行処理制御、一貫性およびデータ更新を管理するよりも、シリアライズされた形式のエージェントを移動させる方がはるかに効果的です(通常は数百バイト程度のメモリー消費で済みます)。

リクエスト/レスポンス処理の場合、エージェントは次のような結果を返します。

Object oResult = map.invoke(key, agent);

つまり、データ・グリッドとしてのCoherenceは、データ・トポロジ構成に基づいてエージェントの実行場所を決定し、その場所にエージェントを移動してから、そのエージェントを実行し(エージェントを実行しながら、そのアイテムの並行処理制御を自動的に処理する)、変更があればそれをバックアップして結果を返します。

2.2 パラレル実行

Coherenceでは、エントリのコレクション全体に対してエージェントを実行する機能も提供します。パーティション化されたデータ・グリッドでは、パラレル実行が行われます。つまり、グリッド内のノード数が多いほど、データ・グリッド全体に作業がロード・バランシングされます。

map.invokeAll(collectionKeys, agent);

リクエスト/レスポンス処理の場合、エージェントは各キーで1つの結果を返します。

Map mapResults = map.invokeAll(collectionKeys, agent);

つまり、Coherenceは、データ・トポロジ構成に基づいてエージェントの最適な実行場所を決定し、その場所にエージェントを移動してから、そのエージェントを実行し(エージェントを実行しながら、そのアイテムの並行処理制御を自動的に処理する)、変更があればそれをバックアップして結合された結果を返します。

2.3問合せベースの実行

第3章「問合せ可能なデータ・ファブリックの提供」の説明のとおり、Coherenceではデータ・グリッド全体に問い合せる機能がサポートされています。たとえば、取引システムにおいて、特定のトレーダーが受け持つ未決済のOrderオブジェクトをすべて問い合せることができます。

例2-1 データ・グリッド間の問合せ

NamedCache map    = CacheFactory.getCache("trades");
Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                                  new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);

この機能をデータ・グリッドのパラレル実行と組み合せることで、問合せに対するエージェントの実行機能を実現します。前項のとおり、実行はパラレルで行われるため、問合せに適合するIDまたはエントリを返すかわりに、エントリに対するエージェントが実行されます。

map.invokeAll(filter, agent);

リクエスト/レスポンス処理の場合、エージェントは各キーで1つの結果を返します。

Map mapResults = map.invokeAll(filter, agent);

つまり、パラレル問合せとパラレル実行を組み合せることで、データ・グリッドに対する問合せベースのエージェント起動が実現されます。

2.4 データ・グリッド全体での実行

AlwaysFilterのインスタンス(またはNULL)をinvokeAllメソッドに渡すと、渡されたエージェントがInvocableMapのすべてのエントリに対して実行されます。

map.invokeAll((Filter) null, agent);

その他のタイプのエージェント起動と同様の方法で、リクエスト/レスポンス処理がサポートされます。

Map mapResults = map.invokeAll((Filter) null, agent);

アプリケーションでは、データ・グリッド内の特定のマップで展開されているすべてのデータを1行のコードで処理できます。

2.5 ターゲット、パラレルおよび問合せベースの実行のエージェント

エージェントは、通常、AbstractProcessorクラスを拡張してEntryProcessorインタフェースを実装します。

Coherenceには、次のようなエージェントが含まれます。

InvocableMapインタフェース内のEntryProcessorインタフェースには、次の2つのメソッドのみが含まれます。

例2-2 EntryProcessorインタフェースのメソッド

/**
* An invocable agent that operates against the Entry objects within a
* Map.
*/
public interface EntryProcessor
        extends Serializable
    {
    /**
    * Process a Map Entry.
    *
    * @param entry  the Entry to process
    *
    * @return the result of the processing, if any
    */
    public Object process(Entry entry);

    /**
    * Process a Set of InvocableMap Entry objects. This method is
    * semantically equivalent to:
    * <pre>
    *   Map mapResults = new ListMap();
    *   for (Iterator iter = setEntries.iterator(); iter.hasNext(); )
    *       {
    *       Entry entry = (Entry) iter.next();
    *       mapResults.put(entry.getKey(), process(entry));
    *       }
    *   return mapResults;
    * </pre>
    *
    * @param setEntries  a read-only Set of InvocableMap Entry objects to
    *                    process
    *
    * @return a Map containing the results of the processing, up to one
    *         entry for each InvocableMap Entry that was processed, keyed
    *         by the keys of the Map that were processed, with a
    *         corresponding value being the result of the processing for
    *         each key
    */
    public Map processAll(Set setEntries);
    }

前述の例で示したとおり、AbstractProcessorprocessAllメソッドを実装します。

EntryProcessorに渡されるInvocableMap.Entryは、Map.Entryインタフェースを拡張したものです。これにより、EntryProcessor実装でエントリに関する必要な情報を取得して、必要な変更を非常に効果的に実行できます。

例2-3 InvocableMap.Entry API

/**
* An InvocableMap Entry contains additional information and exposes
* additional operations that the basic Map Entry does not. It allows
* non-existent entries to be represented, thus allowing their optional
* creation. It allows existent entries to be removed from the Map. It
* supports several optimizations that can ultimately be mapped
* through to indexes and other data structures of the underlying Map.
*/
public interface Entry
        extends Map.Entry
    {
    // ----- Map Entry interface ------------------------------------

    /**
    * Return the key corresponding to this entry. The resultant key does
    * not necessarily exist within the containing Map, which is to say
    * that <tt>InvocableMap.this.containsKey(getKey)</tt> could return
    * false. To test for the presence of this key within the Map, use
    * {@link #isPresent}, and to create the entry for the key, use
    * {@link #setValue}.
     *
    * @return the key corresponding to this entry; may be null if the
    *         underlying Map supports null keys
    */
    public Object getKey();

    /**
    * Return the value corresponding to this entry. If the entry does
    * not exist, then the value is null. To differentiate between
    * a null value and a non-existent entry, use {@link #isPresent}.
    * <p/>
    * <b>Note:</b> any modifications to the value retrieved using this
    * method are not guaranteed to persist unless followed by a
    * {@link #setValue} or {@link #update} call.
    *
    * @return the value corresponding to this entry; may be null if the
    *         value is null or if the Entry does not exist in the Map
    */
    public Object getValue();

    /**
    * Store the value corresponding to this entry. If the entry does
    * not exist, then the entry is created by invoking this method,
    * even with a null value (assuming the Map supports null values).
    *
    * @param oValue  the new value for this Entry
    *
    * @return the previous value of this Entry, or null if the Entry did
    *         not exist
    */
    public Object setValue(Object oValue);

    // ----- InvocableMap Entry interface ---------------------------

    /**
    * Store the value corresponding to this entry. If the entry does
    * not exist, then the entry is created by invoking this method,
    * even with a null value (assuming the Map supports null values).
    * <p/>
    * Unlike the other form of {@link #setValue(Object) setValue}, this
    * form does not return the previous value, and consequently may be
    * significantly less expensive (in terms of cost of execution) for
    * certain Map implementations.
    *
    * @param oValue      the new value for this Entry
    * @param fSynthetic  pass true only if the insertion into or
    *                    modification of the Map should be treated as a
    *                    synthetic event
    */
    public void setValue(Object oValue, boolean fSynthetic);

    /**
    * Extract a value out of the Entry's value. Calling this method is
    * semantically equivalent to
    * <tt>extractor.extract(entry.getValue())</tt>, but this method may
    * be significantly less expensive because the resultant value may be
    * obtained from a forward index, for example.
    *
    * @param extractor  a ValueExtractor to apply to the Entry's value
    *
    * @return the extracted value
    */
    public Object extract(ValueExtractor extractor);

    /**
    * Update the Entry's value. Calling this method is semantically
    * equivalent to:
    * <pre>
    *   Object oTarget = entry.getValue();
    *   updater.update(oTarget, oValue);
    *   entry.setValue(oTarget, false);
    * </pre>
    * The benefit of using this method is that it may allow the Entry
    * implementation to significantly optimize the operation, such as
    * for purposes of delta updates and backup maintenance.
    *
    * @param updater  a ValueUpdater used to modify the Entry's value
    */
    public void update(ValueUpdater updater, Object oValue);

    /**
    * Determine if this Entry exists in the Map. If the Entry is not
    * present, it can be created by calling {@link #setValue} or
    * {@link #setValue}. If the Entry is present, it can be destroyed by
    * calling {@link #remove}.
    *
    * @return true iff this Entry is existent in the containing Map
    */
    public boolean isPresent();

    /**
    * Remove this Entry from the Map if it is present in the Map.
    * <p/>
    * This method supports both the operation corresponding to
    * {@link Map#remove} and synthetic operations such as
    * eviction. If the containing Map does not differentiate between
    * the two, then this method is always identical to
    * <tt>InvocableMap.this.remove(getKey())</tt>.
    *
    * @param fSynthetic  pass true only if the removal from the Map
    *                    should be treated as a synthetic event
    */
    public void remove(boolean fSynthetic);
    }

2.6 データ・グリッドの集計

前の項で説明したエージェントがスカラー・エージェントに対応していれば、InvocableMapインタフェースも集計をサポートします。

例2-4 InvocableMap APIの集計

/**
* Perform an aggregating operation against the entries specified by the
* passed keys.
*
* @param collKeys  the Collection of keys that specify the entries within
*                  this Map to aggregate across
* @param agent     the EntryAggregator that is used to aggregate across
*                  the specified entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Collection collKeys, EntryAggregator agent);

/**
* Perform an aggregating operation against the set of entries that are
* selected by the given Filter.
* <p/>
* <b>Note:</b> calling this method on partitioned caches requires a
* Coherence Enterprise Edition (or higher) license.
*
* @param filter  the Filter that is used to select entries within this
*                Map to aggregate across
* @param agent   the EntryAggregator that is used to aggregate across
*                the selected entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Filter filter, EntryAggregator agent);

単純なEntryAggregatorは、InvocableMap.Entryオブジェクトのセットを処理して結果を得ます。

例2-5 EntryAggregator API

/**
* An EntryAggregator represents processing that can be directed to occur
* against some subset of the entries in an InvocableMap, resulting in a
* aggregated result. Common examples of aggregation include functions
* such as min(), max() and avg(). However, the concept of aggregation
* applies to any process that must evaluate a group of entries to
* come up with a single answer.
*/
public interface EntryAggregator
        extends Serializable
    {
    /**
    * Process a set of InvocableMap Entry objects to produce an
    * aggregated result.
    *
    * @param setEntries  a Set of read-only InvocableMap Entry objects to
    *                    aggregate
    *
    * @return the aggregated result from processing the entries
    */
    public Object aggregate(Set setEntries);
    }

データ・グリッド内での効果的な実行には、集計処理のパラレル実行の設計が不可欠です。

例2-6 集計をパラレル実行するParallelAwareAggregator API

/**
* A ParallelAwareAggregator is an advanced extension to EntryAggregator
* that is explicitly capable of being run in parallel, for example in a
* distributed environment.
*/
public interface ParallelAwareAggregator
        extends EntryAggregator
    {
    /**
    * Get an aggregator that can take the place of this aggregator in
    * situations in which the InvocableMap can aggregate in parallel.
    *
    * @return the aggregator that is run in parallel
    */
    public EntryAggregator getParallelAggregator();

    /**
    * Aggregate the results of the parallel aggregations.
    *
    * @return the aggregation of the parallel aggregation results
    */
    public Object aggregateResults(Collection collResults);
    }

Coherenceには、次のような通常の集計関数がすべて備わっています。

2.7 ノードベースの実行

Coherenceには、グリッド内の任意の場所で単一パスのエージェント(起動可能なオブジェクトと呼ばれる)を実行できる起動サービスが用意されています。エージェントは、グリッドの特定のノードで実行したり、グリッドの特定のノード・セットでパラレル実行することができます。また、グリッドのすべてのノードでパラレル実行することもできます。

起動サービスは、キャッシュ構成ファイル<invocation-scheme>要素を使用して構成します。サービスの名前を使用すると、アプリケーションはサービスへの参照を容易に取得できます。

InvocationService service = CacheFactory.getInvocationService("agents");

エージェントは、アプリケーションの一部である簡単な実行可能クラスです。単純なエージェントの例は、JVMからGCをリクエストするように設計されたものです。

例2-7 ガベージ・コレクションをリクエストする単純なエージェント

/**
* Agent that issues a garbage collection.
*/
public class GCAgent
        extends AbstractInvocable
    {
    public void run()
        {
        System.gc();
        }
    }

次のように、クラスタ全体でそのエージェントを実行するコードは1行で済みます。

service.execute(new GCAgent(), null, null);

次に、グリッド全体のリクエスト/レスポンス・モデルをサポートするエージェントの例を示します。

例2-8 グリッド全体のリクエスト・モデルおよびレスポンス・モデルをサポートするエージェント

/**
* Agent that determines how much free memory a grid node has.
*/
public class FreeMemAgent
        extends AbstractInvocable
    {
    public void run()
        {
        Runtime runtime = Runtime.getRuntime();
        int cbFree  = runtime.freeMemory();
        int cbTotal = runtime.totalMemory();
        setResult(new int[] {cbFree, cbTotal});
        }
    }

グリッド全体でそのエージェントを実行して、すべての結果を取得するのに必要なのも、次の1行のコードのみです

Map map = service.query(new FreeMemAgent(), null);

グリッド全体のリクエストまたはレスポンスを実行するのは容易ですが、その結果を出力するには、より多くのコーディングが必要です。

例2-9 グリッド全体のリクエストまたはレスポンスからの結果の出力

Iterator iter = map.entrySet().iterator();
while (iter.hasNext())
    {
    Map.Entry entry  = (Map.Entry) iter.next();
    Member    member = (Member) entry.getKey();
    int[]     anInfo = (int[]) entry.getValue();
    if (anInfo != null) // nullif member died
        System.out.println("Member " + member + " has "
            + anInfo[0] + " bytes free out of "
            + anInfo[1] + " bytes total");
    }

エージェントの操作はステートフルにすることができます。つまり、その起動状態をシリアライズして、エージェントを実行するグリッド・ノードに転送します。

例2-10 ステートフル・エージェントの操作

/**
* Agent that carries some state with it.
*/
public class StatefulAgent
        extends AbstractInvocable
    {
    public StatefulAgent(String sKey)
        {
        m_sKey = sKey;
        }

    public void run()
        {
        // the agent has the key that it was constructed with
        String sKey = m_sKey;
        // ...
        }

    private String m_sKey;
    }

2.8 ワーク・マネージャ

Coherenceは、CommonJワーク・マネージャのグリッド対応の実装を提供します。ワーク・マネージャを使用すると、アプリケーションで実行する必要がある作業コレクションを送信できます。ワーク・マネージャは、その作業がパラレル実行されるように分散します(通常は、グリッド全体に分散します)。つまり、送信された作業アイテムが10個あり、グリッドに10台のサーバーがあるとすると、各サーバーは1つの作業アイテムを処理することになります。さらに、グリッド内の作業アイテムの分散処理は調整できます。そのため、データの効率性および局所性を考慮して、特定の作業が指定したサーバー(たとえば、特定のメインフレーム・サービスのゲートウェイとして動作するサーバーなど)で最初に実行されるように選択できます。

アプリケーションは作業の完了を待機しますが、その待機時間のタイムアウトを設定することもできます。そのためのAPIは非常に強力なため、アプリケーションで最初の作業アイテムが完了するのを待機したり、指定された作業アイテム・セットが完了するのを待機することができます。このAPIのメソッドを組み合せると、たとえば「実行するアイテムが10個あり、その中の7個は重要でないため最大5秒間待機し、残りの3個の重要なアイテムは最大30秒間待機する」という処理が可能になります。

例2-11 ワーク・マネージャの使用

Work[] aWork = ...
Collection collBigItems = new ArrayList();
Collection collAllItems = new ArrayList();
for (int i = 0, c = aWork.length; i < c; ++i)
    {
    WorkItem item = manager.schedule(aWork[i]);

    if (i < 3)
        {
        // the first three work items are the important ones
        collBigItems.add(item);
        }

    collAllItems.add(item);
    }

Collection collDone = manager.waitForAll(collAllItems, 5000L);
if (!collDone.containsAll(collBigItems))
    {
    // wait the remainder of 30 seconds for the important work to finish
    manager.waitForAll(collBigItems, 25000L);
    }

2.8.1 Oracle Coherenceワーク・マネージャ: 主要金融機関からのフィードバック

ワーク・マネージャの主な使用例として、標準ベースのブレード・インフラストラクチャを使用した、アプリケーションにおける大まかなサービス・リクエストの処理があります。この場合、「この家族の情報が欲しい」などの一見単純なリクエストが多く発生します。しかし、実際には、このリクエストによって、Webサービス、RDBMSコールなどで構成される複数の異なるバックエンド・データソースに対する多数のリクエストが展開されます。この使用例は、ワーク・マネージャの分散機能での解決が要求される、それぞれ異なるが関連性のある2つの問題に展開されます。

  • コール元に不相応な待機時間を強いないために、大まかなリクエストをきめ細かな複数のリクエストに切り分けてパラレル実行するにはどうすればよいでしょうか。前述の例では、100以上のコール先に問い合せて情報を取得しました。Java EEには正式なスレッド・モデルがなく、それに対応するメッセージベースのアプローチは不十分であるため、Coherenceワーク・マネージャを実装しています。

  • 低コストのブレードを活用しながら多数の外部システム・コールをパラレル実行するため、必要な作業を多くのデュアル・プロセッサ(ハイパースレッド機能を採用しているため、論理的には4プロセッサ構成になる)コンピュータに展開させることで、本来は垂直方向のスケーラビリティの問題を水平方向のハードウェア・レベルでのスケーラビリティで解決できます。リモートのワーク・マネージャ・インスタンスへのリクエスト配信にかかるコストは、サービスを実行する際のコスト(通常は数十から数百ミリ秒)に比べて小さくなります。