ヘッダーをスキップ
Oracle® CEP EPL 言語リファレンス
リリース 11gR1 (11.1.1)
B55579-01
  目次
目次
索引
索引

戻る
戻る
 
次へ
次へ
 

1 イベント処理言語 (EPL) の概要

この節では、以下の項目について説明します。

1.1 EPL 言語の概要

複合イベント プロセッサ モジュールは、イベント表現、処理モデル、プログラミング インタフェース、および言語仕様という機能コンポーネントに分けることができます。

イベントは JavaBeans 規則に準拠する POJO として表現されます。イベント プロパティは POJO の getter メソッドを通じて公開されます。可能な場合は、EPL 文の実行結果も POJO として返されます。ただし、イベント ストリームが結合された場合など、型のないイベントが返される場合があります。この場合、Map コレクション インタフェースのインスタンスが返されます。詳細については、節 1.2「イベント表現」を参照してください。

EPL 処理モデルは連続的です。文の制約を満たす着信イベントを受信すると結果がすぐに出力されます。出力時には、出力枠に入れられる新しいイベントの「挿入イベント」、および出力枠から出される古いイベントの「削除イベント」という 2 種類のイベントが生成されます。これらのいずれかまたは両方のイベントが発生すると、リスナにアタッチおよび通知されます。詳細については、節 1.3「処理モデル」を参照してください。

着信イベントは、スライド枠またはバッチ枠で処理されます。スライド枠では、1 件の増分データに対して徐々に枠が変化してイベントが処理されます。これに対し、バッチ枠では別個のデータ チャンクに対して枠が変化してイベントが処理されます。枠のサイズは、保持されるイベントの最大数またはイベントが保持される最大時間によって定義されます。

EPL プログラミング インタフェースでは、文を個別にコンパイルするか、URL を通じて一括でロードできます。文の繰り返し、取得、開始、および停止が可能です。リスナは文にアタッチされ、挿入イベントまたは削除イベントあるいはその両方が発生した場合に通知されます。

イベント処理言語は SELECTFROMWHEREGROUP BYHAVING および ORDER BY 句を使用する SQL に似た言語です。ストリームはデータ ソースとしてのテーブルに相当し、イベントはデータの基本単位としての行に相当します。イベントはデータで構成されるため、結合による相関、サブクエリによるフィルタ処理、グループ化による集約などの SQL の概念が効果的に利用されます。INSERT INTO 句は、下流処理が行われるイベントを他のストリームに転送する手段として作り直されています。JDBC を通じてアクセス可能な外部データは、ストリーム データで照会および結合されます。SQL には含まれない、イベント処理に固有の言語構造を提供する RETAINMATCHINGOUTPUT 句などの追加の句も使用できます。

RETAIN 句は、本質的にはストリーム データに対する仮想枠を定義することで、クエリが実行されるデータ量に制約を付与します。データの範囲がテーブルで制限されるリレーショナル データベース システムとは異なり、イベント処理システムではクエリのデータをより動的に制限する代替手段が提供されます。

MATCHING 句は固有のパターンに一致するイベントのシーケンスを検出します。ANDORFOLLOWED BY などの時相演算子や論理演算子を使用することで、任意の複雑な式を通じてイベントの発生またはイベントの欠落の両方を検出できます。

OUTPUT 句は下流プロセッサの過負荷を回避するために文の実行結果を抑制します。結果的に生成された最初または最後のイベントの全体または一部が、時間ベースまたは行ベースのバッチで渡されます。

最後の節では、実際的なシナリオのもとで言語機能を例示するいくつかの使用例を示します。

1.2 イベント表現

EPL を使用して、イベントをイベント オブジェクトとして表現します。詳細については、以下を参照してください。

すべてのイベント オブジェクトにはプロパティが含まれています。詳細については、以下を参照してください。

イベントはイベント シンクによって使用されます。詳細については、節 1.2.7「イベント シンク」を参照してください。

1.2.1 イベント オブジェクト

イベントは、過去に発生したアクションまたは状態変更の不変レコードです。イベント オブジェクトの状態情報はイベント プロパティで取得されます。イベントは、POJO または java.util.Map を拡張する com.bea.wlevs.cep.event.MapEventObject のいずれかで表現されます。

表 1-1 イベント表現

Java クラス 説明
java.lang.Object

JavaBeans 規則に準拠し、getter メソッドを持つ任意の Java POJO。

com.bea.wlevs.ede.api.MapEventObject

Map イベントはキーと値のペアです。


1.2.2 Plain Old Java Object イベント

Plain old Java object (POJO) イベントは、JavaBeans 形式の getter メソッドを通じてイベント プロパティを公開するオブジェクト インスタンスです。イベント クラスまたはインタフェースは必ずしも完全に JavaBeans 仕様に準拠している必要はありませんが、EPL エンジンがイベント プロパティを取得するためには必須の JavaBeans getter メソッドが存在している必要があります。

