プライマリ・コンテンツに移動
Oracle® Fusion Middleware Oracle Stream Analyticsによるイベント処理用アプリケーションの開発
12c (12.2.1.2.0)
E82664-01
目次へ移動
目次

前
前へ
次
次へ

7 イベントBean

Javaは、EPNに追加するイベントBeanコンポーネントおよびSpring Beanコンポーネント用のロジックを作成するために使用する言語です。イベントBeanは、EPN内でイベント・データ上で機能するアプリケーション・ロジックを定義するために使用します。Spring Beanは、EPN内で使用するデプロイメント・コンテキストおよび機能がSpringに基づいているときに使用します。

すべてのBeanアプリケーション・ロジックは、イベント・シンク、イベント・ソースまたはその両方として機能します。イベント・シンクは、大量のイベント・データを受信してこれらを処理します。イベント・ソースは、大量のイベント・データを送信します。EPNでは、ロジックを使用してイベントBeanおよびアダプタを構成することにより、これらがイベント・ソース、イベント・シンク、またはその両方として動作するようにできます。イベントBeanの場合、イベント・シンクおよびイベント・ソースのロジックは関連するJavaBeanから採用されます。アダプタの場合、イベント・シンクおよびイベント・ソースのロジックはJavaBeanイベント・タイプから採用されます。JavaBeanイベント・タイプの作成の詳細は、イベントおよびイベント・タイプを参照してください。

イベントBeanのロジックではJAXBを使用できます。詳細は、JAXBのサポートを参照してください。

この章の内容は次のとおりです。

7.1 イベントBeanおよびSpring Bean

イベントBeanおよびSpring Beanは、Javaクラスに基づいています。イベントBeanおよびSpring Beanに使用するJavaクラスは、アプリケーション要件に応じてJavaBean仕様に準拠している場合としていない場合があります。

イベントBeanは、通常のSpringベースのBeanに対するOracle拡張機能です。

イベントBeanは、イベント・シンク、イベント・ソース、またはイベント・シンクとイベント・ソースの両方である場合があります。イベント・シンクおよびイベント・ソースはアダプタおよびイベントBeanに追加できます。

  • イベント・シンクは、イベントをリスニングして処理するJavaBeanまたはJavaクラスです。イベント・シンクは、イベントを受信し、イベントからデータを取得し、データから新しいイベントを作成してダウンストリーム・コンポーネントに送信できます。

  • イベント・ソースは、イベントを送信するJavaBeanまたはJavaクラスです。

使用するデプロイメント・コンテキストおよび機能がSpringに基づいている場合は、Spring Beanを使用します。それ以外の場合は、イベントBeanを使用します。表7-1にイベントBeanとSpring Beanの機能を示します。

表7-1 イベントBeanおよびSpring Beanの比較

Beanタイプ 説明

イベントBean

Oracle Stream Analyticsサーバー・コンテナの機能を積極的に使用するEPNステージとして役立ちます。イベントBeanの機能は次のとおりです。

  • タイプはOracle Stream Analytics EPNステージです。

  • Oracle Stream Analytics監視フレームワークによって監視できます。

  • 構成メタデータの注釈を利用できます。

  • 通過するイベントを記録および再生するよう設定できます。

  • Oracle Stream AnalyticsサーバーAPIインタフェースを実装するのではなく、そのXML宣言にメソッドを指定することにより、Oracle Stream AnalyticsサーバーBeanライフサイクルをサポートできます。

Spring Bean

Springへのレガシー統合に役立ちます。Spring Beanの機能は次のとおりです。

  • EPNに追加するSpring Beanがある場合に役立ちます。

  • タイプはOracle Stream Analytics EPNステージではありません。

  • Oracle Stream Analytics監視フレームワークによって監視することはできません。

  • 構成メタデータの注釈を使用できません。

  • 通過するイベントを記録および再生するよう設定できません。

7.1.1 スレッドの動作

イベントBeanは、RunnableまたはRunnableBeanインタフェースが実装されている場合、パラレルに実行されます。このインフラストラクチャでは、スレッドを生成するためのアプリケーションに関連付けられたワーク・マネージャが使用されます。

ワーク・マネージャをアプリケーションに関連付けるには、アプリケーションと同じ名前をワーク・マネージャに付けます。アプリケーションに対してワーク・マネージャを明示的に指定しない場合、Oracle Stream Analyticsでは、最小数(MIN)および最大数(MAX)のスレッドに対して、デフォルト値を使用してワーク・マネージャが作成されます。

