25 マップ・イベントの使用

マップ・イベント・リスナーを使用して、ObservableMapインタフェースを実装するCoherenceの任意のクラスからキャッシュ・イベントおよびイベントを受信できます。

この章の内容は次のとおりです。

マップ・イベントの概要

Coherenceでは、JavaBeanイベント・モデルを使用してキャッシュ・イベントが提供されます。
この実装によって、実際にクラスタ内のどこで変更が行われたのかに関係なく、アプリケーションはそれらが必要としているタイミングと場所でイベントを受信できます。JavaBeanモデルに精通している開発者であれば、複雑なクラスタであってもイベントに難無く対応できるはずです。

ノート:

Coherenceには、ライブ・イベント・プログラミング・モデルも用意されています。ライブ・イベントでは、一般的なイベント・タイプのサポートが提供され、マップ・イベントのかわりに使用できます。ライブ・イベントの使用を参照してください。

この項には次のトピックが含まれます:

リスナー・インタフェースおよびイベント・オブジェクト

JavaBeansイベント・モデルには、すべてのリスナーで拡張する必要があるEventListenerインタフェースがあります。Coherenceには、MapListenerインタフェースが用意されており、これによりCoherenceキャッシュのデータが追加、変更または削除されたときにアプリケーション・ロジックでイベントを受信できます。

MapListenerインタフェースを実装するアプリケーション・オブジェクトは、アプリケーションのMapListener実装のインスタンスをaddMapListener()メソッドに渡すだけで、ObservableMapインタフェースを実装する任意のCoherenceキャッシュまたはクラスからイベントにサインアップできます。

MapListenerに渡されるMapEventオブジェクトは、イベントの発生元になったソース(ObservableMap)、イベントに関連付けられたID(キー)、そのIDに対するアクション(挿入、更新、削除)、古い値と新しい値など、発生したイベントについて必要なすべての情報を保持します。

イベントの保証の理解

パーティション・キャッシュ・サービスは、通常の状況ではイベントが1回だけ配信されることを保証します。ただし、これが保証されない可能性のあるシナリオが2つあります。

  • データ損失を引き起こすクラスタの突発障害(データを保持している2台のマシンの同時クラッシュなど)。この場合、PARTITION_LOSTイベントが、サーバー側のすべての登録済PartitionListenerインスタンスに出力されます。

  • クライアントの切断。この場合、MEMBER_LEFTイベントが、クライアント側のすべての登録済MemberListenerインスタンスに出力されます。

イベントをサポートするキャッシュとクラス

すべてのCoherenceキャッシュには、ObservableMapが実装されます。実際に、すべてのCoherenceキャッシュで実装されるNamedCacheインタフェースは、ObservableMapインタフェースを拡張します。つまり、キャッシュが、ローカル、パーティション、ニア、レプリケーションであるかどうか、また、リードスルー、ライトスルー、ライトビハインド、オーバーフロー、ディスク記憶域などを使用しているかどうかに関係なく、アプリケーションはサインアップして任意のキャッシュからイベントを受信できます。

ノート:

キャッシュ・トポロジやサーバーの数に関係なく、また他のサーバーが変更している場合でも、これらのイベントはアプリケーションのリスナーに配信されます。

Coherenceキャッシュ(Coherenceキャッシュ・ファクトリを介して取得されたオブジェクト)に加えて、次のように、Coherenceでサポートされる他のいくつかのクラスでもObservableMapインタフェースが実装されます。

  • ObservableHashMap

  • LocalCache

  • OverflowMap

  • NearCache

  • ReadWriteBackingMap

  • AbstractSerializationCacheSerializationCacheおよびSerializationPagedCache

  • WrapperObservableMapWrapperConcurrentMapおよびWrapperNamedCache

すべてのイベントのサインアップ

