ヘッダーをスキップ
Oracle® Fusion Middleware Oracle Coherenceチュートリアル
12c (12.1.3)
E56204-01
  ドキュメント・ライブラリへ移動
ライブラリ
製品リストへ移動
製品
目次へ移動
目次

前
 
次
 

11 ライブ・イベントの使用

この演習では、エントリとEntryProcessorイベントの使用方法、およびイベント・インターセプタを使用したイベントの通知方法を学習します。この演習では、プロジェクトを作成するための指示が提供され、このプロジェクトで、イベント・インターセプタと原因イベントを作成して、ライブ・イベント・フレームワークの機能を実行します。

ライブ・イベント・フレームワークの簡単な概要も示します。この章で説明するフレームワークおよびAPIの詳細は、『Oracle Fusion Middleware Oracle Coherenceでのアプリケーションの開発』のライブ・イベントに関する項およびOracle Fusion Middleware Oracle Coherence Java APIリファレンスを参照してください。

この章には次の項が含まれます:

11.1 概要

Coherenceは、データ・グリッドで実行された操作にアプリケーションで対応できるイベント・フレームワークを提供します。このフレームワークは、イベントがクラスタ操作の監視可能な発生を表すイベントベースのモデルを使用します。サポートされているイベントには、パーティション・サービス、キャッシュおよびアプリケーション・イベントがあります。これらのイベントは、プログラムで実行するか、またはキャッシュ構成を使用してイベント・インターセプタ(EventInterceptorを実装するクラス)を登録することで処理できます。

11.1.1 イベント・インターセプタについて

アプリケーションは、イベント・インターセプタ(EventInterceptor)を登録することによってライブ・イベントに対応できます。インターセプタは、どのイベントを受信するか、どのアクションを実行するのか(実行するアクションがある場合)を明示的に定義します。特定のキャッシュに対して、または特定のパーティション・サービスによって管理されるすべてのキャッシュに対して、任意の数のイベント・インターセプタを作成し、登録できます。同じイベント・タイプに対して登録されている複数のインターセプタは、自動的に連鎖され、1つのイベントのコンテキストで実行されます。

イベント・インターセプタは、EventInterceptorインタフェースを実装することによって作成されます。インタフェースは汎用を使用して定義され、インタフェースを使用すると、イベントの汎用タイプをタイプ・パラメータとして指定して、対象となるイベントをフィルタ処理できます。継承されたonEventメソッドは、イベントの受信時に必要な処理を実行する機能を提供します。EventInterceptor APIの詳細は、Oracle Fusion Middleware Oracle Coherence Java APIリファレンスを参照してください。

@Interceptor注釈は、イベントを特定のイベント・タイプに制限するために使用され、インターセプタの詳細構成も提供します。@Interceptor注釈には次の属性が含まれます。

  • identifier—インターセプタの一意の識別子を指定します。この値は、キャッシュ構成ファイルでインターセプタ・クラスを登録するときにオーバーライドできます。

  • entryEvents—インターセプタがサブスクライブするエントリ・イベント・タイプの配列を指定します。

  • entryProcessorEvents—インターセプタがサブスクライブするエントリ・プロセッサ・イベント・タイプの配列を指定します。

  • transferEvents—インターセプタがサブスクライブする転送イベント・タイプの配列を指定します。

  • transactionEvents—インターセプタがサブスクライブするトランザクション・イベント・タイプの配列を指定します。

  • order—このインターセプタを、インターセプタの連鎖の前に配置するかどうかを指定します。有効な値はHIGHLOWです。HIGHの値は、そのインターセプタを、インターセプタの連鎖の前に配置することを示します。LOWの値は、順序付けのプリファレンスがないことを示します。デフォルト値はLOWです。この値は、キャッシュ構成ファイルでインターセプタ・クラスを登録するときにオーバーライドできます。

@Interceptor注釈の詳細は、『Oracle Fusion Middleware Oracle Coherenceでのアプリケーションの開発』のイベント・インターセプタの作成に関する項を参照してください。

11.1.2 キャッシュ・イベントについて

キャッシュ・イベントは、キャッシュ内の1つ以上のエントリに対して実行された一部の操作によって発生します。キャッシュ・イベントには、エントリ・イベントおよびエントリ・プロセッサ・イベントがあります。エントリ・イベント(EntryEvent)は、キャッシュ内のエントリに対して実行されるいくつかの操作(挿入、更新および削除)のいずれかを表すことができます。エントリ・イベントは、エントリに対する変更を許可するために操作の実行前に発生するコミット前イベント(INSERTINGUPDATINGおよびREMOVING)と、操作の完了後にイベントの発生順序と同じ順序で発生するコミット後イベント(INSERTEDUPDATEDおよびREMOVED)に分けることができます。

エントリ・プロセッサ(EntryProcessor)イベントは、キャッシュ内の一連のエントリに対するEntryProcessorの実行を表します。エントリ・プロセッサ・イベントは、エントリ・プロセッサ実装に対する変更を許可するためにエントリ・プロセッサの実行前に発生するコミット前イベント(EXECUTING)と、エントリ・プロセッサの実行後にイベントの発生順序と同じ順序で発生するコミット後イベント(EXECUTED)に分けることができます。

11.1.3 パーティション・サービス・イベントについて

パーティション・サービス(PartitionedService)イベントは、記憶域が有効なメンバー間のパーティション転送を表す転送イベントと、トランザクション・イベントで構成されます。転送イベントは、転送対象のパーティションのコンテキストでディスパッチされますが、パーティションに属するコンテンツは変更不可です。

11.1.4 イベント・インターセプタの登録について

イベント・インターセプタは、キャッシュ構成ファイルまたはプログラムのいずれかで登録します。イベント・インターセプタは、1つ以上のキャッシュまたは特定のパーティション・サービスのいずれかに対して登録されます。特定のキャッシュに対して登録されたイベント・インターセプタは、そのキャッシュに関連するイベントのみを受信します。特定のパーティション・サービスに対して登録されたイベント・インターセプタは、そのサービスによって管理されるキャッシュすべてのイベントを受信します。

キャッシュ構成ファイルで、イベント・インターセプタの完全クラス名は<interceptor>要素で指定され、この要素は<cache-mapping>スタンザの<cache-name>の下に表示されます。インターセプタは、<cache-name>要素で指定されるキャッシュと関連付けられます。イベント・インターセプタは、<caching-schemes>スタンザのパーティション・サービスに対しても登録できます。これを実行するには、任意の数の<interceptor>サブ要素を含む<interceptors>要素を<distributed-scheme>要素内に含めます。

キャッシュ構成ファイルを使用するかわりに、イベント・インターセプタをプログラムで登録できます。イベント・インターセプタを登録するためのキー・クラスおよびメソッドは、ConfigurableCacheFactoryインタフェースのgetInterceptorRegistryメソッド、およびInterceptorRegistryインタフェースのgetEventInterceptorメソッドとregisterEventInterceptorメソッドです。たとえば、次のコードではTimedTraceInterceptorが登録されますが、これは、この章で後述するEventInterceptorです。

ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
InterceptorRegistry reg = ccf.getInterceptorRegistry();
 
reg.registerEventInterceptor(new TimedTraceInterceptor(), RegistrationBehavior.FAIL);

プログラムによるイベント・インターセプタの登録の詳細説明および例は、このドキュメントの対象範囲ではありません。詳細は、『Oracle Fusion Middleware Oracle Coherenceでのアプリケーションの開発』のライブ・イベントの使用に関する項およびOracle Fusion Middleware Oracle Coherence Java APIリファレンスを参照してください。

11.2 イベント・インターセプタの作成、登録および実行

次の各項では、イベント・インターセプタを作成、登録および実行する方法を説明します。この演習では、コミット前イベントとコミット後イベントの間のタイミングを測定するイベント・インターセプタを使用します。

この演習を完了するには:

  1. コミット前イベントとコミット後イベントの間の時間を測定するイベント・インターセプタの作成

  2. イベントの処理を遅延するクラスの作成

  3. イベント時間測定イベント・インターセプタの登録

  4. 遅延プロセッサ・クラスのPOF構成ファイルの作成

  5. イベント時間測定イベント・インターセプタを実行するクラスの作成

  6. イベント時間測定サンプルのドライバ・ファイルの作成

  7. キャッシュ・サーバー起動の構成の作成

  8. イベント時間測定ドライバの起動構成の作成

  9. イベント時間測定サンプルの実行

11.2.1 コミット前イベントとコミット後イベントの間の時間を測定するイベント・インターセプタの作成

イベントの各タイプに対して、コミット前イベントとコミット後イベント(INSERTING/INSERTEDUPDATING/UPDATEDおよびREMOVING/REMOVED)のタイミングを測定する、TimedTraceInterceptorというイベント・インターセプタを作成します。

TimedTraceInterceptorイベント・インターセプタ・クラスを作成するには:

  1. Eclipseで、UEMEventsという新しいアプリケーション・クライアント・プロジェクトを作成します。開始ページの「Configuration」フィールドでCoherenceConfigが選択されていることと、「Application Client module」ページで「Create a default main」選択されていないことを確認します。

    詳細は、「Eclipse IDEでの新規プロジェクトの作成」を参照してください。

  2. TimedTraceInterceptorという新しいJavaクラスを作成します。「Default Package」com.oracle.handsonであることを確認します。「Main Method」チェック・ボックスは選択しないでください。

    詳細は、「Javaクラスの作成」を参照してください。

  3. EventInterceptorインタフェースを実装するイベント・インターセプタを作成します。インターセプタは、イベントの各タイプ(挿入、更新、削除およびエントリ・プロセッサ)に対するコミット前イベントとコミット後イベントの間のタイミングを提供する必要があります。独自のインターセプタを作成するか、または例11-1で提供されているコードを使用できます。

例11-1は、イベント・インターセプタTimedTraceInterceptorを示しています。インターセプタは、EventInterceptorインタフェースを実装します。@Interceptor注釈は、identifier属性でインターセプタの一意名を示し、order属性で実行順序(Order.HIGH)を示します。