スレッドをより詳細に制御する必要がある場合、カスタムイベントBeanを使用してcom.bea.wlevs.ede.spi.WorkManagerAwareインタフェースを実装できます。この場合、イベントBeanは初期化中にアプリケーションのワーク・マネージャを使用してインジェクトされます。ワーク・マネージャを使用すると、イベントBeanインスタンスのスレッドを明示的に管理できます。

7.1.2 ハートビート・イベントの受信

イベントBeanがハートビート・イベントを受信するようにするには、com.bea.wlevs.ede.api.HeartbeatAwareインタフェースを実装します。ハートビート・イベントは、時間の進行をモデリングするために使用できるheartbeatタイプのイベントです。このインタフェースには、実装対象としてonHeartbeat(long timestamp)コールバック・メソッドがあります。

7.1.3 イベントBeanの作成

イベントBeanは、通過するイベントにロジックを適用するEPNコンポーネントです。イベントBeanのロジックは、JavaBeanイベント・タイプによって定義されます。

この章では、いくつかのアセンブリおよび構成ファイルのイベントBean設定について説明します。すべてのリファレンスは、『Oracle Stream Analyticsスキーマ・リファレンス』のevent-beanに関する項を参照してください。

アセンブリ・ファイル

次のイベントBeanのアセンブリ・ファイル・エントリは、イベントBeanのid、関連するclass、およびイベントBeanがアップストリームBeanOutputChannelコンポーネントからイベントをリスニングすることを示します。

<wlevs:event-bean id="eventBean" class="tradereport.TradeEvent" >
   <wlevs:listener ref="BeanOutputChannel"/>
<wlevs:event-bean>

構成ファイル

次のイベントBeanの構成ファイル・エントリは、record-parameters子要素を使用して構成されたイベントBeanを示します。

 <event-bean>
    <name>eventBean</name>
      <record-parameters>
        <dataset-name>tradereport_sample</dataset-name>
          <event-type-list>
            <event-type>TradeEvent</event-type>
          </event-type-list>
          <batch-size>1</batch-size>
          <batch-time-out>10</batch-time-out>
        </record-parameters>
  </event-bean>

7.1.4 Spring Beanの作成

イベント処理ネットワークにクラスを含めるためにSpring BeanとしてJavaクラスを構成できます。これは、EPNに組み込む既存のSpring Beanがある場合、またはSpringの機能をJavaコードに組み込む場合の適切なオプションです。

EPNに追加するSpring Beanでは、様々なライフサイクル・インタフェースを実装できます。これらには、InitializingBeanDisposableBeanおよびRunnableBeanなどのアクティブなインタフェースが含まれます。Spring Beanイベント・ソースでは、@Prepare@Rollback@Activateなどの構成メタデータの注釈も利用できます。

Spring BeanはSpringフレームワークで管理されるJavaクラスです。標準のbean要素を使用してEPNアセンブリ・ファイルで構成して、Spring Beanとしてクラスを追加します。

Spring Beanは、Oracle Stream Analyticsステージではありません。Spring BeanはOracle Stream Analytics監視フレームワークによって監視できず、Spring Beanでは構成メタデータの注釈を使用できず、通過するイベントを記録および再生するようSpring Beanを設定することもできません。

アセンブリ・ファイル

アセンブリ・ファイルでは、bean要素を使用してカスタムSpring Beanをイベント・プロセッサ・ネットワークのコンポーネントとして宣言します。次に例を示します。

<bean id="TradeListenerBean"
  class="com.oracle.cep.example.tradereport.TradeListener">
</bean>

7.2 イベント・シンクのインタフェース

イベント・シンクを作成することにより、EPN内でイベントを受信し、イベント・データに反応するロジックを適用します。イベント・シンクのJavaクラスは、この項に記載されているインタフェースのいずれかを実装します。

各インタフェースは、通常チャネルであるJavaクラスからEPNステージに接続されたアップストリームをイベントが終了する場合にイベントをクラスに渡すためにOracle Event Processingサーバーで使用するメソッドを提供します。

ここで示されるインタフェースは、ストリームまたはリレーションとして到着するイベントのサポートを提供します。ただし、リレーションのインタフェースは、ストリームとして到着するイベントの受信もサポートします。次の表に示すように、インタフェースは階層的に関連します。

インタフェース 説明