イベントにサインアップするには、MapListenerインタフェースを実装するオブジェクトをObservableMapaddMapListenerメソッドに渡します。次の例は、受信する各イベントを出力するサンプルのMapListener実装を示しています。
/**
* A MapListener implementation that prints each event as it receives
* them.
*/
public static class EventPrinter
        extends Base
        implements MapListener
    {
    public void entryInserted(MapEvent evt)
        {
        out(evt);
        }

    public void entryUpdated(MapEvent evt)
        {
        out(evt);
        }

    public void entryDeleted(MapEvent evt)
        {
        out(evt);
        }
    }

この実装を使用すると、任意のキャッシュからすべてのイベントを出力できます(すべてのキャッシュでObservableMapインタフェースが実装されているため)。

cache.addMapListener(new EventPrinter());

後でリスナーを削除できるように、リスナーへの参照を保持する必要があります。

Listener listener = new EventPrinter();
cache.addMapListener(listener);
m_listener = listener; // store the listener in a field

これでリスナーを削除できます。

Listener listener = m_listener;
if (listener != null)
    {
    cache.removeMapListener(listener);
    m_listener = null; // clean up the listener field
    }

ObservableMapインタフェースの各addMapListenerメソッドには、対応するremoveMapListenerメソッドがあります。リスナーを削除するには、そのリスナーの追加に使用したaddMapListenerメソッドに対応するremoveMapListenerメソッドを使用します。

MapListenerとしての内部クラスの使用

MapListenerとして使用する内部クラスを作成する場合、または1つか2つのイベント・タイプ(挿入、更新または削除)のみをリスニングするMapListenerを実装する場合は、AbstractMapListenerベース・クラスを使用できます。次の例では、キャッシュの挿入イベントのみを出力します。
cache.addMapListener(new AbstractMapListener()
    {
    public void entryInserted(MapEvent evt)
        {
        out(evt);
        }
    });

MapListener実装の作成に役立つもう1つのベース・クラスは、MultiplexingMapListenerです。これは、すべてのイベントを1つのメソッドにルーティングして処理します。1つのメソッドを実装するだけですべてのイベントを取得できるため、MultiplexingMapListenerは、MapListenerとして使用する内部クラスを作成する際にも非常に便利です。

public static class EventPrinter
        extends MultiplexingMapListener
    {
    public void onMapEvent(MapEvent evt)
        {
        out(evt);
        }
    }

ラムダ式を使用したマップ・リスナーの追加

ラムダ式を使用すると、MapListener<K, V>実装を追加できます。次の例では、ラムダ式を使用して、Coherenceで提供されるSimpleMapListener<K, V>実装を追加します。この実装では、イベント・タイプに基づいて適切なイベント・ハンドラに委任します。

MapListener<ContactId, Contact> listener = new SimpleMapListener<ContactId,
   Contact>().addInsertHandler((event) -> System.out.println("\ninserted:\n" +
   event.getNewValue()));

cache.addMapListener(listener);

キャッシュ用のMapListenerの構成

特定のキャッシュに常にリスナーを必要とする場合は、<listener>要素を使用してリスナーをキャッシュ構成に配置すると、それが自動的に追加されます。listenerを参照してください。

特定のIDのイベントのサインアップ

特定のID (キー)に対して発生したイベントにサインアップできます。たとえば、整数キー5に対して発生したすべてのイベントを出力するには、次のように処理します。
cache.addMapListener(new EventPrinter(), new Integer(5), false);

したがって、次のコードではIntegerキー5が挿入または更新された場合にのみイベントがトリガーされます。

for (int i = 0; i < 10; ++i)
    {
    Integer key   = new Integer(i);
    String  value = "test value for key " + i;
    cache.put(key, value);
    }

イベントのフィルタリング

フィルタを使用して、特定のイベントをリスニングできます。次の例では、リスナーに削除イベントのみ受信するフィルタを設定したキャッシュにリスナーを追加します。
// Filters used with partitioned caches must be 
// Serializable, Externalizable or ExternalizableLite
public class DeletedFilter
        implements Filter, Serializable
    {
    public boolean evaluate(Object o)
        {
        MapEvent evt = (MapEvent) o;
        return evt.getId() == MapEvent.ENTRY_DELETED;
        }
    }

cache.addMapListener(new EventPrinter(), new DeletedFilter(), false);

