プライマリ・コンテンツに移動
Oracle® Fusion Middleware Oracle Coherenceでのアプリケーションの開発
12c (12.2.1)
E69903-02
  ドキュメント・ライブラリへ移動
ライブラリ
製品リストへ移動
製品
目次へ移動
目次

前
 
次
 

24 キャッシュ内のデータの処理

この章では、クラスタ全体にわたってデータ・グリッド処理を実行するためのエントリ・プロセッサおよびアグリゲータの使用方法について説明します。これらのデータ・グリッド機能は、他のmap-reduceパターンと類似した方法で実行され、非常に短い待機時間で大量のデータを処理できます。

この章には次の項が含まれます:

24.1 キャッシュ内のデータの処理の概要

Coherenceでは、データ・グリッド・サービスおよびデータ・グリッドを使用するクライアントベースおよびサーバーベースのアプリケーションの構築に適したインフラストラクチャを提供します。Coherenceの基本的なレベルでは、グリッド内の多数のサーバーにわたる膨大なデータ量を管理できます。また、そのデータをほぼ待機時間なしでアクセスできます。さらに、マップ・リデュースな方法でのデータ間のパラレル問合せや、そのデータの記録システムとして動作するデータベースおよびEISシステムとの統合もサポートされます。

24.1.1 ターゲット処理の実行

Coherenceでは、データ・グリッドの管理対象である任意のデータ・マップ・エントリに対してエージェントを実行する機能を提供します。

map.invoke(key, agent);

パーティション化されたデータの場合、データを所有しているグリッド・ノード上でエージェントが実行されます。キューイング、並行性の管理、エージェントの実行、エージェントによるデータ・アクセス、およびエージェントによるデータ変更は、すべてそのグリッド・ノード上で発生します。(結果のデータ変更の同期バックアップがある場合にのみ、追加的なネットワーク・トラフィックが必要になります。)様々な処理用途で、分散された並行処理制御、整合性およびデータ更新を処理するよりも、エージェントのシリアライズされた形式(最大で数百バイト)を移動するほうがずっと効率的です。

リクエストおよびレスポンス処理の場合、エージェントは次のような結果を返します。

Object oResult = map.invoke(key, agent);

Coherenceはデータ・グリッドとして、データ・トポロジの構成に基づいてエージェントを実行する場所を決定します。決定した場所にエージェントを移動して、エージェントを実行し(エージェントの実行中は項目の並行処理制御を自動的に処理します)、変更をバックアップして(存在する場合)、結果を返します。

24.1.2 パラレル処理の実行

Coherenceでは、グリッド内のすべてのノードにわたって、エントリのコレクションに対してエージェントをパラレル実行できるようにするmap-reduce機能が提供されます。パラレル実行により、グリッド全体で作業のバランスを取ることで大量のデータを処理することが可能になります。invokeAllメソッドは、次のように使用されます。

map.invokeAll(collectionKeys, agent);

リクエストおよびレスポンス処理の場合、エージェントは各キーで1つの結果を返します。

Map mapResults = map.invokeAll(collectionKeys, agent);

Coherenceは、データ・トポロジの構成に基づいてエージェントを実行する最適な場所を決定します。次に、決定した場所にエージェントを移動して、エージェントを実行し(エージェントの実行中は項目の並行処理制御を自動的に処理します)、変更をバックアップして(存在する場合)、結合した結果を返します。結果セットに対して集計を実行する手順については、「データ・グリッドの集計の実行」を参照してください。

24.1.3 問合せベースの処理の実行

Coherenceは、データ・グリッド全体にわたる問合せ機能をサポートしています。問合せの作成の詳細は、第22章「キャッシュ内のデータの問合せ」を参照してください。たとえば、トレーディング・システムでは、特定のトレーダーのすべての未決済のOrderオブジェクトについて問い合せることができます。

例24-1 データ・グリッド全域の問合せ