com.bea.wlevs.ede.api.StreamSink

イベントをストリームとして順番に受信するために実装します。

com.bea.wlevs.ede.api.RelationSink

イベントをリレーションとして順番に受信するために実装します。イベントをストリームとして受信するためにStreamSinkを拡張します。

com.bea.wlevs.ede.api.BatchStreamSink

バッチ処理されたイベントをストリームで受信するために実装します。アップストリーム・チャネルでバッチ処理が許可されている場合、イベントはタイムスタンプによってバッチ処理されて到着する場合があります。バッチ処理されていないイベントの受信をサポートするためにStreamSinkを拡張します。

com.bea.wlevs.ede.api.BatchRelationSink

バッチ処理されたイベントをリレーションとして受信するために実装します。アップストリーム・チャネルでバッチ処理が許可されている場合、イベントはタイムスタンプによってバッチ処理されて到着する場合があります。バッチ処理されていないイベントのストリームまたはリレーションとしての受信をサポートするためにRelationSinkを拡張します。

onInsertEvent実装でのEventRejectedExceptionの動作

削除しない例外についてはonInsertEvent実装でEventRejectedExceptionを明示的にスローする必要があります。EventProcessingExceptionを発生させると、CQLプロセッサを介してエラーのソースにまでこの例外が伝播されます。例外が複数存在する場合、EventRejectedExceptionによって例外をダウンストリーム・リスナーからチェーンできます。EventRejectedExceptionはCQLプロセッサによってソフト例外に変換されます。詳細は、フォルト処理を参照してください。

7.2.1 StreamSinkの実装

ストリームとしてイベントを受信するクラスは、Oracle Stream Analyticsの観点からは、挿入されるイベントのみ受信します。そのため、ストリームでは、イベントは常にシーケンスの最後に追加されます。また、ストリームのイベントは、あるイベントから次のイベントに移る際にタイムスタンプの値が古くならないよう、常に時間の昇順で受信します。値が古くならないタイムスタンプを使用すると、あるイベントのタイムスタンプを、先行するイベントのタイムスタンプと同じかそれより大きな値にすることができます。タイムスタンプの値は同じかそれより新しくなります。

その結果、ストリームとしてのイベントの受信をサポートするインタフェースは、受信イベントごとに1つのメソッドを持ちます。リレーションとしてイベントを受信するインタフェースは、複数の種類のイベントの受信をサポートしています。

バッチ処理されていないイベントをストリームとして受信するクラスの場合、StreamSinkインタフェースを実装します。StreamSinkインタフェースには、クラスに接続されるアップストリーム・ステージをイベントが離れる場合にストリームから各イベントに渡すためにOracle Stream Analyticsサーバーが呼び出す1つのメソッドonInsertEventがあります。

例7-1では、株式取引イベントを受信する単純なStreamSink実装は、Objectインスタンスとして各イベントを受信し、イベントが特定のイベント・タイプのインスタンスかどうかをテストして確認します。そうである場合、コードはそのタイプのメンバーであるプロパティの値を取得します。

ストリームとしてバッチ処理されたイベントを受信するクラスの場合、BatchStreamSinkインタフェースを実装します。インタフェースには、アップストリーム・ステージから受信するイベントの集合に渡すためにOracle Stream Analyticsサーバーが呼び出す1つのメソッドonInsertEventsがあります。BatchStreamSinkインタフェースはStreamSinkを拡張するため、バッチ処理されていないイベントも受信できます。

イベントのバッチ処理の詳細は、バッチ処理チャネルを参照してください。

例7-1 StreamSinkインタフェースの実装

public class TradeListener implements StreamSink {

    public void onInsertEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            String symbolProp = ((TradeEvent) event).getSymbol();
            Integer volumeProp = ((TradeEvent) event).getVolume();
            // Code to do something with the property values.
        }
    }
}

7.2.2 RelationSinkの実装

リレーションとしてイベントを受信するクラスは、リレーションにある可能性があるすべての種類のイベント(挿入イベント、削除イベントおよび更新イベント)を受信できます。ストリームと異なり、リレーションのイベントは順序付けされず、リレーションを作成または操作したコードで更新または削除されたイベントを含みます。

そのため、リレーションとしてのイベントの受信をサポートするインタフェースには、クラスで挿入イベント、削除イベントまたは更新イベントを受信できるメソッドがあります。