インターセプタには、保護されたEventTimer内部クラスも含まれています。このクラスでは、通知対象の各イベントの経過時間が測定されます。インターセプタは、各エントリに対するコミット前イベントとコミット後イベントの間の時間、およびそれぞれのイベント・タイプ(INSERTUPDATEREMOVE)を追跡します。タイミングは、サンプル統計と累積統計を表示するCoherenceログにまとめて送信されます。

汎用引数がcom.tangosol.net.events.partition.cache.Eventであるため、ユーザーは、フィルタリングを指定しなくても、そのイベントのコンシューマであるイベント(EntryEventおよびEntryProcessorEvent)のみを取得します。

例11-1 コミット前イベントとコミット後イベントの間のタイミングを提供するクラス

package com.oracle.handson;
 
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
 
import com.tangosol.net.events.EventInterceptor;
import com.tangosol.net.events.annotation.Interceptor;
import com.tangosol.net.events.annotation.Interceptor.Order;
import com.tangosol.net.events.partition.cache.EntryEvent;
import com.tangosol.net.events.partition.cache.EntryEvent.Type;
import com.tangosol.net.events.partition.cache.EntryProcessorEvent;
import com.tangosol.net.events.partition.cache.Event;
 
import com.tangosol.util.Binary;
import com.tangosol.util.BinaryEntry;
 
import java.util.HashMap;
import java.util.Map;
 
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
 
/**
 * TimedTraceInterceptor provides timings between pre- and post-commit events
 * for each type of event i.e. inserts, updates, removes, and entry processor
 * execution.
 * <p>
 * These timings are collected and averaged at a sample rate defined by
 * parameter <tt>cSample</tt>. Additionally they are output to the log
 * at the same time. This implementation does maintain a strong reference to
 * the each binary key however this is removed upon receiving the post-commit event
 * for the same key.
 *
 * @since Coherence 12.1.2
 */
@Interceptor(identifier = "trace", order = Order.HIGH)
public class TimedTraceInterceptor
        implements EventInterceptor<Event<? extends Enum<?>>>
    {
 
    // ----- constructors ---------------------------------------------------
 
    /**
     * Default no-arg constructor.
     */
    public TimedTraceInterceptor()
        {
        this(DEFAULT_SAMPLE_RATE);
        }
 
    /**
     * Construct an TimedTraceInterceptor with specified {@link Event}
     * types and chain position.
     *
     * @param cSample   sample size to calculate mean and output statistics
     */
    public TimedTraceInterceptor(int cSample)
        {
        Map<Enum, EventTimer> mapTimedEvents = m_mapTimedEvents = new HashMap<Enum, EventTimer>(3);
        EventTimer insertTimer = new EventTimer(Type.INSERTED, cSample);
        EventTimer updateTimer = new EventTimer(Type.UPDATED, cSample);
        EventTimer removeTimer = new EventTimer(Type.REMOVED, cSample);
        EventTimer invocationTimer = new EventTimer(EntryProcessorEvent.Type.EXECUTED, cSample);
 
        mapTimedEvents.put(Type.INSERTED,  insertTimer);
        mapTimedEvents.put(Type.INSERTING, insertTimer);
        mapTimedEvents.put(Type.UPDATED,   updateTimer);
        mapTimedEvents.put(Type.UPDATING,  updateTimer);
        mapTimedEvents.put(Type.REMOVED,   removeTimer);
        mapTimedEvents.put(Type.REMOVING,  removeTimer);
        mapTimedEvents.put(EntryProcessorEvent.Type.EXECUTED,  invocationTimer);
        mapTimedEvents.put(EntryProcessorEvent.Type.EXECUTING, invocationTimer);
        }
 
    // ----- EventInterceptor methods ---------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void onEvent(Event event)
        {
        if (event instanceof EntryEvent)
            {
            process((EntryEvent) event);
            }
        else if (event instanceof EntryProcessorEvent)
            {
            process((EntryProcessorEvent) event);
            }
        }
 
    /**
     * This method will be invoked upon execution of an entry processor and
     * will time its execution from prior to post execution, including any
     * backup requests that need to be made as a result.
     *
     * @param event  the {@link EntryProcessorEvent} that encompasses the
     *               requested event
     */
    protected void process(EntryProcessorEvent event)
        {
        EventTimer mapTimedEvents = m_mapTimedEvents.get(event.getType());
 
        for (BinaryEntry binEntry : event.getEntrySet())
            {
            if (event.getType() == EntryProcessorEvent.Type.EXECUTING)
                {
                mapTimedEvents.starting(binEntry);
                }
            else if (event.getType() == EntryProcessorEvent.Type.EXECUTED)
                {
                mapTimedEvents.started(binEntry);
                }
            }
        }
 
    /**
     * This method will be invoked upon execution of a data mutating request
     * and will time its execution from prior to post execution, including
     * any backup requests that need to be made as a result.
     *
     * @param event  the {@link EntryEvent} that encompasses the
     *               requested event
     */
    protected void process(EntryEvent event)
        {
        EventTimer mapTimedEvents = m_mapTimedEvents.get(event.getType());
 
        switch ((Type) event.getType())
            {
            case INSERTING:
            case UPDATING:
            case REMOVING:
                for (BinaryEntry binEntry : event.getEntrySet())
                    {
                    mapTimedEvents.starting(binEntry);
                    }
                break;
            case INSERTED:
            case UPDATED:
            case REMOVED:
                for (BinaryEntry binEntry : event.getEntrySet())
                    {
                    mapTimedEvents.started(binEntry);
                    }
                break;
            }
        }
 
    // ----- inner class: EventTimer ----------------------------------------
 
    /**
     * The EventTimer times the elapsed time for each event it is notified
     * of. It correlates the completion event based on equality comparisons
     * of the Binary provided. Additionally it calculates the mean based on a
     * sample set of <tt>cSample</tt> size. When reaching this sample set
     * a log will be made of the current sample set mean and the cumulative
     * mean.
     */
    protected class EventTimer
        {
 
        // ----- constructors -----------------------------------------------
 
        /**
         * Construct an EventTimer with the event type provided.
         *
         * @param eventType  the type of event this timer will be timing
         */
        protected EventTimer(Enum eventType, int cSample)
            {
            m_eventType = eventType;
            m_cSampleSize = cSample;
            }
 
        /**
         * Notifies the timer of the execution of the provided key will
         * imminently commence.
         *
         * @param binEntry  the event will commence for this <tt>binEntry</tt>
         */
        public void starting(BinaryEntry binEntry)
            {
            m_mapElapsedTimes.put(binEntry.getBinaryKey(), System.nanoTime());
            }
 
        /**
         * Notifies the timer of the completion of the event for the provided
         * key.
         *
         * @param binEntry  the event has completed for this <tt>binEntry</tt>
         */
        public void started(BinaryEntry binEntry)
            {
            Long lStart = m_mapElapsedTimes.remove(binEntry.getBinaryKey());
            if (lStart == null)
                {
                return;
                }
 
            add(System.nanoTime() - lStart);
            }
 
        /**
         * Regardless of the specific data item add the elapsed time taken to
         * process the data item. Upon reaching the sample set size of events
         * calculate the mean, reset timings and continue.
         *
         * @param lElapsed  the number of nanos taken for a data item to
         *                  process
         */
        protected void add(long lElapsed)
            {
            AtomicInteger nEvents           = m_nEvents;
            AtomicLong    lTotalElapsed     = m_lTotalElapsed;
            int           nCurrEvents       = nEvents.incrementAndGet();
            long          lCurrTotalElapsed = lTotalElapsed.addAndGet(lElapsed);
 
            if (nCurrEvents % m_cSampleSize == 0)
                {
                nEvents.set(0);
                lTotalElapsed.set(0L);
                ++m_cSamples;
 
                long lMean   = lCurrTotalElapsed / nCurrEvents;
                     m_lMean = m_lMean == 0 ? lMean : lMean + m_lMean / 2;
 
                String sStats = String.format("EventStats[name = %s, sampleMean = %fms, mean = %fms]",
                        m_eventType, (double) lMean / 1000000, (double) m_lMean / 1000000);
 
                CacheFactory.log(sStats, CacheFactory.LOG_INFO);
 
                NamedCache cacheResults = CacheFactory.getCache("events-results");
                int        nMemberId    = CacheFactory.getCluster().getLocalMember().getId();
 
                cacheResults.put(
                        String.format("%d-%s-%d", nMemberId, m_eventType.name(), m_cSamples),
                        sStats);
                }
            }
 
        // ----- data members -----------------------------------------------
 
        /**
         * Sample size to calculate mean and output statistics.
         */
        private int               m_cSampleSize;
 
        /**
         * The start times for a number of Binary keys.
         */
        private Map<Binary, Long> m_mapElapsedTimes = new ConcurrentHashMap<Binary, Long>();
 
        /**
         * A counter of the total elapsed time.
         */
        private AtomicLong        m_lTotalElapsed = new AtomicLong();
 
        /**
         * A counter of the number of events processed
         */
        private AtomicInteger     m_nEvents = new AtomicInteger();
 
        /**
         * An average over time.
         */
        private long              m_lMean;
 
        /**
         * The number of samples taken.
         */
        private int               m_cSamples;
 
        /**
         * The type of event being timed.
         */
        private Enum              m_eventType;
        }
 
    // ----- constants ------------------------------------------------------
 
    /**
     * The sample size for elapsed times.
     */
    protected static final int DEFAULT_SAMPLE_RATE = 100;
 
    // ----- data members ---------------------------------------------------
 
    /**
     * A map of event types to their timers.
     */
    private Map<Enum, EventTimer> m_mapTimedEvents;
    }

11.2.2 イベントの処理を遅延するクラスの作成

LazyProcessorというクラスを作成して、イベントの処理間の実体のない遅延を作成します。クラスでは、このプロセッサがイベントの処理間でスリープする必要があるミリ秒数を指定できる必要があります。このクラスは、EventsExamplesクラスのEventsTimingExampleサブクラスで使用されます。EventsExamplesクラスは後の手順で作成します。

LazyProcessorクラスが生成するデータはワイヤ経由で送信されるため、クラスでPOF (Portable Object Format)を使用する必要があります。POF構成ファイルへのLazyProcessorクラスの追加は後の手順で実行します。

