ヘッダーをスキップ
Oracle® Fusion Middleware Oracle Event Processing EPL言語リファレンス
11g リリース1 (11.1.1.7)
B55579-06
  目次へ移動
目次

前
 
次
 

1 イベント処理言語(EPL)の概要

この章では、ストリーミング・データ用の問合せ言語であるOracle Event Processing Language (EPL)の概要について説明します。EPLを使用すると、Oracle Event Processingを使用したデータ・ストリームに対して問合せを表現できます。EPLは非推奨になったことに注意してください。新たな開発ではOracle Continuous Query Language (Oracle CQL)を使用する必要があります。

この章には次の項が含まれます:

1.1 EPL言語の概要

イベント・プロセッサ・モジュールは、イベント表現、処理モデル、プログラミング・インタフェースおよび言語仕様という機能コンポーネントに分けることができます。

イベントはJavaBeans規則に準拠するPOJOとして表現されます。イベント・プロパティはPOJOのgetterメソッドを通じて公開されます。可能な場合は、EPL文の実行結果もPOJOとして返されます。ただし、イベント・ストリームが結合された場合など、型のないイベントが返される場合があります。この場合、Mapコレクション・インタフェースのインスタンスが返されます。詳細は、1.2項「イベント表現」を参照してください。

EPL処理モデルは連続的です。文の制約を満たす着信イベントを受信すると結果がすぐに出力されます。出力時には、出力ウィンドウに入れられる新しいイベントの挿入イベント、および出力ウィンドウから出される古いイベントの削除イベントという2種類のイベントが生成されます。これらのいずれかまたは両方のイベントが発生すると、リスナーにアタッチおよび通知されます。詳細は、1.3項「処理モデル」を参照してください。

着信イベントは、スライディング・ウィンドウまたはバッチ・ウィンドウで処理されます。スライディング・ウィンドウでは、1件の増分データに対して徐々にウィンドウが変化してイベントが処理されます。これに対し、バッチ・ウィンドウでは別個のデータ・チャンクに対してウィンドウが変化してイベントが処理されます。ウィンドウのサイズは、保持されるイベントの最大数またはイベントが保持される最大時間によって定義されます。

EPLプログラミング・インタフェースでは、文を個別にコンパイルするか、URLを通じて一括でロードできます。文の繰り返し、取得、開始、および停止が可能です。リスナーは文にアタッチされ、挿入イベントまたは削除イベントあるいはその両方が発生した場合に通知されます。

イベント処理言語はSELECTFROMWHEREGROUP BYHAVINGおよびORDER BY句を使用するSQLに似た言語です。ストリームはデータ・ソースとしての表に相当し、イベントはデータの基本単位としての行に相当します。イベントはデータで構成されるため、結合による相関、副問合せによるフィルタ処理、グループ化による集約などのSQLの概念が効果的に利用されます。INSERT INTO句は、下流処理が行われるイベントを他のストリームに転送する手段として作り直されています。JDBCを通じてアクセス可能な外部データは、ストリーム・データで照会および結合されます。SQLには含まれない、イベント処理に固有の言語構造を提供するRETAINMATCHINGOUTPUT句などの追加の句も使用できます。

RETAIN句は、本質的にはストリーム・データに対する仮想ウィンドウを定義することで、問合せが実行されるデータ量に制約を付与します。データの範囲が表で制限されるリレーショナル・データベース・システムとは異なり、イベント処理システムでは問合せのデータをより動的に制限する代替手段が提供されます。

MATCHING句は固有のパターンに一致するイベントのシーケンスを検出します。ANDORFOLLOWED BYなどの時相演算子や論理演算子を使用することで、任意の複雑な式を通じてイベントの発生またはイベントの欠落の両方を検出できます。

OUTPUT句は下流プロセッサの過負荷を回避するために文の実行結果を抑制します。結果的に生成された最初または最後のイベントの全体または一部が、時間ベースまたは行ベースのバッチで渡されます。

最後の節では、実際的なシナリオのもとで言語機能を例示するいくつかの使用例を示します。

1.2 イベント表現

EPLを使用すると、イベントをイベント・オブジェクトとして表現できます。詳細については、以下を参照。

すべてのイベント・オブジェクトにはプロパティが含まれます。詳細については、以下を参照。

イベントはイベント・シンクによって消費されます。詳細は、1.2.7項「イベント・シンク」を参照してください。

1.2.1 イベント・オブジェクト

イベントは、過去に発生したアクションまたは状態変更の不変レコードです。イベント・オブジェクトの状態情報はイベント・プロパティで取得されます。イベントは、POJOまたはjava.util.Mapを拡張するcom.bea.wlevs.cep.event.MapEventObjectのいずれかで表現されます。

表1-1 イベント表現

Javaクラス 説明
java.lang.Object

JavaBeans規則に準拠し、getterメソッドを持つ任意のJava POJO。

com.bea.wlevs.ede.api.MapEventObject

Mapイベントはキーと値のペアです。


1.2.2 Plain Old Java Objectイベント

