Coherenceには、ロック、トランザクションおよびデータへの同時アクセスをサポートする様々なオプションがあります。
Coherenceには、データへの同時アクセスを管理する様々なオプションがあります。
表2-1 Coherenceの同時アクセスのオプション
| オプション名 | 説明 |
|---|---|
|
|
|
|
|
|
|
|
Java EEコンテナでのトランザクション管理について、CoherenceではJCAリソース・アダプタが用意されており、JTAを使用してトランザクションを管理できます。現時点では、XAトランザクションはサポートされていませんが、最後のリソースとしてXAトランザクションに参加できます。 |
|
|
Coherenceでは、 |
|
|
ローカル(XA以外)のデータ・リソースを含むキャッシュを維持する際のガイドラインです。 |
標準的なNamedCacheインタフェースは、基本的なロック・メソッドを含むConcurrentMapインタフェースを拡張します。ロック操作は、エントリ・レベルで、NamedCache内の特定のキーに対してロックをリクエストすることで適用されます。
例2-1 キャッシュでのロック操作の適用
...
NamedCache cache = CacheFactory.getCache("dist-cache");
Object key = "example_key";
cache.lock(key, -1);
try
{
Object value = cache.get(key);
// application logic
cache.put(key, value);
}
finally
{
// Always unlock in a "finally" block
// to ensure that uncaught exceptions
// don't leave data locked
cache.unlock(key);
}
...
Coherenceのロック機能では、JavaのsynchronizedキーワードおよびC#のlockキーワードと同様、ブロック・ロックのみをロックします。複数のスレッドが共同で、ロックを適切に使用してデータへのアクセスを調整する必要があります。スレッドが、ある項目のキーをロックしていても、別のスレッドはロックせずにその項目を読み取ることができます。
ロックはサーバー障害の影響を受けず、バックアップ・サーバーにフェイルオーバーします。ロック所有者(クライアント)に障害が発生すると、ロックはただちに解除されます。
ロックの動作は、リクエストされたタイムアウトとキャッシュのタイプによって異なります。タイムアウトが-1の場合はロックを取得できるまで無限にブロックされ、0の場合はただちに返され、0より大きい値の場合は指定した時間(ミリ秒単位)待機してからタイムアウトします。コール元が実際にロックを取得したことを確認するには、返されたブール値を調べる必要があります。詳細は、ConcurrentMap.lock()を参照してください。タイムアウト値がlock()に渡されない場合、デフォルトは0になります。レプリケーション・キャッシュでは、ConcurrentMap.LOCK_ALLをキーとして使用してキャッシュ全体をロックできますが、通常はお薦めしません。この操作は、パーティション・キャッシュではサポートされていません。
レプリケーション・キャッシュとパーティション・キャッシュの両方で、getはロックされているキーで許可されます。putは、レプリケーション・キャッシュではブロックされますが、パーティション・キャッシュではブロックされません。ロックが行われている場合、ロックの解放はコール元の責任です(lease-granularityの構成によって、同じスレッド内または同じクラスタ・ノード内で実行される)。このような理由から、ロックは常にfinally句(または同等のもの)で解放する必要があります。そうでない場合、未処理の例外があると無限にロックされることがあります。lease-granularity構成の詳細は、「DistributedCacheサービスのパラメータ」を参照してください。
TransactionMapを使用すると、トランザクションにおいて、1つのCoherenceキャッシュで複数の項目を更新できます。TransactionMapを使用してトランザクションを実行するには、Java Transaction API(JTA)ライブラリがクラスパスに存在する必要があります。TransactionMapは、CacheFactoryを使用して作成します。
NamedCache cache = CacheFactory.getCache("dist-cache");
TransactionMap tmap = CacheFactory.getLocalTransaction(cache);
TransactionMapを使用する前に、並行性レベルと分離レベルを、必要なレベルに設定する必要があります。並行性の高い、短時間で実行されるほとんどのトランザクションでは、並行性レベルをCONCUR_PESSIMISTIC、分離レベルをTRANSACTION_REPEATABLE_GETにする必要があります。それほど頻繁に発生しない長時間のトランザクションの場合は、デフォルト設定をCONCUR_OPTIMISTICとTRANSACTION_REPEATABLE_GETにすることをお薦めします。並行性レベルをCONCUR_PESSIMISTICに、分離レベルをTRANSACTION_SERIALIZABLEにすると、キャッシュ全体がロックされます。前述のとおり、パーティション・キャッシュではキャッシュ全体をロックできないため、このモードはパーティション・キャッシュでは機能しません(一般的に、信頼性の高いトランザクション処理では、この分離レベルは不要です)。キャッシュに対する問合せ(keySet(Filter)またはentrySet(Filter)のコール)は、常にREAD_COMMITTEDの分離レベルで実行されます。並行性レベルおよび分離レベルの詳細は、TransactionMap APIを参照してください。
分離レベルと並行性レベルを設定する方法は次のとおりです。
tmap.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET); tmap.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC);
ここで、TransactionMapを使用して、トランザクションでキャッシュを更新できます。
例2-2 トランザクションでのキャッシュの更新
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.Base;
import com.tangosol.util.TransactionMap;
import java.util.Collection;
import java.util.Collections;
public class TxMapSample
extends Base
{
public static void main(String[] args)
{
// populate the cache
NamedCache cache = CacheFactory.getCache("dist-cache");
String key1 = "key1";
String key2 = "key2";
cache.put(key1, new Integer(1));
cache.put(key2, new Integer(1));
out("Initial value for key 1: " + cache.get(key1));
out("Initial value for key 2: " + cache.get(key2));
// create one TransactionMap per NamedCache
TransactionMap mapTx = CacheFactory.getLocalTransaction(cache);
mapTx.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET);
mapTx.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC);
// gather the cache(s) into a Collection
Collection txnCollection = Collections.singleton(mapTx);
boolean fTxSucceeded = false;
try
{
// start the transaction
mapTx.begin();
int i1 = ((Integer)mapTx.get(key1)).intValue();
mapTx.put(key1, new Integer(++i1));
int i2 = ((Integer)mapTx.get(key2)).intValue();
mapTx.put(key2, new Integer(++i2));
// commit the changes
fTxSucceeded = CacheFactory.commitTransactionCollection(txnCollection, 1);
}
catch (Throwable t)
{
// rollback
CacheFactory.rollbackTransactionCollection(txnCollection);
}
int v1 = ((Integer) cache.get(key1)).intValue();
int v2 = ((Integer) cache.get(key2)).intValue();
out("Transaction " + (fTxSucceeded ? "succeeded" : "did not succeed"));
out("Updated value for key 1: " + v1);
out("Updated value for key 2: " + v2);
azzert(v1 == 2, "Expected value for key1 == 2");
azzert(v2 == 2, "Expected value for key2 == 2");
}
}
CacheFactory APIには、TransactionMapインスタンスのコレクションをコミットおよびロールバックするためのヘルパー・メソッドがあります。commitメソッドは、従来の2フェーズ・コミット(2PC)プロトコルを使用します。ただし、他の2PC実装と同様、第2フェーズ(commit)で一方のリソースのコミットが失敗すると、トランザクションは部分的にコミットされた状態になる場合があります。従来の2PC実装とは異なり、Coherenceでは、所定のサーバーがコミット・フェーズにおいてインダウト状態にならないことが保証されていますが、その他の障害が発生する可能性があります(たとえば、ライトスルー・キャッシュによって永続的な障害が発生する場合がある)。また、トランザクション・ログはクライアントにのみ保存されるため、コミット・フェーズにおいてクライアント側で障害が発生すると、部分的コミットになる場合があります。コミット・フェーズは非ブロック化モードであるため(必要なロックはすべて、コミット・フェーズの開始前に獲得される)、コミット・フェーズは準備フェーズよりもかなり短時間(通常は数ミリ秒程度)で実行されます。そのため、通常のワークロードでデータが公開される時間は、ゼロではないにしても、最小限に抑えられます。
トランザクションに参加するキャッシュに登録されているMapListenersは、各項目がコミットされるたびに、MapEventを受信します。イベントが、トランザクション内の順序どおりに起動する保証はありません。また、トランザクションが1つの項目を複数回更新する場合、その項目の最終的な状態を反映した1つのイベントのみがディスパッチされます。
Coherenceには、Java EEコンテナでのトランザクション管理に使用できる、JCA 1.0準拠のリソース・アダプタが用意されています。リソース・アダプタ・アーカイブ(RAR)ファイルにパッケージ化されており、JCA 1.0と互換性のあるすべてのJava EEコンテナにデプロイできます。デプロイした後は、JTAを使用してトランザクションを実行できます。
例2-3 JCAコンテナ用の構成
String key = "key";
Context ctx = new InitialContext();
UserTransaction tx = null;
try
{
// the transaction manager from container
tx = (UserTransaction) ctx.lookup("java:comp/UserTransaction");
tx.begin();
// the try-catch-finally block below is the block of code
// that could be on an EJB and therefore automatically within
// a transactional context
CacheAdapter adapter = null;
try
{
adapter = new CacheAdapter(ctx, "tangosol.coherenceTx",
CacheAdapter.CONCUR_OPTIMISTIC,
CacheAdapter.TRANSACTION_GET_COMMITTED, 0);
NamedCache cache = adapter.getNamedCache("dist-test",
getClass().getClassLoader());
int n = ((Integer)cache.get(key)).intValue();
cache.put(key, new Integer(++n));
}
catch (Throwable t)
{
String sMsg = "Failed to connect: " + t;
System.err.println(sMsg);
t.printStackTrace(System.err);
}
finally
{
try
{
adapter.close();
}
catch (Throwable ex)
{
System.err.println("SHOULD NOT HAPPEN: " + ex);
}
}
}
finally
{
try
{
tx.commit();
}
catch (Throwable t)
{
String sMsg = "Failed to commit: " + t;
System.err.println(sMsg);
}
}
Coherenceは、最後のリソースとしてXAトランザクションに参加できます。この機能は、ほとんどのトランザクション・マネージャでサポートされており、最後のリソース・コミットや最後の参加者など、様々な名前で知られています。このシナリオでトランザクションを完了するには、次の手順を実行します。
すべてのXAリソースでprepareをコールします。
Coherenceトランザクションでcommitをコールします。
正常にコミットされると、トランザクションのその他のXA参加者がcommitをコールします。
この方法の詳細は、XAの最後のリソース構成に関するトランザクション・マネージャのドキュメントを参照してください。
NamedCacheのInvocableMapスーパーインタフェースは、キャッシュ内の処理コードをロックなしで同時実行できます。この処理は、EntryProcessorによって実行されます。より汎用的なTransactionMapおよびConcurrentMapといった明示的ロックAPIと比べて柔軟性は劣るものの、EntryProcessorsでは、データの信頼性を損なうことなく、最高レベルの効率性が得られます。
EntryProcessorsは、処理中のエントリに対して暗黙的な低レベルのロックをかけるため、エンド・ユーザーは、EntryProcessorに処理コードを配置する際に並列処理制御に煩わされずに済みます。これは、ConcurrentMapで提供されている明示的なlock(key)機能とは異なります。
Caching Editionで動作しているレプリケーション・キャッシュやパーティション・キャッシュでは、開始クライアントでローカルに実行されます。Enterprise Edition以上で動作しているパーティション・キャッシュでは、データのプライマリ・ストレージを提供するノードで実行されます。
InvocableMapでは、次の3つの方法でEntryProcessorを起動できます。
特定のキーでEntryProcessorを起動します。EntryProcessorを起動するために、キャッシュ内にキーを置いておく必要はありません。
キーのコレクションでEntryProcessorを起動します。
任意のFilterでEntryProcessorを起動します。この場合、Filterがキャッシュ・エントリに対して実行されます。Filter基準に一致するエントリごとに、EntryProcessorが実行されます。Filterの詳細は、第6章「キャッシュの問合せ」を参照してください。
Enterprise Edition以上で動作しているパーティション・キャッシュでは、EntryProcessorsは、クラスタ全体で並列実行されます(個々のエントリを所有するノード上で)。これは、クライアントで影響を受けるキーをすべてロックし、キャッシュから必要なデータをすべて取得した後、データを処理してキャッシュにデータを戻してからキーをロック解除するのに比べて大きな利点があります。この処理は、1台のマシンで順次実行されるかわりに、複数のマシンで並列実行されるため、ロックの獲得や解放に伴うネットワークのオーバーヘッドがなくなります。
|
注意: 各クラスタ・ノードのクラスパスでEntryProcessorクラスを使用できる必要があります。 |
高レベルの並列処理制御のサンプルは次のとおりです。ネットワーク・アクセスが必要なコードにはコメントが付いています。
例2-4 EntryProcessorを使用しない並列処理制御
final NamedCache cache = CacheFactory.getCache("dist-test");
final String key = "key";
cache.put(key, new Integer(1));
// begin processing
// *requires network access*
if (cache.lock(key, 0))
{
try
{
// *requires network access*
Integer i = (Integer) cache.get(key);
// *requires network access*
cache.put(key, new Integer(i.intValue() + 1));
}
finally
{
// *requires network access*
cache.unlock(key);
}
}
// end processing
次に、EntryProcessorを使用した同等の方法を示します。ここでも、ネットワーク・アクセスにはコメントが付いています。
例2-5 EntryProcessorを使用する並列処理制御
final NamedCache cache = CacheFactory.getCache("dist-test");
final String key = "key";
cache.put(key, new Integer(1));
// begin processing
// *requires network access*
cache.invoke(key, new MyCounterProcessor());
// end processing
...
public static class MyCounterProcessor
extends AbstractProcessor
{
// this is executed on the node that owns the data,
// no network access required
public Object process(InvocableMap.Entry entry)
{
Integer i = (Integer) entry.getValue();
entry.setValue(new Integer(i.intValue() + 1));
return null;
}
}
EntryProcessorsは個別に自動実行されますが、InvocableMap.invokeAll()を使用して複数のEntryProcessorを起動しても、1つのアトミック単位としては実行されません。個々のEntryProcessorが完了すると、他のEntryProcessorsが実行中であっても、キャッシュに対して行われたすべての更新がただちに表示されます。さらに、EntryProcessorでキャッチされない例外によって、他のEntryProcessorの実行が妨げられることはありません。EntryProcessorの実行中に、あるエントリの1次ノードに障害が発生した場合は、かわりにバックアップ・ノードが実行します。ただし、EntryProcessorの完了後にノードに障害が発生した場合、そのEntryProcessorはバックアップでは起動されません。
一般的に、EntryProcessorsは短時間で実行する必要があります。EntryProcessorsを長時間実行しているアプリケーションでは、分散サービスのスレッド・プール・サイズを増やして、分散サービスが実行するその他の操作が、長時間実行されているEntryProcessorによってブロックされないようにする必要があります。分散サービスのスレッド・プールの詳細は、「DistributedCacheサービスのパラメータ」を参照してください。
Coherenceには、ほとんどの状況で使用可能な複数のEntryProcessor実装が用意されています。これらのEntryProcessorsの詳細と、並列データ処理の追加情報は、「データ・グリッドの提供」を参照してください。
Coherenceキャッシュ内のデータベースでライトビハインドとライトスルーを使用する場合は、トランザクション動作を考慮する必要があります。ライトスルーを有効にすると、データベースに項目が正常に保存されるときにputが成功します。そうでない場合は、CacheStoreで発生した例外がクライアントに再度スローされます(注意: この動作を有効にするには、<rollback-cachestore-failures>をtrueに設定します。詳細は、「read-write-backing-map-scheme」を参照してください)。これは、一度に1つのキャッシュ項目を更新する場合にのみ当てはまります。一度に2つのキャッシュ項目を更新すると、それぞれのCacheStore操作は個別のデータベース・トランザクションになります。この制限は、Coherenceの将来のリリースで解決される予定です。
ライトビハインド・キャッシュを使用すると、スループットとパフォーマンスが大きく向上します。ただし、データベースへの書込みは、キャッシュが更新された後に実行されます。したがって、データベースへの書込みが失敗しないように注意する必要があります。ライトビハインドは、次のようなアプリケーションでのみ使用してください。
データ制約が、データベースではなくアプリケーションによって管理されている
それ以外にデータベースを更新するアプリケーションがない
キャッシュ・ストアの詳細は、「リードスルー、ライトスルー、ライトビハインド・キャッシュおよびリフレッシュアヘッド」を参照してください。
トランザクションにおいて、キャッシュ・エントリに対する複数の更新をデータベースに書き込む必要がある場合は、データベースとキャッシュの更新をクライアントが担当するキャッシュアサイド・パターンを実装するほうがよいことがあります。データソースからのキャッシュ・ミスのロードにCacheLoaderを使用することは、依然として可能です。