LazyProcessorクラスを作成するには:

  1. LazyProcessorという新しいJavaクラスを作成します。mainメソッドは含めないでください。

    詳細は、「Javaクラスの作成」を参照してください。

  2. LazyProcessorクラスのコードを作成します。このクラスではデータのシリアライズにPortableObjectインタフェースを使用するため、LazyProcessorクラスはPortableObjectインタフェースを実装する必要があります。クラスは、AbstractProcessorクラスを拡張する必要もあります。BaseInvocableMap.EntryAbstractProcessorPortableObjectPofReaderおよびPofWriterのクラスとインタフェースをインポートします。LazyProcessorクラスの独自のコードを作成するか、または例11-2で提供されているコードを使用できます。

例11-2 イベントの処理を遅延するクラス

package com.oracle.handson;
 
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.Base;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
 
import java.io.IOException;
 
/**
 * LazyProcessor will sleep for a specified period of time.
 *
 *
 * @since 12.1.2
 */
public class LazyProcessor
        extends AbstractProcessor
        implements PortableObject
    {
    // ----- constructors ---------------------------------------------------
 
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
 
    /**
     * Default no-arg constructor.
     */
    public LazyProcessor()
        {
        }
 
    /**
     * Constructs a LazyProcessor with a specified time to sleep.
     *
     * @param lSleepTime  the number of milliseconds this processor should
     *                    sleep
     */
    public LazyProcessor(long lSleepTime)
        {
        m_lSleepTime = lSleepTime;
        }
 
    /**
     * {@inheritDoc}
     */
    public Object process(Entry entry)
        {
        try
            {
            Thread.sleep(m_lSleepTime);
            }
        catch (InterruptedException e)
            {
            throw Base.ensureRuntimeException(e);
            }
        return null;
        }
 
    // ----- PortableObject members -----------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void readExternal(PofReader in) throws IOException
        {
        m_lSleepTime = in.readLong(0);
        }
 
    /**
     * {@inheritDoc}
     */
    public void writeExternal(PofWriter out) throws IOException
        {
        out.writeLong(0, m_lSleepTime);
        }
 
    // ----- data members ---------------------------------------------------
 
    /**
     * The number of milliseconds this processor should sleep.
     */
    private long m_lSleepTime;
    }

11.2.3 イベント時間測定イベント・インターセプタの登録

UEMEventsプロジェクトで、インターセプタはキャッシュ構成ファイルに登録されます。イベント・インターセプタの完全修飾名はinterceptor要素で指定され、この要素は<cache-mapping>スタンザの<cache-name>の下に表示されます。インターセプタは、<cache-name>要素で指定されるキャッシュと関連付けられます。この例で、TimedTraceInterceptoreventsキャッシュ上のイベント・インターセプタです。

イベント・インターセプタを定義するキャッシュ構成ファイルを作成するには:

  1. 「Project Explorer」ウィンドウから、coherence-cache-config.xmlファイルを開きます。ファイルはEvents/appClientModuleの下にあります。

  2. ファイルをuem-cache-config.xmlという名前で保存します。

  3. イベント・インターセプタをコールするキャッシュ構成を作成します。次のリストは、主な要素の一部を取り上げています。

    • <cache-mapping>要素の下に、<interceptor>要素のcom.oracle.handson.TimedTraceInterceptorクラスから<cache-name>要素のeventsキャッシュへの参照があります。eventsキャッシュではevents-distributed-schemeが使用されます。このスキームでは、<distributed-schemes>の下にリストされているPartitionedPofCacheサービスが使用されます。

    • events-resultsキャッシュとdist-events-resultsスキーム間のマッピングがあります。<distributed-scheme>セクションに、dist-events-resultsスキームとPartitionedEventsResultsサービスの間のアソシエーションがあります。

例11-3は、考えられるuem-cache-config.xmlファイルの実装を示しています。

例11-3 TimedTraceInterceptorを登録するキャッシュ構成ファイル

<?xml version="1.0"?>
 
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
  <defaults>
    <serializer>pof</serializer>
  </defaults>
 
  <caching-scheme-mapping>
    <cache-mapping>
        <cache-name>events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.TimedTraceInterceptor</class-name>
                    <init-params>
                        <init-param>
                            <param-type>int</param-type>
                            <param-value>100</param-value>
                        </init-param>
                    </init-params>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>events-results</cache-name>
        <scheme-name>dist-events-results</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>
 
  <caching-schemes>
    <distributed-scheme>
      <scheme-name>events-distributed-scheme</scheme-name>
      <service-name>PartitionedPofCache</service-name>
      <thread-count>5</thread-count>
      <backing-map-scheme>
        <local-scheme>
          <!-- each node will be limited to 32MB -->
          <high-units>32M</high-units>
          <unit-calculator>binary</unit-calculator>
        </local-scheme>
      </backing-map-scheme>
      <autostart>true</autostart>
    </distributed-scheme>
 
    <!-- A PartitionedCache service used to store results for events examples
      -->
    <distributed-scheme>
        <scheme-name>dist-events-results</scheme-name>
        <service-name>PartitionedEventsResults</service-name>
        <thread-count>5</thread-count>
        <backing-map-scheme>
            <local-scheme/>
        </backing-map-scheme>
        <autostart>true</autostart>
    </distributed-scheme>
 
    <!--
    Invocation Service scheme.
    -->
    <invocation-scheme>
      <scheme-name>examples-invocation</scheme-name>
      <service-name>InvocationService</service-name>
 
      <autostart system-property="tangosol.coherence.invocation.autostart">true</autostart>
    </invocation-scheme>
 
 </caching-schemes>
</cache-config>

11.2.4 遅延プロセッサ・クラスのPOF構成ファイルの作成

TimedTraceInterceptorクラスによって生成される情報はすべて、そのローカル・メンバー専用です。LazyProcessorとその状態は、記憶域が有効なメンバーに送信されて実行されるため、POF構成ファイルに追加される必要があります。Eclipseの「Project Explorer」で、pof-config.xmlファイルを右クリックし、名前をuem-pof-config.xmlに変更します。エディタでuem-pof-config.xmlファイルを開き、その内容を例11-4のコードに置換します。例は、このクラスを含むPOF構成ファイルを示しています。

例11-4 LazyProcessorクラスのPOF構成ファイル

<?xml version="1.0"?>
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
    xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config http://xmlns.oracle.com/coherence/coherence-pof-config/1.2/coherence-pof-config.xsd">
   <user-type-list>
   <!-- include all "standard" Coherence POF user types -->
   <include>coherence-pof-config.xml</include>
    <user-type>
      <type-id>1008</type-id>
      <class-name>com.oracle.handson.LazyProcessor</class-name>
    </user-type>
   </user-type-list>
</pof-config>

11.2.5 イベント時間測定イベント・インターセプタを実行するクラスの作成

TimedTraceInterceptorクラスによって実行されるアクションをトリガーするEventsExamplesというクラスを作成します。クラスは、結果キャッシュに挿入される、コミット前イベントとコミット後イベントの間の経過時間を測定できる方法を示す必要があります。結果キャッシュに挿入されたエントリは、このクラスを実行するプロセスによって標準出力に送信されます。

EventsExamplesクラスを作成するには:

  1. EventsExamplesという新しいJavaクラスを作成します。mainメソッドは含めないでください。

    詳細は、「Javaクラスの作成」を参照してください。

  2. EventsExamplesクラスのコードを作成します。LazyProcessorCacheFactoryNamedCachePartitionedServiceMapEventMapListenerMultiplexListenerおよびCallableのクラスとインタフェースをインポートします。独自のコードを作成するか、または例11-5で提供されているコードを使用できます。

例11-5は、EventsExamplesクラスのサンプル実装を示しています。例には、EventsTimingExample内部クラスが含まれています。この内部クラスは、eventsおよびevent-resultsキャッシュにアクセスし、TimedTraceInterceptorクラスのログから、キャッシュ・クラスタ・メンバーの数、トリガーしているイベントの名前およびサンプル・サイズを取得します。処理対象のサンプルの時間、および処理対象の全サンプルの平均時間の計算が、TimedTraceInterceptorクラスによって提供されます。

EventsTimingExampleサブクラスでは、イベント処理の合計時間、およびイベントが実行できるスレッド数の値が提供されます。また、イベント処理の間の時間(スリープ時間)を計算するLazyProcessorクラスもコールされます。

例11-5 TimedTraceInterceptorイベント・インターセプタを実行するクラス

package com.oracle.handson;
 
import com.oracle.handson.LazyProcessor;
 
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.MultiplexingMapListener;
 
import java.util.concurrent.Callable;
 
/**
 * EventsExamples illustrates various features within the Live Events
 * Model. This includes providing mean elapsed times split by event type.
 *
 * @since Coherence 12.1.2
 */
@SuppressWarnings("unchecked")
public class EventsExamples
    {
 
    // ----- inner-class: EventsTimingExample -------------------------------
 
    /**
     * The EventsTimingExample is a catalyst for action to be performed by
     * {@link TimedTraceInterceptor}. This illustrates how the elapsed time
     * between pre- and post-commit events can be measured which are inserted 
     * into a results cache. The entries inserted into the results cache are
     * displayed via the stdout of the process executing this class.
     */
    public static class EventsTimingExample
            implements Callable<Boolean>
        {
        // ----- Callable methods -------------------------------------------
 
        /**
         * {@inheritDoc}
         */
        public Boolean call() throws Exception
            {
            NamedCache cacheEvents  = CacheFactory.getCache("events");
            NamedCache cacheResults = CacheFactory.getCache("events-results");
            int        cFrequency   = ((PartitionedService) cacheEvents.getCacheService()).getOwnershipEnabledMembers().size();
            int        cSet         = 110;
 
            MapListener ml = new MultiplexingMapListener()
                {
                @Override
                protected void onMapEvent(MapEvent evt)
                    {
                    String[] asKey = ((String) evt.getKey()).split("-");
 
                    System.out.printf("Received stats [memberId=%s, eventType=%s, sample=%s] = %s\n",
                            asKey[0], asKey[1], asKey[2], evt.getNewValue());
                    }
                };
 
            try
                {
                cacheResults.addMapListener(ml);
 
                // execute inserts and updates
                for (int i = cFrequency; i > 0; --i)
                    {
                    for (int j = 1, cMax = cSet * cFrequency; j <= cMax; ++j)
                        {
                        cacheEvents.put(j, "value " + j);
                        }
                    }
 
                // execute processors
                int nTotalTime = 3000;
                int cThreads   = 5;
                int nSleepTime = nTotalTime / (cThreads * cSet * cFrequency);
                for (int i = 1, cMax = cSet * cFrequency; i <= cMax; ++i)
                    {
                    cacheEvents.invoke(i, new LazyProcessor(nSleepTime));
                    }
                }
            finally
                {
                cacheEvents.clear();
                cacheResults.removeMapListener(ml);
                cacheResults.clear();
                }
            return true;
            }
        }
    }