NamedCache map    = CacheFactory.getCache("trades");
Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                                  new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);

この機能をデータ・グリッドのパラレル実行の使用と組み合せることで、問合せに対するエージェントの実行機能を実現します。前項のとおり、実行はパラレルで行われるため、問合せに適合するIDまたはエントリを返すかわりに、エントリに対するエージェントが実行されます。

map.invokeAll(filter, agent);

リクエストおよびレスポンス処理の場合、エージェントは各キーで1つの結果を返します。

Map mapResults = map.invokeAll(filter, agent);

パラレル問合せとパラレル実行を組み合せることで、データ・グリッドに対する問合せベースのエージェント起動が実現されます。

24.1.4 データ・グリッド全体の処理の実行

AlwaysFilterのインスタンス(またはnull)をinvokeAllメソッドに渡すと、渡されたエージェントがInvocableMapのすべてのエントリに対して実行されます。

map.invokeAll((Filter) null, agent);

その他のタイプのエージェント起動と同様の方法で、リクエストおよびレスポンス処理がサポートされます。

Map mapResults = map.invokeAll((Filter) null, agent);

アプリケーションでは、データ・グリッド内の特定のマップで展開されているすべてのデータを1行のコードで処理できます。

24.2 ターゲット、パラレルおよび問合せベースの処理のためのエージェントの使用

エージェントは、通常、AbstractProcessorクラスを拡張してEntryProcessorインタフェースを実装します。Coherenceには、一般的な多くの操作の実行に使用可能な、事前定義済の多くのエントリ・プロセッサが用意されています。プロセッサはcom.tangosol.util.processorパッケージに定義されています。

Coherenceには、次のEntryProcessor実装が含まれます。

  • AbstractProcessor - EntryProcessorを作成するための抽象ベース・クラス

  • AsynchronousProcessor - 基礎となるエントリ・プロセッサを非同期で起動可能なラッパー・クラス。詳細は、「エントリの非同期処理」を参照してください。

  • CompositeProcessor - 同じエントリに対して順番に起動されるEntryProcessorオブジェクトのコレクションをバンドルします

  • ConditionalProcessor - エントリ-プロセスに対するFiltertrueに評価された場合、EntryProcessorを起動します

  • ConditionalPut - 指定した条件が満たされている場合に、Entry.setValue操作を実行します

  • ConditionalPutAll - 指定した条件を満たす複数のエントリに対してEntry.setValue操作を実行します

  • ConditionalRemove - 指定した条件が満たされている場合に、Entry.remove操作を実行します

  • ExtractorProcessor - InvocableMapに保存されたオブジェクトから値(プロパティ値など)を抽出して返します

  • NumberIncrementor - 基本形の整数型や、ByteShortIntegerLongFloatDoubleBigIntegerBigDecimalの任意のプロパティを実行前後に増分します

  • NumberMultiplier - 基本形の整数型や、ByteShortIntegerLongFloatDoubleBigIntegerBigDecimalの任意のプロパティを乗算した後で前の値または新しい値を返します

  • PreloadRequest - Entry.getValueコールを実行します。結果はコール元にレポートされません。プロセッサは、ネットワークで値を送信するコストを発生させることなく、ローダーを使用してエントリまたはエントリのコレクションをキャッシュにロードする手段となります。対応するエントリ(または複数のエントリ)がキャッシュにすでに存在する場合、またはキャッシュにローダーがない場合、このプロセッサを含めても影響はありません。

  • PriorityProcessor - Entryprocessorメソッドの実行のスケジュールの優先度とタイムアウトを明示的に制御します。

  • PropertyProcessor - PropertyManipulatorに依存するEntryProcessor実装の抽象ベース・クラスNumberIncrementorおよびNumberMultiplierエントリ・プロセッサは、このプロセッサを拡張します。

  • UpdaterProcessor - InvocableMapにキャッシュされているオブジェクトの属性を更新します。

  • VersionedPut - 指定した値のバージョンが現在の値のバージョンと一致する場合に、Entry.setValue操作を実行します。一致する場合、プロセッサは、値が更新される前にバージョン・インジケータを増分します。エントリ値はVersionableインタフェースを実装する必要があります。

  • VersionedPutAll - 対応する現在値のバージョンとバージョンが一致するエントリに対してのみ、Entry.setValue操作を実行します。一致する場合、プロセッサは、各値が更新される前にバージョン・インジケータを増分します。エントリ値はVersionableインタフェースを実装する必要があります。