Plain old Java object (POJO)イベントは、JavaBeans形式のgetterメソッドを通じてイベント・プロパティを公開するオブジェクト・インスタンスです。イベント・クラスまたはインタフェースは必ずしも完全にJavaBeans仕様に準拠している必要はありませんが、EPLエンジンがイベント・プロパティを取得するためには必須のJavaBeans getterメソッドが存在している必要があります。

EPLでは、スーパー・クラスを拡張したり、1つまたは複数のインタフェースを実装したりするJavaBeans形式のイベント・クラスがサポートされています。また、EPL文ではJavaインタフェース・クラスおよび抽象クラスを参照できます。

イベントを表現するクラスは不変的に作成する必要があります。イベントは過去に発生した状態変更やアクションの記録であるため、関連するイベント・プロパティは変更不能である必要があります。ただし、これは厳密な要件ではなく、EPLエンジンでは可変イベントも受諾されます。

1.2.3 Mapイベント

イベントはjava.util.Mapインタフェースを拡張するcom.bea.wlevs.ede.api.MapEventObjectインタフェースを実装するオブジェクトによっても表現されます。Mapイベントのイベント・プロパティは、java.util.Mapインタフェースで公開されるgetメソッドを通じてアクセスできる各エントリの値です。

Map内のエントリはイベント・プロパティを表現します。EPL文で指定されたイベント・プロパティ名をエンジンが参照できるようするために、キーの型はjava.util.Stringである必要があります。値には任意の型を使用できます。MapではPOJOも値として示されます。

1.2.4 イベント・プロパティ

EPL式では、simple、indexed、mapped、およびnestedのイベント・プロパティを使用できます。以下の表に、異なる型のプロパティ、およびイベント式での構文の概要を示します。この構文を使用すると、JavaBeansオブジェクト・グラフ、XML構造、およびMapイベントを深い所まで照会できます。

表1-2 イベント・プロパティ

タイプ 説明 構文

Simple

取得可能な1つの値を持つプロパティ。プロパティ型にはプリミティブ型(intjava.lang.Stringなど)または他の複合型を使用できます。

name
sensorId

Nested

nestedプロパティは、イベントの他のプロパティ内に存在するプロパティです。Mapとして表現されるイベントには他のPOJOイベントのみをネストでき、他のMapイベントはネストできません。

name.nestedname
sensor.value

Indexed

indexedプロパティにはオブジェクトの順序付きコレクションが格納され(すべて同一の型)、整数の値を持つ正の数の索引(添字)によって個別にアクセスされます。Mapとして表現されるイベントではindexedプロパティはサポートされません。

name[index]
sensor[0]

Mapped

mappedプロパティはキーを持つオブジェクトのコレクションを格納します(すべて同一の型)。標準のJavaBeans APIに対する拡張機能として、EPLは文字列値のキーを受け入れるプロパティをmappedプロパティと見なします。マップとして表現されるイベントではindexedプロパティはサポートされません。

name('key')
sensor('light')

次のようなEmployeeEventイベント・クラスがあると想定します。この例のmappedプロパティとindexedプロパティではJavaオブジェクトが返されますが、Java言語のプリミティブ型(intStringなど)を返すこともできます。AddressオブジェクトおよびEmployeeオブジェクト自身は、Addressオブジェクトの町名やEmployeeオブジェクトの従業員名など内部にネストするプロパティを持つことができます。

  public class EmployeeEvent {
      public String getFirstName();
      public Address getAddress(String type);
      public Employee getSubordinate(int index);
      public Employee[] getAllSubordinates();
  }

simpleイベント・プロパティにはプロパティ値を返すgetterメソッドが必要です。上の例では、getFirstName getterメソッドによって型StringfirstNameイベント・プロパティが返されます。

indexedイベント・プロパティには次のいずれかのgetterメソッドが必要です。

  • getSubordinateメソッドのように、整数型のキー値を受け取り、プロパティ値を返すメソッド。

  • Employeeの配列を返すgetAllSubordinates getterメソッドのように、配列型を返すメソッド。

EPL文では、indexedプロパティへのアクセス時に構文property[index]を使用します。

mappedイベント・プロパティでは、getAddressメソッドのように、String型のキー値を受け取りプロパティ値を返すgetterメソッドが必要です。EPL文またはイベント・パターン文では、mappedプロパティへのアクセス時に構文property ('key')を使用します。

nestedイベント・プロパティにはネストするオブジェクトを返すgetterメソッドが必要です。getAddressおよびgetSubordinateメソッドは、ネストするオブジェクトを返すmappedプロパティとindexedプロパティです。EPL文では、nestedプロパティへのアクセス時に構文property.nestedPropertyを使用します。

すべてのEPL文では、1つまたは複数のイベント・プロパティ名が予期される任意の場所で、indexedプロパティ、mappedプロパティ、nestedプロパティ、またはこれらの組み合わせを使用できます。次の例では、indexed、mapped、およびnestedプロパティの異なる組み合わせを示します。

  address('home').streetName
  subordinate[0].name='anotherName'
  allSubordinates[1].name
  subordinate[0].address('home').streetName

