24 キャッシュ内のデータの処理
この章の内容は次のとおりです。
- キャッシュ内のデータの処理の概要
Coherenceにより、データ・グリッド・サービスとそのクライアント、およびデータ・グリッドを使用するサーバーベースのアプリケーションの構築に最適なインフラストラクチャが提供されます。 - ターゲット、パラレルおよび問合せベースの処理のためのエージェントの使用
一般にエントリ・プロセッサと呼ばれるエージェントを使用して、キャッシュ内のデータを処理できます。 - データ・グリッドの集計の実行
InvocableMap
インタフェースは、エントリのサブセットに対して操作を実行し、単一の結果を取得するエントリ・アグリゲータをサポートします。 - ストリームを使用したデータ・グリッドの集計の実行
ストリームを使用すると、特にJavaラムダ式と組み合せた場合に、プログラミング・モデルを簡素化できます。 - ノードベースの処理の実行
Coherenceには、グリッド内の任意の場所で単一パスのエージェント(起動可能なオブジェクトと呼ばれる)を実行できる起動サービスが用意されています。 - ワーク・マネージャの使用
Coherenceでは、CommonJワーク・マネージャのグリッド対応の実装が提供されます。
親トピック: データ・グリッド操作の実行
キャッシュ内のデータの処理の概要
この項には次のトピックが含まれます:
ターゲット処理の実行
Coherenceでは、データ・グリッドの管理対象である任意のデータ・マップ・エントリに対してエージェントを実行する機能を提供します。
map.invoke(key, agent);
パーティション化されたデータの場合、データを所有しているグリッド・ノード上でエージェントが実行されます。キューイング、並行性の管理、エージェントの実行、エージェントによるデータ・アクセス、およびエージェントによるデータ変更は、すべてそのグリッド・ノード上で発生します。(結果のデータ変更の同期バックアップがある場合にのみ、追加的なネットワーク・トラフィックが必要になります。)様々な処理用途で、分散された並行処理制御、整合性およびデータ更新を処理するよりも、エージェントのシリアライズされた形式(最大で数百バイト)を移動するほうがずっと効率的です。
リクエストおよびレスポンス処理の場合、エージェントは次のような結果を返します。
Object oResult = map.invoke(key, agent);
Coherenceはデータ・グリッドとして、データ・トポロジの構成に基づいてエージェントを実行する場所を決定します。決定した場所にエージェントを移動して、エージェントを実行し(エージェントの実行中は項目の並行処理制御を自動的に処理します)、変更をバックアップして(存在する場合)、結果を返します。
親トピック: キャッシュ内のデータの処理の概要
パラレル処理の実行
Coherenceでは、グリッド内のすべてのノードにわたって、エントリのコレクションに対してエージェントをパラレル実行できるようにするmap-reduce機能が提供されます。パラレル実行により、グリッド全体で作業のバランスを取ることで大量のデータを処理することが可能になります。invokeAll
メソッドは、次のように使用されます。
map.invokeAll(collectionKeys, agent);
リクエストおよびレスポンス処理の場合、エージェントは各キーで1つの結果を返します。
Map mapResults = map.invokeAll(collectionKeys, agent);
Coherenceは、データ・トポロジの構成に基づいてエージェントを実行する最適な場所を決定します。次に、決定した場所にエージェントを移動して、エージェントを実行し(エージェントの実行中は項目の並行処理制御を自動的に処理します)、変更をバックアップして(存在する場合)、結合した結果を返します。データ・グリッドの集計の実行を参照してください。
親トピック: キャッシュ内のデータの処理の概要
問合せベースの処理の実行
Coherenceは、データ・グリッド全体にわたる問合せ機能をサポートしています。キャッシュ内のデータの問合せを参照してください。たとえば、取引システムにおいて、特定のトレーダーが受け持つ未決済のOrder
オブジェクトをすべて問い合せることができます。
NamedCache map = CacheFactory.getCache("trades"); Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid), new EqualsFilter("getStatus", Status.OPEN)); Set setOpenTradeIds = mapTrades.keySet(filter);
この機能をデータ・グリッドのパラレル実行の使用と組み合せることで、問合せに対するエージェントの実行機能を実現します。前項のとおり、実行はパラレルで行われるため、問合せに適合するIDまたはエントリを返すかわりに、エントリに対するエージェントが実行されます。
map.invokeAll(filter, agent);
リクエストおよびレスポンス処理の場合、エージェントは各キーで1つの結果を返します。
Map mapResults = map.invokeAll(filter, agent);
パラレル問合せとパラレル実行を組み合せることで、データ・グリッドに対する問合せベースのエージェント起動が実現されます。
親トピック: キャッシュ内のデータの処理の概要
データ・グリッド全体の処理の実行
AlwaysFilter
のインスタンス(またはnull
)をinvokeAll
メソッドに渡すと、渡されたエージェントがInvocableMap
のすべてのエントリに対して実行されます。
map.invokeAll((Filter) null, agent);
その他のタイプのエージェント起動と同様の方法で、リクエストおよびレスポンス処理がサポートされます。
Map mapResults = map.invokeAll((Filter) null, agent);
アプリケーションでは、データ・グリッド内の特定のマップで展開されているすべてのデータを1行のコードで処理できます。
親トピック: キャッシュ内のデータの処理の概要
ターゲット、パラレルおよび問合せベースの処理のためのエージェントの使用
この項には次のトピックが含まれます:
親トピック: キャッシュ内のデータの処理
エントリ・プロセッサ・エージェントの概要
エージェントは、通常、AbstractProcessor
クラスを拡張してEntryProcessor
インタフェースを実装します。Coherenceには、com.tangosol.util.processor
パッケージに含まれている次の事前定義済のEntryProcessor
実装があります。
-
AbstractProcessor
-EntryProcessor
を作成するための抽象ベース・クラス -
AsynchronousProcessor
- 基礎となるエントリ・プロセッサを非同期で起動可能なラッパー・クラス。エントリの非同期処理を参照してください。 -
CompositeProcessor
- 同じエントリに対して順番に起動されるEntryProcessor
オブジェクトのコレクションをバンドルします -
ConditionalProcessor
- エントリ-プロセスに対するFilter
がtrue
に評価された場合、EntryProcessor
を起動します -
ConditionalPut
- 指定した条件が満たされている場合に、Entry.setValue
操作を実行します -
ConditionalPutAll
- 指定した条件を満たす複数のエントリに対してEntry.setValue
操作を実行します -
ConditionalRemove
- 指定した条件が満たされている場合に、Entry.remove
操作を実行します -
ExtractorProcessor
-InvocableMap
に保存されたオブジェクトから値(プロパティ値など)を抽出して返します -
NumberIncrementor
- 基本形の整数型や、Byte
、Short
、Integer
、Long
、Float
、Double
、BigInteger
、BigDecimal
の任意のプロパティを実行前後に増分します -
NumberMultiplier
- 基本形の整数型や、Byte
、Short
、Integer
、Long
、Float
、Double
、BigInteger
、BigDecimal
の任意のプロパティを乗算した後で前の値または新しい値を返します -
PreloadRequest
-Entry.getValue
コールを実行します。結果はコール元にレポートされません。プロセッサは、ネットワークで値を送信するコストを発生させることなく、ローダーを使用してエントリまたはエントリのコレクションをキャッシュにロードする手段となります。対応するエントリ(または複数のエントリ)がキャッシュにすでに存在する場合、またはキャッシュにローダーがない場合、このプロセッサを含めても影響はありません。 -
PriorityProcessor
-Entryprocessor
メソッドの実行のスケジュールの優先度とタイムアウトを明示的に制御します。 -
PropertyProcessor
-PropertyManipulator
に依存するEntryProcessor
実装の抽象ベース・クラスNumberIncrementor
およびNumberMultiplier
エントリ・プロセッサは、このプロセッサを拡張します。 -
UpdaterProcessor
-InvocableMap
にキャッシュされているオブジェクトの属性を更新します。 -
VersionedPut
- 指定した値のバージョンが現在の値のバージョンと一致する場合に、Entry.setValue
操作を実行します。一致する場合、プロセッサは、値が更新される前にバージョン・インジケータを増分します。エントリ値はVersionable
インタフェースを実装する必要があります。 -
VersionedPutAll
- 対応する現在値のバージョンとバージョンが一致するエントリに対してのみ、Entry.setValue
操作を実行します。一致する場合、プロセッサは、各値が更新される前にバージョン・インジケータを増分します。エントリ値はVersionable
インタフェースを実装する必要があります。
InvocableMap
インタフェース内のEntryProcessor
インタフェースには、process
とprocessAll
の2つのメソッドのみが含まれます。AbstractProcessor
は、processAll
メソッドのデフォルト実装を提供します。複数のキーを処理する場合は、すべてのキーに単一のEntryProcessor
オブジェクトが再利用され、その状態は変化しません。
ノート:
processAll
のコールで例外がスローされた場合、変更は、setEntries
から削除されたエントリの基礎となるMap
に対してのみ行われます。残りのエントリに対して行う変更は処理されません。
EntryProcessor
に渡されるInvocableMap.Entry
は、Map.Entry
インタフェースを拡張したものです。これにより、EntryProcessor
実装でエントリに関する必要な情報を取得して、必要な変更を最も効率的な方法で実行できます。
ラムダ式を使用したエントリの処理
ラムダ式は、エントリ・プロセッサとして使用可能で、より簡潔なクライアント・コードになり、POF構成ファイルでプロセッサをシリアライズまたは登録する必要がなくなります。次の例では、エントリ・プロセッサをラムダ式として作成し、エントリ・プロセッサをinvokeAll
メソッド内で使用します。
InvocableMap.EntryProcessor<ContactId, Contact, Void> processor = (entry) -> { Contact contact = entry.getValue(); contact.setFirstName(contact.getFirstName().toUpperCase()); entry.setValue(contact); return null; }; cache.invokeAll(processor);
次の例では、エントリ・プロセッサをラムダ式としてinvokeAll
メソッド内に直接作成します。
Address addrWork = new Address("201 Newbury St.", "Yoyodyne, Ltd.", "Boston", "MA", "02116", "US"); ValueExtractor extractor = Lambda.extractor(Contact::getHomeAddress).andThen(Address::getState); Filter filter = equal(extractor, "MA"); addrWork.setStreet1("200 Newbury St."); cache.invokeAll(filter, entry -> { Contact contact = entry.getValue(); contact.setWorkAddress(addrWork); entry.setValue(contact); return null; });
ラムダ式はネストできないことに注意してください。たとえば:
cache.invoke(filter, entry -> {Runnable r = () -> System.out.println(""); r.run();}
分散環境のラムダの理解
ラムダ式は静的な性質のため、分散環境での実行には問題がある可能性があります。ラムダを記述するメタデータのみがネットワークで送信されます。そのため、クライアントとサーバー・クラスパスの両方で同じコンパイル済のコードが必要です。クライアントで新しいラムダ式を変更または追加する場合は、クライアントとサーバーの両方を再デプロイして、再起動する必要があります。さらに、統合ラムダ・メソッド名は、クラス・バージョン間で安定していません。そのため、すべてのクラスタ・メンバーには、正確なバージョンのクラスを含める必要があり、拡張クライアントを含め、同時に更新する必要があります。
これらの制限を回避するために、ラムダの動的な実装が提供されています。動的実装では、実行する実際のバイト・コードとラムダ・メタデータの両方を送信します。クライアント側のバイト・コードが解析され、そこから新しいラムダ・クラスが生成されます。サーバー側では、クライアントから受信したバイト・コードに基づいてラムダ・クラスが作成され、実行されます。動的実装:
-
再デプロイまたはサーバーを再起動する必要なく、既存の(または新しく導入した)動作の変更を可能にします
-
ラムダ命名の不安定さに関する問題を解決します
-
クラスタ全体で複数の異なるバージョンのクラスを可能にします
メソッドおよびコンストラクタの参照は静的なラムダとして常に扱われるため、動的実装が適切に動作するには、指定したメソッドおよびコンストラクタをラムダ式内で参照しないでください。さらに、動的実装では、囲んでいるメソッド引数とローカル変数のみを取得します。
複数のキャッシュのエントリの処理
エントリ・プロセッサでは、1つのprocess
またはprocessAll
操作で複数のキャッシュ内のキャッシュ・エントリを更新できます。これらのキャッシュは同じサービスによって管理され、エントリは同じパーティションに存在する必要があります。データ・アフィニティの指定を参照してください。
process
およびprocessAll
の操作は、キャッシュ・エントリのアクセス、挿入、更新、変更または削除の際に暗黙的ロックを使用するというトランザクションに類似した方法で実行されます。エントリの処理中に例外がスローされた場合、エントリはロール・バックされ、基礎となる値はいずれも変更されません。processAll
の操作は、個々のエントリやリクエスト全体ではなく、単一のパーティション内のすべてのエントリ(または、サービス・スレッドが構成されていない場合のメンバー)に対して原子性を持ちます。
ノート:
別のスレッドでの順序の競合でエントリがロックされている場合、暗黙的ロックによってデッドロックが作成されることがあります。アプリケーションでは、デッドロックが発生しない方式でキャッシュ・エントリにアクセス(をロック)するようにしておく必要があります。デッドロックが検出された場合、例外がスローされますが、基礎となるサービスは停止しません。
com.tangosol.net.BackingMapContext
APIは、複数のキャッシュ内のエントリを処理するために使用され、キャッシュのバッキング・マップのエントリに直接アクセスする手段を提供します。パッキング・マップは(アプリケーションによって通常使用されるキャッシュの論理表現とは反対に)エントリが格納されている実際のMap
実装です。パッキング・マップ内のエントリはバイナリ形式で格納されるため、アプリケーションではエントリをシリアライズ形式で処理することが必要です。記憶域およびバッキング・マップの実装を参照してください。
com.tangosol.util.BinaryEntry
APIは、パッキング・マップ・コンテキストへの容易なアクセスを提供し、アプリケーションによって通常使用されます。次のコード・サンプルは、BinaryEntry
APIを使用してエントリ・プロセッサのprocess
メソッド内で2つの異なるキャッシュ内のエントリを更新する方法を示しています。
public Object process(Entry entry) { BinaryEntry binEntry = (BinaryEntry) entry; Binary binKey = binEntry.getBinaryKey(); Trade trade = (Trade) binEntry.getValue(); // Update a Trade object in cache1 trade.setPrice(trade.getPrice() + factor); binEntry.setValue(trade); // update a Trade object in cache2 BackingMapManagerContext ctx = binEntry.getContext(); BinaryEntry binEntry2 = (BinaryEntry) ctx.getBackingMapContext("cache2").getBackingMapEntry(binKey); Trade trade2 = (Trade) binEntry2.getValue(); trade2.setPrice(trade2.getPrice() + factor); binEntry2.setValue(trade2); return null; }
ノート:
getBackingMapEntry
メソッドは、エントリ・プロセッサ呼出しのコンテキスト内でのみコールできます。エントリに対して行われる変更はすべて、外側の呼出しによって行われたものと同じライフサイクルで保持されます。返されるエントリは、包含呼出しの期間のみ有効であり、同じ呼出しコンテキスト内におけるこのメソッドへの複数のコールは同じエントリ・オブジェクトを返します。
エントリ・プロセッサの結果の無視
AbstractProcessor
クラスのprocessAll
メソッドは、結果のマップをクライアント・アプリケーションに返します。マップには、処理されたすべてのエントリのキーと値が含まれています。多くの場合、エントリ・プロセッサはクライアントが使用する結果を返します。ただし、いくつかの結果をクライアントが使用できない場合もあります。さらに重要なことには、プロセッサがキャッシュ内のすべてのエントリを評価することが必要な場合があります。そのようなときは、返されたマップにキャッシュ内のすべてのキーが含まれています。どちらの場合も、不要な結果を無視するようにエージェントを設計する必要があります。
次のような理由により、必要な結果のみを返すようにエージェントを設計することは、優れたパターンであり、ベスト・プラクティスです。
-
クライアントのメモリー・フットプリントが、キャッシュのサイズに依存しなくなります。
-
クライアントへの影響を受けるすべてのキーの転送(キャッシュが大きすぎる場合にクライアント上で
OutOfMemoryError
例外が発生する可能性がある)が回避されます。 -
クライアントにおけるすべてのキーのデシリアライズ・コストが回避されます。
-
プロキシ・ノードを介したマップおよびすべてのキーの転送が回避されます(拡張クライアントの場合)。
エントリ・プロセッサの結果を無視するには、プロセッサのprocessAll
メソッドをオーバーライドし、空のMap
またはnull
を返すようにします。次の例は、エントリを処理した後に常にnull
を返す単純なエントリ・プロセッサ・エージェントを示しています。この例は、あまり現実的ではありませんが、processAll
メソッドのオーバーライド方法を示しています。
public static class Agent implements InvocableMap.EntryProcessor { private static final long serialVersionUID = 1L; @Override public Object process(Entry entry) { return null; } @Override public Map processAll(Set set) { for (Entry entry : (Set<Entry>) set) { process(entry); } return null; }
統合操作の実行
エントリ・プロセッサは、エントリで統合操作を実行します。true
パラメータをメソッド・コールに含めることで、エントリのremove
およびsetValue
メソッドの両方を統合として宣言できます。たとえば:
entry.setValue(value, true)
統合操作のsetValue
メソッドは、以前の値を返しません。さらに、統合操作は、キャッシュ・ストアまたはバイナリ・エントリ・ストアに伝搬されず、すべての処理の完了後にアプリケーションで変更をコミットできます。
アプリケーションでは、一般に統合操作を使用してキャッシュを準備し、特に、リスナーおよびキャッシュ・ストアへの通知により、ネットワーク、メモリーおよびCPU使用率を大きく消費する場合は、この通知が不要な操作(状態変更)を実行します。アプリケーションでは、キャッシュを事前警告する場合に統合操作を使用します。キャッシュ・データをロードするアプリケーションでは、特に、キャッシュにロード中のエントリがバックエンド・データ・ソースからロードされている場合に、キャッシュ・ストアがコールされないようにする必要が生じることがあります。
アプリケーションは、CacheEvent
クラスのisSynthetic
メソッドを使用して、クライアント・ドリブン(自然)イベントとキャッシュ内部(統合)イベントを区別できます。統合イベントの使用を参照してください。
エントリの非同期処理
エントリ・プロセッサは、AsynchronousProcessor
クラスを使用して非同期に起動できます。クラスはJava標準のFuture
インタフェースを実装し、大量のバックログを回避するCoherence固有のフロー制御メカニズムも含んでいます。
ノート:
-
エントリは、
AsyncNameCache<K, V>
インタフェースを使用して非同期で処理することもできます。NameCache操作の非同期実行を参照してください。 -
この機能はCoherence*Extendクライアントでは使用できません。
AsynchronousProcessor
クラスは、エントリ・プロセッサの実装をラップするために使用されます。たとえば:
UpdaterProcessor up = new UpdaterProcessor(null, value); AsynchronousProcessor ap = new AsynchronousProcessor(up); cache.invokeAll(filter, ap); ap.getResult();
前述の例は、基礎となるエントリ・プロセッサを起動し、(基礎となるサービスのフロー制御ロジックで定義されているように)自動フロー制御および(スレッドのリクエストは当然順番に実行されることが期待されるため、コール元スレッドのhashCode
に割り当てられた)デフォルトの順序単位IDを使用します。順番は、フェイルオーバー中である場合でも各パーティションに対して保証されます。手動でリクエスト・フローを制御し、必要に応じて順序単位IDを割り当てるために、追加のコンストラクタが使用可能です。
高度なユースケースのために、AsynchronousProcessor
クラスを拡張してカスタム非同期機能を定義することが可能です。次の例では、AsynchrounousProcessor
クラスを拡張し、onResult
、onComplete
およびonException
メソッドをオーバーライドしています。
ノート:
onComplete
、onResult
およびonException
メソッド呼出しはクライアントのサービス・スレッドで実行され、他のスレッドのレスポンスの処理をブロックするため、これらのメソッドのオーバーライドの実装は、非ブロックで短時間で完了する必要があります。
AsynchronousProcessor processor = new AsynchronousProcessor(null) { public synchronized void onResult(Entry entry) { super.onResult(entry); // process the result } public void onComplete() { super.onComplete(); if (m_eReason == null) { // process the result } else { // process the (potentially partial) failure } } public void onException(Throwable eReason) { super.onException(eReason); // process the observed exception } };
データ・グリッドの集計の実行
InvocableMap
インタフェースは、エントリのサブセットに対して操作を実行し、単一の結果を取得するエントリ・アグリゲータをサポートします。エントリの集計はグリッド間でパラレルで行われ、大量のデータを使用して作業する際にmap-reduceのサポートが提供されます。詳細は、aggregate
メソッドを参照してください。
また、EntryAggregator
インタフェースを使用してInvocableMap
.Entry
オブジェクトのセットを処理し、集計結果を得ることができます。
データ・グリッド内での効果的な実行には、集計処理のパラレル実行の設計が不可欠です。StreamingAggregator
インタフェースは、分散環境で明示的にパラレル実行できるEntryAggregator
インタフェースに対する高度な拡張機能です。
ノート:
ParallelAwareAggregator
インタフェースは非推奨になっているため、使用しないでください。カスタム・アグリゲータを実装するには、アプリケーションでStreamingAggregator
インタフェースを使用してください。ストリームを使用したデータ・グリッドの集計の実行を参照してください。
Coherenceには、標準で多くのアグリゲータ機能が含まれています。それらの機能は次のようなものです。
-
Count
-
DistinctValues
-
DoubleAverage
-
DoubleMax
-
DoubleMin
-
DoubleSum
-
LongMax
-
LongMin
-
LongSum
Coherenceアグリゲータの一覧は、com.tangosol.util.aggregator
パッケージを参照してください。独自のアグリゲータを実装するには、StreamingAggregator
インタフェースを参照してください。
親トピック: キャッシュ内のデータの処理
ストリームを使用したデータ・グリッドの集計の実行
ValueExtractor<Person, Integer> ageExtractor = Person::getAge; double avgAge = cache.stream() .mapToInt(entry -> entry.extract(ageExtractor)) .average() .getAsDouble();
Coherenceフィルタを使用している場合は、フィルタ・オブジェクトをストリームのソースとして渡します。たとえば:
ValueExtractor<Person, Integer> ageExtractor = Person::getAge; int max = personCache.stream(filter) .mapToInt(entry -> entry.extract(ageExtractor)) .max();
別の方法として、Coherence Stream
API拡張機能を使用して、ストリームの作成時にエクストラクタを指定し、最適化に拡張機能を利用することができます。たとえば:
int max = personCache.stream(filter, Person::getAge) .mapToInt(Number::intValue) .max();
この場合、mapToInt(Number::intValue)
を使用してStream<Integer>
をIntStream
に変換する必要があることに注意してください。この変換は内部的にも行えます。たとえば:
int max = RemoteStream.toIntStream(personCache.stream(filter, Person::getAge)).max();
Javaストリーム実装は、クラスタ全体で集計できるようにCoherenceで拡張されています。APIでは、InvocableMap.StreamingAggregator
インタフェースを実装することで、ストリームをサポートする一連の集計を定義します。さらに、Coherenceには、分散環境でパラレルに実行可能な便利なコレクタが多く含まれています。コレクタは、RemoteCollectors
クラスを使用してコールされます。たとえば:
avgAge = cache.stream() .map(Map.Entry::getValue) .collect(RemoteCollectors.averagingInt(Contact::getAge)); System.out.println("\nThe average age of all contacts using collect() is: " + avgAge);
ストリームもサポートするカスタム集計を定義するには、CollectorAggregator
クラスを拡張できます。
親トピック: キャッシュ内のデータの処理
ノードベースの処理の実行
起動サービスは、キャッシュ構成ファイル<invocation-scheme>
要素を使用して構成します。invocation-scheme
を参照してください。サービスの名前を使用すると、アプリケーションはサービスへの参照を容易に取得できます。
InvocationService service = (InvocationService)CacheFactory.getService ("MyService");
エージェントは、アプリケーションの一部である簡単な実行可能クラスです。単純なエージェントの例は、JVMからGCをリクエストするように設計されたものです。
/** * Agent that issues a garbage collection. */ public class GCAgent extends AbstractInvocable { public void run() { System.gc(); } }
次のように、クラスタ全体でそのエージェントを実行するコードは1行で済みます。
service.execute(new GCAgent(), null, null);
次に、グリッド全体のリクエスト/レスポンス・モデルをサポートするエージェントの例を示します。
/** * Agent that determines how much free memory a grid node has. */ public class FreeMemAgent extends AbstractInvocable { public void run() { Runtime runtime = Runtime.getRuntime(); int cbFree = runtime.freeMemory(); int cbTotal = runtime.totalMemory(); setResult(new int[] {cbFree, cbTotal}); } }
グリッド全体でそのエージェントを実行してすべての結果を取得するのに必要なコードは、次の1行のみです。
Map map = service.query(new FreeMemAgent(), null);
グリッド全体のリクエストまたはレスポンスを実行するのは容易ですが、その結果を出力するには、より多くのコーディングが必要です。
Iterator iter = map.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); Member member = (Member) entry.getKey(); int[] anInfo = (int[]) entry.getValue(); if (anInfo != null) // nullif member died System.out.println("Member " + member + " has " + anInfo[0] + " bytes free out of " + anInfo[1] + " bytes total"); }
エージェントの操作はステートフルにすることができます。つまり、その起動状態をシリアライズして、エージェントを実行するグリッド・ノードに転送します。
/** * Agent that carries some state with it. */ public class StatefulAgent extends AbstractInvocable { public StatefulAgent(String sKey) { m_sKey = sKey; } public void run() { // the agent has the key that it was constructed with String sKey = m_sKey; // ... } private String m_sKey; }
親トピック: キャッシュ内のデータの処理
ワーク・マネージャの使用
アプリケーションは作業の完了を待機できますが、その待機時間のタイムアウトを設定することもできます。そのためのAPIは非常に強力なため、アプリケーションで最初の作業アイテムが完了するのを待機したり、指定された作業アイテム・セットが完了するのを待機することができます。このAPIのメソッドを組み合せると、たとえば「実行するアイテムが10個あり、その中の7個は重要でないため最大5秒間待機し、残りの3個の重要なアイテムは最大30秒間待機する」という処理が可能になります。
例24-1 ワーク・マネージャの使用
Work[] aWork = ... Collection collBigItems = new ArrayList(); Collection collAllItems = new ArrayList(); for (int i = 0, c = aWork.length; i < c; ++i) { WorkItem item = manager.schedule(aWork[i]); if (i < 3) { // the first three work items are the important ones collBigItems.add(item); } collAllItems.add(item); } Collection collDone = manager.waitForAll(collAllItems, 5000L); if (!collDone.containsAll(collBigItems)) { // wait the remainder of 30 seconds for the important work to finish manager.waitForAll(collBigItems, 25000L); }
親トピック: キャッシュ内のデータの処理