15 データ・ソースのキャッシュ

Coherenceは、データ・ソースをキャッシュし、そのキャッシュを一時的な記録システムにするために一般的に使用されます。

この章の内容は次のとおりです。

データ・ソースのキャッシュの概要

Coherenceでは、データベース、Webサービス、パッケージ化されたアプリケーション、ファイル・システムなど、任意のデータ・ソースの透過的な読取りまたは書込みのキャッシングがサポートされますが、データベースが最も一般的に使用されます。
ここでは、任意のバックエンド・データ・ソースのことを、簡単にデータベースと呼びます。効率的なキャッシュでは、集中的な読取り専用操作および読取り/書込み操作をサポートし、読取り/書込み操作の際には、キャッシュとデータベースは完全に同期化されている必要があります。データ・ソースのキャッシュを実現するために、Coherenceではリードスルー、ライトスルー、リフレッシュアヘッドおよびライトビハインド・キャッシュをサポートします。

ノート:

リードスルー/ライトスルー・キャッシング(および他の類似するキャッシング形態)は、パーティション(分散)キャッシュ(ニア・キャッシュも含む)トポロジで使用することのみを目的としています。ローカル・キャッシュでは、この機能のサブセットがサポートされます。レプリケート・キャッシュとオプティミスティック・キャッシュは使用しないでください。

この項には次のトピックが含まれます:

プラガブル・キャッシュ・ストア

キャッシュ・ストアは、キャッシュと基礎となるデータ・ソースの接続に使用されるアプリケーション固有のアダプタです。キャッシュ・ストア実装では、データ・アクセス・メカニズム(たとえば、Toplink/EclipseLink、JPA、Hibernate、アプリケーション固有のJDBCコール、他のアプリケーション、メインフレーム、他のキャッシュなど)を使用してデータ・ソースにアクセスします。キャッシュ・ストアでは、データ・ソースからの取得データを使用したJavaオブジェクトの作成方法、データ・ソースへのオブジェクトのマッピング方法や書込み方法、およびデータ・ソースからのオブジェクトの削除方法が認識されます。

データ・ソース接続戦略およびデータ・ソース-アプリケーション・オブジェクトのマッピング情報は、データ・ソース・スキーマ、アプリケーション・クラス・レイアウト、および操作環境に固有です。そのため、このマッピング情報は、アプリケーション開発者がキャッシュ・ストアの実装で用意する必要があります。キャッシュ・ストアの実装の作成を参照してください。

事前定義済のキャッシュ・ストア実装

Coherenceには、JPAキャッシュ・ストアの実装がいくつか含まれています。

  • Generic JPA - 任意のJPAプロバイダで使用できるキャッシュ・ストア実装。JPAとCoherenceの併用を参照してください。

  • EclipseLink - EclipseLink JPAプロバイダ用に最適化されたキャッシュ・ストア実装。JPAとCoherenceの併用を参照してください。

  • Hibernate - Hibernate JPAプロバイダ用のキャッシュ・ストア実装。HibernateとCoherenceの統合を参照してください。

リードスルー・キャッシング

アプリケーションから、たとえばkey Xのキャッシュ・エントリが要求されたときに、キャッシュにXがなかった場合、CoherenceはCacheStoreへの委任を自動的に実行し、基礎となるデータソースからXをロードするようにCacheStoreに要求します。データ・ソースにXが存在している場合、CacheStoreはそれをロードしてCoherenceに返し、Coherenceは将来の使用に備えてそれをキャッシュに置き、最終的にXを要求したアプリケーションに返します。これをリードスルー・キャッシングと呼びます。リフレッシュアヘッド・キャッシュ機能を使用すると、(認識されている待機時間を削減することにより)読取りパフォーマンスがさらに向上します。リフレッシュアヘッド・キャッシングを参照してください。

図15-1 リードスルー・キャッシング


「図15-1 リードスルー・キャッシング」の説明

ライトスルー・キャッシング

Coherenceでは、2つの異なる方法でデータ・ソースの更新を処理できます。1つ目の方法は、ライトスルーです。この場合、アプリケーションでキャッシュのデータが更新されると(つまり、put(...)をコールしてキャッシュ・エントリを変更すると)、キャッシュ・ストアを介して基礎となるデータ・ソースにデータが正常に保存されるまで操作は完了しません(つまり、putは返されません)。このままでは、データ・ソースへの書込みにおいて待機時間が生じるため、書込みパフォーマンスはまったく向上しません。書き込みパフォーマンスを向上させるには、ライトビハインド・キャッシュ機能を使用します。ライトビハインド・キャッシュを参照してください。

図15-2 ライトスルー・キャッシング


「図15-2 ライトスルー・キャッシング」の説明

ライトビハインド・キャッシング

ライトビハインドのシナリオでは、構成された遅延(10秒、20分、1日、1週間、またはそれ以上)の経過後に、変更されたキャッシュ・エントリが非同期でデータ・ソースに書き込まれます。これは、キャッシュの挿入と更新にのみ適用されることに注意する必要があります。キャッシュ・エントリの削除は、データ・ソースと同期して実行されます。ライトビハインド・キャッシングでは、データ・ソースで更新する必要のあるデータのライトビハインド・キューが保持されます。アプリケーションでキャッシュのXが更新されると、Xはライトビハインド・キューに追加され(Xがない場合。ある場合は置き換えられる)、指定されたライトビハインド遅延の経過後、CoherenceはCacheStoreをコールして基礎となるデータ・ソースのXを最新の状態に更新します。ライトビハインド遅延は、一連の変更が始まる時点を基準としています。つまり、データ・ソースのデータは、キャッシュのライトビハインド遅延を超えて遅れることはありません。