同様に、この構文は、SELECTリスト、WHERE句、JOIN条件などEPL文でイベント・プロパティ名が予期されるすべての場所で使用できます。

  SELECT firstName, address('work'), subordinate[0].name, subordinate[1].name
  FROM EmployeeEvent RETAIN ALL
  WHERE address('work').streetName = 'Park Ave'

1.2.5 動的なイベント・プロパティ

動的な(つまり、チェックされていない)プロパティは、文のコンパイル時に認識される必要のないイベント・プロパティです。Oracle Event Processingではこれらの動的プロパティを実行時に解決します。

動的プロパティの背景にある概念としては、基になるイベント表現で必ずしも事前にプロパティを認識する必要がないということがあります。文のコンパイル時に認識されない追加のプロパティが基になるイベントに含まれていて、EPL文を使用してこれらのプロパティを照会する必要がある場合があります。この概念は、豊富な機能を持つオブジェクト指向のドメイン・モデルを表現するイベントで特に便利です。

動的プロパティの構文は、プロパティ名と疑問符から成ります。indexed、mapped、およびnestedプロパティを動的プロパティにすることもできます。以下の表に、動的イベント・プロパティの型、およびそれらの識別のために使用される構文を示します。

表1-3動的プロパティの構文

イベント・プロパティ型 構文

動的なSimple

name?

動的なIndexed

name[index]?

動的なMapped

name('key')?

動的なNested

name?.nestedPropertyName

動的プロパティでは常にjava.lang.Object型が返されます。実行時に処理されるイベントに動的プロパティが存在しない場合、動的プロパティではnull値が返されます。

たとえば、itemプロパティを指定するOrderEventイベントを考えます。itemプロパティの型はObjectであり、ServiceまたはProductのいずれかのインスタンスへの参照が保持されます。どちらになるかは実行時に明らかになります。さらに、ServiceおよびProductクラスでpriceというプロパティが提供されると想定します。動的プロパティを使用すると、以下のように、いずれかのオブジェクト(ServiceまたはProduct)からpriceプロパティを取得する問合せを指定できます。

  SELECT item.price? 
  FROM OrderEvent RETAIN ALL EVENTS

2つ目の例として、Serviceクラスに含まれているserviceNameプロパティがProductクラスには含まれていないと想定します。実行時、以下の問合せはServiceオブジェクトの場合にserviceNameプロパティの値を返します。しかし、Productオブジェクトに対して問合せを実行すると、ProductsにはserviceNameプロパティが含まれないためnull値が返されます。

  SELECT item.serviceName? 
  FROM OrderEvent RETAIN ALL EVENTS

次に、OrderEventに複数の実装クラスがあり、そのうちのいくつかにのみtimestampプロパティが含まれている場合を考えます。以下の問合せは、プロパティが含まれているOrderEventインタフェースで実装されているtimestampプロパティを返します。

  SELECT timestamp? 
  FROM OrderEvent RETAIN ALL EVENTS

前述の問合せは、型java.lang.Objecttimestamp?という1つの列を返します。

動的プロパティをネストした場合は、動的プロパティの下にあるすべてのプロパティも動的プロパティと見なされます。次の例は、detail動的プロパティで返されたオブジェクトのdirectionプロパティをたずねています。

  SELECT detail?.direction 
  FROM OrderEvent RETAIN ALL EVENTS

上の問合せは、以下と同等です。

  SELECT detail?.direction? 
  FROM OrderEvent RETAIN ALL EVENTS

以下は、動的プロパティと共に使用すると便利な機能です。

  • CAST関数は動的プロパティの値(または式の値)を指定の型にキャストします。4.1.7項「CAST関数」を参照してください。

  • EXISTS関数は動的プロパティが存在するかどうかをチェックします。イベントにその名前のプロパティが存在する場合はtrueを返し、イベントにプロパティが存在しない場合はfalseを返します。4.1.8項「EXISTS関数」を参照してください。

  • INSTANCEOF関数は動的プロパティの値(または式の値)が指定の型であるかどうかをチェックします。4.1.6項「INSTANCEOF関数」を参照してください。

1.2.6 ユーザー定義関数を使用した他のイベント・プロパティの処理

EPLでサポートされていないデータ型をイベントが使用している場合は、ユーザー定義関数を作成してEPL問合せでそのデータ型を評価できます。

例1-1のようなenumデータ型があるとします。例1-2に示すイベントでは、このenumデータ型を使用しています。EPLではenumデータ型がサポートされていません。

例1-1 列挙データ型ProcessStatus

package com.oracle.app;

public enum ProcessStatus {
    OPEN(1), 
    CLOSED(0)}
}

例1-2 列挙データ型ProcessStatusを使用するイベント

package com.oracle.app;

import com.oracle.capp.ProcessStatus;

public class ServiceOrder {
    private String serviceOrderId;
    private String electronicSerialNumber;
    private ProcessStatus status;
... 
}