バッチ処理されていないイベントをリレーションとして受信するクラスの場合、RelationSinkインタフェースを実装します。RelationSinkインタフェースには、onInsertEventonDeleteEventおよびonUpdateEventの3つのメソッドがあり、このうち1つはStreamSinkインタフェースから継承されたものです。実行時、Oracle Stream Analyticsサーバーは、クラスに接続されるアップストリーム・チャネルから受信するイベントのタイプに応じて適切なメソッドを呼び出します。

public class TradeListener implements RelationSink {

    public void onInsertEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            String symbolProp = ((TradeEvent) event).getSymbol();
            Integer volumeProp = ((TradeEvent) event).getVolume();
            // Do something with the inserted event.
        }
    }

    @Override
    public void onDeleteEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            // Do something with the deleted event.
        }
    }

    @Override
    public void onUpdateEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            // Do something with the updated event.
        }
    }
}

バッチ処理されたイベントをリレーションとして受信するクラスの場合、BatchRelationSinkインタフェースを実装します。java.util.Collectionインスタンスのバッチから3つのタイプすべてのイベントを受信するよう設計されたonEventsメソッドがあります。

onEvents(insertEvents, deleteEvents, updateEvents)

また、BatchRelationSinkインタフェースはRelationSinkインタフェースを拡張して、バッチ処理されていないイベントの受信をサポートします。

実行時、Oracle Stream Analyticsサーバーは、適切なメソッドを呼び出して、クラスに接続されたアップストリーム・ステージから受信するイベントに渡します。

イベントのバッチ処理の詳細は、バッチ処理チャネルを参照してください。

この項で説明しているOracle Stream Analytics APIの完全なAPIリファレンス情報は、Oracle Stream Analytics Java APIリファレンスを参照してください。

7.3 イベント・ソースのインタフェース

イベント処理ネットワークのダウンストリーム・ステージにイベントを送信するJavaクラスを作成できます。イベント・ソースなどを作成することにより、たとえば、JavaコードによってEPNを通過するイベント・データから作成または変更されたイベントを送信できます。

イベント・ソースのJavaクラスは、この項に記載されているインタフェースのいずれかを実装します。これらの各インタフェースは、Oracle Stream Analyticsサーバーで使用されるメソッドを提供して、クラスにセンダー・クラスのインスタンスを渡します。

イベント・ソースで受信するセンダー・インスタンスは、この項に記載されているセンダー・インタフェースのいずれかを実装します。センダー・インタフェースは、バッチ処理の有無にかかわらずストリームまたはリレーションとしてイベントをチャネルなどの後続のダウンストリームEPNステージに送信するためにコードで呼び出すことができるメソッドを提供します。

ここで示されるインタフェースは、ストリームまたはリレーションとしてのイベントの送信をサポートします。リレーションのインタフェースも、ストリームとしてのイベントの送信をサポートします。

表7-2 イベント・ソースを実装するインタフェース

インタフェース 説明

com.bea.wlevs.ede.api.StreamSource

ストリームとしてイベントを送信するためにこのインタフェースを実装します。実行時、Oracle Stream Analyticsサーバーはストリーム・センダー・クラスのインスタンスをインジェクトします。

com.bea.wlevs.ede.api.RelationSource

ストリームまたはリレーションとしてイベントを送信するためにこのインタフェースを実装します。実行時、Oracle Stream Analyticsサーバーはリレーション・センダー・クラスのインスタンスをインジェクトします。ストリーム・イベントもサポートするよう、StreamSourceを拡張します。

表7-3に示すインタフェースは、イベント・ソース・クラスがOracle Stream Analyticsサーバーから受信するセンダー・クラスによって実装されます。

表7-3 センダー・クラスで実装されるインタフェース

インタフェース 説明

com.bea.wlevs.ede.api.StreamSender

ストリームとしてイベントを送信するためのメソッドを提供します。

com.bea.wlevs.ede.api.RelationSender

リレーションとしてイベントを送信するためのメソッドを提供します。ストリーム・イベントもサポートするよう、StreamSenderを拡張します。

com.bea.wlevs.ede.api.BatchStreamSender

コードでストリームとしてバッチ処理されたイベントを送信できるメソッドを提供します。送信するダウンストリーム・ステージがバッチ処理されたイベントのために構成されたチャネルである場合、タイムスタンプでバッチ処理されたイベントを送信できます。バッチ処理されていないイベントを送信するサポートも提供するよう、StreamSenderを拡張します。