EPL では、スーパー クラスを拡張したり、1 つまたは複数のインタフェースを実装したりする JavaBeans 形式のイベント クラスがサポートされています。また、EPL 文では Java インタフェース クラスおよび抽象クラスを参照できます。

イベントを表現するクラスは不変的に作成する必要があります。イベントは過去に発生した状態変更やアクションの記録であるため、関連するイベント プロパティは変更不能である必要があります。ただし、これは厳密な要件ではなく、EPL エンジンでは可変イベントも受諾されます。

1.2.3 Map イベント

イベントは java.util.Map インタフェースを拡張する com.bea.wlevs.ede.api.MapEventObject インタフェースを実装するオブジェクトによっても表現されます。Map イベントのイベント プロパティは、java.util.Map インタフェースで公開される get メソッドを通じてアクセスできる各エントリの値です。

Map 内のエントリはイベント プロパティを表現します。EPL 文で指定されたイベント プロパティ名をエンジンが参照できるようするために、キーの型は java.util.String である必要があります。値には任意の型を使用できます。Map では POJO も値として示されます。

1.2.4 イベント プロパティ

EPL 式では単純イベント プロパティのほか、インデックス、マップおよびネストされたイベント プロパティを使用できます。以下の表に、異なる型のプロパティ、およびイベント式での構文の概要を示します。この構文を使用すると、JavaBeans オブジェクト グラフ、XML 構造、および Map イベントを深い所まで照会できます。

表 1-2 イベント プロパティ

説明 構文

単純

取得可能な 1 つの値を持つプロパティ。プロパティ型にはプリミティブ型 (intjava.lang.String など) または他の複合型を使用できます。

name
sensorId

ネスト

ネストされたプロパティは、イベントの他のプロパティ内に存在するプロパティです。Map として表現されるイベントには他の POJO イベントのみをネストでき、他の Map イベントはネストできません。

name.nestedname
sensor.value

インデックス

インデックス プロパティにはオブジェクトの順序付きコレクションが格納され (すべて同一の型)、整数の値を持つ正の数のインデックス (添字) によって個別にアクセスされます。Map として表現されるイベントではインデックス プロパティはサポートされません。

name[index]
sensor[0]

マップ

マップ プロパティはキーを持つオブジェクトのコレクションを格納します (すべて同一の型)。標準の JavaBeans API に対する拡張機能として、EPL は文字列値のキーを受け入れるプロパティをマップ プロパティと見なします。Map として表現されるイベントではインデックス プロパティはサポートされません。

name('key')
sensor('light')

以下のような EmployeeEvent イベント クラスがあると想定します。この例のマップ プロパティとインデックス プロパティでは Java オブジェクトが返されますが、Java 言語のプリミティブ型 (intString など) を返すこともできます。Address オブジェクトおよび Employee オブジェクト自身は、Address オブジェクトの町名や Employee オブジェクトの従業員名など内部にネストするプロパティを持つことができます。

  public class EmployeeEvent {
      public String getFirstName();
      public Address getAddress(String type);
      public Employee getSubordinate(int index);
      public Employee[] getAllSubordinates();
  }

単純イベント プロパティにはプロパティ値を返す getter メソッドが必要です。上の例では、getFirstName getter メソッドによって型 StringfirstName イベント プロパティが返されます。

インデックス イベント プロパティには以下のいずれかの getter メソッドが必要です。

  • getSubordinate メソッドのように、整数型のキー値を受け取り、プロパティ値を返すメソッド。

  • Employee の配列を返す getAllSubordinates getter メソッドのように、配列型を返すメソッド。

EPL 文では、インデックス プロパティへのアクセス時に構文 property[index] を使用します。

マップ イベント プロパティでは、getAddress メソッドのように、String 型のキー値を受け取りプロパティ値を返す getter メソッドが必要です。EPL 文またはイベント パターン文では、マップ プロパティへのアクセス時に構文 property ('key') を使用します。

ネスト イベント プロパティにはネストするオブジェクトを返す getter メソッドが必要です。getAddress および getSubordinate メソッドは、ネストするオブジェクトを返すマップ プロパティとインデックス プロパティです。EPL 文では、ネストされたプロパティへのアクセス時に構文 property.nestedProperty を使用します。

すべての EPL 文では、1 つまたは複数のイベント プロパティ名が予期される任意の場所で、インデックス プロパティ、マップ プロパティ、ネストされたプロパティ、またはこれらの組み合わせを使用できます。次の例では、インデックス プロパティ、マップ プロパティ、およびネストされたプロパティの異なる組み合わせを示します。

  address('home').streetName
  subordinate[0].name='anotherName'
  allSubordinates[1].name
  subordinate[0].address('home').streetName