ユーザー定義関数を作成し(例1-3を参照)、アプリケーション・アセンブリ・ファイルに関数を登録することで(例1-4を参照)、このenumデータ型をEPL問合せで評価できます(例1-5を参照)。

例1-3 列挙データ型を評価するユーザー定義関数

package com.oracle.app;

import com.oracle.capp.ProcessStatus;
public class CheckIfStatusClosed {
    public boolean execute(Object[] args) {
        ProcessStatus arg0 = (ProcessStatus)args[0];
        if (arg0 == ProcessStatus.OPEN)
            return Boolean.FALSE;
        else
            return Boolean.TRUE;
    }
}

例1-4 アプリケーション・アセンブリ・ファイルへのユーザー定義関数の登録

<wlevs:processor id="testProcessor">
    <wlevs:listener ref="providerCache"/>
    <wlevs:listener ref="outputCache"/>
    <wlevs:cache-source ref="testCache"/>
    <wlevs:function function-name="statusClosed" exec-method=”execute” />
        <bean class="com.oracle.app.CheckIfStatusClosed"/>
    </wlevs:function>
</wlevs:processor>

例1-5 ユーザー定義関数を使用したEPL問合せでの列挙データ型の評価

<query id="rule-04"><![CDATA[
    SELECT
        meter.electronicSerialNumber, 
        meter.exceptionKind
    FROM 
        MeterLogEvent AS meter, 
        ServiceOrder AS svco
    WHERE 
        meter.electronicSerialNumber = svco.electronicSerialNumber and
        svco.serviceOrderId IS NULL OR statusClosed(svco.status)
]]></query>

詳細は、第4章「EPLリファレンス関数」を参照してください。

1.2.7 イベント・シンク

イベント・シンクは、EPL文で指定された条件に一致するイベントが発生した場合に、プログラムによる通知を受信する手段を提供します。以下の場合に、シンクへの通知が行われます。

  • EPL文で指定された条件に一致する新しいイベントが発生した場合。これらはISTREAMイベントと呼ばれます。

  • 以前EPL文で指定された条件と一致していた古いイベントが、期限切れになったか、または新たに発生した着信イベントがこれらに取って代わったために、出力ウィンドウからプッシュされた場合。これらはRSTREAMイベントと呼ばれます。

これらの通知が発生した場合の詳しい例は、1.3項「処理モデル」で説明しています。

ISTREAMイベントを受信するには、com.bea.wlevs.ede.api.EventSinkインタフェースを使用します。実装では、結果が使用可能になった場合にエンジンによって呼び出される単一のonEventメソッドを指定する必要があります。このインタフェースでは、新しいイベントのみがリスナーに送信されます。

public interface EventSink extends EventListener {
   void onEvent(List<Object> newEvents) 
   throws RejectEventException;
}

エンジンでは、文の結果をPOJOまたはMapEventObjectインスタンスのリストとしてイベント・シンクに提供します。ワイルドカードでの選択の場合、結果はエンジンに送信された元のイベント・オブジェクト・タイプに一致します。式のjoinsおよびselect句の場合、結果のオブジェクトはcom.bea.wlevs.ede.api.MapEventObjectインタフェースを実装します。

1.3 処理モデル

EPL処理モデルは、1.3.1項「イベント・ストリーム」で説明するイベント・ストリームに基づいています。

イベント・ストリームで操作する問合せは、次の項に従って表現します。

1.3.1 イベント・ストリーム

EPL処理モデルは継続的です。文へのリスナーはエンジンでその文のイベントが処理されるとすぐに、文で選択されているイベント・ストリーム、RETAIN句の制約、フィルタ、および出力レートに応じて更新されたデータを受信します。

この節では、非常に単純なEPL文の出力を示します。この文はデータ・ウィンドウを使用せず、いずれのフィルタも適用せずにイベント・ストリームを選択します。

  SELECT * FROM Withdrawal RETAIN ALL

この文はすべてのWithdrawalイベントを選択します。エンジンでWithdrawal型またはWithdrawalの下位の型のイベントが処理されるたびに、すべての更新リスナーが呼び出され、文の各リスナーに新しいイベントが渡されます。

挿入ストリームという用語は、新しいイベントが到着し、データ・ウィンドウまたは集約に入れられることを示します。この例の挿入ストリームは、到着するWithdrawalイベントのストリームであり、更新リスナーに新しいイベントとしてポストされます。

次の図は、時間の経過に沿って到着する一連のWithdrawalイベント1 - 6を示しています。この図を含め、この節に示す図のカッコ内の数字はWithdrawalイベントのamountプロパティの値を示します。

図1-1 イベント・ストリームの例

図1-1の説明が続きます
「図1-1 イベント・ストリームの例」の説明

前述の例の文ではRETAIN句が指定されていないため、新しいイベントのみがエンジンによって文のリスナーにポストされ、古いイベントはポストされません。

1.3.2 スライディング・ウィンドウ

スライディング・ウィンドウには、行ベースおよび時間ベースの2種類があります。以下の節では、これらの各項目について示します。