11.2.6 イベント時間測定サンプルのドライバ・ファイルの作成

EventsExamplesクラスで定義したEventsTimingExampleサンプルを実行するドライバ・ファイルを作成します。

ドライバ・ファイルを作成するには:

  1. UEMEventsプロジェクト内にDriverという新しいJavaクラスを作成します。mainメソッドを含むようにしてください。

    詳細は、「Javaクラスの作成」を参照してください。

  2. EventsExamplesクラスで定義したEventsTimingExampleサンプルを実行するコードを作成します。独自のドライバを作成するか、または例11-6で提供されているコードを使用できます。

例11-6 イベント時間測定サンプルのドライバ・ファイル

package com.oracle.handson;
 
import com.oracle.handson.EventsExamples.EventsTimingExample;
 
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Callable;
 
/**
 * Driver executes all the Coherence Events examples.
 * <p>
 * <strong>Timed Events Example</strong> - In this example we time
 * the elapsed time between pre- and post-commit events for each of the events
 * that occur.
 *
 * @since Coherence 12.1.2
 */
public class Driver
    {
 
    // ----- static methods -------------------------------------------------
 
    /**
    * Execute Events examples.
    *
    * @param asArg  command line arguments
    */
    public static void main(String[] asArg)
        {
        System.out.println("------ events examples begin ------");
 
        // Run examples
        for (Map.Entry<String, Callable<Boolean>> example : EVENTS_EXAMPLES.entrySet())
            {
            String sExample = example.getKey();
 
            try
                {
                System.out.printf("------   %s begin\n\n", sExample);
                boolean fSuccess = example.getValue().call();
                System.out.printf("\n------   %s completed %ssuccessfully\n", sExample, fSuccess ? "" : "un");
                }
            catch(Exception e)
                {
                System.err.printf("----------%s completed unsuccessfully with the following exception:\n", sExample);
                e.printStackTrace();
                }
            }
 
        System.out.println("------ events examples completed------");
        }
 
    // ----- constants ------------------------------------------------------
 
    /**
     * All the examples to be executed in insertion order.
     */
    protected static final Map<String, Callable<Boolean>> EVENTS_EXAMPLES = new LinkedHashMap<String, Callable<Boolean>>();
 
    /**
     * Default examples.
     */
    static
        {
        EVENTS_EXAMPLES.put("timing interceptor", new EventsTimingExample());
        }
    }

11.2.7 キャッシュ・サーバー起動の構成の作成

UEMEventsプロジェクト用キャッシュ・サーバーを起動するための構成を作成します。

  1. プロジェクトを右クリックして、「Run As」「Run Configurations」を選択します。「Run Configurations」ダイアログ・ボックスで「Oracle Coherence」アイコンをダブルクリックして、新しい起動構成を作成します。

  2. 「Main」タブで、「Name」フィールドにUEMEventsServerと入力します。「Project」フィールドの「Browse」をクリックし、「Project Selection」ダイアログ・ボックスでUEMEventsプロジェクトを選択します。「Include system libraries when searching for a main class」チェック・ボックスを選択し、「Search」をクリックします。「Select Type」フィールドにDefaultCacheServerと入力して、com.tangosol.net.DefaultCacheServerを選択します。「Apply」をクリックします。「Main」タブは図11-1のようになります。

    図11-1 イベント・サーバー起動構成の「Main」タブ

    図11-1の説明が続きます
    「図11-1 イベント・サーバー起動構成の「Main」タブ」の説明

  3. 「Coherence」タブの「General」タブで、「Cache configuration descriptor」の下にあるキャッシュ構成ファイルへのパスを指定します。「Browse」ボタンをクリックして、キャッシュ構成ファイルの絶対ファイル・パスC:\home\oracle\workspace\UEMEvents\appClientModule\uem-cache-config.xmlに移動します。「Local storage」の下の「Enabled (cache server)」を選択します。「Cluster port」には3155などの一意の値を入力します。

    「Other」タブで、tangosol.pof.configの項目をuem-pof-config.xmlに設定します。

  4. 「Common」タブで、「Shared file」を選択して\UEMEventsプロジェクトを参照します。

  5. 「Classpath」タブは例11-2のようになります。UEMEventsプロジェクトが「User Entries」の下に表示されます。JRE System LibraryCoherence12.1.3ライブラリが「Bootstrap Entries」セクションに表示されます。

    図11-2 イベント・サーバー起動構成の「Classpath」タブ

    図11-2の説明が続きます
    「図11-2 イベント・サーバー起動構成の「Classpath」タブ」の説明

11.2.8 イベント時間測定ドライバの起動構成の作成

UEMEventsプロジェクト用クライアント・ドライバを起動するための構成を作成します。

  1. プロジェクトを右クリックして、「Run As」「Run Configurations」を選択します。「Run Configurations」ダイアログ・ボックスで「Oracle Coherence」アイコンをダブルクリックして、新しい起動構成を作成します。

  2. 「Main」タブで、「Name」フィールドにUEMEventDriverと入力します。「Project」フィールドの「Browse」をクリックし、「Project Selection」ダイアログ・ボックスでUEMEventsプロジェクトを選択します。「Include system libraries when searching for a main class」チェック・ボックスを選択し、「Search」をクリックします。「Select Type」フィールドにDriverと入力し、com.oracle.handson.Driverを選択します。「Apply」をクリックします。「Main」タブは図11-0のようになります。

    図11-3 イベント・クライアント起動構成の「Main」タブ

    図11-3の説明が続きます
    「図11-3 イベント・クライアント起動構成の「Main」タブ」の説明

  3. 「Coherence」タブの「General」タブで、「Cache configuration descriptor」の下にあるキャッシュ構成ファイルへのパスを指定します。「Browse」ボタンをクリックして、キャッシュ構成ファイルの絶対ファイル・パスC:\home\oracle\workspace\UEMEvents\appClientModule\uem-cache-config.xmlに移動します。「Local storage」「Disabled (cache client)」を選択します。「Cluster port」には3155などの一意の値を入力します。

    「Other」タブで、tangosol.pof.configの項目をuem-pof-config.xmlに設定します。

  4. 「Common」タブで、「Shared file」を選択して\UEMEventsプロジェクトを参照します。

  5. 「Classpath」タブは例11-4のようになります。UEMEventsプロジェクトが「User Entries」の下に表示されます。JRE System LibraryCoherence12.1.3ライブラリが「Bootstrap Entries」セクションに表示されます。

    図11-4 イベント・クライアント起動構成の「Classpath」タブ

    図11-4の説明が続きます
    「図11-4 イベント・クライアント起動構成の「Classpath」タブ」の説明

11.2.9 イベント時間測定サンプルの実行

「Project Explorer」でUEMEventsプロジェクトを右クリックして、「Run As」「Run Configurations」を選択します。「Run Configurations」ダイアログ・ボックスで、UEMEventsServer起動構成を選択し、「Run」をクリックしてキャッシュ・サーバーを起動します。キャッシュ・サーバーの起動後、UEMEventsServerを選択し、「Run」をもう一度、さらにもう一度クリックして、合計で3つのキャッシュ・サーバーを起動します。

3番目のキャッシュ・サーバーの起動後、UEMEventDriver構成を選択し、「Run」をクリックします。キャッシュ・クライアントからの出力は例11-7のようになります。

例11-7 キャッシュ・クライアントからの出力

------ events examples begin ------
------   timing interceptor begin
2014-01-02 15:43:20.284/0.340 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational configuration from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence.xml"
2014-01-02 15:43:20.384/0.440 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational overrides from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence-override-dev.xml"
2014-01-02 15:43:20.464/0.520 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational overrides from "file:/C:/home/oracle/workspace/UEMEvents/build/classes/tangosol-coherence-override.xml"
2014-01-02 15:43:20.474/0.530 Oracle Coherence 12.1.3.0.0 <D5> (thread=main, member=n/a): Optional configuration override "cache-factory-config.xml" is not specified
2014-01-02 15:43:20.474/0.530 Oracle Coherence 12.1.3.0.0 <D5> (thread=main, member=n/a): Optional configuration override "cache-factory-builder-config.xml" is not specified
2014-01-02 15:43:20.474/0.530 Oracle Coherence 12.1.3.0.0 <D5> (thread=main, member=n/a): Optional configuration override "/custom-mbeans.xml" is not specified
...
... 
2014-01-02 15:43:24.650/4.706 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedPofCache, member=5): Service PartitionedPofCache joined the cluster with senior service member 1
2014-01-02 15:43:24.699/4.755 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedEventsResults, member=5): Service PartitionedEventsResults joined the cluster with senior service member 1
Received stats [memberId=2, eventType=INSERTED, sample=1] = EventStats[name = INSERTED, sampleMean = 0.294040ms, mean = 0.294040ms]
Received stats [memberId=3, eventType=INSERTED, sample=1] = EventStats[name = INSERTED, sampleMean = 0.397855ms, mean = 0.397855ms]
Received stats [memberId=1, eventType=INSERTED, sample=1] = EventStats[name = INSERTED, sampleMean = 0.373270ms, mean = 0.373270ms]
Received stats [memberId=3, eventType=UPDATED, sample=1] = EventStats[name = UPDATED, sampleMean = 0.187132ms, mean = 0.187132ms]
Received stats [memberId=2, eventType=UPDATED, sample=1] = EventStats[name = UPDATED, sampleMean = 0.234314ms, mean = 0.234314ms]
Received stats [memberId=1, eventType=UPDATED, sample=1] = EventStats[name = UPDATED, sampleMean = 0.237622ms, mean = 0.237622ms]
Received stats [memberId=2, eventType=UPDATED, sample=2] = EventStats[name = UPDATED, sampleMean = 1.315323ms, mean = 1.432480ms]
Received stats [memberId=3, eventType=UPDATED, sample=2] = EventStats[name = UPDATED, sampleMean = 0.417201ms, mean = 0.510767ms]
Received stats [memberId=1, eventType=UPDATED, sample=2] = EventStats[name = UPDATED, sampleMean = 0.190555ms, mean = 0.309366ms]
Received stats [memberId=2, eventType=EXECUTED, sample=1] = EventStats[name = EXECUTED, sampleMean = 1.766313ms, mean = 1.766313ms]
Received stats [memberId=3, eventType=EXECUTED, sample=1] = EventStats[name = EXECUTED, sampleMean = 1.672603ms, mean = 1.672603ms]
Received stats [memberId=1, eventType=EXECUTED, sample=1] = EventStats[name = EXECUTED, sampleMean = 1.676003ms, mean = 1.676003ms]
 