このため、一度読み取って構成された間隔で書き込む(つまり、更新頻度が少ない)シナリオになります。このアーキテクチャ・タイプには、主に4つの利点があります。

  • 基礎となるデータ・ソースへのデータ書込みを待つ必要がないため、アプリケーションのパフォーマンスが向上します。(データは後で、別の実行スレッドによって書き込まれます。)

  • アプリケーションにおけるデータベース負荷が大幅に削減されます。読取り操作と書込み操作の量が両方とも減るため、データベース負荷も削減されます。他のキャッシング・アプローチと同様、キャッシングによって読取りが削減されます。一般的に、書込みはより負荷の高い操作ですが、多くの場合、その回数が減ります。ライトビハインド間隔の間における同一オブジェクトへの複数の変更操作が結合され、基礎となるデータ・ソースに一度だけ書き込まれるからです(書込み結合)。また、複数のキャッシュ・エントリへの書込みは、CacheStore.storeAll()メソッドを使用して単一のデータベース・トランザクションにまとめることができます(書込み組合せ)。

  • アプリケーションは、データベース障害からある程度保護されます。書込みの失敗時にオブジェクトが書込みキューに再度配置されるように、ライトビハインド機能を構成できるからです。アプリケーションの使用データがCoherenceキャッシュにある場合、データベースを稼働せずに操作を継続できます。これは、Coherenceパーティション・キャッシュを使用すると、容易に実現できます。Coherenceパーティション・キャッシュでは、(ローカル記憶域が有効な)すべての参加クラスタ・ノードでキャッシュ全体をパーティション化できるため、巨大なキャッシュ・メモリーを使用できます。

  • 線形スケーラビリティ: アプリケーションでより多くの同時ユーザー数を処理するには、クラスタ内のノード数を増加するだけで済みます。データベース負荷への影響は、ライトビハインド間隔を長くすることで調整できます。

図15-3 ライトビハインド・キャッシング


「図15-3 ライトビハインド・キャッシング」の説明

ライトビハインドの要件

ライトビハインド・キャッシングは、1つの構成設定を調整するだけで有効化できますが、ライトビハインドを期待どおりに動作させるのはより困難です。特に、アプリケーション設計では、いくつかの設計上の問題に前もって対処しておく必要があります。

データベースの更新がキャッシュ・トランザクションの外側で発生するため、そこにライトビハインド・キャッシングが最も作用します。つまり、キャッシュ・トランザクションはほとんどの場合、データベース・トランザクションが開始される前に完了します。これは、データベース・トランザクションが失敗してはならないことを意味します。これが保証できない場合、ロールバック機能を採用する必要があります。

ライトビハインドはデータベース更新の順序を並べ替えることができるため、参照整合性制約で順不同の更新をできるようにする必要があります。概念的には、これはデータベースをISAMスタイル(更新競合がないことが保証された主キー・ベースのアクセス)の記憶域として使用するようなものです。他のアプリケーションとデータベースを共有すると、新たな困難に直面します。つまり、ライトビハインド・トランザクションが外部の更新と競合しないことを保証する手段がなくなります。これは、ライトビハインドの競合がヒューリスティックに処理されるか、オペレータが手動で調整する必要があることを意味します。

通常は、論理データベース・トランザクションに各キャッシュ・エントリの更新をマッピングすると、最も単純なデータベース・トランザクションが保証されるため理想的です。

ライトビハインドは、(ライトビハインド・キューがディスクに書き込まれるまで)キャッシュを効率的に記録システムで保持するため、ビジネス規則ではクラスタに永続可能な(ディスクへの永続可能ではなく)データおよびトランザクションの記憶域を許可する必要があります。

リフレッシュアヘッド・キャッシング

リフレッシュアヘッドのシナリオでは、キャッシュ・ローダーから最近アクセスされた任意のキャッシュ・エントリを、失効する前に自動的かつ非同期的にリロード(リフレッシュ)するようにキャッシュを構成できます。その結果、アクセス頻度の高いエントリがキャッシュに入ると、失効後にエントリがリロードされるときに、アプリケーションは潜在的に低速なキャッシュ・ストアに対する読取りの影響を受けません。非同期リフレッシュは、有効期限が近いオブジェクトへのアクセスが発生したときにのみトリガーされます。オブジェクトへのアクセスが期限切れ後の場合、キャッシュ・ストアからの読取りが同期的に行われ、その値がリフレッシュされます。

リフレッシュアヘッド期限は、エントリの有効期間の割合として表されます。たとえば、キャッシュ・エントリの有効期間が60秒に設定されており、リフレッシュアヘッド係数が0.5に設定されているとします。キャッシュされたオブジェクトを60秒後にアクセスすると、キャッシュ・ストアから同期読取りが実行され、その値がリフレッシュされます。ただし、30秒を超える60秒未満のエントリに対するリクエストを実行すると、キャッシュの現在の値が返され、キャッシュ・ストアから非同期のリロードがスケジュールされます。ただし、これにより、レスポンスが返される前にキャッシュ全体がリフレッシュされることはありません。リフレッシュはバックグラウンドで実行されます。

リフレッシュアヘッドは、多数のユーザーがオブジェクトにアクセスする場合に有用です。値はキャッシュで最新の状態に維持され、キャッシュ・ストアの大量のリロードに起因する待機時間が回避されます。

リフレッシュアヘッド係数の値は<refresh-ahead-factor>サブ要素により指定されます。read-write-backing-map-schemeを参照してください。リフレッシュアヘッドでは、キャッシュのエントリの有効期間(<expiry-delay>)も設定されていることを前提としています。

例15-1では、ローカル・キャッシュのエントリのリフレッシュアヘッド係数を0.5で、有効期間を20秒で構成しています。エントリが有効期間の10秒以内にアクセスされると、キャッシュ・ストアから非同期にリロードされるようにスケジュールされます。

例15-1 リフレッシュアヘッド係数の指定

<distributed-scheme>
   <scheme-name>categories-cache-all-scheme</scheme-name>
   <service-name>DistributedCache</service-name>
   <backing-map-scheme>

      <read-write-backing-map-scheme>
         <scheme-name>categoriesLoaderScheme</scheme-name>
         <internal-cache-scheme>
            <local-scheme>
               <scheme-ref>categories-eviction</scheme-ref>
            </local-scheme>
         </internal-cache-scheme>

         <cachestore-scheme>
            <class-scheme>
               <class-name>
                  com.demo.cache.coherence.categories.CategoryCacheLoader
               </class-name>
            </class-scheme>
         </cachestore-scheme>
         <refresh-ahead-factor>0.5</refresh-ahead-factor>
      </read-write-backing-map-scheme>
   </backing-map-scheme>
   <autostart>true</autostart>