1.3.2.1 行ベースのスライディング・ウィンドウ

行ベースのスライディング・ウィンドウでは、ストリームの直前のN件のイベントのみを保持するようエンジンに通知します。次の文は、Withdrawalイベント・ストリームに長さウィンドウを適用します。この文は、データ・ウィンドウ、およびデータ・ウィンドウに入るまたはウィンドウから出るイベントの概念を示しています。

  SELECT * FROM Withdrawal RETAIN 5 EVENTS

この文のウィンドウのサイズは5件のイベントです。エンジンにより到着するすべてのWithdrawalイベントがウィンドウに入れられます。ウィンドウが一杯になると、最も古いWithdrawalイベントがウィンドウからプッシュされます。エンジンは更新リスナーに対し、ウィンドウに入れられるすべてのイベントを新しいイベントとして示し、ウィンドウから出されるすべてのイベントを古いイベントとして示します。

挿入ストリームという用語が到着する新しいイベントを示すのに対し、削除ストリームという用語はデータ・ウィンドウから出されるイベント、または集約値を変更するイベントを示します。この例の削除ストリームは、長さウィンドウから退去するWithdrawalイベントのストリームであり、これらのイベントは古いイベントとして更新リスナーにポストされます。

次の図は、イベントの到着に応じて変化する長さウィンドウの内容と、更新リスナーにポストされるイベントを示します。

図1-2 行ベースのスライディング・ウィンドウの例

図1-2の説明が続きます
「図1-2 行ベースのスライディング・ウィンドウの例」の説明

以前と同様、到着するすべてのイベントは新しいイベントとして更新リスナーにポストされます。また、イベントW1は、イベントW6の到着時に長さウィンドウから出され、古いイベントとして更新リスナーにポストされます。

長さウィンドウと同様、時間ウィンドウでは指定の期間内で最新のイベントが保持されます。たとえば、5秒の時間ウィンドウでは直前の5秒間のイベントが保持されます。秒の経過につれて、時間ウィンドウでは最も古いイベントがウィンドウから実際にプッシュされ、その結果1つまたは複数の古いイベントが更新リスナーにポストされます。

EPLでは、省略可能なISTREAMおよびRSTREAMキーワードがSELECT句およびINSERT INTO句でサポートされています。これにより、データ・ウィンドウに入れられるイベントまたは出されるイベントのみを転送したり、現在または以前の集約値のみを選択したりするなど、挿入ストリームまたは削除ストリームに基づいた操作を行うようエンジンに通知できます。

1.3.2.2 時間ベースのスライディング・ウィンドウ

時間ベースのスライディング・ウィンドウは、システム時間に基づいて、過去の指定された時間間隔までの範囲を示す変化するウィンドウです。行ベースのスライディング・ウィンドウと同様、時間ベースのスライディング・ウィンドウでは問合せで処理されるイベント数を制限できます。

次の図は、時間ウィンドウの動作を示しています。この図では、イベントのグループ化やフィルタ処理を行わず、単にイベントを選択する問合せを想定します。

  SELECT * FROM Withdrawal RETAIN 4 SECONDS

この図は指定の時間tから開始し、t+4秒、t+5秒、およびそれ以降の時点における時間ウィンドウの内容を示しています。

図1-3 時間ベースのスライディング・ウィンドウの例

図1-3の説明が続きます
「図1-3 時間ベースのスライディング・ウィンドウの例」の説明

図では、以下の動作を示しています。

  1. 時間t + 4秒の時点でイベントW1が到着し、時間ウィンドウに入ります。エンジンにより新しいイベントが更新リスナーに報告されます。

  2. 時間t + 5秒の時点でイベントW2が到着し、時間ウィンドウに入ります。エンジンにより新しいイベントが更新リスナーに報告されます。

  3. 時間t + 6.5秒の時点でイベントW3が到着し、時間ウィンドウに入ります。エンジンにより新しいイベントが更新リスナーに報告されます。

  4. 時間t + 8秒の時点でW1が時間ウィンドウから離れます。エンジンによりイベントは古いイベントとして更新リスナーに報告されます。

実際的な例として、直前の4秒間の各口座の平均引出し額が1000を超えるすべての口座を決定する必要がある場合を考えます。この問題を解決する文は、以下のとおりです。

  SELECT account, AVG(amount)
  FROM Withdrawal RETAIN 4 SECONDS
  GROUP BY account
  HAVING amount > 1000

1.3.3 バッチ・ウィンドウ

行ベースおよび時間ベースのウィンドウは両方ともバッチ処理に対応できます。次の節では、これらの概念について示します。

1.3.3.1 時間ベースのバッチ・ウィンドウ

時間ベースのバッチ・ウィンドウではイベントがバッファ・リングされ、指定された時間間隔ごとにそれらのイベントが1回の更新で解放されます。長さベースのバッチ・ウィンドウと同様、時間ベースのバッチ・ウィンドウではイベントの評価が制御されます。

次の図は、時間バッチ・ビューの動作を示しています。この図では、以下の単純な問合せを想定しています。

  SELECT * FROM Withdrawal RETAIN BATCH OF 4 SECONDS