------   timing interceptor completed successfully
------ events examples completed------
2014-01-02 15:43:28.344/8.400 Oracle Coherence GE 12.1.3.0.0 <D4> (thread=ShutdownHook, member=5): ShutdownHook: stopping cluster node

11.3 イベント・インターセプタを使用したコミット前イベントおよびコミット後イベントの拒否

この演習では、指定したキーに基づいてイベントを検出し、拒否するイベント・インターセプタを作成します。この演習を完了するには:

  1. イベントを検出および拒否するイベント・インターセプタの作成

  2. イベント拒否イベント・インターセプタの登録

  3. イベント拒否イベント・インターセプタを実行するクラスの作成

  4. イベント拒否サンプルのドライバ・ファイルの編集

  5. イベント拒否サンプルの実行

11.3.1 イベントを検出および拒否するイベント・インターセプタの作成

イベントを受け入れるまたは拒否するライブ・イベントの機能を実行するには、CantankerousInterceptorというイベント・インターセプタを作成します。インターセプタは、指定したキーに対応するイベントに基づいて例外をスローします。

  1. CantankerousInterceptorという新しいJavaクラスを作成します。「Default Package」com.oracle.handsonであることを確認します。「Main Method」チェック・ボックスは選択しないでください。

    詳細は、「Javaクラスの作成」を参照してください。

  2. イベント・インターセプタのコードを作成します。EventEventInterceptorInterceptorEntryEventおよびEntryEvent.Typeのクラスとインタフェースをインポートします。独自のインターセプタ・コードを作成するか、または例11-8で提供されているコードを使用できます。

例11-8は、挿入を試行しているキーに基づいて、コミット前イベントまたはコミット後イベント時に実行時例外をスローするイベント・インターセプタを示しています。例外がコミット前にスローされた場合は、ロールバックが発生し、例外がクライアントに伝播されます。例外がコミット後に発生した場合は、ログ・イベントが記録されます。例外に使用されるキーは、VETOおよびNON-VETOです。INSERTINGおよびUPDATINGは拒否できるイベントであるのに対して、INSERTEDおよびUPDATEDイベントは拒否できません。

例11-8 イベントを検出および拒否するクラス

package com.oracle.handson;
 
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
 
import com.tangosol.net.events.Event;
import com.tangosol.net.events.EventInterceptor;
import com.tangosol.net.events.annotation.Interceptor;
import com.tangosol.net.events.partition.cache.EntryEvent;
import com.tangosol.net.events.partition.cache.EntryEvent.Type;
 
import com.tangosol.util.BinaryEntry;
 
/**
 * A CantankerousInterceptor is an {@link EventInterceptor} implementation
 * that is argumentative in nature, hence the event of inserting certain keys
 * will result in {@link RuntimeException}s at either pre- or post-commit
 * phases. Throwing a {@link RuntimeException} during the precommit phase
 * will result in a rollback and related exception being propagated to the
 * client. A post-commit exception will result in a log event. A precommit
 * event is considered an *ING event with a post-commit event being a
 * *ED event.
 * <p>
 * This interceptor assumes it will be working against a cache with strings
 * as keys with the following items that will be considered objectionable.
 * <table>
 *     <tr><td>Key</td><td>Exception Thrown During Event</td></tr>
 *     <tr><td>{@value #VETO}</td><td>{@link Type#INSERTING} ||
 *             {@link Type#UPDATING}</td></tr>
 *     <tr><td>{@value #NON_VETO}</td><td>{@link Type#INSERTED} ||
 *             {@link Type#UPDATED}</td></tr>
 * </table>
 *
 *
 * @since Coherence 12.1.2
 */
@Interceptor(identifier = "cantankerous",
        entryEvents = {Type.INSERTING, Type.INSERTED, Type.UPDATING, Type.UPDATED})
public class CantankerousInterceptor
        implements EventInterceptor<EntryEvent>
    {
    // ----- EventInterceptor methods ---------------------------------------
 
    /**
     * Throws {@link RuntimeException} iff the key used for this event is
     * {@code #VETO} or {@code #NON_VETO}.
     *
     * @param event  the {@link Event} to be processed
     *
     * @throws RuntimeException iff {@code #VETO} || {@code #NON_VETO} are
     *         keys of the event
     */
    public void onEvent(EntryEvent event)
        {
        for (BinaryEntry binEntry : event.getEntrySet())
            {
            if (VETO.equals(binEntry.getKey()))
                {
                throw new RuntimeException("Objection! value = " + binEntry.getValue());
                }
            else if (NON_VETO.equals(binEntry.getKey())
                    && (event.getType() == Type.INSERTED || event.getType() == Type.UPDATED))
                {
 
                NamedCache cacheResults = CacheFactory.getCache("events-results");
                int        nMemberId    = CacheFactory.getCluster().getLocalMember().getId();
                String     sMessage     = "Objection falls on deaf ears! value = " + binEntry.getValue();
 
                cacheResults.put(
                        String.format("%d-NON_VETO-%d", nMemberId, ++m_cNonVetoableEvents),
                        sMessage);
 
                throw new RuntimeException(sMessage);
                }
            }
        }
 
    // ----- constants ------------------------------------------------------
 
    /**
     * String used to determine whether the event should be VETO'd during the
     * precommit phase.
     */
    public static final String VETO     = "VETO";
 
    /**
     * String used to determine whether the event should be VETO'd during the
     * post-commit phase.
     */
    public static final String NON_VETO = "NON-VETO";
 
    // ----- data members ---------------------------------------------------
 
    /**
     * A counter of the number of non-vetoable exceptions raised.
     */
    private int m_cNonVetoableEvents;
    }

11.3.2 イベント拒否イベント・インターセプタの登録

uem-cache-config.xmlキャッシュ構成ファイルを開き、CantankerousInterceptorイベント・インターセプタとそのキャッシュを登録するコードを追加します。参照するキャッシュであるvetod-events<cache-name>要素にリストし、その関連スキームであるevents-distributed-scheme<scheme-name>要素にリストします。CantankerousInterceptorクラスの完全修飾クラス名を、<interceptors>スタンザの<class-name>サブ要素にリストします。追加したコードは太字で示されています。

例11-9 CantankerousInterceptorクラスを登録するキャッシュ構成

<?xml version="1.0"?>
 
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
  <defaults>
    <serializer>pof</serializer>
  </defaults>
 
  <caching-scheme-mapping>
    <cache-mapping>
        <cache-name>events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.TimedTraceInterceptor</class-name>
                    <init-params>
                        <init-param>
                            <param-type>int</param-type>
                            <param-value>100</param-value>
                        </init-param>
                    </init-params>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>vetod-events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.CantankerousInterceptor</class-name>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>events-results</cache-name>
        <scheme-name>dist-events-results</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>
 
  <caching-schemes>
    <distributed-scheme>
      <scheme-name>events-distributed-scheme</scheme-name>
      <service-name>PartitionedPofCache</service-name>
      <thread-count>5</thread-count>
      <backing-map-scheme>
        <local-scheme>
          <!-- each node will be limited to 32MB -->
          <high-units>32M</high-units>
          <unit-calculator>binary</unit-calculator>
        </local-scheme>
      </backing-map-scheme>
      <autostart>true</autostart>
    </distributed-scheme>
 
    <!-- A PartitionedCache service used to store results for events examples
      -->
    <distributed-scheme>
        <scheme-name>dist-events-results</scheme-name>
        <service-name>PartitionedEventsResults</service-name>
        <thread-count>5</thread-count>
        <backing-map-scheme>
            <local-scheme/>
        </backing-map-scheme>
        <autostart>true</autostart>
    </distributed-scheme>
 
    <!--
    Invocation Service scheme.
    -->
    <invocation-scheme>
      <scheme-name>examples-invocation</scheme-name>
      <service-name>InvocationService</service-name>
 
      <autostart system-property="tangosol.coherence.invocation.autostart">true</autostart>
    </invocation-scheme>
 
 </caching-schemes>
</cache-config>

11.3.3 イベント拒否イベント・インターセプタを実行するクラスの作成

CantankerousInterceptorイベント・インターセプタを実行するVetoExampleという名前のクラスを作成します。VetoExampleクラス内に、サブクラスVetoedEventsExampleを作成します。VetoedEventsExampleサブクラスは、CantankerousInterceptorによって実行されるアクションを開始します。コードは、コミット前イベントおよびコミット後イベントにおける例外スローのセマンティクスを示しています。記録する必要がある例外のみが結果キャッシュに挿入されます。結果キャッシュに挿入されたエントリは、このクラスを実行するプロセスの標準出力を使用して表示されます。

例11-10 TimedTraceInterceptorイベント・インターセプタを実行するクラス

package com.oracle.handson;
 
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.MultiplexingMapListener;
 
import java.util.concurrent.Callable;
 
/**
 * VetoExample illustrates the different semantics in throwing exceptions in pre
 *  events compared to post events.
 *
 * @since Coherence 12.1.2
 */