com.bea.wlevs.ede.api.BatchRelationSender

バッチ処理されたイベントをリレーションとして送信するためのメソッドを提供します。ダウンストリーム・ステージがバッチ処理されたイベントのために構成されたチャネルである場合、タイムスタンプでバッチ処理されたイベントを送信できます。バッチ処理されていないイベントをサポートするためにRelationSenderを拡張します。

7.3.1 StreamSenderの実装

ストリーム・イベントのソースであるクラスは、Oracle Stream Analyticsの観点からは、挿入されるイベントのみ送信します。挿入されたイベントのみ送信すると、リレーションではなくストリームがモデル化されます。ストリーム・ソースから送信されるイベントでは、あるイベントから後続のイベントに移る際にタイムスタンプの値が古くならないようにする必要があります。後続のイベントのタイムスタンプは、直前のイベントと同じか後である必要があります。

StreamSourceを実装する場合、コードによってバッチ処理の有無にかかわらずイベントを送信できます。StreamSource setEventSenderメソッドの実装により、表7-3に示されているタイプのいずれかにキャストできるセンダー・インスタンスを受信します。コードでセンダー・インスタンスを使用して、ダウンストリーム・ステージで期待されるとおりにイベントを送信します。

コードでバッチ処理を有効にするチャネルにイベントを送信する場合、送信する前にバッチ処理されたイベント・センダーのいずれかを使用して、タイムスタンプでイベントをバッチ処理します。詳細は、バッチ処理チャネルを参照してください。

センダー・インスタンスは、アプリケーションのタイムスタンプを付加するように受信チャネルを構成する場合にハートビートを送信するsendHeartbeatメソッドも提供します。

7.3.2 RelationSenderの実装

リレーションとしてイベントのソースであるクラスは、ダウンストリーム・ステージに挿入イベント、削除イベントおよび更新イベントを送信できます。RelationSourceインタフェースを実装する場合、コードでバッチ処理の有無にかかわらずイベントを送信できます。RelationSource setEventSenderメソッドの実装により、表7-3に示されているタイプのいずれかにキャストできるセンダー・インスタンスを受信します。センダー・インスタンスを使用して、イベントをダウンストリーム・ステージに送信します。

クラスで受信するセンダー・インスタンスの処理に関して次の制約に留意してください。

  • sendDeleteEventについては、チャネル用に構成されたのと同じイベント・タイプのインスタンスを送信する必要があります。

  • sendInsertEventについては、同じ主キーを持つイベントがすでにリレーション内に含まれる場合、一意制約違反例外が発生し、入力イベントが破棄されます。

  • sendUpdateEventについては、指定の主キーを持つイベントがリレーション内に含まれていない場合、無効な更新タプル例外が発生し、入力イベントが破棄されます。

次の例では、単純なRelationSource実装はStreamSenderを受信し、リレーションとしてイベントを送信するためにセンダーをRelationSenderにキャストします。このクラスはリポジトリで構成されたイベント・タイプから新しいTradeEventインスタンスを作成しますが、sendEventsメソッドはコードの別の部分からパラメータと同じように簡単にインスタンスを受信できます。

package com.oracle.cep.example.tradereport;

import com.bea.wlevs.ede.api.EventType;
import com.bea.wlevs.ede.api.EventTypeRepository;
import com.bea.wlevs.ede.api.RelationSender;
import com.bea.wlevs.ede.api.RelationSource; 
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.util.Service;

public class TradeEventSource implements RelationSource {

    // Variables for event type respository and event sender. Both
    // will be set by the server.
    EventTypeRepository m_repos = null;	
    RelationSender m_sender = null;

    // Called by the server to set the repository instance.
    @Service
    public void setEventTypeRepository(EventTypeRepository repos) {
        m_repos = repos;
    }

    // Called by the server to set the sender instance.
    @Override
    public void setEventSender(StreamSender sender) {
        // Cast the received StreamSender to a RelationSender    
        m_sender = (RelationSender)sender;
    }

    /**
     * Sends events to the next EPN stage using the sender 
     * received from the server. This code assumes that an event
     * instance isn't received from another part of the class, 
     * instead creating a new instance from the repository.
     */
    private void sendEvents(){
        EventType eventType = m_repos.getEventType("TradeEvent");
        TradeEvent tradeEvent = (TradeEvent)eventType.createEvent();
        m_sender.sendDeleteEvent(tradeEvent);
    }
}