同様に、この構文は、SELECT リスト、WHERE 句、JOIN 条件など EPL 文でイベント プロパティ名が予期されるすべての場所で使用できます。

  SELECT firstName, address('work'), subordinate[0].name, subordinate[1].name
  FROM EmployeeEvent RETAIN ALL
  WHERE address('work').streetName = 'Park Ave'

1.2.5 動的なイベント プロパティ

動的な (つまり、チェックされていない) プロパティは、文のコンパイル時に認識される必要のないイベント プロパティです。Oracle CEP ではこれらの動的プロパティを実行時に解決します。

動的プロパティの背景にある概念として、基になるイベント表現で必ずしも事前にプロパティを認識する必要がないということがあります。文のコンパイル時に認識されない追加のプロパティが基になるイベントに含まれていて、EPL 文を使用してこれらのプロパティを照会する必要がある場合があります。この概念は、豊富な機能を持つオブジェクト指向のドメイン モデルを表現するイベントで特に便利です。

動的プロパティの構文は、プロパティ名と疑問符から成ります。インデックス プロパティ、マップ プロパティ、およびネストされたプロパティを動的プロパティにすることもできます。以下の表に、動的イベント プロパティの型、およびそれらの識別のために使用される構文を示します。

表 1-3 動的プロパティの構文

イベント プロパティ型 構文

単純 (動的)

name?

インデックス (動的)

name[index]?

マップ (動的)

name('key')?

ネスト (動的)

name?.nestedPropertyName

動的プロパティでは常に java.lang.Object 型が返されます。実行時に処理されるイベントに動的プロパティが存在しない場合、動的プロパティでは null 値が返されます。

たとえば、item プロパティを指定する OrderEvent イベントを考えます。item プロパティの型は Object であり、Service または Product のいずれかのインスタンスへの参照が保持されます。どちらになるかは実行時に明らかになります。さらに、Service および Product クラスで price というプロパティが提供されると想定します。動的プロパティを使用すると、以下のように、いずれかのオブジェクト (Service または Product) から price プロパティを取得するクエリを指定できます。

  SELECT item.price? 
  FROM OrderEvent RETAIN ALL EVENTS

2 つ目の例として、Service クラスに含まれている serviceName プロパティが Product クラスには含まれていないと想定します。実行時、以下のクエリは Service オブジェクトの場合に serviceName プロパティの値を返します。しかし、Product オブジェクトに対してクエリを実行すると、Products には serviceName プロパティが含まれないため null 値が返されます。

  SELECT item.serviceName? 
  FROM OrderEvent RETAIN ALL EVENTS

次に、OrderEvent に複数の実装クラスがあり、そのうちのいくつかにのみ timestamp プロパティが含まれている場合を考えます。以下のクエリは、プロパティが含まれている OrderEvent インタフェースで実装されている timestamp プロパティを返します。

  SELECT timestamp? 
  FROM OrderEvent RETAIN ALL EVENTS

上のクエリは、型 java.lang.Objecttimestamp? という 1 つのカラムを返します。

動的プロパティをネストした場合は、動的プロパティの下にあるすべてのプロパティも動的プロパティと見なされます。次の例は、detail 動的プロパティで返されたオブジェクトの direction プロパティをたずねています。

  SELECT detail?.direction 
  FROM OrderEvent RETAIN ALL EVENTS

上のクエリは、以下と同等です。

  SELECT detail?.direction? 
  FROM OrderEvent RETAIN ALL EVENTS

以下は、動的プロパティと共に使用すると便利な機能です。

  • CAST 関数は動的プロパティの値 (または式の値) を指定の型にキャストします。節 4.1.7「CAST 関数」を参照してください。

  • EXISTS 関数は動的プロパティが存在するかどうかをチェックします。イベントにその名前のプロパティが存在する場合は true を返し、イベントにプロパティが存在しない場合は false を返します。節 4.1.8「EXISTS 関数」を参照してください。

  • INSTANCEOF 関数は動的プロパティの値 (または式の値) が指定の型であるかどうかをチェックします。節 4.1.6「INSTANCEOF 関数」を参照してください。

1.2.6 ユーザ定義関数を使用した他のイベント プロパティの処理

EPL でサポートされていないデータ型をイベントが使用している場合は、ユーザ定義関数を作成して EPL クエリでそのデータ型を評価できます。

例 1-1 のような enum データ型を考えます。例 1-2 に示すイベントでは、この enum データ型を使用しています。EPL では enum データ型がサポートされていません。

例 1-1 列挙データ型 ProcessStatus

package com.oracle.app;

public enum ProcessStatus {
    OPEN(1), 
    CLOSED(0)}
}

例 1-2 列挙データ型 ProcessStatus を使用するイベント

package com.oracle.app;

import com.oracle.capp.ProcessStatus;

public class ServiceOrder {
    private String serviceOrderId;
    private String electronicSerialNumber;
    private ProcessStatus status;
... 
}

ユーザ定義関数を作成し (例 1-3 を参照)、アプリケーション アセンブリ ファイルに関数を登録することで (例 1-4 を参照)、この enum データ型を EPL クエリで評価できます (例 1-5 を参照)。