@SuppressWarnings("unchecked")
public class VetoExample
    {
 
    /**
     * The VetoExample is a catalyst for action to be performed by
     * {@link CantankerousInterceptor}. This illustrates the semantics of
     * throwing exceptions in pre- and post-commit events. The exceptions that are
     * expected to only be logged are inserted into a results cache. The
     * entries inserted into the results cache are
     * displayed via the stdout of the process executing this class.
     */
    public static class VetoedEventsExample
            implements Callable<Boolean>
        {
 
        // ----- Callable methods -------------------------------------------
 
        /**
         * {@inheritDoc}
         */
        public Boolean call() throws Exception
            {
            // perform events to cause interceptors to veto said event
            NamedCache  cacheVetoEvents = CacheFactory.getCache("vetod-events");
            NamedCache  cacheResults    = CacheFactory.getCache("events-results");
            MapListener ml              = new MultiplexingMapListener()
                {
                @Override
                protected void onMapEvent(MapEvent evt)
                    {
                    String[] asKey = ((String) evt.getKey()).split("-");
 
                    System.out.printf("Received event [memberId=%s, eventType=%s, count=%s] = %s\n",
                            asKey[0], asKey[1], asKey[2], evt.getNewValue());
                    }
                };
            try
                {
                int cSet      = 110;
                int cVetos    = 5;
                int cNonVetos = 10;
                int cVetod    = 0;
 
                cacheResults.addMapListener(ml);
 
                for (int i = 1; i <= cSet; ++i)
                    {
                    boolean fVetod = false;
 
                    if (i % (cSet / cVetos) == 0)
                        {
                        try
                            {
                            cacheVetoEvents.put(CantankerousInterceptor.VETO, "value: " + i);
                            }
                        catch(Throwable e)
                            {
                            fVetod = true;
                            ++cVetod;
                            }
                        }
                    if (i % (cSet / cNonVetos) == 0)
                        {
                        cacheVetoEvents.put(CantankerousInterceptor.NON_VETO, "value: " + i);
                        fVetod = true;
                        }
 
                    if (!fVetod)
                        {
                        cacheVetoEvents.put(String.valueOf(i), "value: " + i);
                        }
                    }
                System.out.printf("Number of veto'd events: %d\n", cVetod);
                }
            finally
                {
                cacheVetoEvents.clear();
                cacheResults.removeMapListener(ml);
                cacheResults.clear();
                }
            return true;
            }
        }
   } 

11.3.4 イベント拒否サンプルのドライバ・ファイルの編集

「イベント時間測定サンプルのドライバ・ファイルの作成」で作成したドライバ・ファイルを、VetoExampleクラスで定義したVetoedEventsExampleサンプルを実行するように編集します。

ドライバ・ファイルを編集するには:

  1. イベント・タイミング・サンプルの次のインポート文を置換します。

    import com.oracle.handson.EventsExamples.EventsTimingExample; 
    

    VetoedEventsExampleサブクラスの置換後のインポート文は、次のとおりです。

    import com.oracle.handson.VetoExample.VetoedEventsExample;
    
  2. イベント時間測定サンプルをコールする次のコマンドを置換します。

    EVENTS_EXAMPLES.put("timing interceptor", new EventsTimingExample());
    

    VetoExampleクラスで定義したVetoedEventsExampleサンプルを実行する置換後のコマンドは、次のとおりです。

    EVENTS_EXAMPLES.put("veto interceptor", new VetoedEventsExample());
    

11.3.5 イベント拒否サンプルの実行

「Project Explorer」でUEMEventsプロジェクトを右クリックして、「Run As」「Run Configurations」を選択します。「Run Configurations」ダイアログ・ボックスで、「キャッシュ・サーバー起動の構成の作成」で作成したUEMEventsServer起動構成を選択します。「Run」をクリックして、キャッシュ・サーバーを起動します。キャッシュ・サーバーの起動後、UEMEventsServerを選択し、「Run」をもう一度、さらにもう一度クリックして、合計で3つのキャッシュ・サーバーを起動します。

3番目のキャッシュ・サーバーの起動後、UEMEventDriver起動構成を選択し、「Run」をクリックします。イベント拒否クライアントからの出力は例11-11のようになります。

例11-11 イベント拒否クライアントからの出力

------ events examples begin ------
------   veto interceptor begin
2014-01-02 16:07:28.606/0.340 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational configuration from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence.xml"
2014-01-02 16:07:28.716/0.450 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational overrides from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence-override-dev.xml" 
...
2014-01-02 16:07:33.018/4.752 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedEventsResults, member=4): Service PartitionedEventsResults joined the cluster with senior service member 1 
Received event [memberId=3, eventType=NON_VETO, count=1] = Objection falls on deaf ears! value = value: 11
Received event [memberId=3, eventType=NON_VETO, count=2] = Objection falls on deaf ears! value = value: 22
Received event [memberId=3, eventType=NON_VETO, count=3] = Objection falls on deaf ears! value = value: 33
Received event [memberId=3, eventType=NON_VETO, count=4] = Objection falls on deaf ears! value = value: 44
Received event [memberId=3, eventType=NON_VETO, count=5] = Objection falls on deaf ears! value = value: 55
Received event [memberId=3, eventType=NON_VETO, count=6] = Objection falls on deaf ears! value = value: 66
Received event [memberId=3, eventType=NON_VETO, count=7] = Objection falls on deaf ears! value = value: 77
Received event [memberId=3, eventType=NON_VETO, count=8] = Objection falls on deaf ears! value = value: 88
Received event [memberId=3, eventType=NON_VETO, count=9] = Objection falls on deaf ears! value = value: 99 
Number of veto'd events: 5
Received event [memberId=3, eventType=NON_VETO, count=10] = Objection falls on deaf ears! value = value: 110
------   veto interceptor completed successfully
------ events examples completed------

例11-12に示されている3番目のキャッシュ・サーバーからの出力に注意してください。出力には、拒否したイベントによって発生する例外が表示されます。

例11-12 キャッシュ・サーバーからの出力

Started DefaultCacheServer...
 
2014-01-02 16:06:50.262/5.785 Oracle Coherence GE 12.1.3.0.0 <D4> (thread=DistributedCache:PartitionedPofCache, member=2): Asking member 1 for primary ownership of PartitionSet{0..127}
...
...
2014-01-02 16:07:33.178/48.701 Oracle Coherence GE 12.1.3.0.0 <Error> (thread=DistributedCache:PartitionedPofCache:EventDispatcher, member=2): Exception caught while dispatching to "<cantankerous, com.oracle.handson.CantankerousInterceptor>": java.lang.RuntimeException: Objection falls on deaf ears! value = value: 11 
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:74)
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:1)
     at com.tangosol.net.events.internal.NamedEventInterceptor.onEvent(NamedEventInterceptor.java:240)
     at com.tangosol.net.events.internal.AbstractEvent.nextInterceptor(AbstractEvent.java:116)
     at com.tangosol.net.events.internal.AbstractEvent.dispatch(AbstractEvent.java:154)
     at com.tangosol.net.events.internal.AbstractEventDispatcher$1.proceed(AbstractEventDispatcher.java:254)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.service.grid.PartitionedService$Continuations$Task.run(PartitionedService.CDB:6)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.Service$EventDispatcher.onNotify(Service.CDB:26)
     at com.tangosol.coherence.component.util.Daemon.run(Daemon.CDB:51)
     at java.lang.Thread.run(Thread.java:722)
 
2014-01-02 16:07:33.238/48.761 Oracle Coherence GE 12.1.3.0.0 <Error> (thread=DistributedCache:PartitionedPofCache:EventDispatcher, member=3): Exception caught while dispatching to "<cantankerous, com.oracle.handson.CantankerousInterceptor>": Objection falls on deaf ears! value = value: 22
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:74)
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:1)
     at com.tangosol.net.events.internal.NamedEventInterceptor.onEvent(NamedEventInterceptor.java:240)
     at com.tangosol.net.events.internal.AbstractEvent.nextInterceptor(AbstractEvent.java:116)
     at com.tangosol.net.events.internal.AbstractEvent.dispatch(AbstractEvent.java:154)
     at com.tangosol.net.events.internal.AbstractEventDispatcher$1.proceed(AbstractEventDispatcher.java:254)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.service.grid.PartitionedService$Continuations$Task.run(PartitionedService.CDB:6)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.Service$EventDispatcher.onNotify(Service.CDB:26)
     at com.tangosol.coherence.component.util.Daemon.run(Daemon.CDB:51)
     at java.lang.Thread.run(Thread.java:722)

...

2014-01-02 16:07:33.238/48.761 Oracle Coherence GE 12.1.3.0.0 <Error> (thread=DistributedCache:PartitionedPofCache:EventDispatcher, member=3): Exception caught while dispatching to "<cantankerous, com.oracle.handson.CantankerousInterceptor>": Objection falls on deaf ears! value = value: 110
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:74)
     at com.oracle.handson.CantankerousInterceptor.onEvent(CantankerousInterceptor.java:1)
     at com.tangosol.net.events.internal.NamedEventInterceptor.onEvent(NamedEventInterceptor.java:240)
     at com.tangosol.net.events.internal.AbstractEvent.nextInterceptor(AbstractEvent.java:116)
     at com.tangosol.net.events.internal.AbstractEvent.dispatch(AbstractEvent.java:154)
     at com.tangosol.net.events.internal.AbstractEventDispatcher$1.proceed(AbstractEventDispatcher.java:254)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.service.grid.PartitionedService$Continuations$Task.run(PartitionedService.CDB:6)
     at com.tangosol.coherence.component.util.daemon.queueProcessor.Service$EventDispatcher.onNotify(Service.CDB:26)
     at com.tangosol.coherence.component.util.Daemon.run(Daemon.CDB:51)
     at java.lang.Thread.run(Thread.java:722)
 
...
2014-01-02 16:07:33.649/49.172 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Cluster, member=2): Member(Id=4, Timestamp=2014-01-02 16:07:33.649, Address=10.159.154.103:8094, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:8152, Role=OracleHandsonDriver) left Cluster with senior member 1 
...