</distributed-scheme>
<local-scheme>
   <scheme-name>categories-eviction</scheme-name>
   <expiry-delay>20s</expiry-delay>
</local-scheme> 

HotCacheとのデータベース更新の同期

Oracle Coherence GoldenGate HotCache (HotCache)の統合により、アプリケーション外部のソースからデータベースへの変更をCoherenceキャッシュ内のオブジェクトに伝播できます。HotCacheの統合により、失効している可能性があったり、期限切れのキャッシュ・データをアプリケーションが使用しないことが保証されます。HotCacheには、失効したデータのみを処理する効率的なプッシュ・モデルが採用されています。データベースに変更が行われる際にデータがプッシュされるため、待機時間の短さが保証されます。HotCacheの統合には、CoherenceとGoldenGateの両方をインストールする必要があります。Oracle Coherence GoldenGate HotCacheとの統合を参照してください。

データ・ソースの非ブロッキング

Coherenceは、リードスルー、ライトスルー、ライトビハインド、リフレッシュアヘッドなど、多数の既存の戦略を使用して、基礎となるデータ・ソースと統合する手段を提供します。データ・ソースのキャッシュを参照してください。

Coherence 14.1.1.2206.1では、非ブロッキングAPIを提供するデータ・ソースと統合するためのNonBlockingEntryStoreインタフェースが導入されています。Interface NonBlockingEntryStoreを参照してください。この戦略は、元の変更とは非同期であるため、ライトビハインドに類似しています。ただし、ストアへのコールを延期するためにキューを必要とせず、ストアへのインテントをすぐに実装者に渡します。実装者は、データ・ソースの非ブロッキングAPIを即座にコールし、成功または失敗時に、Futureは、その情報をそれぞれonNextまたはonErrorを介して指定されたStoreObserverに渡すことができます。NonBlockingEntryStoreの主なメソッドは次のとおりです。
public interface NonBlockingEntryStore<K, V>
    {
    public void load(BinaryEntry<K, V> binEntry, StoreObserver<K, V> observer);

    public void store(BinaryEntry<K, V> binEntry, StoreObserver<K, V> observer);

    public void erase(BinaryEntry<K, V> binEntry);
    }

CacheStore (Interface CacheStoreを参照)およびBinaryEntryStore (Interface BinaryEntryStoreを参照)インタフェースには、他の同様のメソッドがあります。ただし、NonBlockingEntryStoreインタフェースでは、コールは非ブロッキングであるため、Coherenceは制御が返されたときに操作が完了することを想定していません。実装者がCoherenceに操作の完了を通知できるようにするために、StoreObserverが提供されており、成功または失敗時にコールする必要があります。これは、Coherenceが操作の結果を処理するのに役立ちます。

失敗してプライマリ・パーティションがリストアされるときのライトビハインド戦略と同様に、Coherenceは、成功またはエラーの通知を受信しなかったエントリに対してNonBlockingEntryStore.storeをコールするという点を指摘しておきます。これにより、少なくとも1回はセマンティックが提供され、必要に応じて、実装者は非ブロッキング・データ・ソースをコールできます。

次の図は、初期リクエストから記憶域が有効なノードでのNonBlockingEntryStoreの呼出しまでのフローを示しています。

図15-4 get()操作によるロードの場合


get()操作によるロードの場合。

フローの説明:
  • 番号1 アプリケーションは、まだキャッシュに存在しないエントリAに対してget()をコールします。
  • 番号2 リクエストは、エントリを所有する記憶域メンバー(この例ではJVM2)に移動します。エントリの所有権(つまりパーティションの所有権)は、キーのraw (またはバイナリ)値と、関連付けられたパーティション・サービスで構成されたパーティションの数に基づいてアルゴリズムで決定されます。リクエストはまだアクセスされていないか有効期限が切れているため、ミスが発生し、コールは構成エントリ・ストアにリレーされます。
  • 番号3 NonBlockingEntryStoreを実装するエントリ・ストアのload()操作がコールされます。カスタム・ロジックには、初期値がnullのBinaryEntry (Interface BinaryEntryStoreを参照)およびStoreObserver (Interface StoreObserverを参照)が提供されます。実装者は、キャッシュ・エントリを移入するために必要なデータストア操作を実行します。
  • 番号4 基礎となるデータ・ソースに対する操作が完了すると、実装ではobserver.onNextまたはobserver.onErrorをコールして、値が正常にロードされたかどうかを判断します。実装者は、onNextをコールする前に、setValueまたはupdateBinaryValueを使用してBinaryEntryを更新します。これにより、Coherenceではプライマリ・パーティション所有者(JVM2)にデータが挿入され、それに応じてバックアップされるようになります。
  • 番号5 プライマリ・パーティション所有者は、バックアップの目的で、クラスタ内の別の記憶域メンバーに値を送信します。
  • 番号6 エントリ値は、一時参照が保持されているコール元アプリケーションに返されます。データ・ソース操作は非同期で実行でき、load()へのコールは完了が返されるまで待機する必要はありませんが、get()の呼出しはコール元の観点から同期されます。

図15-5 put()操作の場合

()操作の場合。
フローの説明:
  • 番号1 アプリケーションは、値Aを持つエントリAに対してput()をコールします。
  • 番号2 エントリは所有メンバーに格納されます。
  • 番号3 キャッシュはNonBlockingEntryStoreで構成されているため、store()操作がコールされます。store()には、BinaryEntryおよびStoreObserverが提供されます。実装者は、キャッシュ・エントリをデータストアに保存するために必要なデータストア操作を実行します。
  • 番号4 この時点で、NonBlockingEntryStorestore()コールは返され、put()はコール元のアプリケーションに制御を戻します。
  • 番号5 データストアは、キャッシュ・エントリをデータストアに保存するために必要なデータストア操作を非同期に実行し、通常の操作にはobserver.onNext()メソッド(または問題の場合はobserver.onError())をコールします。必要に応じて(たとえば、BinaryEntryの値が更新された場合)、値はキャッシュに戻されます。
  • 番号6 この値は、保管のためにバックアップ所有メンバーに送信されます。

getAll()