ノート:

イベントのフィルタリングとキャッシュされたデータのフィルタリングの比較

問合せのフィルタを作成する場合、Filterのevaluateメソッドに渡すオブジェクトはキャッシュからの値です。または、フィルタでEntryFilterインタフェースを実装する場合は、キャッシュからのMap.Entry全体です。MapListenerのイベントをフィルタリングするフィルタを作成する場合、フィルタのevaluateメソッドに渡すオブジェクトはMapEvent型です。問合せのリスニングを参照してください。

次の一連のコールを実行すると、

cache.put("hello", "world");
cache.put("hello", "again");
cache.remove("hello");

次のような結果になります。

CacheEvent{LocalCache deleted: key=hello, value=again}

Liteイベントの使用

イベントに新規の値のみを含める場合は、Liteイベントを使用できます。デフォルトでは、イベントの一部として古い値と新しい値が両方とも提供されます。次に例を示します。
MapListener listener = new MultiplexingMapListener()
    {
    public void onMapEvent(MapEvent evt)
        {
        out("event has occurred: " + evt);
        out("(the wire-size of the event would have been "
            + ExternalizableHelper.toBinary(evt).length()
            + " bytes.)");
        }
    };
cache.addMapListener(listener);

// insert a 1KB value
cache.put("test", new byte[1024]);

// update with a 2KB value
cache.put("test", new byte[2048]);

// remove the 2KB value
cache.remove("test");

テストの実行結果の出力では、1つ目のイベントは挿入される値(1KB)を保持し、2つ目のイベントは置換前の値(1KB)と新しい値(2KB)を保持しています。3つ目のイベントは、削除される値(2KB)を保持しています。