11.4 イベント・インターセプタを使用したパーティション・アクティビティのロギング

この演習では、パーティション・サービスのパーティション・イベントを記録するイベント・インターセプタを作成します。この演習を完了するには:

  1. JVMの終了およびロギングの有効化と無効化を実行するクラスの作成

  2. パーティション・アクティビティを記録するイベント・インターセプタの作成

  3. パーティション・アクティビティ記録サンプルを実行するクラスの作成

  4. パーティション・アクティビティ記録イベント・インターセプタの登録

  5. POF構成ファイルの編集

  6. パーティション・アクティビティ記録サンプルのドライバ・ファイルの編集

  7. パーティション・アクティビティ記録サンプルの実行

11.4.1 JVMの終了およびロギングの有効化と無効化を実行するクラスの作成

クラスタの様々なメンバーで実行されるアクション可能な3つの状態を定義するRedistributionInvocableというクラスを作成します。この例では、状態を次のように定義します。

  • DISABLE: RedistributionInterceptorイベント・インターセプタによって実行されるロギングを無効にします。

  • ENABLE: RedistributionInterceptorイベント・インターセプタによって実行されるロギングを有効にします。

  • KILL: この起動可能(RedistributionInvocable)が実行されるJVMを終了します。

RedistributionInterceptorイベント・インターセプタは後の手順で作成します。

ロギングが有効であるか無効であるかを判別する変数には、AtomicBooleanクラスを使用します。次に例を示します。

public static final AtomicBoolean ENABLED = new AtomicBoolean(false) 

起動可能を終了するには、KILL状態で単にSystem.exitをコールできます。

クラスが生成するデータはワイヤ経由で送信されるため、クラスでPOF (Portable Object Format)を使用する必要があります。POF構成ファイルへのクラスの追加は後の手順で実行します。

RedistributionInvocableクラスを作成するには:

  1. RedistributionInvocableという新しいJavaクラスを作成します。「Default Package」com.oracle.handsonであることを確認します。「Main Method」チェック・ボックスは選択しないでください。

    詳細は、「Javaクラスの作成」を参照してください。

  2. インターセプタ・オブジェクトに割当て可能な3つの異なる状態を定義するクラスを作成します。AbstractInvocableAtomicBooleanPortableObjectPofReaderおよびPofWriterのクラスとインタフェースをインポートします。RedistributionInvocableクラスは、AbstractInvocableクラスを拡張し、PortableObjectインタフェースを実装する必要があります。独自のRedistributionInvocableクラスを作成するか、または例11-13で提供されているコードを使用します。

例11-13 JVMの終了およびロギングの有効化または無効化を実行するクラス

package com.oracle.handson;
 
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
 
import com.tangosol.net.AbstractInvocable;
 
import java.io.IOException;
 
import java.util.concurrent.atomic.AtomicBoolean;
 
/**
 * RedistributionInvocable has three states in which appropriate action is
 * taken:
 * <ol>
 *     <li><strong>{@link State#DISABLE}</strong> - Disables the logging
 *     performed by {@link com.oracle.handson.RedistributionInterceptor}.</li>
 *     <li><strong>{@link State#ENABLE}</strong> - Enables the logging
 *     performed by {@link com.oracle.handson.RedistributionInterceptor}.</li>
 *     <li><strong>{@link State#KILL}</strong> - Kills the JVM this invocable
 *     is executed on.</li>
 * </ol>
 *
 *
 * @since 12.1.2
 */
public class RedistributionInvocable
        extends AbstractInvocable
        implements PortableObject
    {
    // ----- constructors ---------------------------------------------------
 
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
 
    /**
     * Default no-arg constructor.
     */
    public RedistributionInvocable()
        {
        this(State.DISABLE);
        }
 
    /**
     * Constructs a RedistributionInvocable with the specified state.
     *
     * @param state  the state indicating the action to be performed
     */
    public RedistributionInvocable(State state)
        {
        m_state = state;
        }
 
    // ----- Invocable methods ----------------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void run()
        {
        switch (m_state)
            {
            case DISABLE:
                ENABLED.set(false);
                break;
            case ENABLE:
                ENABLED.set(true);
                break;
            case KILL:
                System.exit(1);
            }
        }
 
    // ----- PortableObject methods -----------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void readExternal(PofReader in) throws IOException
        {
        m_state = State.values()[in.readInt(0)];
        }
 
    /**
     * {@inheritDoc}
     */
    public void writeExternal(PofWriter out) throws IOException
        {
        out.writeInt(0, m_state.ordinal());
        }
 
    // ----- inner class: State ---------------------------------------------
 
    /**
     * Representation of the action to be performed when
     * {@link RedistributionInvocable#run()}.
     */
    public enum State
        {
        /**
         * Disables the logging performed by
         * {@link com.oracle.handson.RedistributionInterceptor}
         */
        DISABLE,
        /**
         * Enables the logging performed by
         * {@link com.oracle.handson.RedistributionInterceptor}
         */
        ENABLE,
        /**
         * Terminates the JVM process in which the
         * {@link RedistributionInvocable} is executed.
         */
        KILL
        }
 
    // ----- constants ------------------------------------------------------
 
    /**
     * Flag used to determine whether to log partition events.
     */
    public static final AtomicBoolean ENABLED = new AtomicBoolean(false);
 
    // ----- data members ---------------------------------------------------
 
    /**
     * The state used to determine which action to perform.
     */
    private State m_state;
    }

11.4.2 パーティション・アクティビティを記録するイベント・インターセプタの作成

パーティション・サービスのパーティション・イベントを記録するRedistributionInterceptorというイベント・インターセプタを作成します。

RedistributionInterceptorイベント・インターセプタ・クラスを作成するには:

  1. RedistributionInterceptorという新しいJavaクラスを作成します。「Default Package」com.oracle.handsonであることを確認します。「Main Method」チェック・ボックスは選択しないでください。

    詳細は、「Javaクラスの作成」を参照してください。

  2. パーティション・イベントを記録するRedistributionInterceptorクラスを作成します。RedistributionInvocableCacheFactoryEventInterceptorInterceptorTransferEventのクラスとインタフェースをインポートします。RedistributionInterceptorクラスはEventInterceptor<TransferEvent>インタフェースを実装する必要があります。パーティション・イベントを記録する独自のクラスを作成するか、または例11-14で提供されるコードを使用します。

例11-14は、パーティション・イベントを記録するイベント・インターセプタを示しています。@Interceptor注釈にあるオプションのidentifier属性を使用して、インターセプタに名前を割り当てることができます。イベント・インターセプタでは、RedistributionInvocable.ENABLED定数の値を参照して、パーティション・イベントを記録するかどうかを決定します。

例11-14 パーティション・イベントを記録するクラス

package com.oracle.handson;
 
import com.oracle.handson.RedistributionInvocable;
 
import com.tangosol.net.CacheFactory;
 
import com.tangosol.net.events.EventInterceptor;
import com.tangosol.net.events.annotation.Interceptor;
import com.tangosol.net.events.partition.TransferEvent;
 
/**
 * RedistributionInterceptor is an {@link  
 * com.tangosol.net.events.EventInterceptor}
 * that logs partition activity when enabled. Logging can be enabled via
 * setting the {@link RedistributionInvocable#ENABLED} constant.
 *
 * @since Coherence 12.1.2
 */
@Interceptor(identifier = "redist")
public class RedistributionInterceptor
        implements EventInterceptor<TransferEvent>
    {
 
    // ----- EventInterceptor methods ---------------------------------------
 
    /**
     * {@inheritDoc}
     */
    public void onEvent(TransferEvent event)
        {
        if (RedistributionInvocable.ENABLED.get())
            {
            CacheFactory.log(String.format("Discovered event %s for partition-id %d from remote member %s\n",
                event.getType(), event.getPartitionId(), event.getRemoteMember()),
                CacheFactory.LOG_INFO);
            }
        }
    }

11.4.3 パーティション・アクティビティ記録サンプルを実行するクラスの作成

RedistributionInterceptorクラスによって実行されるアクションをトリガーするLogExampleというクラスを作成します。

LogExampleクラスを作成するには:

  1. LogExampleという新しいJavaクラスを作成します。mainメソッドは含めないでください。

    詳細は、「Javaクラスの作成」を参照してください。

  2. LogExampleクラスのコードを作成します。RedistributionInvocableRedistributionInvocable.StateCacheFactoryInvocationServiceMemberArrayListCollectionRandomSetおよびCallableのクラスとインタフェースをインポートします。独自のコードを作成するか、または例11-15で提供されているコードを使用できます。

例11-15は、LogExampleクラスのサンプル実装を示しています。例には、RedistributionInterceptorクラスによって実行されるアクションをトリガーするサブクラスであるRedistributionEventsExampleサブクラスが含まれています。サブクラスは、パーティション再配布イベントの記録方法を示しています。このサンプルを実行するには、少なくとも2つのクラスタ・メンバーを実行している必要があります。

例11-15 パーティション・アクティビティ記録サンプルを実行するサンプル・クラス

package com.oracle.handson;
 
import com.oracle.handson.RedistributionInvocable;
import com.oracle.handson.RedistributionInvocable.State;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.InvocationService;
import com.tangosol.net.Member;
 
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
 
/**
 * LogExample illustrates logging of partition movement when enabled.
 *
 * @since Coherence 12.1.2
 */
@SuppressWarnings("unchecked")
public class LogExample
    {
 
    /**
     * The RedistributionEventsExample is a catalyst for action to be
     * performed by {@link RedistributionInterceptor}. This illustrates how
     * partition redistribution events can be logged.
     */
    public static class RedistributionEventsExample
            implements Callable<Boolean>
        {
 
        // ----- Callable methods -------------------------------------------
 
        /**
         * {@inheritDoc}
         */
        public Boolean call() throws Exception
            {
            // transfer events
            try
                {
                InvocationService is       = (InvocationService) CacheFactory.getService("InvocationService");
                Random            rnd      = new Random();
                int               cMembers = is.getInfo().getServiceMembers().size();
 
                if (cMembers < 3)
                    {
                    System.err.println("<Error> At least two members must exist for the RedistributionEvent example");
                    return false;
                    }
 
                // enable the logging of transfer event
                is.query(new RedistributionInvocable(State.ENABLE), null);
 
                Set<Member> isMembers = is.getInfo().getServiceMembers();
                isMembers.remove(is.getCluster().getLocalMember());
 
                Member memChosen = new ArrayList<Member>(isMembers).get(rnd.nextInt(isMembers.size()));
 
                System.out.printf("Choosing to kill member %s\n", memChosen);
                is.query(new RedistributionInvocable(State.KILL), Collections.singleton(memChosen));
                }
            finally
                {
                }
 
            return true;
            }
        }
    }

