Coherenceには、ロック、トランザクションおよびデータへの同時アクセスをサポートする様々なオプションがあります。
Coherenceには、データへの同時アクセスを管理する様々なオプションがあります。
表2-1 Coherenceの同時アクセスのオプション
オプション名 | 説明 |
---|---|
|
|
|
|
|
Java EEコンテナでのトランザクション管理について、CoherenceではJCAリソース・アダプタが用意されており、JTAを使用してトランザクションを管理できます。現時点では、XAトランザクションはサポートされていませんが、最後のリソースとしてXAトランザクションに参加できます。 |
|
Coherenceでは、 |
|
ローカル(XA以外)のデータ・リソースを含むキャッシュを維持する際のガイドラインです。 |
標準的なNamedCache
インタフェースは、基本的なロック・メソッドを含むConcurrentMap
インタフェースを拡張します。ロック操作は、エントリ・レベルで、NamedCache
内の特定のキーに対してロックをリクエストすることで適用されます。
例2-1 キャッシュでのロック操作の適用
... NamedCache cache = CacheFactory.getCache("dist-cache"); Object key = "example_key"; cache.lock(key, -1); try { Object value = cache.get(key); // application logic cache.put(key, value); } finally { // Always unlock in a "finally" block // to ensure that uncaught exceptions // don't leave data locked cache.unlock(key); } ...
Coherenceのロック機能では、Javaのsynchronized
キーワードおよびC#のlock
キーワードと同様、ブロック・ロックのみをロックします。複数のスレッドが共同で、ロックを適切に使用してデータへのアクセスを調整する必要があります。スレッドが、ある項目のキーをロックしていても、別のスレッドはロックせずにその項目を読み取ることができます。
ロックはサーバー障害の影響を受けず、バックアップ・サーバーにフェイルオーバーします。ロック所有者(クライアント)に障害が発生すると、ロックはただちに解除されます。
ロックの動作は、リクエストされたタイムアウトとキャッシュのタイプによって異なります。タイムアウトが-1の場合はロックを取得できるまで無限にブロックされ、0の場合はただちに返され、0より大きい値の場合は指定した時間(ミリ秒単位)待機してからタイムアウトします。コール元が実際にロックを取得したことを確認するには、返されたブール値を調べる必要があります。詳細は、ConcurrentMap.lock()を参照してください。タイムアウト値がlock()
に渡されない場合、デフォルトは0になります。レプリケーション・キャッシュでは、ConcurrentMap.LOCK_ALL
をキーとして使用してキャッシュ全体をロックできますが、通常はお薦めしません。この操作は、パーティション・キャッシュではサポートされていません。
レプリケーション・キャッシュとパーティション・キャッシュの両方で、getはロックされているキーで許可されます。putは、レプリケーション・キャッシュではブロックされますが、パーティション・キャッシュではブロックされません。ロックが行われている場合、ロックの解放はコール元の責任です(lease-granularity
の構成によって、同じスレッド内または同じクラスタ・ノード内で実行される)。このような理由から、ロックは常にfinally句(または同等のもの)で解放する必要があります。そうでない場合、未処理の例外があると無限にロックされることがあります。lease-granularity
構成の詳細は、「DistributedCacheサービスのパラメータ」を参照してください。
TransactionMap
を使用すると、トランザクションにおいて、1つのCoherenceキャッシュで複数の項目を更新できます。TransactionMap
を使用してトランザクションを実行するには、Java Transaction API(JTA)ライブラリがクラスパスに存在する必要があります。TransactionMap
は、CacheFactory
を使用して作成します。
NamedCache cache = CacheFactory.getCache("dist-cache"); TransactionMap tmap = CacheFactory.getLocalTransaction(cache);
TransactionMap
を使用する前に、並行性レベルと分離レベルを、必要なレベルに設定する必要があります。並行性の高い、短時間で実行されるほとんどのトランザクションでは、並行性レベルをCONCUR_PESSIMISTIC
、分離レベルをTRANSACTION_REPEATABLE_GET
にする必要があります。それほど頻繁に発生しない長時間のトランザクションの場合は、デフォルト設定をCONCUR_OPTIMISTIC
とTRANSACTION_REPEATABLE_GET
にすることをお薦めします。並行性レベルをCONCUR_PESSIMISTIC
に、分離レベルをTRANSACTION_SERIALIZABLE
にすると、キャッシュ全体がロックされます。前述のとおり、パーティション・キャッシュではキャッシュ全体をロックできないため、このモードはパーティション・キャッシュでは機能しません(一般的に、信頼性の高いトランザクション処理では、この分離レベルは不要です)。キャッシュに対する問合せ(keySet(Filter
)
またはentrySet(Filter)
のコール)は、常にREAD_COMMITTED
の分離レベルで実行されます。並行性レベルおよび分離レベルの詳細は、TransactionMap
APIを参照してください。
分離レベルと並行性レベルを設定する方法は次のとおりです。
tmap.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET); tmap.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC);
ここで、TransactionMap
を使用して、トランザクションでキャッシュを更新できます。
例2-2 トランザクションでのキャッシュの更新
import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; import com.tangosol.util.Base; import com.tangosol.util.TransactionMap; import java.util.Collection; import java.util.Collections; public class TxMapSample extends Base { public static void main(String[] args) { // populate the cache NamedCache cache = CacheFactory.getCache("dist-cache"); String key1 = "key1"; String key2 = "key2"; cache.put(key1, new Integer(1)); cache.put(key2, new Integer(1)); out("Initial value for key 1: " + cache.get(key1)); out("Initial value for key 2: " + cache.get(key2)); // create one TransactionMap per NamedCache TransactionMap mapTx = CacheFactory.getLocalTransaction(cache); mapTx.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET); mapTx.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC); // gather the cache(s) into a Collection Collection txnCollection = Collections.singleton(mapTx); boolean fTxSucceeded = false; try { // start the transaction mapTx.begin(); int i1 = ((Integer)mapTx.get(key1)).intValue(); mapTx.put(key1, new Integer(++i1)); int i2 = ((Integer)mapTx.get(key2)).intValue(); mapTx.put(key2, new Integer(++i2)); // commit the changes fTxSucceeded = CacheFactory.commitTransactionCollection(txnCollection, 1); } catch (Throwable t) { // rollback CacheFactory.rollbackTransactionCollection(txnCollection); } int v1 = ((Integer) cache.get(key1)).intValue(); int v2 = ((Integer) cache.get(key2)).intValue(); out("Transaction " + (fTxSucceeded ? "succeeded" : "did not succeed")); out("Updated value for key 1: " + v1); out("Updated value for key 2: " + v2); azzert(v1 == 2, "Expected value for key1 == 2"); azzert(v2 == 2, "Expected value for key2 == 2"); } }
CacheFactory
APIには、TransactionMap
インスタンスのコレクションをコミットおよびロールバックするためのヘルパー・メソッドがあります。commit
メソッドは、従来の2フェーズ・コミット(2PC)プロトコルを使用します。ただし、他の2PC実装と同様、第2フェーズ(commit)で一方のリソースのコミットが失敗すると、トランザクションは部分的にコミットされた状態になる場合があります。従来の2PC実装とは異なり、Coherenceでは、所定のサーバーがコミット・フェーズにおいてインダウト状態にならないことが保証されていますが、その他の障害が発生する可能性があります(たとえば、ライトスルー・キャッシュによって永続的な障害が発生する場合がある)。また、トランザクション・ログはクライアントにのみ保存されるため、コミット・フェーズにおいてクライアント側で障害が発生すると、部分的コミットになる場合があります。コミット・フェーズは非ブロック化モードであるため(必要なロックはすべて、コミット・フェーズの開始前に獲得される)、コミット・フェーズは準備フェーズよりもかなり短時間(通常は数ミリ秒程度)で実行されます。そのため、通常のワークロードでデータが公開される時間は、ゼロではないにしても、最小限に抑えられます。
トランザクションに参加するキャッシュに登録されているMapListeners
は、各項目がコミットされるたびに、MapEvent
を受信します。イベントが、トランザクション内の順序どおりに起動する保証はありません。また、トランザクションが1つの項目を複数回更新する場合、その項目の最終的な状態を反映した1つのイベントのみがディスパッチされます。
Coherenceには、Java EEコンテナでのトランザクション管理に使用できる、JCA 1.0準拠のリソース・アダプタが用意されています。リソース・アダプタ・アーカイブ(RAR)ファイルにパッケージ化されており、JCA 1.0と互換性のあるすべてのJava EEコンテナにデプロイできます。デプロイした後は、JTAを使用してトランザクションを実行できます。
例2-3 JCAコンテナ用の構成
String key = "key"; Context ctx = new InitialContext(); UserTransaction tx = null; try { // the transaction manager from container tx = (UserTransaction) ctx.lookup("java:comp/UserTransaction"); tx.begin(); // the try-catch-finally block below is the block of code // that could be on an EJB and therefore automatically within // a transactional context CacheAdapter adapter = null; try { adapter = new CacheAdapter(ctx, "tangosol.coherenceTx", CacheAdapter.CONCUR_OPTIMISTIC, CacheAdapter.TRANSACTION_GET_COMMITTED, 0); NamedCache cache = adapter.getNamedCache("dist-test", getClass().getClassLoader()); int n = ((Integer)cache.get(key)).intValue(); cache.put(key, new Integer(++n)); } catch (Throwable t) { String sMsg = "Failed to connect: " + t; System.err.println(sMsg); t.printStackTrace(System.err); } finally { try { adapter.close(); } catch (Throwable ex) { System.err.println("SHOULD NOT HAPPEN: " + ex); } } } finally { try { tx.commit(); } catch (Throwable t) { String sMsg = "Failed to commit: " + t; System.err.println(sMsg); } }
Coherenceは、最後のリソースとしてXAトランザクションに参加できます。この機能は、ほとんどのトランザクション・マネージャでサポートされており、最後のリソース・コミットや最後の参加者など、様々な名前で知られています。このシナリオでトランザクションを完了するには、次の手順を実行します。
すべてのXAリソースでprepareをコールします。
Coherenceトランザクションでcommitをコールします。
正常にコミットされると、トランザクションのその他のXA参加者がcommitをコールします。
この方法の詳細は、XAの最後のリソース構成に関するトランザクション・マネージャのドキュメントを参照してください。
NamedCache
のInvocableMap
スーパーインタフェースは、キャッシュ内の処理コードをロックなしで同時実行できます。この処理は、EntryProcessor
によって実行されます。より汎用的なTransactionMap
およびConcurrentMap
といった明示的ロックAPIと比べて柔軟性は劣るものの、EntryProcessors
では、データの信頼性を損なうことなく、最高レベルの効率性が得られます。
EntryProcessors
は、処理中のエントリに対して暗黙的な低レベルのロックをかけるため、エンド・ユーザーは、EntryProcessor
に処理コードを配置する際に並列処理制御に煩わされずに済みます。これは、ConcurrentMapで提供されている明示的なlock(key)
機能とは異なります。
Caching Editionで動作しているレプリケーション・キャッシュやパーティション・キャッシュでは、開始クライアントでローカルに実行されます。Enterprise Edition以上で動作しているパーティション・キャッシュでは、データのプライマリ・ストレージを提供するノードで実行されます。
InvocableMap
では、次の3つの方法でEntryProcessorを起動できます。
特定のキーでEntryProcessor
を起動します。EntryProcessorを起動するために、キャッシュ内にキーを置いておく必要はありません。
キーのコレクションでEntryProcessor
を起動します。
任意のFilter
でEntryProcessor
を起動します。この場合、Filter
がキャッシュ・エントリに対して実行されます。Filter
基準に一致するエントリごとに、EntryProcessor
が実行されます。Filterの詳細は、第6章「キャッシュの問合せ」を参照してください。
Enterprise Edition以上で動作しているパーティション・キャッシュでは、EntryProcessors
は、クラスタ全体で並列実行されます(個々のエントリを所有するノード上で)。これは、クライアントで影響を受けるキーをすべてロックし、キャッシュから必要なデータをすべて取得した後、データを処理してキャッシュにデータを戻してからキーをロック解除するのに比べて大きな利点があります。この処理は、1台のマシンで順次実行されるかわりに、複数のマシンで並列実行されるため、ロックの獲得や解放に伴うネットワークのオーバーヘッドがなくなります。
注意: 各クラスタ・ノードのクラスパスでEntryProcessorクラスを使用できる必要があります。 |
高レベルの並列処理制御のサンプルは次のとおりです。ネットワーク・アクセスが必要なコードにはコメントが付いています。
例2-4 EntryProcessorを使用しない並列処理制御
final NamedCache cache = CacheFactory.getCache("dist-test"); final String key = "key"; cache.put(key, new Integer(1)); // begin processing // *requires network access* if (cache.lock(key, 0)) { try { // *requires network access* Integer i = (Integer) cache.get(key); // *requires network access* cache.put(key, new Integer(i.intValue() + 1)); } finally { // *requires network access* cache.unlock(key); } } // end processing
次に、EntryProcessorを使用した同等の方法を示します。ここでも、ネットワーク・アクセスにはコメントが付いています。
例2-5 EntryProcessorを使用する並列処理制御
final NamedCache cache = CacheFactory.getCache("dist-test"); final String key = "key"; cache.put(key, new Integer(1)); // begin processing // *requires network access* cache.invoke(key, new MyCounterProcessor()); // end processing ... public static class MyCounterProcessor extends AbstractProcessor { // this is executed on the node that owns the data, // no network access required public Object process(InvocableMap.Entry entry) { Integer i = (Integer) entry.getValue(); entry.setValue(new Integer(i.intValue() + 1)); return null; } }
EntryProcessors
は個別に自動実行されますが、InvocableMap.invokeAll()
を使用して複数のEntryProcessor
を起動しても、1つのアトミック単位としては実行されません。個々のEntryProcessor
が完了すると、他のEntryProcessors
が実行中であっても、キャッシュに対して行われたすべての更新がただちに表示されます。さらに、EntryProcessor
でキャッチされない例外によって、他のEntryProcessor
の実行が妨げられることはありません。EntryProcessor
の実行中に、あるエントリの1次ノードに障害が発生した場合は、かわりにバックアップ・ノードが実行します。ただし、EntryProcessor
の完了後にノードに障害が発生した場合、そのEntryProcessor
はバックアップでは起動されません。
一般的に、EntryProcessors
は短時間で実行する必要があります。EntryProcessors
を長時間実行しているアプリケーションでは、分散サービスのスレッド・プール・サイズを増やして、分散サービスが実行するその他の操作が、長時間実行されているEntryProcessor
によってブロックされないようにする必要があります。分散サービスのスレッド・プールの詳細は、「DistributedCacheサービスのパラメータ」を参照してください。
Coherenceには、ほとんどの状況で使用可能な複数のEntryProcessor
実装が用意されています。これらのEntryProcessors
の詳細と、並列データ処理の追加情報は、「データ・グリッドの提供」を参照してください。
Coherenceキャッシュ内のデータベースでライトビハインドとライトスルーを使用する場合は、トランザクション動作を考慮する必要があります。ライトスルーを有効にすると、データベースに項目が正常に保存されるときにputが成功します。そうでない場合は、CacheStoreで発生した例外がクライアントに再度スローされます(注意: この動作を有効にするには、<rollback-cachestore-failures>
をtrue
に設定します。詳細は、「read-write-backing-map-scheme」を参照してください)。これは、一度に1つのキャッシュ項目を更新する場合にのみ当てはまります。一度に2つのキャッシュ項目を更新すると、それぞれのCacheStore
操作は個別のデータベース・トランザクションになります。この制限は、Coherenceの将来のリリースで解決される予定です。
ライトビハインド・キャッシュを使用すると、スループットとパフォーマンスが大きく向上します。ただし、データベースへの書込みは、キャッシュが更新された後に実行されます。したがって、データベースへの書込みが失敗しないように注意する必要があります。ライトビハインドは、次のようなアプリケーションでのみ使用してください。
データ制約が、データベースではなくアプリケーションによって管理されている
それ以外にデータベースを更新するアプリケーションがない
キャッシュ・ストアの詳細は、「リードスルー、ライトスルー、ライトビハインド・キャッシュおよびリフレッシュアヘッド」を参照してください。
トランザクションにおいて、キャッシュ・エントリに対する複数の更新をデータベースに書き込む必要がある場合は、データベースとキャッシュの更新をクライアントが担当するキャッシュアサイド・パターンを実装するほうがよいことがあります。データソースからのキャッシュ・ミスのロードにCacheLoaderを使用することは、依然として可能です。