getAll()は、get()と同等に機能しますが、getAll()はエントリのセットを処理します。これにより、実装者はデータソースに対するバッチ操作(マルチエントリ)を最適化できるため、データソースとの通信オーバーヘッドが軽減されます。関連付けられたエントリが正常に書き込まれた後、実装者はStoreObserver.onNextをコールして関連するエントリを渡す必要があります(この特定のエントリの処理中にエラーが発生した場合はonError()です)。

ノート:

Coherenceでは、終了する前にすべてのエントリが処理されることを想定しています。

putAll()

また、putAll()は、エントリのセットを除き、put()と同等に機能します。ここでも同じことが想定されます。すべてのエントリがonNext()/onError()を使用して処理されるか、onComplete()を使用して操作を中断できます。putAll()での違いは、コール元は完了を待機しないため、例外はスローされず、ログに出力されることです。

アプリケーションの観点からは、remove()操作はCacheStoreまたはBinaryEntryStoreと同様に機能します。

このモデルは、非ブロッキング・データ・ストアと統合する自然な方法を提供するだけでなく、パフォーマンスとスケーラビリティの観点から、そのようなストアの利点を活用します。

NonBlockingEntryStoreについて

特定のデータ・ソース・ライブラリには、コール元が別の処理を行う前に結果が返されるのを待機する必要がないAPIがあります。たとえば、HTTPコールを行うと、データを格納するリクエストが送信されてからレスポンスが返されるまでの待機時間が比較的長くなる可能性があります。非ブロッキングAPIを実装することで、コール元は実際のストア操作が完了するのを待たずに、他の作業をすぐに行うことができます。

NonBlockingEntryStoreインタフェースを実装することで、ストア実装者は非ブロッキングAPIをより自然な方法で使用できます。

NonBlockingEntryStoreは、プラガブル・データ・ストアのコンテキストで提供されています。これを使用するには、実装クラスを指定および構成する必要があります。このクラスは、ReadWriteBackingMapを使用してデータ・ソースからデータをロード、格納または削除します。このバッキング・マップには、次の2つの要素があります。
  • データをキャッシュするための内部マップ。
  • データベースとやりとりするためのデータ・ソース・アクセス部分。

NonBlockingEntryStoreインタフェースには、loadstoreまたはerase操作を表すBinaryEntryが用意されています。これにより、実装者は必要に応じてデシリアライズを回避できます。これはBinaryEntryStoreに類似しています。キーや値全体をデシリアライズするのではなく、rawバイナリがダウンストリーム・システムに格納されている場合や、バイナリをナビゲートして関連する部分を抽出できる場合は、デシリアライズを回避できます。

ノート:

getKeygetValueおよびgetOriginalValueは、最初のコールのデシリアライズを発生させます。
データ・ソースの非ブロッキングの使用

この項では、この機能を使用するために必要なタスクのサマリーを示します。開始するには、Cache StoresのNon Blocking Entry Store Exampleを参照してください。

NonBlockingEntryStoreの構成
非ブロッキング・キャッシュ・ストアの実装を指定するには、次に示すように、read-write-backing-map-scheme内に実装クラス名を指定します。
...
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
   coherence-cache-config.xsd">
    <cache-mapping>
...
        <cache-name>myCache</cache-name>
        <scheme-name>distributed-rwbm-nonblocking</scheme-name>
...
    </cache-mapping>

    <distributed-scheme>
...
        <scheme-name>distributed-rwbm-nonblocking</scheme-name>
        <backing-map-scheme>
            <read-write-backing-map-scheme>

                <cachestore-scheme>
                    <class-scheme>
                        <class-name>com.company.NonBlockingStoreImpl</class-name>
                    </class-scheme>
                </cachestore-scheme>

            </read-write-backing-map-scheme>
        </backing-map-scheme>
        <autostart>true</autostart>
...
    </distributed-scheme>

</cache-config>
NonBlockingEntryStoreの実装

構成後、NonBlockingEntryStoreインタフェースを実装するクラスを、記憶域が有効なメンバーのクラスパスに追加します。次のサンプル・コードを参照してください。

クラスを配置すると、次の同等性が確立されます。
  • get() - 呼出し → load()

    ノート:

    データがすでにキャッシュ内にある場合、load()はコールされません。また、get()をコールすると、onNext()/onError()がリクエストを完了するまで待機してから返されます。
  • getAll() - 呼出し → loadAll()
  • put() - 呼出し → store()
  • putAll() - 呼出し → storeAll()
  • remove() - 呼出し → erase()
  • removeAll() - 呼出し → eraseAll()

次のコード・スニペットは、リアクティブAPIを使用したデータ・ソースへのアクセスを示しています。

...
/**
 * An example NonBlockingEntryStore implementation
 */