例 1-3 列挙データ型を評価するユーザ定義関数

package com.oracle.app;

import com.oracle.capp.ProcessStatus;
public class CheckIfStatusClosed {
    public boolean execute(Object[] args) {
        ProcessStatus arg0 = (ProcessStatus)args[0];
        if (arg0 == ProcessStatus.OPEN)
            return Boolean.FALSE;
        else
            return Boolean.TRUE;
    }
}

例 1-4 アプリケーション アセンブリ ファイルへのユーザ定義関数の登録

<wlevs:processor id="testProcessor">
    <wlevs:listener ref="providerCache"/>
    <wlevs:listener ref="outputCache"/>
    <wlevs:cache-source ref="testCache"/>
    <wlevs:function function-name="statusClosed" exec-method=”execute” />
        <bean class="com.oracle.app.CheckIfStatusClosed"/>
    </wlevs:function>
</wlevs:processor>

例 1-5 ユーザ定義関数を使用した EPL クエリでの列挙データ型の評価

<query id="rule-04"><![CDATA[
    SELECT
        meter.electronicSerialNumber, 
        meter.exceptionKind
    FROM 
        MeterLogEvent AS meter, 
        ServiceOrder AS svco
    WHERE 
        meter.electronicSerialNumber = svco.electronicSerialNumber and
        svco.serviceOrderId IS NULL OR statusClosed(svco.status)
]]></query>

詳細については、第 4 章「EPL リファレンス : 関数」を参照してください。

1.2.7 イベント シンク

イベント シンクは、EPL 文で指定された条件に一致するイベントが発生した場合に、プログラムによる通知を受信する手段を提供します。以下の場合に、シンクへの通知が行われます。

  • EPL 文で指定された条件に一致する新しいイベントが発生した場合。これらは ISTREAM イベントと呼ばれます。

  • 以前 EPL 文で指定された条件と一致していた古いイベントが、期限切れになったか、または新たに発生した着信イベントがこれらに取って代わったために、出力枠からプッシュされた場合。これらは RSTREAM イベントと呼ばれます。

これらの通知が発生した場合の詳しい例については、節 1.3「処理モデル」で説明しています。

ISTREAM イベントを受信するには、com.bea.wlevs.ede.api.EventSink インタフェースを使用します。実装では、結果が使用可能になった場合にエンジンによって呼び出される単一の onEvent メソッドを指定する必要があります。このインタフェースでは、新しいイベントのみがリスナに送信されます。

public interface EventSink extends EventListener {
   void onEvent(List<Object> newEvents) 
   throws RejectEventException;
}

エンジンでは、文の結果を POJO または MapEventObject インスタンスのリストとしてイベント シンクに提供します。ワイルドカードでの選択の場合、結果はエンジンに送信された元のイベント オブジェクト タイプに一致します。式の joins および select 句の場合、結果のオブジェクトは com.bea.wlevs.ede.api.MapEventObject インタフェースを実装します。

1.3 処理モデル

節 1.3.1「イベント ストリーム」で説明しているように、EPL 処理モデルはイベント ストリームに基づいています。

以下の説明に従って、イベント ストリームを操作するクエリを記述します。

1.3.1 イベント ストリーム

EPL 処理モデルは継続的です。文へのリスナはエンジンでその文のイベントが処理されるとすぐに、文で選択されているイベント ストリーム、RETAIN 句の制約、フィルタ、および出力レートに応じて更新されたデータを受信します。

この節では、非常に単純な EPL 文の出力を示します。この文はデータ枠を使用せず、いずれのフィルタも適用せずにイベント ストリームを選択します。

  SELECT * FROM Withdrawal RETAIN ALL

この文はすべての Withdrawal イベントを選択します。エンジンで Withdrawal 型または Withdrawal の下位の型のイベントが処理されるたびに、すべての更新リスナが呼び出され、文の各リスナに新しいイベントが渡されます。

挿入ストリームという用語は、新しいイベントが到着し、データ枠または集約に入れられることを示します。この例の挿入ストリームは、到着する Withdrawal イベントのストリームであり、更新リスナに新しいイベントとしてポストされます。

次の図は、時間の経過に沿って到着する一連の Withdrawal イベント 1 - 6 を示しています。この図を含め、この節に示す図のかっこ内の数字は Withdrawal イベントの amount プロパティの値を示します。

図 1-1 イベント ストリームの例

図 1-1 の説明
「図 1-1 イベント ストリームの例」の説明

上の例の文では RETAIN 句が指定されていないため、新しいイベントのみがエンジンによって文のリスナにポストされ、古いイベントはポストされません。

1.3.2 スライド枠

スライド枠には、行ベースおよび時間ベースの 2 種類があります。以下の節では、これらの各項目について示します。

1.3.2.1 行ベースのスライド枠