この図は指定の時間tから開始し、t + 4秒、t + 5秒、およびそれ以降の時点における時間ウィンドウの内容を示しています。

図1-4 時間ベースのバッチ・ウィンドウの例

図1-4の説明が続きます
「図1-4 時間ベースのバッチ・ウィンドウの例」の説明

図では、以下の動作を示しています。

  1. 時間t + 1秒の時点でイベントW1が到着し、バッチに入ります。更新リスナーへの通知の呼出しは発生しません。

  2. 時間t + 3秒の時点でイベントW2が到着し、バッチに入ります。更新リスナーへの通知の呼出しは発生しません。

  3. 時間t + 4秒の時点でエンジンによりバッチ内のイベントが処理され、新しいバッチが開始されます。エンジンによりイベントW1およびW2が更新リスナーに報告されます。

  4. 時間t + 6.5秒の時点でイベントW3が到着し、バッチに入ります。更新リスナーへの通知の呼出しは発生しません。

  5. 時間t + 8秒の時点でエンジンによりバッチ内のイベントが処理され、新しいバッチが開始されます。エンジンによりイベントW3が新しいデータとして更新リスナーに報告されます。エンジンによりイベントW1およびW2が古いデータ(前回のバッチ)として更新リスナーに報告されます。

1.3.3.2 行ベースのバッチ・ウィンドウ

行ベースのウィンドウでもバッチ処理を行えます。たとえば、次の問合せは5件のイベントが受信されるまで何も処理せずに待機します。

  SELECT * FROM Withdrawal RETAIN BATCH OF 5 EVENTS

5件のイベントを受信すると問合せが実行され、再び新しい5件のイベントを受信するまで処理を待機します。

1.3.4 副問合せとWHERE句

イベント・ストリームに対するフィルタは副問合せ式内で使用され、イベントがデータ・ウィンドウ内に入る前にイベントを指定のストリームからフィルタで除外します。このフィルタ処理はWHERE句の実行前に発生します。残りのEPL文で処理されるデータ量が軽減され、パフォーマンスが向上するため、可能な場合は、WHERE句に対し、副問合せ内でフィルタ処理を完了します。

次の文は、金額が200以上のWithdrawalイベントを選択する副問合せを示します。

  SELECT * FROM (SELECT * FROM Withdrawal WHERE amount >= 200) RETAIN 5 EVENTS

この副問合せでは、金額が200未満のWithdrawalイベントは外部問合せのウィンドウに入らず、そのため更新リスナーには渡されません。

図1-5 副問合せとWHERE句の例

図1-5の説明が続きます
「図1-5 副問合せとWHERE句の例」の説明

文中のWHERE句およびHAVING句では、文のデータ・ウィンドウや他のビュー内でイベントが処理された後、処理の後半で可能性のある結果行が消去されます。

次の文は、副問合せを使用せず、WithdrawalイベントにWHERE句を適用します。

  SELECT * FROM Withdrawal RETAIN 5 EVENTS WHERE amount >= 200

WHERE句は新しいイベントと古いイベントの両方に適用されます。次の図に示すとおり、到着するイベントは"amount"プロパティの値にかかわらずウィンドウに入れられます。ただし、WHERE句を満たすイベントのみが更新リスナーに渡されます。また、イベントはデータ・ウィンドウを離れますが、WHERE句の条件を満たすイベントのみが古いイベントとして更新リスナーにポストされます。

図1-6 行ベースのスライディング・ウィンドウに到達するイベント

図1-6の説明が続きます
「図1-6 行ベースのスライディング・ウィンドウに到達するイベント」の説明

イベント・ストリームのフィルタは指定可能なフィルタ・タイプの中でより限定的ですが、WHERE句には複雑な条件を含めることができます。次の文のWHERE句はWHERE句内のjava.lang.Math Javaライブラリ・クラスのceil関数に適用されます。INSERT INTO句により、最初の文の結果が2番目の文で使用可能になります。

  INSERT INTO BigWithdrawal 
    SELECT * FROM Withdrawal RETAIN ALL WHERE Math.ceil(amount) >= 200
  SELECT * FROM BigWithdrawal RETAIN ALL 

1.3.5 集約

集約関数を介してイベントを集約する文では、集約された値の変化に伴い、削除ストリーム・イベントもポストされます。2つのWithdrawalイベントを受信した場合にアラートを発生する以下の文について考えます。

  SELECT COUNT(*) AS mycount 
  FROM Withdrawal RETAIN ALL 
  HAVING COUNT(*) = 2

エンジンで2つ目のwithdrawalイベントが検出されると、エンジンにより新しいイベントが更新リスナーにポストされます。その新しいイベントのmycountプロパティの値は2です。さらに、エンジンで3つ目のWithdrawalイベントが検出されると、以前のカウントの値を含む古いイベントがエンジンにより更新リスナーにポストされます。その古いイベントのmycountプロパティの値も2です。