public class ExampleNonBlockingEntryStore<K, V>
    {
    @Override
    public void load(BinaryEntry<K, V> binEntry, StoreObserver<K, V> observer)
        {
        K key = binEntry.getKey();

        Flux.from(getConnection())
                .flatMap(connection -> connection.createStatement(LOAD_STMT)
                        .bind("$1", key)
                        .execute())
                .flatMap(result ->
                         result.map((row, meta) ->
                                 {
                                 returnnew Student(
                                         (String) row.get("name"),
                                         (String) row.get("address"));
                                 }
                         ))
                .collectList()
                .doOnNext(s ->
                          {
                          binEntry.setValue((V) s.get(0));
                          observer.onNext(binEntry);
                          })
                .doOnError(t ->
                           {
                           if (t instanceof IndexOutOfBoundsException)
                               {
                               CacheFactory.log("Could not find row for key: " + key);
                               }
                           else
                               {
                               CacheFactory.log("Error: " + t);
                               }
                           observer.onError(binEntry, new Exception(t));
                           })
                .subscribe();
        }
...
    @Override
    public void store(BinaryEntry<K, V> binEntry, StoreObserver<K, V> observer)
        {
        K       key      = binEntry.getKey();
        Student oStudent = (Student) binEntry.getValue();

        Flux.from(getConnection())
                .flatMap(connection -> connection.createStatement(STORE_STMT)
                        .bind("$1", key)
                        .bind("$2", oStudent.getName())
                        .bind("$3", oStudent.getAddress())
                        .execute())
                .flatMap(Result::getRowsUpdated)
                .doOnNext((s) ->
                          {
                          CacheFactory.log("store done, rows updated: " + s);
                          observer.onNext(binEntry);
                          })
                .doOnError(t -> new Exception(t))
                .subscribe();
        }
...
    privatestaticfinal String STORE_STMT = "INSERT INTO student VALUES ($1, $2, $3) ON conflict (id) DO UPDATE SET name=$2, address=$2";
    privatestaticfinal String LOAD_STMT = "SELECT NAME, ADDRESS FROM student WHERE id=$1";

データ・ソースのエントリ・ストアを実装する際には、必ずベスト・プラクティスを使用してください。実装に関する考慮事項を参照してください。

キャッシュ戦略の選択

Coherenceがサポートしている様々なデータ・ソースのキャッシュ戦略を比較し、対照します。

この項には次のトピックが含まれます:

リードスルー/ライトスルーとキャッシュアサイドの比較

クラスタ化環境のキャッシュアサイド・パターンには、一般的に2つのアプローチがあります。1つ目のアプローチでは、キャッシュ・ミスをチェックし、データベースに問い合せ、キャッシュに移入して、アプリケーション処理を継続します。この場合、様々なアプリケーション・スレッドがこの処理を同時に実行すると、多数のデータベース・アクセスが発生することになります。もう1つのアプローチでは、アプリケーションでロックを再確認します(これは、チェックがキャッシュ・エントリに対する原子性を持っているため機能します)。ただし、キャッシュ・ミスまたはデータベース更新の際に大量のオーバーヘッドが生じます(クラスタ化されたロック、追加の読取り、およびクラスタ化されたロック解除 - 最大10の追加ネットワーク・ホップに加え、追加処理のオーバーヘッドおよびキャッシュ・エントリのロック継続期間の増加)。

インライン・キャッシングを使用すると、(フォルト・トレランスを確保するためにデータがバックアップ・サーバーにコピーされている間)エントリは2つのネットワーク・ホップでのみロックされます。また、ロックはパーティション所有者によりローカルで維持されます。さらに、アプリケーション・コードは完全にキャッシュ・サーバーで管理されます。つまり、ノードの管理下にあるサブセットのみが直接データベースにアクセスします(そのため、負荷とセキュリティの予測可能性が向上します)。加えて、これによりデータベース・ロジックからキャッシュ・クライアントが切り離されます。

リフレッシュアヘッドとリードスルーの比較

リフレッシュアヘッドでは、リードスルーと比べて待機時間が減りますが、それは将来必要とされるキャッシュ・アイテムをキャッシュが正確に予測できる場合のみです。これらの予測が完全に正確であれば、リフレッシュアヘッドにより待機時間が削減され、余分なオーバーヘッドはありません。予測の正確性が低下すると、データベースに送信される不要なリクエストが増加するため、スループットへの影響も大きくなります。データベースのリクエスト処理が遅れ始めると、待機時間が長くなる可能性もあります。

ライトビハインドとライトスルーの比較

ライトビハインド・キャッシングの要件が満たされると、ライトスルー・キャッシュと比較して非常に高いスループットが得られ、待機時間が削減されます。また、ライトビハインド・キャッシングによりデータベース(書込みの減少)およびキャッシュ・サーバー(キャッシュ値のデシリアライズの削減)の負荷が低減されます。

キャッシュ・ストアの実装の作成

Coherenceでは、キャッシュがデータ・ソースを使用する方法に応じて使用できるいくつかのキャッシュ・ストア・インタフェースが提供されます。キャッシュ・ストアを作成するには、次のいずれかのインタフェースを実装します。
  • CacheLoader - 読取り専用キャッシュ

  • CacheStore - 読取り/書込みキャッシュ

  • BinaryEntryStore - バイナリ・エントリ・オブジェクトの読取り/書込み。

これらのインタフェースは、com.tangosol.net.cacheパッケージにあります。CacheLoaderインタフェースには、load(Object key)およびloadAll(Collection keys)という2つの主要なメソッドがあります。CacheStoreインタフェースでは、store(Object key, Object value)storeAll(Map mapEntries)erase(Object key)およびeraseAll(Collection colKeys)メソッドが追加されます。BinaryEntryStoreインタフェースでは他のインタフェースと同じメソッドが提供されますが、このインタフェースは直接バイナリ・オブジェクトに対して機能します。

キャッシュ・ストアの実装のサンプルおよび制御可能なキャッシュ・ストアの実装のサンプルを参照してください。

キャッシュ・ストアの実装のプラグイン

キャッシュ・ストアの実装をプラグインするには、distributed-schemebacking-map-schemecachestore-schemeまたはread-write-backing-map-schemeを指定します。

read-write-backing-map-schemeでは、ReadWriteBackingMapの実装を構成します。このバッキング・マップは、実際にデータをキャッシュする内部マップ(internal-cache-scheme)と、データベースと相互作用するキャッシュ・ストア実装(cachestore-scheme)という2つの主要な要素で構成されます。read-write-backing-map-schemeを参照し、write-batch-factorrefresh-ahead-factorwrite-requeue-thresholdおよびrollback-cachestore-failures要素に注目してください。

例15-2は、キャッシュ・ストアの実装を指定するキャッシュ構成を示しています。<init-params>要素には、コンストラクタに渡されるパラメータの順序付けられたリストが含まれます。{cache-name}構成マクロを使用してキャッシュ名を実装に渡すことにより、これがデータベース表にマップされます。パラメータ・マクロの使用を参照してください。

例15-2 Cachestoreモジュールの例

<?xml version="1.0"?>

<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
   coherence-cache-config.xsd">
   <caching-scheme-mapping>
      <cache-mapping>
         <cache-name>com.company.dto.*</cache-name>
         <scheme-name>distributed-rwbm</scheme-name>
      </cache-mapping>
   </caching-scheme-mapping>

   <caching-schemes>
      <distributed-scheme>
         <scheme-name>distributed-rwbm</scheme-name>
         <backing-map-scheme>
            <read-write-backing-map-scheme>

            <internal-cache-scheme>
               <local-scheme/>
            </internal-cache-scheme>

            <cachestore-scheme>
               <class-scheme>
                  <class-name>com.example.MyCacheStore</class-name>
                     <init-params>
                        <init-param>
                           <param-type>java.lang.String</param-type>
                           <param-value>{cache-name}</param-value>
                        </init-param>
                     </init-params>
                  </class-scheme>
               </cachestore-scheme>
            </read-write-backing-map-scheme>
         </backing-map-scheme>
      </distributed-scheme>
   </caching-schemes>
</cache-config>

キャッシュ・ストアでのフェデレート・キャッシュの使用

デフォルトでは、キャッシュ・ストアにロードされるデータはフェデレーション参加者に統合されません。フェデレーテッド・キャッシュを有効にする場合は、ローカル参加者のすべてのリードスルー・リクエストがリモート参加者にレプリケートされ、リモート・サイトのデータ・ソースが更新されるため、注意が必要です。これは、タイミングにセンシティブなアプリケーションで特に問題になる可能性があります。さらに、読取り量の多いアプリケーションでは、リモート・サイトのデータ・ソースへの不要な書込みを引き起こす可能性があります。

リードスルー・キャッシングの使用時にキャッシュにロードされるエントリを統合するには、キャッシュ・ストアの定義で<federated-loading>要素をtrueに設定します。たとえば:

<cachestore-scheme>
   <class-scheme>
      <class-name>com.example.MyCacheStore</class-name>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value>{cache-name}</param-value>
         </init-param>
      </init-params>
   </class-scheme>
   <federated-loading>true</federated-loading>
</cachestore-scheme>

キャッシュ・ストアの実装のサンプル

キャッシュ・ストアの実装を作成する前に、com.tangosol.net.cache.CacheStoreインタフェースのきわめて基本的なサンプル実装を確認します。
例15-3の実装では、JDBCを使用する単一のデータベース接続が使用され、バルク操作は使用されません。完全な実装では接続プールが使用されます。また、ライトビハインドが使用される場合は、CacheStore.storeAll()を実装してJDBCのバルク挿入およびバルク更新を行います。データベースのキャッシュ構成のサンプルについては、データベースのキャッシュを参照してください。

ヒント:

キャッシュをバルク・ロードすることによって、処理作業を軽減できます。次の例では、putメソッドを使用して、キャッシュ・ストアに値を書き込みます。多くの場合、putAllメソッドを使用してバルク・ロードを実行すると、処理作業およびネットワーク・トラフィックが軽減されます。キャッシュの事前ロードを参照してください。

例15-3 CacheStoreインタフェースの実装

package com.tangosol.examples.coherence;
​
import com.tangosol.net.AbstractCacheStore;
​
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
​
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
​
/**
* An example CacheStore implementation.
*/
public class DBCacheStore
        extends AbstractCacheStore
    {
    // ----- constructors ---------------------------------------------------
​
    /**
    * Constructs DBCacheStore for a given database table.
    *
    * @param sTableName the db table name
    */
    public DBCacheStore(String sTableName)
        {
        m_sTableName = sTableName;
        configureConnection();
        }
​
    /**
    * Set up the DB connection.
    */
    protected void configureConnection()
        {
        try
            {
            Class.forName("org.gjt.mm.mysql.Driver");
            m_con = DriverManager.getConnection(DB_URL, DB_USERNAME, DB_PASSWORD);
            m_con.setAutoCommit(true);
            }
        catch (Exception e)
            {
            throw ensureRuntimeException(e, "Connection failed");
            }
        }
​
    // ---- accessors -------------------------------------------------------
​
    /**
    * Obtain the name of the table this CacheStore is persisting to.
    *
    * @return the name of the table this CacheStore is persisting to
    */
    public String getTableName()
        {
        return m_sTableName;
        }
​
    /**
    * Obtain the connection being used to connect to the database.
    *
    * @return the connection used to connect to the database
    */
    public Connection getConnection()
        {
        return m_con;
        }
​
    // ----- CacheStore Interface --------------------------------------------
​
    /**
    * Return the value associated with the specified key, or null if the
    * key does not have an associated value in the underlying store.
    *
    * @param oKey  key whose associated value is to be returned
    *
    * @return the value associated with the specified key, or
    *         <tt>null</tt> if no value is available for that key
    */
    public Object load(Object oKey)
        {
        Object     oValue = null;
        Connection con    = getConnection();
        String     sSQL   = "SELECT id, value FROM " + getTableName()
                          + " WHERE id = ?";
        try
            {
            PreparedStatement stmt = con.prepareStatement(sSQL);
​
            stmt.setString(1, String.valueOf(oKey));
​
            ResultSet rslt = stmt.executeQuery();
            if (rslt.next())
                {
                oValue = rslt.getString(2);
                if (rslt.next())
                    {
                    throw new SQLException("Not a unique key: " + oKey);
                    }
                }
            stmt.close();
            }
        catch (SQLException e)
            {
            throw ensureRuntimeException(e, "Load failed: key=" + oKey);
            }
        return oValue;
        }
​
    /**
    * Store the specified value under the specific key in the underlying
    * store. This method is intended to support both key/value creation
    * and value update for a specific key.
    *
    * @param oKey    key to store the value under
    * @param oValue  value to be stored
    *
    * @throws UnsupportedOperationException  if this implementation or the
    *         underlying store is read-only
    */
    public void store(Object oKey, Object oValue)
        {
        Connection con     = getConnection();
        String     sTable  = getTableName();
        String     sSQL;
​
        // the following is very inefficient; it is recommended to use DB
        // specific functionality that is, REPLACE for MySQL or MERGE for Oracle
        if (load(oKey) != null)
            {
            // key exists - update
            sSQL = "UPDATE " + sTable + " SET value = ? where id = ?";
            }
        else
             {
             // new key - insert
             sSQL = "INSERT INTO " + sTable + " (value, id) VALUES (?,?)";
             }
        try
                {
                PreparedStatement stmt = con.prepareStatement(sSQL);
                int i = 0;
                stmt.setString(++i, String.valueOf(oValue));
                stmt.setString(++i, String.valueOf(oKey));
                stmt.executeUpdate();
                stmt.close();
                }
        catch (SQLException e)
                {
                throw ensureRuntimeException(e, "Store failed: key=" + oKey);
                }
        }
​
    /**
    * Remove the specified key from the underlying store if present.
    *
    * @param oKey key whose mapping is to be removed from the map
    *
    * @throws UnsupportedOperationException  if this implementation or the
    *         underlying store is read-only
    */
    public void erase(Object oKey)
        {
        Connection con  = getConnection();
        String     sSQL = "DELETE FROM " + getTableName() + " WHERE id=?";
        try
            {
            PreparedStatement stmt = con.prepareStatement(sSQL);
​
            stmt.setString(1, String.valueOf(oKey));
            stmt.executeUpdate();
            stmt.close();
            }
        catch (SQLException e)
            {
            throw ensureRuntimeException(e, "Erase failed: key=" + oKey);
            }
        }
​
    /**
    * Iterate all keys in the underlying store.
    *
    * @return a read-only iterator of the keys in the underlying store
    */
    public Iterator keys()
        {
        Connection con  = getConnection();
        String     sSQL = "SELECT id FROM " + getTableName();
        List       list = new LinkedList();
​
        try
            {
            PreparedStatement stmt = con.prepareStatement(sSQL);
            ResultSet         rslt = stmt.executeQuery();
            while (rslt.next())
                {
                Object oKey = rslt.getString(1);
                list.add(oKey);
                }
            stmt.close();
            }
        catch (SQLException e)
            {
            throw ensureRuntimeException(e, "Iterator failed");
            }
​
        return list.iterator();
        }
​
    // ----- data members ---------------------------------------------------
​
    /**
    * The connection.
    */
    protected Connection m_con;
​
    /**
    * The db table name.
    */
    protected String m_sTableName;
​
    /**
    * Driver class name.
    */
    private static final String DB_DRIVER = "org.gjt.mm.mysql.Driver";
​
    /**
    * Connection URL.
    */
    private static final String DB_URL = "jdbc:mysql://localhost:3306/CacheStore";
​
    /**
    * User name.
    */
    private static final String DB_USERNAME = "root";
​
    /**
    * Password.
    */
    private static final String DB_PASSWORD = null;
    }

制御可能なキャッシュ・ストアの実装のサンプル

更新された値をデータ・ストアに書き込むタイミングをアプリケーションで制御できるように、制御可能なキャッシュ・ストアを実装できます。このシナリオの最も一般的なユースケースは、起動時のデータ・ストアからのキャッシュの初期移入中です。起動時には、キャッシュの値を元のデータ・ストアに書き込む必要がありません。このような書込みはリソースの浪費になります。

例15-4Main.javaファイルは、次のような制御可能なキャッシュ・ストアの2種類の操作方法を示しています。

  • 制御可能なキャッシュ(別のサービスに配置されている必要があります)を使用して、キャッシュ・ストアの有効化または無効化を行います。これは、ControllableCacheStore1クラスで示されます。

  • CacheStoreAwareインタフェースを使用して、キャッシュに追加されたオブジェクトで記憶域が不要なことを示します。これは、ControllableCacheStore2クラスで示されます。

ControllableCacheStore1ControllableCacheStore2は両方とも、com.tangosol.net.cache.AbstractCacheStoreクラスを拡張します。このヘルパー・クラスによって、storeAll操作およびeraseAll操作の非最適実装が行われます。

CacheStoreAwareインタフェースを使用して、キャッシュに追加されるオブジェクトをデータベースに格納する必要がないことを示すことができます。データベースのキャッシュを参照してください。

例15-4は、Main.javaインタフェースの一覧を示しています。

例15-4 Main.java - 制御可能なCacheStoreとの相互作用

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.cache.AbstractCacheStore;
import com.tangosol.util.Base;

import java.io.Serializable;
import java.util.Date;

public class Main extends Base
    {

    /**
     * A cache controlled CacheStore implementation
     */
    public static class ControllableCacheStore1 extends AbstractCacheStore
        {
        public static final String CONTROL_CACHE = "cachestorecontrol";

        String m_sName;

        public static void enable(String sName)
            {
            CacheFactory.getCache(CONTROL_CACHE).put(sName, Boolean.TRUE);
            }

        public static void disable(String sName)
            {
            CacheFactory.getCache(CONTROL_CACHE).put(sName, Boolean.FALSE);
            }

        public void store(Object oKey, Object oValue)
            {
            Boolean isEnabled = (Boolean) CacheFactory.getCache(CONTROL_CACHE).get(m_sName);
            if (isEnabled != null && isEnabled.booleanValue())
                {
                log("controllablecachestore1: enabled " + oKey + " = " + oValue);
                }
            else
                {
                log("controllablecachestore1: disabled " + oKey + " = " + oValue);
                }
            }

        public Object load(Object oKey)
            {
            log("controllablecachestore1: load:" + oKey);
            return new MyValue1(oKey);
            }

        public ControllableCacheStore1(String sName)
            {
            m_sName = sName;
            }

        }

    /**
     * a valued controlled CacheStore implementation that 
     * implements the CacheStoreAware interface
     */
    public static class ControllableCacheStore2 extends AbstractCacheStore
        {

        public void store(Object oKey, Object oValue)
            {
            boolean isEnabled = oValue instanceof CacheStoreAware ? !((CacheStoreAware) oValue).isSkipStore() : true;
            if (isEnabled)
                {
                log("controllablecachestore2: enabled " + oKey + " = " + oValue);
                }
            else
                {
                log("controllablecachestore2: disabled " + oKey + " = " + oValue);
                }
            }

        public Object load(Object oKey)
            {
            log("controllablecachestore2: load:" + oKey);
            return new MyValue2(oKey);
            }

        }

    public static class MyValue1 implements Serializable
        {
        String m_sValue;

        public String getValue()
            {
            return m_sValue;
            }

        public String toString()
            {
            return "MyValue1[" + getValue() + "]";
            }

        public MyValue1(Object obj)
            {
            m_sValue = "value:" + obj;
            }
        }

    public static class MyValue2 extends MyValue1 implements CacheStoreAware
        {
        boolean m_isSkipStore = false;

        public boolean isSkipStore()
            {
            return m_isSkipStore;
            }

        public void skipStore()
            {
            m_isSkipStore = true;
            }

        public String toString()
            {
            return "MyValue2[" + getValue() + "]";
            }

        public MyValue2(Object obj)
            {
            super(obj);
            }

        }

    public static void main(String[] args)
        {
        try
            {

            // example 1

            NamedCache cache1 = CacheFactory.getCache("cache1");

            // disable cachestore
            ControllableCacheStore1.disable("cache1");
            for(int i = 0; i < 5; i++)
                {
                cache1.put(new Integer(i), new MyValue1(new Date()));
                }

            // enable cachestore
            ControllableCacheStore1.enable("cache1");
            for(int i = 0; i < 5; i++)
                {
                cache1.put(new Integer(i), new MyValue1(new Date()));
                }

            // example 2

            NamedCache cache2 = CacheFactory.getCache("cache2");

            // add some values with cachestore disabled
            for(int i = 0; i < 5; i++)
                {
                MyValue2 value = new MyValue2(new Date());
                value.skipStore();
                cache2.put(new Integer(i), value);
                }

            // add some values with cachestore enabled
            for(int i = 0; i < 5; i++)
                {
                cache2.put(new Integer(i), new MyValue2(new Date()));
                }


            }
        catch(Throwable oops)
            {
            err(oops);
            }
        finally
            {
            CacheFactory.shutdown();
            }
        }

    }

実装に関する考慮事項

キャッシュ・ストア実装を作成する場合のベスト・プラクティスを検討します。

この項には次のトピックが含まれます:

冪等性

すべての操作は、冪等になるように(つまり、望ましくない副作用なしに繰り返し処理できるように)設計する必要があります。ライトスルーおよびライトビハインド・キャッシュの場合、フェイルオーバー処理時にキャッシュ更新のデータベース部分を再試行することにより、低コストのフォルト・トレランスを提供できます。ライトビハインド・キャッシングの場合、冪等性によって、データ整合性に影響を与えずに、複数のキャッシュ更新を単一の起動にまとめることができます。

アプリケーションにライトビハインド・キャッシングの要件があっても、書込み組合せを回避する必要がある場合(たとえば、監査のため)、バージョニングされたキャッシュ・キーを作成する必要があります(たとえば、シーケンスIDを持つ通常の主キーを組み合せて)。

ライトスルーの制限

Coherenceでは、複数のキャッシュ・ストア・インスタンス間での2フェーズの操作はサポートされません。つまり、2つのキャッシュ・エントリが更新され、別々のキャッシュ・サーバーにあるキャッシュ・ストア実装へのコールがトリガーされる場合、片方のデータベース更新は成功し、もう一方は失敗する可能性があります。この場合、アプリケーション・サーバーのトランザクション・マネージャで、キャッシュアサイド・アーキテクチャ(キャッシュとデータベースを、単一トランザクション内の2つの別個のコンポーネントとして更新する)を使用することをお薦めします。多くの場合、論理コミットの失敗がないように(当然、サーバー障害が発生しないようにもする)データベース・スキーマを設計することは可能です。ライトビハインド・キャッシングでは、データベースの動作によりputが影響を受けないため、この問題は回避されます(根本的な問題は設計プロセスの初期段階で解決しておく必要があります)。

キャッシュ問合せ

キャッシュ問合せでは、キャッシュに格納されているデータのみを操作し、欠落(または欠落する可能性のある)データをロードするためにキャッシュ・ストアの実装をトリガーすることはありません。そのため、キャッシュ・ストアでバックアップされたキャッシュに問い合せるアプリケーションは、その問合せに必要なすべてのデータがあらかじめロードされていることを確認する必要があります。「キャッシュ内のデータの問合せ」を参照してください。効率性を考慮して、アプリケーションの起動時にデータ・セットをデータベースから直接キャッシュに送信して、ほとんどのバルク・ロード操作を実行しておく必要があります(NamedCache.putAll()を使用して、データのブロックをキャッシュにまとめる)。ローダー処理は、制御可能なキャッシュ・ストア・パターンを使用して、データベースにバックアップされた循環更新を無効にする必要があります。制御可能なキャッシュ・ストアの実装のサンプルを参照してください。キャッシュ・ストアを制御するには、起動サービスを使用するか(クラスタ全体にエージェントを送信して各JVMのローカル・フラグを変更します)、レプリケート・キャッシュ(別のキャッシュ・サービス)に値を設定して、すべてのキャッシュ・ストア実装のメソッド呼出しでその値を読み取ります(一般的なデータベース操作に比べて最小限のオーバーヘッドで済みます)。Coherenceのクラスタ化されたJMX機能により、カスタムMBeanも簡単に実装できます。

リエントラント・コール

キャッシュ・ストアの実装では、ホスティングしているキャッシュ・サービスをコールバックしないでください。これには、内部的にCoherenceキャッシュ・サービスを参照するORMソリューションも含まれます。別のキャッシュ・サービス・インスタンスをコールすることも可能ですが、深くネストされたコールは避けるように注意する必要があります。コールのそれぞれがキャッシュ・サービス・スレッドを消費するため、キャッシュ・サービス・スレッド・プールを使用し尽くすと、デッドロックが発生する可能性があります。

キャッシュ・サーバー・クラスパス

キャッシュ・エントリのクラス(値オブジェクト、データ転送オブジェクトとも呼ばれる)は、キャッシュ・サーバー・クラスパスに置く必要があります(キャッシュ・サーバーは、キャッシュ・エントリをシリアライズ/デシリアライズしてキャッシュ・ストアと相互作用する必要があるため)。

CacheStoreのコレクション操作

キャッシュがライトビハインドとして構成され、<write-batch-factor>が構成されている場合に使用される可能性が最も高いのは、CacheStore.storeAllメソッドです。Coherenceでは、CacheLoader.loadAllメソッドも使用されます。同様の理由により、その最初の使用にはリフレッシュアヘッドを有効にする必要がある場合があります。

接続プール

データベース接続は、コンテナ接続プール(またはサード・パーティの接続プール)から取得するか、スレッドローカルの遅延初期化パターンを使用して取得する必要があります。専用キャッシュ・サーバーは、多くの場合、管理コンテナなしでデプロイされるため、後者が最も魅力的な選択になります。ただし、過度な同時データベース接続を回避するように、キャッシュ・サービスのスレッド・プール・サイズは制限する必要があります。