行ベースのスライド枠では、ストリームの直前の N 件のイベントのみを保持するようエンジンに通知します。次の文は、Withdrawal イベント ストリームに長さ枠を適用します。この文は、データ枠、およびデータ枠に入るまたは枠から出るイベントの概念を示しています。

  SELECT * FROM Withdrawal RETAIN 5 EVENTS

この文の枠のサイズは 5 件のイベントです。エンジンにより到着するすべての Withdrawal イベントが枠に入れられます。枠が一杯になると、最も古い Withdrawal イベントが枠からプッシュされます。エンジンは更新リスナに対し、枠に入れられるすべてのイベントを新しいイベントとして示し、枠から出されるすべてのイベントを古いイベントとして示します。

「挿入ストリーム」という用語が到着する新しいイベントを示すのに対し、「削除ストリーム」という用語はデータ枠から出されるイベント、または集約値を変更するイベントを示します。この例の削除ストリームは、長さ枠から退去する Withdrawal イベントのストリームであり、これらのイベントは古いイベントとして更新リスナにポストされます。

次の図は、イベントの到着に応じて変化する長さ枠の内容と、更新リスナにポストされるイベントを示します。

図 1-2 行ベースのスライド枠の例

図 1-2 の説明
「図 1-2 行ベースのスライド枠の例」の説明

以前と同様、到着するすべてのイベントは新しいイベントとして更新リスナにポストされます。また、イベント W1 は、イベント W6 の到着時に長さ枠から出され、古いイベントとして更新リスナにポストされます。

長さ枠と同様、時間枠では指定の期間内で最新のイベントが保持されます。たとえば、5 秒の時間枠では直前の 5 秒間のイベントが保持されます。秒の経過につれて、時間枠では最も古いイベントが枠から実際にプッシュされ、その結果 1 つまたは複数の古いイベントが更新リスナにポストされます。

EPL では、省略可能な ISTREAM および RSTREAM キーワードが SELECT 句および INSERT INTO 句でサポートされています。これにより、データ枠に入れられるイベントまたは出されるイベントのみを転送したり、現在または以前の集約値のみを選択したりするなど、挿入ストリームまたは削除ストリームに基づいた操作を行うようエンジンに通知できます。

1.3.2.2 時間ベースのスライド枠

時間ベースのスライド枠は、システム時間に基づいて、過去の指定された時間間隔までの範囲を示す変化する枠です。行ベースのスライド枠と同様、時間ベースのスライド枠ではクエリで処理されるイベント数を制限できます。

次の図は、時間枠の動作を示しています。この図では、イベントのグループ化やフィルタ処理を行わず、単にイベントを選択するクエリを想定します。

  SELECT * FROM Withdrawal RETAIN 4 SECONDS

この図は指定の時間 t から開始し、t+4 秒、t+5 秒、およびそれ以降の時点における時間枠の内容を示しています。

図 1-3 時間ベースのスライド枠の例

図 1-3 の説明
「図 1-3 時間ベースのスライド枠の例」の説明

図では、以下の動作を示しています。

  1. 時間 t + 4 秒の時点でイベント W1 が到着し、時間枠に入ります。エンジンにより新しいイベントが更新リスナに報告されます。

  2. 時間 t + 5 秒の時点でイベント W2 が到着し、時間枠に入ります。エンジンにより新しいイベントが更新リスナに報告されます。

  3. 時間 t + 6.5 秒の時点でイベント W3 が到着し、時間枠に入ります。エンジンにより新しいイベントが更新リスナに報告されます。

  4. 時間 t + 8 秒の時点で W1 が時間枠から離れます。エンジンによりイベントは古いイベントとして更新リスナに報告されます。

実際的な例として、直前の 4 秒間の各口座の平均引き出し額が 1000 を超えるすべての口座を決定する必要がある場合を考えます。この問題を解決する文は、以下のとおりです。

  SELECT account, AVG(amount)
  FROM Withdrawal RETAIN 4 SECONDS
  GROUP BY account
  HAVING amount > 1000

1.3.3 バッチ枠

行ベースおよび時間ベースの枠は両方ともバッチ処理に対応できます。次の節では、これらの概念について示します。

1.3.3.1 時間ベースのバッチ枠

時間ベースのバッチ枠ではイベントがバッファリングされ、指定された時間間隔ごとにそれらのイベントが 1 回の更新で解放されます。長さベースのバッチ枠と同様、時間ベースのバッチ枠ではイベントの評価が制御されます。

次の図は、時間バッチ ビューの動作を示しています。この図では、以下の単純なクエリを想定しています。

  SELECT * FROM Withdrawal RETAIN BATCH OF 4 SECONDS

この図は指定の時間 t から開始し、t + 4 秒、t + 5 秒、およびそれ以降の時点における時間枠の内容を示しています。

図 1-4 時間ベースのバッチ枠の例

図 1-4 の説明
「図 1-4 時間ベースのバッチ枠の例」の説明