11.4.4 パーティション・アクティビティ記録イベント・インターセプタの登録

イベント・インターセプタは、プログラムで、またはキャッシュ構成ファイルにそれらに対する参照を含めることによって登録できます。

UEMEventsプロジェクトで、インターセプタはキャッシュ構成ファイルに登録されます。イベント・インターセプタの完全修飾クラス名は、<interceptor>要素で指定されます。インターセプタは、<cache-name>要素で指定されるキャッシュと関連付けられます。

パーティション記録イベント・サンプルの場合、イベント・インターセプタRedistributionInterceptorは、<distributed-scheme>要素の下のパーティション・キャッシュ・サービスに登録されます。

RedistributionInterceptorイベント・インターセプタを定義するキャッシュ構成ファイルを編集するには:

  1. 「Project Explorer」ウィンドウから、uem-cache-config.xmlファイルを開きます。ファイルはEvents/appClientModuleの下にあります。

  2. イベント・インターセプタをコールするキャッシュ構成を作成します。<distributed-scheme>要素の下で、<interceptor>要素に完全修飾RedistributionInterceptorクラスへの参照があることを確認してください。

例11-16は、考えられるuem-cache-config.xmlファイルの実装を示しています。RedistributionInterceptorイベント・インターセプタの構成は、太字で示されています。

例11-16 イベント・インターセプタを含むキャッシュ構成ファイル

<?xml version="1.0"?>
 
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
  <defaults>
    <serializer>pof</serializer>
  </defaults>
 
  <caching-scheme-mapping>
    <cache-mapping>
        <cache-name>events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.TimedTraceInterceptor</class-name>
                    <init-params>
                        <init-param>
                            <param-type>int</param-type>
                            <param-value>100</param-value>
                        </init-param>
                    </init-params>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>vetod-events</cache-name>
        <scheme-name>events-distributed-scheme</scheme-name>
        <interceptors>
            <interceptor>
                <instance>
                    <class-name>com.oracle.handson.CantankerousInterceptor</class-name>
                </instance>
            </interceptor>
        </interceptors>
    </cache-mapping>
    <cache-mapping>
        <cache-name>events-results</cache-name>
        <scheme-name>dist-events-results</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>
 
  <caching-schemes>
    <distributed-scheme>
      <scheme-name>events-distributed-scheme</scheme-name>
      <service-name>PartitionedPofCache</service-name>
      <thread-count>5</thread-count>
      <backing-map-scheme>
        <local-scheme>
          <!-- each node will be limited to 32MB -->
          <high-units>32M</high-units>
          <unit-calculator>binary</unit-calculator>
        </local-scheme>
      </backing-map-scheme>
      <autostart>true</autostart>
      <interceptors>
          <interceptor>
              <instance>
                  <class-name>com.oracle.handson.RedistributionInterceptor</class-name>
              </instance>
          </interceptor>
      </interceptors>
    </distributed-scheme>
 
    <!-- A PartitionedCache service used to store results for events examples
      -->
    <distributed-scheme>
        <scheme-name>dist-events-results</scheme-name>
        <service-name>PartitionedEventsResults</service-name>
        <thread-count>5</thread-count>
        <backing-map-scheme>
            <local-scheme/>
        </backing-map-scheme>
        <autostart>true</autostart>
    </distributed-scheme>
 
    <!--
    Invocation Service scheme.
    -->
    <invocation-scheme>
      <scheme-name>examples-invocation</scheme-name>
      <service-name>InvocationService</service-name>
 
      <autostart system-property="tangosol.coherence.invocation.autostart">true</autostart>
    </invocation-scheme>
 
 </caching-schemes>
</cache-config>

11.4.5 POF構成ファイルの編集

RedistributionInvocableクラスを除いて、パーティション・アクティビティ記録演習のクラスによって生成された情報はすべて、その固有のクラスタ・メンバー上に保持されます。ただし、RedistributionInvocableクラスによって生成された情報は、ワイヤ経由で他のクラスタ・メンバーに送信されます。したがって、POF構成ファイルに追加する必要があります。

RedistributionInvocableデータ型用のPOF構成ファイルを編集するには:

  1. uem-pof-config.xmlファイルを開きます。ファイルはUEMEvents/appClientModule/META-INFの下にあります。

  2. com.oracle.handson.RedistributionInvocableクラスに対する<user-type>要素を定義し、タイプID 1009を割り当てます。このファイルには、Coherenceデータ型の最初の1000個のIDを予約するcoherence-pof-config.xmlファイルが含まれている必要があります。

例11-17は、uem-pof-config.xmlファイルのサンプルを示しています。RedistributionInvocableクラスの構成は、太字で示されています。

例11-17 パーティション記録イベント・サンプルのPOF構成ファイル

<?xml version="1.0"?>
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
    xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config http://xmlns.oracle.com/coherence/coherence-pof-config/1.2/coherence-pof-config.xsd">
    <user-type-list>
    <!-- include all "standard" Coherence POF user types -->
    <include>coherence-pof-config.xml</include>
    <user-type>
      <type-id>1008</type-id>
      <class-name>com.oracle.handson.LazyProcessor</class-name>
    </user-type>
    <user-type>
      <type-id>1009</type-id>
      <class-name>com.oracle.handson.RedistributionInvocable</class-name>
    </user-type>
    </user-type-list>
</pof-config>

11.4.6 パーティション・アクティビティ記録サンプルのドライバ・ファイルの編集

「イベント時間測定サンプルのドライバ・ファイルの作成」で作成したドライバ・ファイルを、LogExampleクラスで定義したRedistributionEventsExampleサンプルを実行するように編集します。

ドライバ・ファイルを編集するには:

  1. イベント拒否サンプルの次のインポート文を置換します。

    import com.oracle.handson.VetoExample.VetoedEventsExample; 
    

    LogExampleクラスのRedistributionEventsExampleサブクラスの置換後のインポート文は、次のとおりです。

    import com.oracle.handson.LogExample.RedistributionEventsExample;
    
  2. イベント拒否サンプルをコールする次のコマンドを置換します。

    EVENTS_EXAMPLES.put("veto interceptor", new VetoedEventsExample());
    

    LogExampleクラスで定義したRedistributionEventsExampleサンプルを実行する置換後のコマンドは、次のとおりです。

    EVENTS_EXAMPLES.put("redistribution interceptor", new RedistributionEventsExample());
    

11.4.7 パーティション・アクティビティ記録サンプルの実行

「Project Explorer」でUEMEventsプロジェクトを右クリックして、「Run As」「Run Configurations」を選択します。「Run Configurations」ダイアログ・ボックスで、「キャッシュ・サーバー起動の構成の作成」で作成したUEMEventsServer起動構成を選択します。「Run」をクリックして、キャッシュ・サーバーを起動します。キャッシュ・サーバーの起動後、UEMEventsServerを選択し、「Run」をもう一度、さらにもう一度クリックして、合計で3つのキャッシュ・サーバーを起動します。

3番目のキャッシュ・サーバーの起動後、UEMEventDriver構成を選択し、「Run」をクリックします。キャッシュ・クライアントからの出力は例11-18のようになります。

例11-18 キャッシュ・クライアントからの出力

------ events examples begin ------
------   redistribution interceptor begin
2014-01-02 16:38:38.901/0.350 Oracle Coherence 12.1.3.0.0 <Info> (thread=main, member=n/a): Loaded operational configuration from "jar:file:/C:/Oracle/coherence/lib/coherence.jar!/tangosol-coherence.xml" 

...

2014-01-02 16:38:43.249/4.698 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Invocation:InvocationService, member=4): Service InvocationService joined the cluster with senior service member 1
Choosing to kill member Member(Id=3, Timestamp=2014-01-02 16:38:17.942, Address=10.159.154.103:8092, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:8168, Role=CoherenceServer) 
Choosing to kill member Member(Id=3, Timestamp=2014-01-02 16:38:17.942, Address=10.159.154.103:8092, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:8168, Role=CoherenceServer)
------   redistribution interceptor completed successfully
------ events examples completed------
2014-01-02 16:38:43.276/4.725 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Cluster, member=4): TcpRing disconnected from Member(Id=3, Timestamp=2014-01-02 16:38:17.942, Address=10.159.154.103:8092, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:8168, Role=CoherenceServer) due to a peer departure; removing the member. 

...

例11-19は、終了したキャッシュ・サーバーからの出力を示しています。出力は、クラスタに参加しているクライアント(Member 4)および現在のキャッシュ・サーバーの停止を示しています。

例11-19 最初のキャッシュ・サーバーからの出力

...
2014-01-02 16:38:20.701/6.178 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=DistributedCache:PartitionedEventsResults, member=3): Transferring 1B of backup[1] for PartitionSet{0} to member 2
2014-01-02 16:38:42.299/27.776 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Cluster, member=3): Member(Id=4, Timestamp=2014-01-02 16:38:42.09, Address=10.159.154.103:8094, MachineId=47251, Location=site:,machine:TPFAEFFL-LAP,process:1620, Role=OracleHandsonDriver) joined Cluster with senior member 1
2014-01-02 16:38:42.495/27.972 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Invocation:Management, member=3): Member 4 joined Service Management with senior member 1
2014-01-02 16:38:43.254/28.731 Oracle Coherence GE 12.1.3.0.0 <D5> (thread=Invocation:InvocationService, member=3): Member 4 joined Service InvocationService with senior member 1
2014-01-02 16:38:43.254/28.731 Oracle Coherence GE 12.1.3.0.0 <D4> (thread=ShutdownHook, member=1): ShutdownHook: stopping cluster node