この節では、以下の項目について説明します。
複合イベント プロセッサ モジュールは、イベント表現、処理モデル、プログラミング インタフェース、および言語仕様という機能コンポーネントに分けることができます。
イベントは JavaBeans 規則に準拠する POJO として表現されます。イベント プロパティは POJO の getter メソッドを通じて公開されます。可能な場合は、EPL 文の実行結果も POJO として返されます。ただし、イベント ストリームが結合された場合など、型のないイベントが返される場合があります。この場合、Map コレクション インタフェースのインスタンスが返されます。詳細については、節 1.2「イベント表現」を参照してください。
EPL 処理モデルは連続的です。文の制約を満たす着信イベントを受信すると結果がすぐに出力されます。出力時には、出力枠に入れられる新しいイベントの「挿入イベント」、および出力枠から出される古いイベントの「削除イベント」という 2 種類のイベントが生成されます。これらのいずれかまたは両方のイベントが発生すると、リスナにアタッチおよび通知されます。詳細については、節 1.3「処理モデル」を参照してください。
着信イベントは、スライド枠またはバッチ枠で処理されます。スライド枠では、1 件の増分データに対して徐々に枠が変化してイベントが処理されます。これに対し、バッチ枠では別個のデータ チャンクに対して枠が変化してイベントが処理されます。枠のサイズは、保持されるイベントの最大数またはイベントが保持される最大時間によって定義されます。
EPL プログラミング インタフェースでは、文を個別にコンパイルするか、URL を通じて一括でロードできます。文の繰り返し、取得、開始、および停止が可能です。リスナは文にアタッチされ、挿入イベントまたは削除イベントあるいはその両方が発生した場合に通知されます。
イベント処理言語は SELECT
、FROM
、WHERE
、GROUP BY
、HAVING
および ORDER BY
句を使用する SQL に似た言語です。ストリームはデータ ソースとしてのテーブルに相当し、イベントはデータの基本単位としての行に相当します。イベントはデータで構成されるため、結合による相関、サブクエリによるフィルタ処理、グループ化による集約などの SQL の概念が効果的に利用されます。INSERT INTO
句は、下流処理が行われるイベントを他のストリームに転送する手段として作り直されています。JDBC を通じてアクセス可能な外部データは、ストリーム データで照会および結合されます。SQL には含まれない、イベント処理に固有の言語構造を提供する RETAIN
、MATCHING
、OUTPUT
句などの追加の句も使用できます。
RETAIN
句は、本質的にはストリーム データに対する仮想枠を定義することで、クエリが実行されるデータ量に制約を付与します。データの範囲がテーブルで制限されるリレーショナル データベース システムとは異なり、イベント処理システムではクエリのデータをより動的に制限する代替手段が提供されます。
MATCHING
句は固有のパターンに一致するイベントのシーケンスを検出します。AND
、OR
、FOLLOWED BY
などの時相演算子や論理演算子を使用することで、任意の複雑な式を通じてイベントの発生またはイベントの欠落の両方を検出できます。
OUTPUT
句は下流プロセッサの過負荷を回避するために文の実行結果を抑制します。結果的に生成された最初または最後のイベントの全体または一部が、時間ベースまたは行ベースのバッチで渡されます。
最後の節では、実際的なシナリオのもとで言語機能を例示するいくつかの使用例を示します。
EPL を使用して、イベントをイベント オブジェクトとして表現します。詳細については、以下を参照してください。
すべてのイベント オブジェクトにはプロパティが含まれています。詳細については、以下を参照してください。
イベントはイベント シンクによって使用されます。詳細については、節 1.2.7「イベント シンク」を参照してください。
イベントは、過去に発生したアクションまたは状態変更の不変レコードです。イベント オブジェクトの状態情報はイベント プロパティで取得されます。イベントは、POJO または java.util.Map
を拡張する com.bea.wlevs.cep.event.MapEventObject
のいずれかで表現されます。
Plain old Java object (POJO) イベントは、JavaBeans 形式の getter メソッドを通じてイベント プロパティを公開するオブジェクト インスタンスです。イベント クラスまたはインタフェースは必ずしも完全に JavaBeans 仕様に準拠している必要はありませんが、EPL エンジンがイベント プロパティを取得するためには必須の JavaBeans getter メソッドが存在している必要があります。
EPL では、スーパー クラスを拡張したり、1 つまたは複数のインタフェースを実装したりする JavaBeans 形式のイベント クラスがサポートされています。また、EPL 文では Java インタフェース クラスおよび抽象クラスを参照できます。
イベントを表現するクラスは不変的に作成する必要があります。イベントは過去に発生した状態変更やアクションの記録であるため、関連するイベント プロパティは変更不能である必要があります。ただし、これは厳密な要件ではなく、EPL エンジンでは可変イベントも受諾されます。
イベントは java.util.Map
インタフェースを拡張する com.bea.wlevs.ede.api.MapEventObject
インタフェースを実装するオブジェクトによっても表現されます。Map
イベントのイベント プロパティは、java.util.Map
インタフェースで公開される get メソッドを通じてアクセスできる各エントリの値です。
Map
内のエントリはイベント プロパティを表現します。EPL 文で指定されたイベント プロパティ名をエンジンが参照できるようするために、キーの型は java.util.String
である必要があります。値には任意の型を使用できます。Map
では POJO も値として示されます。
EPL 式では単純イベント プロパティのほか、インデックス、マップおよびネストされたイベント プロパティを使用できます。以下の表に、異なる型のプロパティ、およびイベント式での構文の概要を示します。この構文を使用すると、JavaBeans オブジェクト グラフ、XML 構造、および Map イベントを深い所まで照会できます。
表 1-2 イベント プロパティ
以下のような EmployeeEvent
イベント クラスがあると想定します。この例のマップ プロパティとインデックス プロパティでは Java オブジェクトが返されますが、Java 言語のプリミティブ型 (int
や String
など) を返すこともできます。Address
オブジェクトおよび Employee
オブジェクト自身は、Address
オブジェクトの町名や Employee
オブジェクトの従業員名など内部にネストするプロパティを持つことができます。
public class EmployeeEvent { public String getFirstName(); public Address getAddress(String type); public Employee getSubordinate(int index); public Employee[] getAllSubordinates(); }
単純イベント プロパティにはプロパティ値を返す getter メソッドが必要です。上の例では、getFirstName
getter メソッドによって型 String
の firstName
イベント プロパティが返されます。
インデックス イベント プロパティには以下のいずれかの getter メソッドが必要です。
getSubordinate
メソッドのように、整数型のキー値を受け取り、プロパティ値を返すメソッド。
Employee
の配列を返す getAllSubordinates
getter メソッドのように、配列型を返すメソッド。
EPL 文では、インデックス プロパティへのアクセス時に構文 property[index]
を使用します。
マップ イベント プロパティでは、getAddress
メソッドのように、String
型のキー値を受け取りプロパティ値を返す getter メソッドが必要です。EPL 文またはイベント パターン文では、マップ プロパティへのアクセス時に構文 property ('key')
を使用します。
ネスト イベント プロパティにはネストするオブジェクトを返す getter メソッドが必要です。getAddress
および getSubordinate
メソッドは、ネストするオブジェクトを返すマップ プロパティとインデックス プロパティです。EPL 文では、ネストされたプロパティへのアクセス時に構文 property.nestedProperty
を使用します。
すべての EPL 文では、1 つまたは複数のイベント プロパティ名が予期される任意の場所で、インデックス プロパティ、マップ プロパティ、ネストされたプロパティ、またはこれらの組み合わせを使用できます。次の例では、インデックス プロパティ、マップ プロパティ、およびネストされたプロパティの異なる組み合わせを示します。
address('home').streetName subordinate[0].name='anotherName' allSubordinates[1].name subordinate[0].address('home').streetName
同様に、この構文は、SELECT リスト、WHERE 句、JOIN 条件など EPL 文でイベント プロパティ名が予期されるすべての場所で使用できます。
SELECT firstName, address('work'), subordinate[0].name, subordinate[1].name FROM EmployeeEvent RETAIN ALL WHERE address('work').streetName = 'Park Ave'
動的な (つまり、チェックされていない) プロパティは、文のコンパイル時に認識される必要のないイベント プロパティです。Oracle CEP ではこれらの動的プロパティを実行時に解決します。
動的プロパティの背景にある概念として、基になるイベント表現で必ずしも事前にプロパティを認識する必要がないということがあります。文のコンパイル時に認識されない追加のプロパティが基になるイベントに含まれていて、EPL 文を使用してこれらのプロパティを照会する必要がある場合があります。この概念は、豊富な機能を持つオブジェクト指向のドメイン モデルを表現するイベントで特に便利です。
動的プロパティの構文は、プロパティ名と疑問符から成ります。インデックス プロパティ、マップ プロパティ、およびネストされたプロパティを動的プロパティにすることもできます。以下の表に、動的イベント プロパティの型、およびそれらの識別のために使用される構文を示します。
動的プロパティでは常に java.lang.Object
型が返されます。実行時に処理されるイベントに動的プロパティが存在しない場合、動的プロパティでは null 値が返されます。
たとえば、item
プロパティを指定する OrderEvent
イベントを考えます。item
プロパティの型は Object
であり、Service
または Product
のいずれかのインスタンスへの参照が保持されます。どちらになるかは実行時に明らかになります。さらに、Service
および Product
クラスで price
というプロパティが提供されると想定します。動的プロパティを使用すると、以下のように、いずれかのオブジェクト (Service
または Product
) から price プロパティを取得するクエリを指定できます。
SELECT item.price? FROM OrderEvent RETAIN ALL EVENTS
2 つ目の例として、Service
クラスに含まれている serviceName
プロパティが Product
クラスには含まれていないと想定します。実行時、以下のクエリは Service
オブジェクトの場合に serviceName
プロパティの値を返します。しかし、Product
オブジェクトに対してクエリを実行すると、Products
には serviceName
プロパティが含まれないため null 値が返されます。
SELECT item.serviceName? FROM OrderEvent RETAIN ALL EVENTS
次に、OrderEvent
に複数の実装クラスがあり、そのうちのいくつかにのみ timestamp
プロパティが含まれている場合を考えます。以下のクエリは、プロパティが含まれている OrderEvent
インタフェースで実装されている timestamp
プロパティを返します。
SELECT timestamp? FROM OrderEvent RETAIN ALL EVENTS
上のクエリは、型 java.lang.Object
の timestamp?
という 1 つのカラムを返します。
動的プロパティをネストした場合は、動的プロパティの下にあるすべてのプロパティも動的プロパティと見なされます。次の例は、detail
動的プロパティで返されたオブジェクトの direction
プロパティをたずねています。
SELECT detail?.direction FROM OrderEvent RETAIN ALL EVENTS
上のクエリは、以下と同等です。
SELECT detail?.direction? FROM OrderEvent RETAIN ALL EVENTS
以下は、動的プロパティと共に使用すると便利な機能です。
CAST
関数は動的プロパティの値 (または式の値) を指定の型にキャストします。節 4.1.7「CAST 関数」を参照してください。
EXISTS
関数は動的プロパティが存在するかどうかをチェックします。イベントにその名前のプロパティが存在する場合は true
を返し、イベントにプロパティが存在しない場合は false
を返します。節 4.1.8「EXISTS 関数」を参照してください。
INSTANCEOF
関数は動的プロパティの値 (または式の値) が指定の型であるかどうかをチェックします。節 4.1.6「INSTANCEOF 関数」を参照してください。
EPL でサポートされていないデータ型をイベントが使用している場合は、ユーザ定義関数を作成して EPL クエリでそのデータ型を評価できます。
例 1-1 のような enum
データ型を考えます。例 1-2 に示すイベントでは、この enum
データ型を使用しています。EPL では enum
データ型がサポートされていません。
例 1-1 列挙データ型 ProcessStatus
package com.oracle.app; public enum ProcessStatus { OPEN(1), CLOSED(0)} }
例 1-2 列挙データ型 ProcessStatus を使用するイベント
package com.oracle.app; import com.oracle.capp.ProcessStatus; public class ServiceOrder { private String serviceOrderId; private String electronicSerialNumber; private ProcessStatus status; ... }
ユーザ定義関数を作成し (例 1-3 を参照)、アプリケーション アセンブリ ファイルに関数を登録することで (例 1-4 を参照)、この enum
データ型を EPL クエリで評価できます (例 1-5 を参照)。
例 1-3 列挙データ型を評価するユーザ定義関数
package com.oracle.app; import com.oracle.capp.ProcessStatus; public class CheckIfStatusClosed { public boolean execute(Object[] args) { ProcessStatus arg0 = (ProcessStatus)args[0]; if (arg0 == ProcessStatus.OPEN) return Boolean.FALSE; else return Boolean.TRUE; } }
例 1-4 アプリケーション アセンブリ ファイルへのユーザ定義関数の登録
<wlevs:processor id="testProcessor"> <wlevs:listener ref="providerCache"/> <wlevs:listener ref="outputCache"/> <wlevs:cache-source ref="testCache"/> <wlevs:function function-name="statusClosed" exec-method=”execute” /> <bean class="com.oracle.app.CheckIfStatusClosed"/> </wlevs:function> </wlevs:processor>
例 1-5 ユーザ定義関数を使用した EPL クエリでの列挙データ型の評価
<query id="rule-04"><![CDATA[ SELECT meter.electronicSerialNumber, meter.exceptionKind FROM MeterLogEvent AS meter, ServiceOrder AS svco WHERE meter.electronicSerialNumber = svco.electronicSerialNumber and svco.serviceOrderId IS NULL OR statusClosed(svco.status) ]]></query>
詳細については、第 4 章「EPL リファレンス : 関数」を参照してください。
イベント シンクは、EPL 文で指定された条件に一致するイベントが発生した場合に、プログラムによる通知を受信する手段を提供します。以下の場合に、シンクへの通知が行われます。
以前 EPL 文で指定された条件と一致していた古いイベントが、期限切れになったか、または新たに発生した着信イベントがこれらに取って代わったために、出力枠からプッシュされた場合。これらは RSTREAM
イベントと呼ばれます。
これらの通知が発生した場合の詳しい例については、節 1.3「処理モデル」で説明しています。
ISTREAM
イベントを受信するには、com.bea.wlevs.ede.api.EventSink
インタフェースを使用します。実装では、結果が使用可能になった場合にエンジンによって呼び出される単一の onEvent
メソッドを指定する必要があります。このインタフェースでは、新しいイベントのみがリスナに送信されます。
public interface EventSink extends EventListener { void onEvent(List<Object> newEvents) throws RejectEventException; }
エンジンでは、文の結果を POJO または MapEventObject
インスタンスのリストとしてイベント シンクに提供します。ワイルドカードでの選択の場合、結果はエンジンに送信された元のイベント オブジェクト タイプに一致します。式の joins および select 句の場合、結果のオブジェクトは com.bea.wlevs.ede.api.MapEventObject
インタフェースを実装します。
節 1.3.1「イベント ストリーム」で説明しているように、EPL 処理モデルはイベント ストリームに基づいています。
以下の説明に従って、イベント ストリームを操作するクエリを記述します。
EPL 処理モデルは継続的です。文へのリスナはエンジンでその文のイベントが処理されるとすぐに、文で選択されているイベント ストリーム、RETAIN 句の制約、フィルタ、および出力レートに応じて更新されたデータを受信します。
この節では、非常に単純な EPL 文の出力を示します。この文はデータ枠を使用せず、いずれのフィルタも適用せずにイベント ストリームを選択します。
SELECT * FROM Withdrawal RETAIN ALL
この文はすべての Withdrawal
イベントを選択します。エンジンで Withdrawal
型または Withdrawal
の下位の型のイベントが処理されるたびに、すべての更新リスナが呼び出され、文の各リスナに新しいイベントが渡されます。
挿入ストリームという用語は、新しいイベントが到着し、データ枠または集約に入れられることを示します。この例の挿入ストリームは、到着する Withdrawal
イベントのストリームであり、更新リスナに新しいイベントとしてポストされます。
次の図は、時間の経過に沿って到着する一連の Withdrawal
イベント 1 - 6 を示しています。この図を含め、この節に示す図のかっこ内の数字は Withdrawal
イベントの amount プロパティの値を示します。
上の例の文では RETAIN
句が指定されていないため、新しいイベントのみがエンジンによって文のリスナにポストされ、古いイベントはポストされません。
スライド枠には、行ベースおよび時間ベースの 2 種類があります。以下の節では、これらの各項目について示します。
行ベースのスライド枠では、ストリームの直前の N 件のイベントのみを保持するようエンジンに通知します。次の文は、Withdrawal
イベント ストリームに長さ枠を適用します。この文は、データ枠、およびデータ枠に入るまたは枠から出るイベントの概念を示しています。
SELECT * FROM Withdrawal RETAIN 5 EVENTS
この文の枠のサイズは 5 件のイベントです。エンジンにより到着するすべての Withdrawal
イベントが枠に入れられます。枠が一杯になると、最も古い Withdrawal
イベントが枠からプッシュされます。エンジンは更新リスナに対し、枠に入れられるすべてのイベントを新しいイベントとして示し、枠から出されるすべてのイベントを古いイベントとして示します。
「挿入ストリーム」という用語が到着する新しいイベントを示すのに対し、「削除ストリーム」という用語はデータ枠から出されるイベント、または集約値を変更するイベントを示します。この例の削除ストリームは、長さ枠から退去する
Withdrawal
イベントのストリームであり、これらのイベントは古いイベントとして更新リスナにポストされます。
次の図は、イベントの到着に応じて変化する長さ枠の内容と、更新リスナにポストされるイベントを示します。
以前と同様、到着するすべてのイベントは新しいイベントとして更新リスナにポストされます。また、イベント W1 は、イベント W6 の到着時に長さ枠から出され、古いイベントとして更新リスナにポストされます。
長さ枠と同様、時間枠では指定の期間内で最新のイベントが保持されます。たとえば、5 秒の時間枠では直前の 5 秒間のイベントが保持されます。秒の経過につれて、時間枠では最も古いイベントが枠から実際にプッシュされ、その結果 1 つまたは複数の古いイベントが更新リスナにポストされます。
EPL では、省略可能な ISTREAM
および RSTREAM
キーワードが SELECT
句および INSERT INTO
句でサポートされています。これにより、データ枠に入れられるイベントまたは出されるイベントのみを転送したり、現在または以前の集約値のみを選択したりするなど、挿入ストリームまたは削除ストリームに基づいた操作を行うようエンジンに通知できます。
時間ベースのスライド枠は、システム時間に基づいて、過去の指定された時間間隔までの範囲を示す変化する枠です。行ベースのスライド枠と同様、時間ベースのスライド枠ではクエリで処理されるイベント数を制限できます。
次の図は、時間枠の動作を示しています。この図では、イベントのグループ化やフィルタ処理を行わず、単にイベントを選択するクエリを想定します。
SELECT * FROM Withdrawal RETAIN 4 SECONDS
この図は指定の時間 t
から開始し、t+4
秒、t+5
秒、およびそれ以降の時点における時間枠の内容を示しています。
図では、以下の動作を示しています。
時間 t + 4
秒の時点でイベント W1 が到着し、時間枠に入ります。エンジンにより新しいイベントが更新リスナに報告されます。
時間 t + 5
秒の時点でイベント W2 が到着し、時間枠に入ります。エンジンにより新しいイベントが更新リスナに報告されます。
時間 t + 6.5
秒の時点でイベント W3 が到着し、時間枠に入ります。エンジンにより新しいイベントが更新リスナに報告されます。
時間 t + 8
秒の時点で W1 が時間枠から離れます。エンジンによりイベントは古いイベントとして更新リスナに報告されます。
実際的な例として、直前の 4 秒間の各口座の平均引き出し額が 1000 を超えるすべての口座を決定する必要がある場合を考えます。この問題を解決する文は、以下のとおりです。
SELECT account, AVG(amount) FROM Withdrawal RETAIN 4 SECONDS GROUP BY account HAVING amount > 1000
行ベースおよび時間ベースの枠は両方ともバッチ処理に対応できます。次の節では、これらの概念について示します。
時間ベースのバッチ枠ではイベントがバッファリングされ、指定された時間間隔ごとにそれらのイベントが 1 回の更新で解放されます。長さベースのバッチ枠と同様、時間ベースのバッチ枠ではイベントの評価が制御されます。
次の図は、時間バッチ ビューの動作を示しています。この図では、以下の単純なクエリを想定しています。
SELECT * FROM Withdrawal RETAIN BATCH OF 4 SECONDS
この図は指定の時間 t から開始し、t + 4
秒、t + 5
秒、およびそれ以降の時点における時間枠の内容を示しています。
図では、以下の動作を示しています。
時間 t + 1
秒の時点でイベント W1 が到着し、バッチに入ります。更新リスナへの通知の呼び出しは発生しません。
時間 t + 3
秒の時点でイベント W2 が到着し、バッチに入ります。更新リスナへの通知の呼び出しは発生しません。
時間 t + 4
秒の時点でエンジンによりバッチ内のイベントが処理され、新しいバッチが開始されます。エンジンによりイベント W1 および W2 が更新リスナに報告されます。
時間 t + 6.5
秒の時点でイベント W3 が到着し、バッチに入ります。更新リスナへの通知の呼び出しは発生しません。
時間 t + 8
秒の時点でエンジンによりバッチ内のイベントが処理され、新しいバッチが開始されます。エンジンによりイベント W3 が新しいデータとして更新リスナに報告されます。エンジンによりイベント W1 および W2 が古いデータ (前回のバッチ) として更新リスナに報告されます。
イベント ストリームに対するフィルタはサブクエリ式内で使用され、イベントがデータ枠内に入る前にイベントを指定のストリームからフィルタで除外します。このフィルタ処理は WHERE
句の実行前に発生します。可能な場合は、WHERE
句に対し、サブクエリ内でフィルタ処理を完了します。これは、残りの EPL 文で処理されるデータ量が軽減され、パフォーマンスが向上するためです。
次の文は、金額が 200 以上の Withdrawal イベントを選択するサブクエリを示します。
SELECT * FROM (SELECT * FROM Withdrawal WHERE amount >= 200) RETAIN 5 EVENTS
このサブクエリでは、金額が 200 未満の Withdrawal イベントは外部クエリの枠に入らず、そのため更新リスナには渡されません。
文中の WHERE
句および HAVING
句では、文のデータ枠や他のビュー内でイベントが処理された後、処理の後半で可能性のある結果行が消去されます。
次の文は、サブクエリを使用せず、Withdrawal
イベントに WHERE
句を適用します。
SELECT * FROM Withdrawal RETAIN 5 EVENTS WHERE amount >= 200
WHERE
句は新しいイベントと古いイベントの両方に適用されます。次の図に示すとおり、到着するイベントは "amount" プロパティの値にかかわらず枠に入れられます。ただし、WHERE
句を満たすイベントのみが更新リスナに渡されます。また、イベントはデータ枠を離れますが、WHERE
句の条件を満たすイベントのみが古いイベントとして更新リスナにポストされます。
イベント ストリームのフィルタは指定可能なフィルタ タイプの中でより限定的ですが、WHERE
句には複雑な条件を含めることができます。次の文の WHERE
句は WHERE 句内の java.lang.Math
Java ライブラリ クラスの ceil
関数に適用されます。INSERT INTO
句により、最初の文の結果が 2 番目の文で使用可能になります。
INSERT INTO BigWithdrawal SELECT * FROM Withdrawal RETAIN ALL WHERE Math.ceil(amount) >= 200 SELECT * FROM BigWithdrawal RETAIN ALL
集約関数を介してイベントを集約する文では、集約された値の変化に伴い、削除ストリーム イベントもポストされます。2 つの Withdrawal
イベントを受信した場合にアラートを発生する以下の文について考えます。
SELECT COUNT(*) AS mycount FROM Withdrawal RETAIN ALL HAVING COUNT(*) = 2
エンジンで 2 つ目の withdrawal イベントが検出されると、エンジンにより新しいイベントが更新リスナにポストされます。新しいイベントの mycount
プロパティの値は 2 です。また、エンジンで 3 つ目の Withdrawal
イベントが検出されると、前のカウント値を含む古いイベントが更新リスナにポストされます。その古いイベントの mycount プロパティの値も 2 です。
ISTREAM
または RSTREAM
キーワードを使用すると、更新リスナにポストされる新しいイベントまたは古いイベントを除去できます。次の文では、ISTREAM
キーワードを使用した結果、2 つ目の Withdrawal
イベントを受信した際にエンジンで更新リスナが 1 回のみ呼び出されます。
SELECT ISTREAM COUNT(*) AS mycount FROM Withdrawal RETAIN ALL HAVING COUNT(*) = 2
以下の使用例では、言語のさまざまな機能の使用法の例を示します。
スループットの統計および急速な減衰の検出のために、各マーケット データ フィードごとに毎秒のチック数を計算します。
マーケット データ イベント ストリームのソースからの 1 秒間のイベントをバッチにまとめる EPL 文を使用できます。フィードおよびフィードごとのイベント数を出力値として指定します。このデータをさらに処理するために、出力イベントを TicksPerSecond イベント ストリームに挿入します。
INSERT INTO TicksPerSecond SELECT feed, COUNT(*) AS cnt FROM MarketDataEvent RETAIN BATCH OF 1 SECOND GROUP BY feed
最高値の株価を計算するため、固有の各銘柄記号に対し、取引のブロック サイズが 10 を超える 100 件のイベントを保持するスライド枠を定義します。たとえば、5,000 の銘柄記号がある場合、5,000 x 100 の 5,000,000 件のイベントが保持されます。ブロック サイズが 10 を越える MarketTrade
イベントのみが枠に入れられ、価格の高い上位 100 件のイベントのみが保持されます。
結果は銘柄記号別にグループ化され、平均価格が 100 未満の銘柄記号は出力から除外され、アルファベット順に並べられます。
SELECT symbol, AVG(price) FROM (SELECT * FROM MarketTrade WHERE blockSize > 10) RETAIN 100 EVENTS WITH LARGEST price PARTITION BY symbol GROUP BY symbol HAVING AVG(price) >= 100 ORDER BY symbol
高速道路の自動車の位置および方向に関する情報が含まれる自動車位置のイベント データに基づいて、自動車が進むルートを検出します。まず carId
によってデータを分類することで特定の自動車に関する情報を特定し、次に高速道路、方向で分類し、方向をプロットします。これらの情報に基づいて自動車の速度が計算されます。
最初の PARTITION BY
carId
では自動車位置イベントが自動車ごとにグループ化され、さらにその後の PARTITION BY expressway PARTITION BY direction
ではより詳細な位置および方向のプロパティ値によって、データがさらに分類されます。このクエリで保持されるイベント数は 4 ですが、これは最後の PARTITION BY
句で保持された最大数に適用されます。このため、個別の分類プロパティ値に対し、最大 4 件のイベントが保持されます。
SELECT carId, expressway, direction, SUM(segment)/(MAX(timestamp)-MIN(timestamp)) AS speed FROM CarLocationEvent RETAIN 4 events PARTITION BY carId PARTITION BY expressway PARTITION BY direction
任意のタイミングの毎秒チック数が直前の 10 秒間の毎秒チック数の 75% を下回った場合にアラートを発することにより、急速な減衰を定義します。
直前の 10 秒間の毎秒チック数は、最初の文で計算された TicksPerSecond
イベントを使用し、直前の 10 秒間の平均を取ることで計算できます。次に、現在のレートを変化する平均値と比べ、平均値の 75% に満たないレートをフィルタ処理します。
SELECT feed, AVG(cnt) AS avgCnt, cnt AS feedCnt FROM TicksPerSecond RETAIN 10 seconds GROUP BY feed HAVING cnt < AVG(cnt) * 0.75
顧客側でのチェックインの作業中に、端末でハードウェアの問題が検出されたり、ネットワークが停止したりする場合があります。このような状況では、顧客を支援するチーム メンバーに警告する必要があります。端末で問題が検出された場合は OutOfOrder
イベントが発行されます。パターンでは端末で out-of-order が通知され、顧客がチェックイン処理中である状況が検出されます。
SELECT ci.term MATCHING ci:=Checkin FOLLOWED BY ( OutOfOrder (term.id=ci.term.id) AND NOT (Cancelled (term.id=ci.term.id) OR Completed (term.id=ci.term.id) ) WITHIN 3 MINUTES )
セルフサービスの各端末では、以下の 4 つのうちいずれかのイベントを発行できます。
Checkin
- 顧客がチェックイン ダイアログを開始したことを示します。
Cancelled
- 顧客がチェックイン ダイアログを取り消したことを示します。
Completed
- 顧客がチェックイン ダイアログを完了したことを示します。
OutOfOrder
- 端末でハードウェアの問題が検出されたことを示します。
すべてのイベントでは、イベントをパブリッシュした端末の情報と、タイムスタンプが提供されます。端末の情報は term
というプロパティに格納され、端末の id
が指定されます。すべてのイベントは類似の情報を含んでいるため、各イベントを基本クラス TerminalEvent
のサブタイプとしてモデル化し、すべてのイベントで共有される端末情報がこのクラスで提供されます。これにより、すべての端末イベントを多態的に扱うことができ、派生したイベント タイプを親イベント タイプと同様に扱うことができるためクエリが簡素化されます。
Status
イベントは定期的に 60 秒間隔で到着するため、MATCHING
句を使用した一時的なパターン照合を利用して、時間通りに到着しなかったイベントを検出できます。WITHIN
演算子を使用して送信や処理による遅延の可能性を考慮した 65 秒の枠を保持し、NOT
演算子を使用して term.id
が T1
に等しい Status
イベントの欠落を検出します。
SELECT 'terminal 1 is offline' MATCHING NOT Status(term.id = 'T1') WITHIN 65 SECONDS OUTPUT FIRST EVERY 5 MINUTES
端末アクティビティの統計情報をスタッフ側にリアルタイムに提示することで、システムをモニタし、問題を特定することが可能になります。次のサンプル クエリでは、イベント タイプごとのカウントが毎分取得されます。CountPerType
イベント ストリームを通じて使用可能なこのデータをさらに利用して、記録された使用法パターンに対して結合および比較したり、単にアクティビティをリアルタイムで要約できます。
INSERT INTO CountPerType SELECT type, COUNT(*) AS countPerType FROM TerminalEvent RETAIN 10 MINUTES GROUP BY type OUTPUT ALL EVERY 1 MINUTE
このサンプルでは、いずれかのリーダーの範囲内にパレットが含まれている場合に、RFID リーダーのアレイが RFID タグを検知します。リーダーは、リーダーのセンサー ID、調査時刻、調査したタグなどの調査情報が含まれる XML ドキュメントを生成します。リーダーのセンサー ID ごとに直前の 60 秒間のタグの総数が計算されます。
SELECT ID AS sensorId, SUM(countTags) AS numTagsPerSensor FROM AutoIdRFIDExample RETAIN 60 SECONDS WHERE Observation[0].Command = 'READ_PALLET_TAGS_ONLY' GROUP BY ID
このサンプルでは、トランザクションの各コンポーネントが含まれている組み合わせイベントを検出する EPL 文を作成します。イベント照合は、直前の 30 分間に到着したイベントに制限されます。この文は INSERT INTO
構文を使用して CombinedEvent
イベント ストリームを生成します。
INSERT INTO CombinedEvent(transactionId, customerId, supplierId, latencyAC, latencyBC, latencyAB) SELECT C.transactionId, customerId, supplierId, C.timestamp - A.timestamp, C.timestamp - B.timestamp, B.timestamp - A.timestamp FROM TxnEventA A, TxnEventB B, TxnEventC C RETAIN 30 MINUTES WHERE A.transactionId = B.transactionId AND B.transactionId = C.transactionId
過去 30 分間のイベントの最小、最大、および平均の合計レイテンシ (A および C の間の時間差) を導くには、以下の EPL を使用できます。また、イベント サーバをモニタするため、ダッシュボード UI はイベントのサブセットをサブスクライブして、サーバとエンドツーエンドのレイテンシなどのシステム性能を計測します。システム全体を流れる各イベントが UI でモニタされることは期待できないため、モニタ アプリケーションでの処理が可能なイベントのサブセットに出力を制限する手段が必要になります。最後の 1 つのイベントのみ、またはすべてのイベントを出力できます。
SELECT MIN(latencyAC) as minLatencyAC, MAX(latencyAC) as maxLatencyAC, AVG(latencyAC) as avgLatencyAC FROM CombinedEvent RETAIN 30 MINUTES GROUP BY customerId OUTPUT LAST 50 EVERY 1 SECOND
OUTER JOIN
を使用すると、3 つのすべてのイベントを通じて成功しなかったトランザクションを検出できます。TxnEventA
または TxnEventB
イベントが、直前の 30 分間のイベントで構成されるそれぞれの時間枠を離れた場合、EPL により EventC
行が見つからない行がフィルタで除外されます。
SELECT * FROM TxnEventA A FULL OUTER JOIN TxnEventC C ON A.transactionId = C.transactionId FULL OUTER JOIN TxnEventB B ON B.transactionId = C.transactionId RETAIN 30 MINUTES WHERE C.transactionId is null