この章では、バルク・ロードおよび分散ロード・パターンを使用してキャッシュにデータを事前ロードする方法について説明します。
この章には次の項が含まれます:
Coherenceを使用する際の一般的なシナリオでは、データはアプリケーションで使用される前にキャッシュに事前移入します。例21-1は、put
メソッドを使用したデータのロードを示しています。このテクニックは有益ですが、特にパーティション・キャッシュおよびレプリケート・キャッシュでは、put
をコールするたびにネットワーク・トラフィックが発生する場合があります。また、put
をコールするたびにキャッシュで置換されたばかりのオブジェクトが返され(java.util.Map
インタフェースで定義されている)、不要なオーバーヘッドが追加されます。
例21-1 キャッシュの事前ロード
public static void bulkLoad(NamedCache cache, Connection conn) { Statement s; ResultSet rs; try { s = conn.createStatement(); rs = s.executeQuery("select key, value from table"); while (rs.next()) { Integer key = new Integer(rs.getInt(1)); String value = rs.getString(2); cache.put(key, value); } ... } catch (SQLException e) {...} }
かわりに、ConcurrentMap.putAll
メソッドを使用すると、キャッシュのロードを大幅に効率化できます。これを例21-2に示します。
例21-2 ConcurrentMap.putAllを使用したキャッシュの事前ロード
public static void bulkLoad(NamedCache cache, Connection conn) { Statement s; ResultSet rs; Map buffer = new HashMap(); try { int count = 0; s = conn.createStatement(); rs = s.executeQuery("select key, value from table"); while (rs.next()) { Integer key = new Integer(rs.getInt(1)); String value = rs.getString(2); buffer.put(key, value); // this loads 1000 items at a time into the cache if ((count++ % 1000) == 0) { cache.putAll(buffer); buffer.clear(); } } if (!buffer.isEmpty()) { cache.putAll(buffer); } ... } catch (SQLException e) {...} }
Coherenceのパーティション・キャッシュに大量のデータ・セットを事前移入する際は、Coherenceのクラスタ・メンバーに作業を分散すると効率が向上する可能性があります。分散ロードを使用すると、クラスタのネットワーク帯域幅およびCPU処理能力の集積を活用することによって、データ・スループット率を高めることができます。分散ロードの実行時には、アプリケーションで次の事項を決定する必要があります。
ロードを実行するクラスタのメンバー
メンバー間でのデータ・セットの分割方法
メンバーの選択および作業の分割時は、基礎となるデータ・ソース(データベースやファイル・システムなど)に対する負荷をアプリケーションで考慮する必要があります。たとえば、問合せを同時に実行するメンバーが多すぎると、1つのデータベースでは対応しきれなくなる場合があります。
この項では、簡単な分散ロードを実行する一般的な手順の概要について説明します。この例では、データがファイルに保存されていて、クラスタ内で記憶域が有効なすべてのメンバーに分散されることを前提としています。
記憶域が有効なメンバーのセットを取得します。たとえば、次の方法ではgetStorageEnabledMembers
メソッドを使用して、分散キャッシュ内で記憶域が有効なメンバーを取得します。
記憶域が有効なクラスタのメンバー間で作業を分割します。たとえば、次のルーチンでは、メンバーに割り当てられたファイルの一覧が含まれるマップが、メンバーをキーとして返されます。
例21-4 キャッシュのメンバーに割り当てられたファイルの一覧を取得するルーチン
protected Map<Member, List<String>> divideWork(Set members, List<String> fileNames) { Iterator i = members.iterator(); Map<Member, List<String>> mapWork = new HashMap(members.size()); for (String sFileName : fileNames) { Member member = (Member) i.next(); List<String> memberFileNames = mapWork.get(member); if (memberFileNames == null) { memberFileNames = new ArrayList(); mapWork.put(member, memberFileNames); } memberFileNames.add(sFileName); // recycle through the members if (!i.hasNext()) { i = members.iterator(); } } return mapWork; }
各メンバーへのロードを実行するタスクを起動します。たとえば、タスクの起動にはCoherenceのInvocationService
などを使用します。この場合、LoaderInvocable
の実装では、memberFileNames
を反復して各ファイルを処理し、その内容をキャッシュにロードする必要があります。通常、クライアント上で実行されるキャッシュ処理は、LoaderInvocable
を使用して実行する必要があります。
例21-5 キャッシュの各メンバーをロードするクラス
public void load() { NamedCache cache = getCache(); Set members = getStorageMembers(cache); List<String> fileNames = getFileNames(); Map<Member, List<String>> mapWork = divideWork(members, fileNames); InvocationService service = (InvocationService) CacheFactory.getService("InvocationService"); for (Map.Entry<Member, List<String>> entry : mapWork.entrySet()) { Member member = entry.getKey(); List<String> memberFileNames = entry.getValue(); LoaderInvocable task = new LoaderInvocable(memberFileNames, cache.getCacheName()); service.execute(task, Collections.singleton(member), this); } }