InvocableMapインタフェース内のEntryProcessorインタフェースには、processprocessAllの2つのメソッドのみが含まれます。AbstractProcessorは、processAllメソッドのデフォルト実装を提供します。複数のキーを処理する場合は、すべてのキーに単一のEntryProcessorオブジェクトが再利用され、その状態は変化しません。


注意:

processAllのコールで例外がスローされた場合、変更は、setEntriesから削除されたエントリの基礎となるMapに対してのみ行われます。残りのエントリに対して行う変更は処理されません。

EntryProcessorに渡されるInvocableMap.Entryは、Map.Entryインタフェースを拡張したものです。これにより、EntryProcessor実装でエントリに関する必要な情報を取得して、必要な変更を最も効率的な方法で実行できます。

24.2.1 ラムダ式を使用したエントリの処理

ラムダ式は、エントリ・プロセッサとして使用可能で、より簡潔なクライアント・コードになり、POF構成ファイルでプロセッサをシリアライズまたは登録する必要がなくなります。次の例では、エントリ・プロセッサをラムダ式として作成し、エントリ・プロセッサをinvokeAllメソッド内で使用します。

InvocableMap.EntryProcessor<ContactId, Contact, Void> processor = (entry) ->
   {
      Contact contact = entry.getValue();
      contact.setFirstName(contact.getFirstName().toUpperCase());
      entry.setValue(contact);
      return null;
   };
 
cache.invokeAll(processor);

次の例では、エントリ・プロセッサをラムダ式としてinvokeAllメソッド内に直接作成します。

Address addrWork = new Address("201 Newbury St.", "Yoyodyne, Ltd.",
   "Boston", "MA", "02116", "US");

ValueExtractor extractor =
   Lambda.extractor(Contact::getHomeAddress).andThen(Address::getState);
Filter filter = equal(extractor, "MA");

addrWork.setStreet1("200 Newbury St.");

cache.invokeAll(filter, entry ->
   {
   Contact contact = entry.getValue();
   contact.setWorkAddress(addrWork);
   entry.setValue(contact);
   return null;
   });

ラムダ式はネストできないことに注意してください。例:

cache.invoke(filter, entry -> {Runnable r = () -> System.out.println("");
r.run();}

分散環境のラムダの理解

ラムダ式は静的な性質のため、分散環境での実行には問題がある可能性があります。ラムダを記述するメタデータのみがネットワークで送信されます。そのため、クライアントとサーバー・クラスパスの両方で同じコンパイル済のコードが必要です。クライアントで新しいラムダ式を変更または追加する場合は、クライアントとサーバーの両方を再デプロイして、再起動する必要があります。さらに、統合ラムダ・メソッド名は、クラス・バージョン間で安定していません。そのため、すべてのクラスタ・メンバーには、正確なバージョンのクラスを含める必要があり、拡張クライアントを含め、同時に更新する必要があります。

これらの制限を回避するために、ラムダの動的な実装が提供されています。動的実装では、実行する実際のバイト・コードとラムダ・メタデータの両方を送信します。クライアント側のバイト・コードが解析され、そこから新しいラムダ・クラスが生成されます。サーバー側では、クライアントから受信したバイト・コードに基づいてラムダ・クラスが作成され、実行されます。動的実装:

  • 再デプロイまたはサーバーを再起動する必要なく、既存の(または新しく導入した)動作の変更を可能にします

  • ラムダ命名の不安定さに関する問題を解決します

  • クラスタ全体で複数の異なるバージョンのクラスを可能にします

メソッドおよびコンストラクタの参照は静的なラムダとして常に扱われるため、動的実装が適切に動作するには、指定したメソッドおよびコンストラクタをラムダ式内で参照しないでください。さらに、動的実装では、囲んでいるメソッド引数とローカル変数のみを取得します。

24.2.2 複数のキャッシュのエントリの処理

エントリ・プロセッサでは、1つのprocessまたはprocessAll操作で複数のキャッシュ内のキャッシュ・エントリを更新できます。これらのキャッシュは同じサービスによって管理され、エントリは同じパーティションに存在する必要があります。エントリを同じパーティションに存在させる方法の詳細は、「データ・アフィニティの指定」を参照してください。

processおよびprocessAllの操作は、キャッシュ・エントリのアクセス、挿入、更新、変更または削除の際に暗黙的ロックを使用するというトランザクションに類似した方法で実行されます。エントリの処理中に例外がスローされた場合、エントリはロール・バックされ、基礎となる値はいずれも変更されません。processAllの操作は、個々のエントリやリクエスト全体ではなく、単一のパーティション内のすべてのエントリ(または、サービス・スレッドが構成されていない場合のメンバー)に対して原子性を持ちます。


注意:

別のスレッドでの順序の競合でエントリがロックされている場合、暗黙的ロックによってデッドロックが作成されることがあります。アプリケーションでは、デッドロックが発生しない方式でキャッシュ・エントリにアクセス(をロック)するようにしておく必要があります。デッドロックが検出された場合、例外がスローされますが、基礎となるサービスは停止しません。

com.tangosol.net.BackingMapContext APIは、複数のキャッシュ内のエントリを処理するために使用され、キャッシュのバッキング・マップのエントリに直接アクセスする手段を提供します。パッキング・マップは(アプリケーションによって通常使用されるキャッシュの論理表現とは反対に)エントリが格納されている実際のMap実装です。パッキング・マップ内のエントリはバイナリ形式で格納されるため、アプリケーションではエントリをシリアライズ形式で処理することが必要です。パッキング・マップの詳細は、第13章「記憶域およびバッキング・マップの実装」を参照してください。

com.tangosol.util.BinaryEntry APIは、パッキング・マップ・コンテキストへの容易なアクセスを提供し、アプリケーションによって通常使用されます。次のコード・サンプルは、BinaryEntry APIを使用してエントリ・プロセッサのprocessメソッド内で2つの異なるキャッシュ内のエントリを更新する方法を示しています。

public Object process(Entry entry) {
   BinaryEntry binEntry = (BinaryEntry) entry;
   Binary binKey = binEntry.getBinaryKey();
   Trade trade = (Trade) binEntry.getValue();
       
   // Update a Trade object in cache1

   trade.setPrice(trade.getPrice() + factor);
   binEntry.setValue(trade);  
 
   // update a Trade object in cache2

   BackingMapManagerContext ctx = binEntry.getContext();
   BinaryEntry binEntry2 = 
      (BinaryEntry) ctx.getBackingMapContext("cache2").getBackingMapEntry(binKey);
   Trade trade2 = (Trade) binEntry2.getValue();
   trade2.setPrice(trade2.getPrice() + factor);
   binEntry2.setValue(trade2);

   return null;
}

注意:

getBackingMapEntryメソッドは、エントリ・プロセッサ呼出しのコンテキスト内でのみコールできます。エントリに対して行われる変更はすべて、外側の呼出しによって行われたものと同じライフサイクルで保持されます。返されるエントリは、包含呼出しの期間のみ有効であり、同じ呼出しコンテキスト内におけるこのメソッドへの複数のコールは同じエントリ・オブジェクトを返します。

24.2.3 エントリ・プロセッサの結果の無視

AbstractProcessorクラスのprocessAllメソッドは、結果のマップをクライアント・アプリケーションに返します。マップには、処理されたすべてのエントリのキーと値が含まれています。多くの場合、エントリ・プロセッサはクライアントが使用する結果を返します。ただし、いくつかの結果をクライアントが使用できない場合もあります。さらに重要なことには、プロセッサがキャッシュ内のすべてのエントリを評価することが必要な場合があります。そのようなときは、返されたマップにキャッシュ内のすべてのキーが含まれています。どちらの場合も、不要な結果を無視するようにエージェントを設計する必要があります。

次のような理由により、必要な結果のみを返すようにエージェントを設計することは、優れたパターンであり、ベスト・プラクティスです。

  • クライアントのメモリー・フットプリントが、キャッシュのサイズに依存しなくなります。

  • クライアントへの影響を受けるすべてのキーの転送(キャッシュが大きすぎる場合にクライアント上でOutOfMemoryError例外が発生する可能性がある)が回避されます。

  • クライアントにおけるすべてのキーのデシリアライズ・コストが回避されます。

  • プロキシ・ノードを介したマップおよびすべてのキーの転送が回避されます(拡張クライアントの場合)。

エントリ・プロセッサの結果を無視するには、プロセッサのprocessAllメソッドをオーバーライドし、空のMapまたはnullを返すようにします。次の例は、エントリを処理した後に常にnullを返す単純なエントリ・プロセッサ・エージェントを示しています。この例は、あまり現実的ではありませんが、processAllメソッドのオーバーライド方法を示しています。

public static class Agent
   implements InvocableMap.EntryProcessor
   {
   private static final long serialVersionUID = 1L;
   
   @Override
   public Object process(Entry entry)
      {
      return null;
      }
    
   @Override
   public Map processAll(Set set)
      {
      for (Entry entry : (Set<Entry>) set)
         {
         process(entry);
         }
      return null;
      }

24.2.4 統合操作の実行

エントリ・プロセッサは、エントリで統合操作を実行します。trueパラメータをメソッド・コールに含めることで、エントリのremoveおよびsetValueメソッドの両方を統合として宣言できます。例:

entry.setValue(value, true)

統合操作のsetValueメソッドは、以前の値を返しません。さらに、統合操作は、キャッシュ・ストアまたはバイナリ・エントリ・ストアに伝搬されず、すべての処理の完了後にアプリケーションで変更をコミットできます。

アプリケーションでは、一般に統合操作を使用してキャッシュを準備し、特に、リスナーおよびキャッシュ・ストアへの通知により、ネットワーク、メモリーおよびCPU使用率を大きく消費する場合は、この通知が不要な操作(状態変更)を実行します。アプリケーションでは、キャッシュを事前警告する場合に統合操作を使用します。キャッシュ・データをロードするアプリケーションでは、特に、キャッシュにロード中のエントリがバックエンド・データ・ソースからロードされている場合に、キャッシュ・ストアがコールされないようにする必要が生じることがあります。

アプリケーションは、CacheEventクラスのisSyntheticメソッドを使用して、クライアント・ドリブン(自然)イベントとキャッシュ内部(統合)イベントを区別できます。詳細は、「統合イベントの使用」を参照してください。

24.2.5 エントリの非同期処理

エントリ・プロセッサは、AsynchronousProcessorクラスを使用して非同期に起動できます。クラスはJava標準のFutureインタフェースを実装し、大量のバックログを回避するCoherence固有のフロー制御メカニズムも含んでいます。


注意:

エントリは、AsyncNameCache<K, V>インタフェースを使用して非同期で処理することもできます。詳細は、「NameCache操作の非同期実行」を参照してください。

AsynchronousProcessorクラスは、エントリ・プロセッサの実装をラップするために使用されます。例:

UpdaterProcessor up = new UpdaterProcessor(null, value);
AsynchronousProcessor ap = new AsynchronousProcessor(up);

cache.invokeAll(filter, ap);
ap.getResult();

前述の例は、基礎となるエントリ・プロセッサを起動し、(基礎となるサービスのフロー制御ロジックで定義されているように)自動フロー制御および(スレッドのリクエストは当然順番に実行されることが期待されるため、コール元スレッドのhashCodeに割り当てられた)デフォルトの順序単位IDを使用します。順番は、フェイルオーバー中である場合でも各パーティションに対して保証されます。手動でリクエスト・フローを制御し、必要に応じて順序単位IDを割り当てるために、追加のコンストラクタが使用可能です。

高度なユースケースのために、AsynchronousProcessorクラスを拡張してカスタム非同期機能を定義することが可能です。次の例では、AsynchrounousProcessorクラスを拡張し、onResultonCompleteおよびonExceptionメソッドをオーバーライドしています。AsynchrounousProcessorクラスおよびそのメソッドの詳細は、Oracle Coherence Java APIリファレンスを参照してください。


注意:

onCompleteonResultおよびonExceptionメソッド呼出しはクライアントのサービス・スレッドで実行され、他のスレッドのレスポンスの処理をブロックするため、これらのメソッドのオーバーライドの実装は、非ブロックで短時間で完了する必要があります。

AsynchronousProcessor processor = new AsynchronousProcessor(null)
   {

   public synchronized void onResult(Entry entry)
      {
      super.onResult(entry);

      // process the result
      }

   public void onComplete()
      {
      super.onComplete();
            
      if (m_eReason == null)
         {

         // process the result
         }
      else
         {

         // process the (potentially partial) failure
         }
      }

   public void onException(Throwable eReason)
      {
         super.onException(eReason);
         
         // process the observed exception
      }
};

24.3 データ・グリッドの集計の実行

scalarエージェントに加えて、InvocableMapインタフェースも、エントリのサブセットに対して単一の結果を取得するための操作を実行するエントリ集計をサポートしています。エントリ集計は、グリッド全体でパラレルに行われ、大量のデータを処理する際のmap-reduceサポートを提供します。

例24-2 InvocableMap APIの集計

/**
* Perform an aggregating operation against the entries specified by the
* passed keys.
*
* @param collKeys  the Collection of keys that specify the entries within
*                  this Map to aggregate across
* @param agent     the EntryAggregator that is used to aggregate across
*                  the specified entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Collection collKeys, EntryAggregator agent);

/**
* Perform an aggregating operation against the set of entries that are
* selected by the given Filter.
* <p/>
* <b>Note:</b> calling this method on partitioned caches requires a
* Coherence Enterprise Edition (or higher) license.
*
* @param filter  the Filter that is used to select entries within this
*                Map to aggregate across
* @param agent   the EntryAggregator that is used to aggregate across
*                the selected entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Filter filter, EntryAggregator agent);

単純なEntryAggregatorは、InvocableMap.Entryオブジェクトのセットを処理して結果を得ます。

例24-3 EntryAggregator API

/**
* An EntryAggregator represents processing that can be directed to occur
* against some subset of the entries in an InvocableMap, resulting in a
* aggregated result. Common examples of aggregation include functions
* such as min(), max() and avg(). However, the concept of aggregation
* applies to any process that must evaluate a group of entries to
* come up with a single answer.
*/
public interface EntryAggregator
        extends Serializable
    {
    /**
    * Process a set of InvocableMap Entry objects to produce an
    * aggregated result.
    *
    * @param setEntries  a Set of read-only InvocableMap Entry objects to
    *                    aggregate
    *
    * @return the aggregated result from processing the entries
    */
    public Object aggregate(Set setEntries);
    }

データ・グリッド内での効果的な実行には、集計処理のパラレル実行の設計が不可欠です。

例24-4 集計をパラレル実行するParallelAwareAggregator API

/**
* A ParallelAwareAggregator is an advanced extension to EntryAggregator
* that is explicitly capable of being run in parallel, for example in a
* distributed environment.
*/
public interface ParallelAwareAggregator
        extends EntryAggregator
    {
    /**
    * Get an aggregator that can take the place of this aggregator in
    * situations in which the InvocableMap can aggregate in parallel.
    *
    * @return the aggregator that is run in parallel
    */
    public EntryAggregator getParallelAggregator();

    /**
    * Aggregate the results of the parallel aggregations.
    *
    * @return the aggregation of the parallel aggregation results
    */
    public Object aggregateResults(Collection collResults);
    }

Coherenceには、次のような通常の集計関数がすべて備わっています。

  • Count

  • DistinctValues

  • DoubleAverage

  • DoubleMax

  • DoubleMin

  • DoubleSum

  • LongMax

  • LongMin

  • LongSum


    注意:

    Coherenceに付属するすべてのアグリゲータは、パラレル実行に対応しています。

    Coherenceアグリゲータの一覧は、com.tangosol.util.aggregatorパッケージを参照してください。独自のアグリゲータを実装する場合、AbstractAggregator抽象ベース・クラスを参照してください。

24.4 ストリームを使用したデータ・グリッドの集計の実行

データ・グリッド集計は、Javaストリームを使用して実行できます。ストリームを使用すると、特にJavaラムダ式と組み合せた場合に、プログラミング・モデルを簡素化できます。例:

double avgAge = cache.stream()
   .mapToInt(entry -> entry.extract(Contact::getAge))
   .average()
   .getAsDouble();
System.out.println("\nThe average age of all contacts is: " + avgAge);

Coherenceフィルタを使用している場合は、フィルタ・オブジェクトをストリームのソースとして渡すことができます。例:

int max = cache.stream(filter)
   .mapToInt(Person::getAge)
   .max();

Javaストリーム実装は、クラスタ全体で集計できるようにCoherenceで拡張されています。APIでは、InvocableMap.StreamingAggregatorインタフェースを実装することで、ストリームをサポートする一連の集計を定義します。さらに、Coherenceには、分散環境でパラレルに実行可能な便利なコレクタが多く含まれています。コレクタは、RemoteCollectorsクラスを使用してコールされます。例:

avgAge = cache.stream()
   .map(Map.Entry::getValue)
   .collect(RemoteCollectors.averagingInt(Contact::getAge));
System.out.println("\nThe average age of all contacts using collect() is: " +
   avgAge);

ストリームもサポートするカスタム集計を定義するには、CollectorAggregatorクラスを拡張できます。このクラスおよびサポートされるコレクションの詳細は、Oracle Coherence Java APIリファレンスcom.tangosol.internal.util.streamパッケージに関する項を参照してください。

24.5 ノードベースの処理の実行

Coherenceには、グリッド内の任意の場所で単一パスのエージェント(起動可能なオブジェクトと呼ばれる)を実行できる起動サービスが用意されています。エージェントは、グリッドの特定のノードで実行したり、グリッドの特定のノード・セットでパラレル実行することができます。また、グリッドのすべてのノードでパラレル実行することもできます。

起動サービスは、キャッシュ構成ファイル<invocation-scheme要素を使用して構成します。サービスの名前を使用すると、アプリケーションはサービスへの参照を容易に取得できます。

InvocationService service = (InvocationService)CacheFactory.getService
("MyService");

エージェントは、アプリケーションの一部である簡単な実行可能クラスです。単純なエージェントの例は、JVMからGCをリクエストするように設計されたものです。

例24-5 ガベージ・コレクションをリクエストする単純なエージェント

/**
* Agent that issues a garbage collection.
*/
public class GCAgent
        extends AbstractInvocable
    {
    public void run()
        {
        System.gc();
        }
    }

次のように、クラスタ全体でそのエージェントを実行するコードは1行で済みます。

service.execute(new GCAgent(), null, null);

次に、グリッド全体のリクエスト/レスポンス・モデルをサポートするエージェントの例を示します。

例24-6 グリッド全体のリクエスト・モデルおよびレスポンス・モデルをサポートするエージェント

/**
* Agent that determines how much free memory a grid node has.
*/
public class FreeMemAgent
        extends AbstractInvocable
    {
    public void run()
        {
        Runtime runtime = Runtime.getRuntime();
        int cbFree  = runtime.freeMemory();
        int cbTotal = runtime.totalMemory();
        setResult(new int[] {cbFree, cbTotal});
        }
    }

グリッド全体でそのエージェントを実行して、すべての結果を取得するのに必要なのも、次の1行のコードのみです

Map map = service.query(new FreeMemAgent(), null);

グリッド全体のリクエストまたはレスポンスを実行するのは容易ですが、その結果を出力するには、より多くのコーディングが必要です。

例24-7 グリッド全体のリクエストまたはレスポンスからの結果の出力

Iterator iter = map.entrySet().iterator();
while (iter.hasNext())
    {
    Map.Entry entry  = (Map.Entry) iter.next();
    Member    member = (Member) entry.getKey();
    int[]     anInfo = (int[]) entry.getValue();
    if (anInfo != null) // nullif member died
        System.out.println("Member " + member + " has "
            + anInfo[0] + " bytes free out of "
            + anInfo[1] + " bytes total");
    }

エージェントの操作はステートフルにすることができます。つまり、その起動状態をシリアライズして、エージェントを実行するグリッド・ノードに転送します。

例24-8 ステートフル・エージェントの操作

/**
* Agent that carries some state with it.
*/
public class StatefulAgent
        extends AbstractInvocable
    {
    public StatefulAgent(String sKey)
        {
        m_sKey = sKey;
        }

    public void run()
        {
        // the agent has the key that it was constructed with
        String sKey = m_sKey;
        // ...
        }

    private String m_sKey;
    }

24.6 ワーク・マネージャの使用

Coherenceは、CommonJワーク・マネージャのグリッド対応の実装を提供します。ワーク・マネージャを使用すると、アプリケーションで実行する必要がある作業コレクションを送信できます。ワーク・マネージャは、その作業がパラレル実行されるように分散します(通常は、グリッド全体に分散します)。つまり、送信された作業アイテムが10個あり、グリッドに10台のサーバーがあるとすると、各サーバーは1つの作業アイテムを処理することになります。さらに、グリッド内の作業アイテムの分散処理は調整できます。そのため、データの効率性および局所性を考慮して、特定の作業が指定したサーバー(たとえば、特定のメインフレーム・サービスのゲートウェイとして動作するサーバーなど)で最初に実行されるように選択できます。

アプリケーションは作業の完了を待機できますが、その待機時間のタイムアウトを設定することもできます。そのためのAPIは非常に強力なため、アプリケーションで最初の作業アイテムが完了するのを待機したり、指定された作業アイテム・セットが完了するのを待機することができます。このAPIのメソッドを組み合せると、たとえば「実行するアイテムが10個あり、その中の7個は重要でないため最大5秒間待機し、残りの3個の重要なアイテムは最大30秒間待機する」という処理が可能になります。

例24-9 ワーク・マネージャの使用

Work[] aWork = ...
Collection collBigItems = new ArrayList();
Collection collAllItems = new ArrayList();
for (int i = 0, c = aWork.length; i < c; ++i)
    {
    WorkItem item = manager.schedule(aWork[i]);

    if (i < 3)
        {
        // the first three work items are the important ones
        collBigItems.add(item);
        }

    collAllItems.add(item);
    }

Collection collDone = manager.waitForAll(collAllItems, 5000L);
if (!collDone.containsAll(collBigItems))
    {
    // wait the remainder of 30 seconds for the important work to finish
    manager.waitForAll(collBigItems, 25000L);
    }