event has occurred: CacheEvent{LocalCache added: key=test, value=[B@a470b8}
(the wire-size of the event would have been 1283 bytes.)
event has occurred: CacheEvent{LocalCache updated: key=test, old value=[B@a470b8, new value=[B@1c6f579}
(the wire-size of the event would have been 3340 bytes.)
event has occurred: CacheEvent{LocalCache deleted: key=test, value=[B@1c6f579}
(the wire-size of the event would have been 2307 bytes.)

アプリケーションでイベントに古い値と新しい値を両方とも含める必要がない場合、Liteイベントのみをリクエストして、そのように指定することができます。リスナーの追加時にLiteイベントをリクエストするには、追加のブール・パラメータfLiteを使用するaddMapListenerメソッドを使用できます。

cache.addMapListener(listener, (Filter) null, true);

ノート:

Liteイベントの古い値と新しい値は、NULLにすることができます。ただし、Liteイベントをリクエストしても、イベントの生成と配信に余分な負荷がかからない場合は、古い値と新しい値が含まれることがあります。つまり、MapListenerでLiteイベントを受信するようにリクエストしても、それは単にシステムにMapListenerがイベントの古い値と新しい値を認識する必要がないことを知らせているだけになります。

問合せのリスニング

キャッシュの問合せに使用するフィルタと同じフィルタで、キャッシュからのイベントをリスニングできます。

この項には次のトピックが含まれます:

問合せのリスニングの概要

すべてのCoherenceキャッシュはどの基準による問合せもサポートしています。アプリケーションでキャッシュからのデータを問い合せる場合、その結果はID (keySet)のセットまたはID/値のペア(entrySet)のセットとしてのポイント・イン・タイム・スナップショットになります。結果セットの内容を判断するメカニズムは、フィルタリングと呼ばれ、これによりアプリケーションの開発者は即時利用可能なフィルタ(equals、less-than、like、betweenなど)の豊富なセットを使用してどのように複雑な問合せも構築でき、また独自のカスタム・フィルタ(XPathなど)を提供できるようになります。

たとえば、取引システムにおいて、特定のトレーダーが受け持つ未決済のOrderオブジェクトをすべて問い合せることができます。

NamedCache mapTrades = ...
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                              new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTrades = mapTrades.entrySet(filter);

そのトレーダーが新しいポジションを建てた、そのトレーダーがポジションを決済した、トレーダー間でポジションを付け替えた、という情報を受信するために、アプリケーションで同じフィルタを使用できます。

// receive events for all trade IDs that this trader is interested in
mapTrades.addMapListener(listener, new MapEventFilter(filter), true);

MapEventFilterは、問合せフィルタをイベント・フィルタに変換します。

MapEventFilterには、いくつもの非常に強力なオプションがあります。これにより、アプリケーション・リスナーは特に関心のあるイベントのみを受信できます。スケーラビリティとパフォーマンスの観点からさらに重要なこととして、必要なイベントのみがネットワーク上で通信されるため、それらの特定イベントを対象とするサーバーとクライアントにのみ配信されます。たとえば:

// receive all events for all trades that this trader is interested in
nMask = MapEventFilter.E_ALL;
mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events for all this trader's trades that are closed or
// re-assigned to a different trader
nMask = MapEventFilter.E_UPDATED_LEFT | MapEventFilter.E_DELETED;
mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events for all trades as they are assigned to this trader
nMask = MapEventFilter.E_INSERTED | MapEventFilter.E_UPDATED_ENTERED;
mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events only for new trades assigned to this trader
nMask = MapEventFilter.E_INSERTED;
mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

イベントのフィルタリングとキャッシュされたデータのフィルタリングの比較

問合せのFilterを作成する場合、Filterのevaluateメソッドに渡すオブジェクトはキャッシュからの値です。または、FilterEntryFilterインタフェースを実装する場合は、キャッシュからのMap.Entry全体です。MapListenerのイベントをフィルタリングするFilterを作成する場合、Filterのevaluateメソッドに渡すオブジェクトはMapEvent型です。

MapEventFilterは、問合せに使用するFilterMapListenerのイベントのフィルタリングに使用するFilterに変換します。つまり、MapEventFilterはキャッシュを問い合せるFilterから作成されますが、作成されたMapEventFilterは、問合せFilterが期待するオブジェクトに変換することによってMapEventオブジェクトを評価するフィルタになります。

統合イベントの使用

統合イベントをモニターできます。イベントは通常、キャッシュへの変更を反映します。たとえば、あるサーバーがキャッシュ内の1つのエントリを変更し、別のサーバーがキャッシュに複数のアイテムを追加し、3つ目のサーバーが同じキャッシュから1つのアイテムを削除している間に、クラスタ内の各サーバーで50個のスレッドが同じキャッシュのデータにアクセスすることがあります。すべての変更操作によりイベントが生成されますが、それをクラスタ内の任意のサーバーで受信するように選択できます。これらの操作をクライアント・アクションと呼び、そのイベントは、クライアントにディスパッチされた状態にあります(この場合の「クライアント」が実際には「サーバー」であっても)。これはCoherenceクラスタなどの、真のpeer-to-peerアーキテクチャでは自然の概念です。いずれのピアもクライアントでありサーバーでもあるので、他のピアからサービスを受け、他のピアにサービスを提供します。一般的なJava Enterpriseアプリケーションでは、ピアはアプリケーションのコンテナとして動作するアプリケーション・サーバー・インスタンスで、クライアントは直接キャッシュに対してアクセスおよび変更を行い、キャッシュからのイベントをリスニングするアプリケーションの一部です。

イベントの中には、キャッシュ内部で起動されるものもあります。多くの例がありますが、最も一般的なケースは次のとおりです。

  • キャッシュのエントリが自動失効した場合

  • キャッシュの最大サイズに達したため、エントリがキャッシュから削除された場合

  • リードスルー操作の結果としてエントリが透過的にキャッシュに追加された場合

  • リードアヘッドまたはリフレッシュアヘッド操作の結果としてキャッシュ内のエントリが透過的に更新された場合

これらは変更の各例ですが、変更とはキャッシュ内部での自然な(通常は自動的な)各操作を表します。これらのイベントを統合イベントと呼びます。

必要に応じて、アプリケーションはイベントに問い合せて、それがクライアントに起因するイベントか、または統合イベントかを区別できます。この情報は、CacheEventというMapEventのサブクラスで保持されます。前述のEventPrinterの例を使用して、統合イベントのみを出力できます。

public static class EventPrinter
        extends MultiplexingMapListener
    {
    public void onMapEvent(MapEvent evt)
        {
        if (evt instanceof CacheEvent && ((CacheEvent) evt).isSynthetic())
            {
            out(evt);
            )
        }
    }

バッキング・マップ・イベントのリスニング

キャッシュをバッキングするマップのイベント(パーティション、レプリケート、ニア、連続問合せ、リードスルー/ライトスルー、およびライトビハインド)をリスニングできます。

この項には次のトピックが含まれます:

バッキング・マップ・イベントのリスニングの概要

いくつかの応用例の中には、サービスの背後でマップをリスニングする必要がある場合があります。分散環境でのレプリケーション、パーティション、およびその他のデータ管理方式は、いずれも分散サービスです。このサービスにはさらに実際にデータを管理するための機能が必要です。それが「バッキング・マップ」と呼ばれるものです。

バッキング・マップを構成できます。特定のキャッシュのすべてのデータを、ヒープのオブジェクト形式で保持する必要がある場合、無制限で失効期限のないLocalCache(統計が不要な場合はSafeHashMap)を使用します。メモリーに保持するアイテムの数が少ない場合は、LocalCacheを使用します。データを必要に応じてデータベースから読み取る場合は、ReadWriteBackingMapを使用します(アプリケーションのDAO実装を介した読取りおよび書込み方法を指定します)。ReadWriteBackingMapに、SafeHashMapLocalCacheなどのバッキング・マップを提供してデータを保存します。

バッキング・マップの中には観測可能なものがあります。これらのバッキング・マップから取得されたイベントは、一般的にはアプリケーションに直接関係がありません。かわりに、Coherenceではそれらを(Coherenceで)実行する必要があるアクションに変換してデータを同期的に保持し正しくバックアップを行い、またアプリケーション・リスナーでのリクエストに応じてそれをクラスタ全体に配信されるクラスタ化されたイベントに変換します。たとえば、パーティション・キャッシュにバッキング・マップとしてLocalCacheがあり、ローカル・キャッシュでエントリが失効した場合、そのイベントによってエントリのすべてのバックアップ・コピーが失効します。さらに、リスナーがパーティション・キャッシュに登録されている場合、イベントがイベント・フィルタに適合すると、そのイベントはそのリスナーが登録されているサーバーのリスナーに配信されます。

応用例の中には、データが保持されているサーバー上のイベントをアプリケーションで処理する必要があるものや、それを実際にデータを管理している構造(バッキング・マップ)で行う必要があるものがあります。そのような場合、バッキング・マップが観測可能マップであれば、リスナーをバッキング・マップに構成したり、プログラムによってリスナーをバッキング・マップに追加することができます。(バッキング・マップが監視可能でない場合、WrapperObservableMapでラップすることにより監視可能になります。)

各バッキング・マップ・イベントは、1回だけディスパッチされます。ただし、1つのputから複数のバッキング・マップ・イベントが生成されることがあります。たとえば、putからのエントリを再分散する必要がある場合、分散イベント(元のノードから削除され、新しいノードに挿入されるイベント)が作成されます。この場合、1つのputに対してバッキング・マップ・リスナーが複数回コールされます。

最後に、バッキング・マップ・リスナーは常に同期されます。バッキング・マップ・リスナーは、バッキング・マップ自身の同期化モニターを保持しつつ、変更操作を行うスレッドで起動されます。内部バッキング・マップ・リスナーでは多くの場合、イベントは即座には処理されませんが、キューに配置され、後で非同期に処理されます。

分散キャッシュからの読取り可能なバッキングMapListenerイベントの生成

バッキングMapListenerイベントは、読取り可能なJavaフォーマットでレプリケート・キャッシュから返されます。ただし、分散キャッシュから返されたバッキングMapListenerイベントは、Coherenceの内部フォーマットになります。Coherence Incubator Commonプロジェクトには、分散キャッシュから読取り可能なバッキングMapListenerイベントから取得できるAbstractMultiplexingBackingMapListenerクラスが用意されています。Coherence Commonライブラリをダウンロードするには、https://coherence.java.net/を参照してください。

分散キャッシュからの読取り可能なバッキングMapListenerイベントを生成するには:

  1. AbstractMultiplexingBackingMapListenerクラスを実装します。
  2. cache-configファイルのbacking-map-scheme<listener>セクションに実装を登録します。
  3. Javaプロパティのcacheconfigで、キャッシュ・サーバー・アプリケーション・ファイルおよびクライアント・ファイルを開始します。
    -Dcoherence.cacheconfig="cache-config.xml"
    

AbstractMultiplexingBackingMapListenerクラスには、onBackingMapEventメソッドが用意されています。これをオーバーライドしてイベントが返される方法を指定できます。

次のVerboseBackingMapListenerクラスの一覧は、AbstractMultiplexingBackingMapListenerのサンプルの実装です。onBackingMapEventメソッドは、結果を標準出力に送信するように上書きされています。

import com.tangosol.net.BackingMapManagerContext;
import com.tangosol.util.MapEvent;


public class VerboseBackingMapListener extends AbstractMultiplexingBackingMapListener {

        public VerboseBackingMapListener(BackingMapManagerContext context) {
                super(context);
        }
        
        @Override
        protected void onBackingMapEvent(MapEvent mapEvent, Cause cause) {
                
                System.out.printf("Thread: %s Cause: %s Event: %s\n",
                Thread.currentThread().getName(), cause, mapEvent);
        }
}

次の例では、分散キャッシュ・スキームにおいて<listener>要素の設定を行い、VerboseBackingMapListener実装をcom.tangosol.net.BackingMapManagerContextタイプとして識別します。

<distributed-scheme>
   <scheme-name>my-dist-scheme</scheme-name>
   <service-name>DistributedCache</service-name>
      <backing-map-scheme>
         <read-write-backing-map-scheme>
            <internal-cache-scheme>
               <local-scheme>
                  <high-units>0</high-units>
                  <expiry-delay>0</expiry-delay>
               </local-scheme>
            </internal-cache-scheme>
            <cachestore-scheme>
               <class-scheme>
                  <class-name>CustomCacheStore</class-name>
                  <init-params>
                     <init-param>
                        <param-type>java.lang.String</param-type>
                        <param-value>{cache-name}</param-value>
                     </init-param>
                  </init-params>
               </class-scheme>
            </cachestore-scheme>
            <listener> 
               <class-scheme>
                  <class-name>VerboseBackingMapListener</class-name>
                     <init-params>
                        <init-param>
                           <param-type>com.tangosol.net.BackingMapManagerContext
                           </param-type>
                           <param-value>{manager-context}</param-value>
                        </init-param>
                     </init-params>
                  </class-scheme>
            </listener> 
         </read-write-backing-map-scheme>                                            
      </backing-map-scheme>
   <autostart>true</autostart>
</distributed-scheme>

同期イベント・リスナーの使用

イベントの中には、アプリケーション・リスナーがイベントを生成するキャッシュ・サービスを妨害しないように、非同期に配信されるものがあります。まれなケースですが、非同期配信では進行している処理の結果に比べてイベントの順序があいまいになる場合があります。クラスタ化システムのローカル表示が単一スレッドであるかのようにキャッシュAPI操作およびイベントの順序を保証するには、MapListenerSynchronousListenerマーカー・インタフェースを実装する必要があります。

Coherence自体で同期リスナーを使用する例にニア・キャッシュがあります。これにより、ローカルでキャッシュされたデータを、イベントを使用して無効化できます。つまり、ニア・キャッシュへのput操作では、エントリのローカル・コピーならびに分散されたプライマリおよびバックアップ・コピーは同期操作として更新されます。この更新が完了すると、リスニングしているその他すべてのニア・キャッシュに非同期イベントが送信されます。これにより、次のget操作でバック・キャッシュからエントリが取得されるように、ローカル・コピーが無効になります。