ISTREAMまたはRSTREAMキーワードを使用すると、更新リスナーにポストされる新しいイベントまたは古いイベントを除去できます。次の文では、ISTREAMキーワードを使用した結果、2つ目のWithdrawalイベントを受信した際にエンジンで更新リスナーが1回のみ呼び出されます。

  SELECT ISTREAM COUNT(*) AS mycount 
  FROM Withdrawal RETAIN ALL 
  HAVING COUNT(*) = 2

1.4 使用例

次の使用例では、言語のさまざまな機能の使用方法の例を示します。

1.4.1 フィードごとのレートの計算

スループットの統計および急速な減衰の検出のために、各マーケット・データ・フィードごとに毎秒のティック数を計算します。

マーケット・データ・イベント・ストリームのソースからの1秒間のイベントをバッチにまとめるEPL文を使用できます。フィードおよびフィードごとのイベント数を出力値として指定します。このデータをさらに処理するために、出力イベントをTicksPerSecondイベント・ストリームに挿入します。

  INSERT INTO TicksPerSecond
  SELECT feed, COUNT(*) AS cnt 
    FROM MarketDataEvent
    RETAIN BATCH OF 1 SECOND
    GROUP BY feed

1.4.2 最高値の株価の計算

最高値の株価を計算するため、固有の各銘柄記号に対し、トランザクションのブロック・サイズが10を超える100件のイベントを保持するスライディング・ウィンドウを定義します。たとえば、5000の銘柄記号がある場合、5,000 x 100の5,000,000件のイベントが保持されます。ブロック・サイズが10を越えるMarketTradeイベントのみがウィンドウに入れられ、価格の高い上位100件のイベントのみが保持されます。

結果は銘柄記号別にグループ化され、平均価格が100未満の銘柄記号は出力から除外され、アルファベット順に並べられます。

   SELECT symbol, AVG(price) 
   FROM (SELECT * FROM MarketTrade WHERE blockSize > 10)
   RETAIN 100 EVENTS WITH LARGEST price PARTITION BY symbol 
   GROUP BY symbol
   HAVING AVG(price) >= 100
   ORDER BY symbol

1.4.3 位置データの分類

高速道路の自動車の位置および方向に関する情報が含まれる自動車位置のイベント・データに基づいて、自動車が進むルートを検出します。まずcarIdによってデータを分類することで特定の自動車に関する情報を特定し、次に高速道路、方向で分類し、方向をプロットします。これらの情報に基づいて自動車の速度が計算されます。

最初のPARTITION BY carIdでは自動車位置イベントが自動車ごとにグループ化され、さらにその後のPARTITION BY expressway PARTITION BY directionではより詳細な位置および方向のプロパティ値によって、データがさらに分類されます。この問合せで保持されるイベント数は4ですが、これは最後のPARTITION BY句で保持された最大数に適用されます。このため、個別の分類プロパティ値に対し、最大4件のイベントが保持されます。

  SELECT carId, expressway, direction, 
    SUM(segment)/(MAX(timestamp)-MIN(timestamp)) AS speed
   FROM CarLocationEvent
   RETAIN 4 events 
   PARTITION BY carId PARTITION BY expressway PARTITION BY direction

1.4.4 急速な減衰の検出

任意のタイミングの毎秒ティック数が直前の10秒間の毎秒ティック数の75%を下回った場合にアラートを発することにより、急速な減衰を定義します。

直前の10秒間の毎秒ティック数は、最初の文で計算されたTicksPerSecondイベントを使用し、直前の10秒間の平均を取ることで計算できます。次に、現在のレートを変化する平均値と比べ、平均値の75%に満たないレートをフィルタ処理します。

  SELECT feed, AVG(cnt) AS avgCnt, cnt AS feedCnt 
    FROM TicksPerSecond
    RETAIN 10 seconds
    GROUP BY feed 
    HAVING cnt < AVG(cnt) * 0.75

1.4.5 ネットワークの異常性の検出

顧客側でのチェック・インの作業中に、端末でハードウェアの問題が検出されたり、ネットワークが停止したりする場合があります。このような状況では、顧客を支援するチーム・メンバーに警告する必要があります。端末で問題が検出された場合はOutOfOrderイベントが発行されます。パターンでは端末でout-of-orderが通知され、顧客がチェック・イン処理中である状況が検出されます。

  SELECT ci.term 
  MATCHING ci:=Checkin FOLLOWED BY 
        ( OutOfOrder (term.id=ci.term.id) AND NOT
          (Cancelled (term.id=ci.term.id) OR 
           Completed (term.id=ci.term.id) ) WITHIN 3 MINUTES )

セルフサービスの各端末では、以下の4つのうちいずれかのイベントを発行できます。

  • Checkin - 顧客がチェック・イン・ダイアログを開始したことを示します。

  • Cancelled - 顧客がチェック・イン・ダイアログを取り消したことを示します。

  • Completed - 顧客がチェック・イン・ダイアログを完了したことを示します。

  • OutOfOrder - 端末でハードウェアの問題が検出されたことを示します。