図では、以下の動作を示しています。

  1. 時間 t + 1 秒の時点でイベント W1 が到着し、バッチに入ります。更新リスナへの通知の呼び出しは発生しません。

  2. 時間 t + 3 秒の時点でイベント W2 が到着し、バッチに入ります。更新リスナへの通知の呼び出しは発生しません。

  3. 時間 t + 4 秒の時点でエンジンによりバッチ内のイベントが処理され、新しいバッチが開始されます。エンジンによりイベント W1 および W2 が更新リスナに報告されます。

  4. 時間 t + 6.5 秒の時点でイベント W3 が到着し、バッチに入ります。更新リスナへの通知の呼び出しは発生しません。

  5. 時間 t + 8 秒の時点でエンジンによりバッチ内のイベントが処理され、新しいバッチが開始されます。エンジンによりイベント W3 が新しいデータとして更新リスナに報告されます。エンジンによりイベント W1 および W2 が古いデータ (前回のバッチ) として更新リスナに報告されます。

1.3.3.2 行ベースのバッチ枠

行ベースの枠でもバッチ処理を行えます。たとえば、次のクエリは 5 件のイベントが受信されるまで何も処理せずに待機します。

  SELECT * FROM Withdrawal RETAIN BATCH OF 5 EVENTS

5 件のイベントを受信するとクエリが実行され、再び新しい 5 件のイベントを受信するまで処理を待機します。

1.3.4 サブクエリと WHERE 句

イベント ストリームに対するフィルタはサブクエリ式内で使用され、イベントがデータ枠内に入る前にイベントを指定のストリームからフィルタで除外します。このフィルタ処理は WHERE 句の実行前に発生します。可能な場合は、WHERE 句に対し、サブクエリ内でフィルタ処理を完了します。これは、残りの EPL 文で処理されるデータ量が軽減され、パフォーマンスが向上するためです。

次の文は、金額が 200 以上の Withdrawal イベントを選択するサブクエリを示します。

  SELECT * FROM (SELECT * FROM Withdrawal WHERE amount >= 200) RETAIN 5 EVENTS

このサブクエリでは、金額が 200 未満の Withdrawal イベントは外部クエリの枠に入らず、そのため更新リスナには渡されません。

図 1-5 サブクエリと WHERE 句の例

図 1-5 の説明
「図 1-5 サブクエリと WHERE 句の例」の説明

文中の WHERE 句および HAVING 句では、文のデータ枠や他のビュー内でイベントが処理された後、処理の後半で可能性のある結果行が消去されます。

次の文は、サブクエリを使用せず、Withdrawal イベントに WHERE 句を適用します。

  SELECT * FROM Withdrawal RETAIN 5 EVENTS WHERE amount >= 200

WHERE 句は新しいイベントと古いイベントの両方に適用されます。次の図に示すとおり、到着するイベントは "amount" プロパティの値にかかわらず枠に入れられます。ただし、WHERE 句を満たすイベントのみが更新リスナに渡されます。また、イベントはデータ枠を離れますが、WHERE 句の条件を満たすイベントのみが古いイベントとして更新リスナにポストされます。

図 1-6 行ベースのスライド枠に到着するイベント

図 1-6 の説明
「図 1-6 行ベースのスライド枠に到着するイベント」の説明

イベント ストリームのフィルタは指定可能なフィルタ タイプの中でより限定的ですが、WHERE 句には複雑な条件を含めることができます。次の文の WHERE 句は WHERE 句内の java.lang.Math Java ライブラリ クラスの ceil 関数に適用されます。INSERT INTO 句により、最初の文の結果が 2 番目の文で使用可能になります。

  INSERT INTO BigWithdrawal 
    SELECT * FROM Withdrawal RETAIN ALL WHERE Math.ceil(amount) >= 200
  SELECT * FROM BigWithdrawal RETAIN ALL 

1.3.5 集約

集約関数を介してイベントを集約する文では、集約された値の変化に伴い、削除ストリーム イベントもポストされます。2 つの Withdrawal イベントを受信した場合にアラートを発生する以下の文について考えます。

  SELECT COUNT(*) AS mycount 
  FROM Withdrawal RETAIN ALL 
  HAVING COUNT(*) = 2

エンジンで 2 つ目の withdrawal イベントが検出されると、エンジンにより新しいイベントが更新リスナにポストされます。新しいイベントの mycount プロパティの値は 2 です。また、エンジンで 3 つ目の Withdrawal イベントが検出されると、前のカウント値を含む古いイベントが更新リスナにポストされます。その古いイベントの mycount プロパティの値も 2 です。

ISTREAM または RSTREAM キーワードを使用すると、更新リスナにポストされる新しいイベントまたは古いイベントを除去できます。次の文では、ISTREAM キーワードを使用した結果、2 つ目の Withdrawal イベントを受信した際にエンジンで更新リスナが 1 回のみ呼び出されます。

  SELECT ISTREAM COUNT(*) AS mycount 
  FROM Withdrawal RETAIN ALL 
  HAVING COUNT(*) = 2

1.4 使用例

以下の使用例では、言語のさまざまな機能の使用法の例を示します。

1.4.1 フィードごとのレートの計算

スループットの統計および急速な減衰の検出のために、各マーケット データ フィードごとに毎秒のチック数を計算します。

マーケット データ イベント ストリームのソースからの 1 秒間のイベントをバッチにまとめる EPL 文を使用できます。フィードおよびフィードごとのイベント数を出力値として指定します。このデータをさらに処理するために、出力イベントを TicksPerSecond イベント ストリームに挿入します。

  INSERT INTO TicksPerSecond
  SELECT feed, COUNT(*) AS cnt 
    FROM MarketDataEvent
    RETAIN BATCH OF 1 SECOND
    GROUP BY feed

1.4.2 最高値の株価の計算

最高値の株価を計算するため、固有の各銘柄記号に対し、取引のブロック サイズが 10 を超える 100 件のイベントを保持するスライド枠を定義します。たとえば、5,000 の銘柄記号がある場合、5,000 x 100 の 5,000,000 件のイベントが保持されます。ブロック サイズが 10 を越える MarketTrade イベントのみが枠に入れられ、価格の高い上位 100 件のイベントのみが保持されます。

結果は銘柄記号別にグループ化され、平均価格が 100 未満の銘柄記号は出力から除外され、アルファベット順に並べられます。

   SELECT symbol, AVG(price) 
   FROM (SELECT * FROM MarketTrade WHERE blockSize > 10)
   RETAIN 100 EVENTS WITH LARGEST price PARTITION BY symbol 
   GROUP BY symbol
   HAVING AVG(price) >= 100
   ORDER BY symbol

1.4.3 位置データの分類

高速道路の自動車の位置および方向に関する情報が含まれる自動車位置のイベント データに基づいて、自動車が進むルートを検出します。まず carId によってデータを分類することで特定の自動車に関する情報を特定し、次に高速道路、方向で分類し、方向をプロットします。これらの情報に基づいて自動車の速度が計算されます。

最初の PARTITION BY carId では自動車位置イベントが自動車ごとにグループ化され、さらにその後の PARTITION BY expressway PARTITION BY direction ではより詳細な位置および方向のプロパティ値によって、データがさらに分類されます。このクエリで保持されるイベント数は 4 ですが、これは最後の PARTITION BY 句で保持された最大数に適用されます。このため、個別の分類プロパティ値に対し、最大 4 件のイベントが保持されます。

  SELECT carId, expressway, direction, 
    SUM(segment)/(MAX(timestamp)-MIN(timestamp)) AS speed
   FROM CarLocationEvent
   RETAIN 4 events 
   PARTITION BY carId PARTITION BY expressway PARTITION BY direction

1.4.4 急速な減衰の検出

任意のタイミングの毎秒チック数が直前の 10 秒間の毎秒チック数の 75% を下回った場合にアラートを発することにより、急速な減衰を定義します。

直前の 10 秒間の毎秒チック数は、最初の文で計算された TicksPerSecond イベントを使用し、直前の 10 秒間の平均を取ることで計算できます。次に、現在のレートを変化する平均値と比べ、平均値の 75% に満たないレートをフィルタ処理します。

  SELECT feed, AVG(cnt) AS avgCnt, cnt AS feedCnt 
    FROM TicksPerSecond
    RETAIN 10 seconds
    GROUP BY feed 
    HAVING cnt < AVG(cnt) * 0.75

1.4.5 ネットワークの異常性の検出

顧客側でのチェックインの作業中に、端末でハードウェアの問題が検出されたり、ネットワークが停止したりする場合があります。このような状況では、顧客を支援するチーム メンバーに警告する必要があります。端末で問題が検出された場合は OutOfOrder イベントが発行されます。パターンでは端末で out-of-order が通知され、顧客がチェックイン処理中である状況が検出されます。

  SELECT ci.term 
  MATCHING ci:=Checkin FOLLOWED BY 
        ( OutOfOrder (term.id=ci.term.id) AND NOT
          (Cancelled (term.id=ci.term.id) OR 
           Completed (term.id=ci.term.id) ) WITHIN 3 MINUTES )

セルフサービスの各端末では、以下の 4 つのうちいずれかのイベントを発行できます。

  • Checkin - 顧客がチェックイン ダイアログを開始したことを示します。

  • Cancelled - 顧客がチェックイン ダイアログを取り消したことを示します。

  • Completed - 顧客がチェックイン ダイアログを完了したことを示します。

  • OutOfOrder - 端末でハードウェアの問題が検出されたことを示します。