すべてのイベントでは、イベントをパブリッシュした端末の情報と、タイム・スタンプが提供されます。端末の情報はtermというプロパティに格納され、端末のidが指定されます。すべてのイベントは類似の情報を含んでいるため、各イベントを基本クラスTerminalEventのサブタイプとしてモデル化し、すべてのイベントで共有される端末情報がこのクラスで提供されます。これにより、すべての端末イベントを多態的に扱うことができ、派生したイベント・タイプを親イベント・タイプと同様に扱うことができるため問合せが簡素化されます。

1.4.6> 欠落したイベントの検出

Statusイベントは定期的に60秒間隔で到着するため、MATCHING句を使用した一時的なパターン照合を利用して、時間通りに到着しなかったイベントを検出できます。WITHIN演算子を使用して送信や処理による遅延の可能性を考慮した65秒のウィンドウを保持し、NOT演算子を使用してterm.idT1に等しいStatusイベントの欠落を検出します。

  SELECT 'terminal 1 is offline' 
  MATCHING NOT Status(term.id = 'T1') WITHIN 65 SECONDS
  OUTPUT FIRST EVERY 5 MINUTES

1.4.7 端末アクティビティ・データの概要

端末アクティビティの統計情報をスタッフ側にリアル・タイムに提示することで、システムをモニターし、問題を特定することが可能になります。次のサンプル問合せでは、イベント・タイプごとのカウントが毎分取得されます。CountPerTypeイベント・ストリームを通じて使用可能なこのデータをさらに利用して、記録された使用法パターンに対して結合および比較したり、単にアクティビティをリアル・タイムで要約できます。

  INSERT INTO CountPerType
  SELECT type, COUNT(*) AS countPerType 
  FROM TerminalEvent
  RETAIN 10 MINUTES
  GROUP BY type
  OUTPUT ALL EVERY 1 MINUTE

1.4.8 センサー・データの読取り

このサンプルでは、いずれかのリーダーの範囲内にパレットが含まれている場合に、RFIDリーダーのアレイがRFIDタグを検知します。リーダーは、リーダーのセンサーID、調査時刻、調査したタグなどの調査情報が含まれるXMLドキュメントを生成します。リーダーのセンサーIDごとに直前の60秒間のタグの総数が計算されます。

  SELECT ID AS sensorId, SUM(countTags) AS numTagsPerSensor
  FROM AutoIdRFIDExample
  RETAIN 60 SECONDS
  WHERE Observation[0].Command = 'READ_PALLET_TAGS_ONLY'
  GROUP BY ID

1.4.9 トランザクション・イベントの組合せ

このサンプルでは、トランザクションの各コンポーネントが含まれている組み合わせイベントを検出するEPL文を作成します。イベント照合は、直前の30分間に到着したイベントに制限されます。この文はINSERT INTO構文を使用してCombinedEventイベント・ストリームを生成します。

  INSERT INTO CombinedEvent(transactionId, customerId, supplierId, 
    latencyAC, latencyBC, latencyAB)
  SELECT C.transactionId, customerId, supplierId, 
    C.timestamp - A.timestamp, 
    C.timestamp - B.timestamp, 
    B.timestamp - A.timestamp 
  FROM TxnEventA A, TxnEventB B, TxnEventC C
  RETAIN 30 MINUTES
  WHERE A.transactionId = B.transactionId AND
    B.transactionId = C.transactionId

1.4.10 リアル・タイム・パフォーマンスのモニター

過去30分間のイベントの最小、最大、および平均の合計レイテンシ(AおよびCの間の時間差)を導くには、以下のEPLを使用できます。また、イベント・サーバーをモニターするため、ダッシュボードUIはイベントのサブセットをサブスクライブして、サーバーとエンド・ツー・エンドのレイテンシなどのシステム・パフォーマンスを計測します。システム全体を流れる各イベントがUIでモニターされることは期待できないため、モニター・アプリケーションでの処理が可能なイベントのサブセットに出力を制限する手段が必要になります。最後の1つのイベントのみ、またはすべてのイベントを出力できます。

  SELECT MIN(latencyAC) as minLatencyAC, 
    MAX(latencyAC) as maxLatencyAC,
    AVG(latencyAC) as avgLatencyAC
  FROM CombinedEvent
  RETAIN 30 MINUTES
  GROUP BY customerId
  OUTPUT LAST 50 EVERY 1 SECOND

1.4.11 削除されたトランザクション・イベントの検出

OUTER JOINを使用すると、3つのすべてのイベントを通じて成功しなかったトランザクションを検出できます。TxnEventAまたはTxnEventBイベントが、直前の30分間のイベントで構成されるそれぞれの時間ウィンドウを離れた場合、EPLによりEventC行が見つからない行がフィルタで除外されます。

  SELECT * 
    FROM TxnEventA A 
         FULL OUTER JOIN TxnEventC C ON A.transactionId = C.transactionId
         FULL OUTER JOIN TxnEventB B ON B.transactionId = C.transactionId
    RETAIN 30 MINUTES
  WHERE C.transactionId is null