すべてのイベントでは、イベントをパブリッシュした端末の情報と、タイムスタンプが提供されます。端末の情報は term というプロパティに格納され、端末の id が指定されます。すべてのイベントは類似の情報を含んでいるため、各イベントを基本クラス TerminalEvent のサブタイプとしてモデル化し、すべてのイベントで共有される端末情報がこのクラスで提供されます。これにより、すべての端末イベントを多態的に扱うことができ、派生したイベント タイプを親イベント タイプと同様に扱うことができるためクエリが簡素化されます。

1.4.6 欠落したイベントの検出

Status イベントは定期的に 60 秒間隔で到着するため、MATCHING 句を使用した一時的なパターン照合を利用して、時間通りに到着しなかったイベントを検出できます。WITHIN 演算子を使用して送信や処理による遅延の可能性を考慮した 65 秒の枠を保持し、NOT 演算子を使用して term.idT1 に等しい Status イベントの欠落を検出します。

  SELECT 'terminal 1 is offline' 
  MATCHING NOT Status(term.id = 'T1') WITHIN 65 SECONDS
  OUTPUT FIRST EVERY 5 MINUTES

1.4.7 端末アクティビティ データの概要

端末アクティビティの統計情報をスタッフ側にリアルタイムに提示することで、システムをモニタし、問題を特定することが可能になります。次のサンプル クエリでは、イベント タイプごとのカウントが毎分取得されます。CountPerType イベント ストリームを通じて使用可能なこのデータをさらに利用して、記録された使用法パターンに対して結合および比較したり、単にアクティビティをリアルタイムで要約できます。

  INSERT INTO CountPerType
  SELECT type, COUNT(*) AS countPerType 
  FROM TerminalEvent
  RETAIN 10 MINUTES
  GROUP BY type
  OUTPUT ALL EVERY 1 MINUTE

1.4.8 センサー データの読み取り

このサンプルでは、いずれかのリーダーの範囲内にパレットが含まれている場合に、RFID リーダーのアレイが RFID タグを検知します。リーダーは、リーダーのセンサー ID、調査時刻、調査したタグなどの調査情報が含まれる XML ドキュメントを生成します。リーダーのセンサー ID ごとに直前の 60 秒間のタグの総数が計算されます。

  SELECT ID AS sensorId, SUM(countTags) AS numTagsPerSensor
  FROM AutoIdRFIDExample
  RETAIN 60 SECONDS
  WHERE Observation[0].Command = 'READ_PALLET_TAGS_ONLY'
  GROUP BY ID

1.4.9 トランザクション イベントの組み合わせ

このサンプルでは、トランザクションの各コンポーネントが含まれている組み合わせイベントを検出する EPL 文を作成します。イベント照合は、直前の 30 分間に到着したイベントに制限されます。この文は INSERT INTO 構文を使用して CombinedEvent イベント ストリームを生成します。

  INSERT INTO CombinedEvent(transactionId, customerId, supplierId, 
    latencyAC, latencyBC, latencyAB)
  SELECT C.transactionId, customerId, supplierId, 
    C.timestamp - A.timestamp, 
    C.timestamp - B.timestamp, 
    B.timestamp - A.timestamp 
  FROM TxnEventA A, TxnEventB B, TxnEventC C
  RETAIN 30 MINUTES
  WHERE A.transactionId = B.transactionId AND
    B.transactionId = C.transactionId

1.4.10 リアルタイム パフォーマンスのモニタ

過去 30 分間のイベントの最小、最大、および平均の合計レイテンシ (A および C の間の時間差) を導くには、以下の EPL を使用できます。また、イベント サーバをモニタするため、ダッシュボード UI はイベントのサブセットをサブスクライブして、サーバとエンドツーエンドのレイテンシなどのシステム性能を計測します。システム全体を流れる各イベントが UI でモニタされることは期待できないため、モニタ アプリケーションでの処理が可能なイベントのサブセットに出力を制限する手段が必要になります。最後の 1 つのイベントのみ、またはすべてのイベントを出力できます。

  SELECT MIN(latencyAC) as minLatencyAC, 
    MAX(latencyAC) as maxLatencyAC,
    AVG(latencyAC) as avgLatencyAC
  FROM CombinedEvent
  RETAIN 30 MINUTES
  GROUP BY customerId
  OUTPUT LAST 50 EVERY 1 SECOND

1.4.11 削除されたトランザクション イベントの検出

OUTER JOIN を使用すると、3 つのすべてのイベントを通じて成功しなかったトランザクションを検出できます。TxnEventA または TxnEventB イベントが、直前の 30 分間のイベントで構成されるそれぞれの時間枠を離れた場合、EPL により EventC 行が見つからない行がフィルタで除外されます。

  SELECT * 
    FROM TxnEventA A 
         FULL OUTER JOIN TxnEventC C ON A.transactionId = C.transactionId
         FULL OUTER JOIN TxnEventB B ON B.transactionId = C.transactionId
    RETAIN 30 MINUTES
  WHERE C.transactionId is null