この章では、ストリーミング・データ用の問合せ言語であるOracle Event Processing Language (EPL)の概要について説明します。EPLを使用すると、Oracle Event Processingを使用したデータ・ストリームに対して問合せを表現できます。EPLは非推奨になったことに注意してください。新たな開発ではOracle Continuous Query Language (Oracle CQL)を使用する必要があります。
この章には次の項が含まれます:
イベント・プロセッサ・モジュールは、イベント表現、処理モデル、プログラミング・インタフェースおよび言語仕様という機能コンポーネントに分けることができます。
イベントはJavaBeans規則に準拠するPOJOとして表現されます。イベント・プロパティはPOJOのgetterメソッドを通じて公開されます。可能な場合は、EPL文の実行結果もPOJOとして返されます。ただし、イベント・ストリームが結合された場合など、型のないイベントが返される場合があります。この場合、Mapコレクション・インタフェースのインスタンスが返されます。詳細は、1.2項「イベント表現」を参照してください。
EPL処理モデルは連続的です。文の制約を満たす着信イベントを受信すると結果がすぐに出力されます。出力時には、出力ウィンドウに入れられる新しいイベントの挿入イベント、および出力ウィンドウから出される古いイベントの削除イベントという2種類のイベントが生成されます。これらのいずれかまたは両方のイベントが発生すると、リスナーにアタッチおよび通知されます。詳細は、1.3項「処理モデル」を参照してください。
着信イベントは、スライディング・ウィンドウまたはバッチ・ウィンドウで処理されます。スライディング・ウィンドウでは、1件の増分データに対して徐々にウィンドウが変化してイベントが処理されます。これに対し、バッチ・ウィンドウでは別個のデータ・チャンクに対してウィンドウが変化してイベントが処理されます。ウィンドウのサイズは、保持されるイベントの最大数またはイベントが保持される最大時間によって定義されます。
EPLプログラミング・インタフェースでは、文を個別にコンパイルするか、URLを通じて一括でロードできます。文の繰り返し、取得、開始、および停止が可能です。リスナーは文にアタッチされ、挿入イベントまたは削除イベントあるいはその両方が発生した場合に通知されます。
イベント処理言語はSELECT、FROM、WHERE、GROUP BY、HAVINGおよびORDER BY句を使用するSQLに似た言語です。ストリームはデータ・ソースとしての表に相当し、イベントはデータの基本単位としての行に相当します。イベントはデータで構成されるため、結合による相関、副問合せによるフィルタ処理、グループ化による集約などのSQLの概念が効果的に利用されます。INSERT INTO句は、下流処理が行われるイベントを他のストリームに転送する手段として作り直されています。JDBCを通じてアクセス可能な外部データは、ストリーム・データで照会および結合されます。SQLには含まれない、イベント処理に固有の言語構造を提供するRETAIN、MATCHING、OUTPUT句などの追加の句も使用できます。
RETAIN句は、本質的にはストリーム・データに対する仮想ウィンドウを定義することで、問合せが実行されるデータ量に制約を付与します。データの範囲が表で制限されるリレーショナル・データベース・システムとは異なり、イベント処理システムでは問合せのデータをより動的に制限する代替手段が提供されます。
MATCHING句は固有のパターンに一致するイベントのシーケンスを検出します。AND、OR、FOLLOWED BYなどの時相演算子や論理演算子を使用することで、任意の複雑な式を通じてイベントの発生またはイベントの欠落の両方を検出できます。
OUTPUT句は下流プロセッサの過負荷を回避するために文の実行結果を抑制します。結果的に生成された最初または最後のイベントの全体または一部が、時間ベースまたは行ベースのバッチで渡されます。
最後の節では、実際的なシナリオのもとで言語機能を例示するいくつかの使用例を示します。
EPLを使用すると、イベントをイベント・オブジェクトとして表現できます。詳細については、以下を参照。
すべてのイベント・オブジェクトにはプロパティが含まれます。詳細については、以下を参照。
イベントはイベント・シンクによって消費されます。詳細は、1.2.7項「イベント・シンク」を参照してください。
イベントは、過去に発生したアクションまたは状態変更の不変レコードです。イベント・オブジェクトの状態情報はイベント・プロパティで取得されます。イベントは、POJOまたはjava.util.Mapを拡張するcom.bea.wlevs.cep.event.MapEventObjectのいずれかで表現されます。
Plain old Java object (POJO)イベントは、JavaBeans形式のgetterメソッドを通じてイベント・プロパティを公開するオブジェクト・インスタンスです。イベント・クラスまたはインタフェースは必ずしも完全にJavaBeans仕様に準拠している必要はありませんが、EPLエンジンがイベント・プロパティを取得するためには必須のJavaBeans getterメソッドが存在している必要があります。
EPLでは、スーパー・クラスを拡張したり、1つまたは複数のインタフェースを実装したりするJavaBeans形式のイベント・クラスがサポートされています。また、EPL文ではJavaインタフェース・クラスおよび抽象クラスを参照できます。
イベントを表現するクラスは不変的に作成する必要があります。イベントは過去に発生した状態変更やアクションの記録であるため、関連するイベント・プロパティは変更不能である必要があります。ただし、これは厳密な要件ではなく、EPLエンジンでは可変イベントも受諾されます。
イベントはjava.util.Mapインタフェースを拡張するcom.bea.wlevs.ede.api.MapEventObjectインタフェースを実装するオブジェクトによっても表現されます。Mapイベントのイベント・プロパティは、java.util.Mapインタフェースで公開されるgetメソッドを通じてアクセスできる各エントリの値です。
Map内のエントリはイベント・プロパティを表現します。EPL文で指定されたイベント・プロパティ名をエンジンが参照できるようするために、キーの型はjava.util.Stringである必要があります。値には任意の型を使用できます。MapではPOJOも値として示されます。
EPL式では、simple、indexed、mapped、およびnestedのイベント・プロパティを使用できます。以下の表に、異なる型のプロパティ、およびイベント式での構文の概要を示します。この構文を使用すると、JavaBeansオブジェクト・グラフ、XML構造、およびMapイベントを深い所まで照会できます。
表1-2 イベント・プロパティ
| タイプ | 説明 | 構文 | 例 |
|---|---|---|---|
|
Simple |
取得可能な1つの値を持つプロパティ。プロパティ型にはプリミティブ型( |
name |
sensorId |
|
Nested |
nestedプロパティは、イベントの他のプロパティ内に存在するプロパティです。Mapとして表現されるイベントには他のPOJOイベントのみをネストでき、他のMapイベントはネストできません。 |
name.nestedname |
sensor.value |
|
Indexed |
indexedプロパティにはオブジェクトの順序付きコレクションが格納され(すべて同一の型)、整数の値を持つ正の数の索引(添字)によって個別にアクセスされます。Mapとして表現されるイベントではindexedプロパティはサポートされません。 |
name[index] |
sensor[0] |
|
Mapped |
mappedプロパティはキーを持つオブジェクトのコレクションを格納します(すべて同一の型)。標準のJavaBeans APIに対する拡張機能として、EPLは文字列値のキーを受け入れるプロパティをmappedプロパティと見なします。マップとして表現されるイベントではindexedプロパティはサポートされません。 |
name('key')
|
sensor('light')
|
次のようなEmployeeEventイベント・クラスがあると想定します。この例のmappedプロパティとindexedプロパティではJavaオブジェクトが返されますが、Java言語のプリミティブ型(intやStringなど)を返すこともできます。AddressオブジェクトおよびEmployeeオブジェクト自身は、Addressオブジェクトの町名やEmployeeオブジェクトの従業員名など内部にネストするプロパティを持つことができます。
public class EmployeeEvent {
public String getFirstName();
public Address getAddress(String type);
public Employee getSubordinate(int index);
public Employee[] getAllSubordinates();
}
simpleイベント・プロパティにはプロパティ値を返すgetterメソッドが必要です。上の例では、getFirstName getterメソッドによって型StringのfirstNameイベント・プロパティが返されます。
indexedイベント・プロパティには次のいずれかのgetterメソッドが必要です。
getSubordinateメソッドのように、整数型のキー値を受け取り、プロパティ値を返すメソッド。
Employeeの配列を返すgetAllSubordinates getterメソッドのように、配列型を返すメソッド。
EPL文では、indexedプロパティへのアクセス時に構文property[index]を使用します。
mappedイベント・プロパティでは、getAddressメソッドのように、String型のキー値を受け取りプロパティ値を返すgetterメソッドが必要です。EPL文またはイベント・パターン文では、mappedプロパティへのアクセス時に構文property ('key')を使用します。
nestedイベント・プロパティにはネストするオブジェクトを返すgetterメソッドが必要です。getAddressおよびgetSubordinateメソッドは、ネストするオブジェクトを返すmappedプロパティとindexedプロパティです。EPL文では、nestedプロパティへのアクセス時に構文property.nestedPropertyを使用します。
すべてのEPL文では、1つまたは複数のイベント・プロパティ名が予期される任意の場所で、indexedプロパティ、mappedプロパティ、nestedプロパティ、またはこれらの組み合わせを使用できます。次の例では、indexed、mapped、およびnestedプロパティの異なる組み合わせを示します。
address('home').streetName
subordinate[0].name='anotherName'
allSubordinates[1].name
subordinate[0].address('home').streetName
同様に、この構文は、SELECTリスト、WHERE句、JOIN条件などEPL文でイベント・プロパティ名が予期されるすべての場所で使用できます。
SELECT firstName, address('work'), subordinate[0].name, subordinate[1].name
FROM EmployeeEvent RETAIN ALL
WHERE address('work').streetName = 'Park Ave'
動的な(つまり、チェックされていない)プロパティは、文のコンパイル時に認識される必要のないイベント・プロパティです。Oracle Event Processingではこれらの動的プロパティを実行時に解決します。
動的プロパティの背景にある概念としては、基になるイベント表現で必ずしも事前にプロパティを認識する必要がないということがあります。文のコンパイル時に認識されない追加のプロパティが基になるイベントに含まれていて、EPL文を使用してこれらのプロパティを照会する必要がある場合があります。この概念は、豊富な機能を持つオブジェクト指向のドメイン・モデルを表現するイベントで特に便利です。
動的プロパティの構文は、プロパティ名と疑問符から成ります。indexed、mapped、およびnestedプロパティを動的プロパティにすることもできます。以下の表に、動的イベント・プロパティの型、およびそれらの識別のために使用される構文を示します。
表1-3動的プロパティの構文
| イベント・プロパティ型 | 構文 |
|---|---|
|
動的なSimple |
name? |
|
動的なIndexed |
name[index]? |
|
動的なMapped |
name('key')?
|
|
動的なNested |
name?.nestedPropertyName |
動的プロパティでは常にjava.lang.Object型が返されます。実行時に処理されるイベントに動的プロパティが存在しない場合、動的プロパティではnull値が返されます。
たとえば、itemプロパティを指定するOrderEventイベントを考えます。itemプロパティの型はObjectであり、ServiceまたはProductのいずれかのインスタンスへの参照が保持されます。どちらになるかは実行時に明らかになります。さらに、ServiceおよびProductクラスでpriceというプロパティが提供されると想定します。動的プロパティを使用すると、以下のように、いずれかのオブジェクト(ServiceまたはProduct)からpriceプロパティを取得する問合せを指定できます。
SELECT item.price? FROM OrderEvent RETAIN ALL EVENTS
2つ目の例として、Serviceクラスに含まれているserviceNameプロパティがProductクラスには含まれていないと想定します。実行時、以下の問合せはServiceオブジェクトの場合にserviceNameプロパティの値を返します。しかし、Productオブジェクトに対して問合せを実行すると、ProductsにはserviceNameプロパティが含まれないためnull値が返されます。
SELECT item.serviceName? FROM OrderEvent RETAIN ALL EVENTS
次に、OrderEventに複数の実装クラスがあり、そのうちのいくつかにのみtimestampプロパティが含まれている場合を考えます。以下の問合せは、プロパティが含まれているOrderEventインタフェースで実装されているtimestampプロパティを返します。
SELECT timestamp? FROM OrderEvent RETAIN ALL EVENTS
前述の問合せは、型java.lang.Objectのtimestamp?という1つの列を返します。
動的プロパティをネストした場合は、動的プロパティの下にあるすべてのプロパティも動的プロパティと見なされます。次の例は、detail動的プロパティで返されたオブジェクトのdirectionプロパティをたずねています。
SELECT detail?.direction FROM OrderEvent RETAIN ALL EVENTS
上の問合せは、以下と同等です。
SELECT detail?.direction? FROM OrderEvent RETAIN ALL EVENTS
以下は、動的プロパティと共に使用すると便利な機能です。
CAST関数は動的プロパティの値(または式の値)を指定の型にキャストします。4.1.7項「CAST関数」を参照してください。
EXISTS関数は動的プロパティが存在するかどうかをチェックします。イベントにその名前のプロパティが存在する場合はtrueを返し、イベントにプロパティが存在しない場合はfalseを返します。4.1.8項「EXISTS関数」を参照してください。
INSTANCEOF関数は動的プロパティの値(または式の値)が指定の型であるかどうかをチェックします。4.1.6項「INSTANCEOF関数」を参照してください。
EPLでサポートされていないデータ型をイベントが使用している場合は、ユーザー定義関数を作成してEPL問合せでそのデータ型を評価できます。
例1-1のようなenumデータ型があるとします。例1-2に示すイベントでは、このenumデータ型を使用しています。EPLではenumデータ型がサポートされていません。
例1-2 列挙データ型ProcessStatusを使用するイベント
package com.oracle.app;
import com.oracle.capp.ProcessStatus;
public class ServiceOrder {
private String serviceOrderId;
private String electronicSerialNumber;
private ProcessStatus status;
...
}
ユーザー定義関数を作成し(例1-3を参照)、アプリケーション・アセンブリ・ファイルに関数を登録することで(例1-4を参照)、このenumデータ型をEPL問合せで評価できます(例1-5を参照)。
例1-3 列挙データ型を評価するユーザー定義関数
package com.oracle.app;
import com.oracle.capp.ProcessStatus;
public class CheckIfStatusClosed {
public boolean execute(Object[] args) {
ProcessStatus arg0 = (ProcessStatus)args[0];
if (arg0 == ProcessStatus.OPEN)
return Boolean.FALSE;
else
return Boolean.TRUE;
}
}
例1-4 アプリケーション・アセンブリ・ファイルへのユーザー定義関数の登録
<wlevs:processor id="testProcessor">
<wlevs:listener ref="providerCache"/>
<wlevs:listener ref="outputCache"/>
<wlevs:cache-source ref="testCache"/>
<wlevs:function function-name="statusClosed" exec-method=”execute” />
<bean class="com.oracle.app.CheckIfStatusClosed"/>
</wlevs:function>
</wlevs:processor>
例1-5 ユーザー定義関数を使用したEPL問合せでの列挙データ型の評価
<query id="rule-04"><![CDATA[
SELECT
meter.electronicSerialNumber,
meter.exceptionKind
FROM
MeterLogEvent AS meter,
ServiceOrder AS svco
WHERE
meter.electronicSerialNumber = svco.electronicSerialNumber and
svco.serviceOrderId IS NULL OR statusClosed(svco.status)
]]></query>
詳細は、第4章「EPLリファレンス関数」を参照してください。
イベント・シンクは、EPL文で指定された条件に一致するイベントが発生した場合に、プログラムによる通知を受信する手段を提供します。以下の場合に、シンクへの通知が行われます。
EPL文で指定された条件に一致する新しいイベントが発生した場合。これらはISTREAMイベントと呼ばれます。
以前EPL文で指定された条件と一致していた古いイベントが、期限切れになったか、または新たに発生した着信イベントがこれらに取って代わったために、出力ウィンドウからプッシュされた場合。これらはRSTREAMイベントと呼ばれます。
これらの通知が発生した場合の詳しい例は、1.3項「処理モデル」で説明しています。
ISTREAMイベントを受信するには、com.bea.wlevs.ede.api.EventSinkインタフェースを使用します。実装では、結果が使用可能になった場合にエンジンによって呼び出される単一のonEventメソッドを指定する必要があります。このインタフェースでは、新しいイベントのみがリスナーに送信されます。
public interface EventSink extends EventListener {
void onEvent(List<Object> newEvents)
throws RejectEventException;
}
エンジンでは、文の結果をPOJOまたはMapEventObjectインスタンスのリストとしてイベント・シンクに提供します。ワイルドカードでの選択の場合、結果はエンジンに送信された元のイベント・オブジェクト・タイプに一致します。式のjoinsおよびselect句の場合、結果のオブジェクトはcom.bea.wlevs.ede.api.MapEventObjectインタフェースを実装します。
EPL処理モデルは、1.3.1項「イベント・ストリーム」で説明するイベント・ストリームに基づいています。
イベント・ストリームで操作する問合せは、次の項に従って表現します。
EPL処理モデルは継続的です。文へのリスナーはエンジンでその文のイベントが処理されるとすぐに、文で選択されているイベント・ストリーム、RETAIN句の制約、フィルタ、および出力レートに応じて更新されたデータを受信します。
この節では、非常に単純なEPL文の出力を示します。この文はデータ・ウィンドウを使用せず、いずれのフィルタも適用せずにイベント・ストリームを選択します。
SELECT * FROM Withdrawal RETAIN ALL
この文はすべてのWithdrawalイベントを選択します。エンジンでWithdrawal型またはWithdrawalの下位の型のイベントが処理されるたびに、すべての更新リスナーが呼び出され、文の各リスナーに新しいイベントが渡されます。
挿入ストリームという用語は、新しいイベントが到着し、データ・ウィンドウまたは集約に入れられることを示します。この例の挿入ストリームは、到着するWithdrawalイベントのストリームであり、更新リスナーに新しいイベントとしてポストされます。
次の図は、時間の経過に沿って到着する一連のWithdrawalイベント1 - 6を示しています。この図を含め、この節に示す図のカッコ内の数字はWithdrawalイベントのamountプロパティの値を示します。
前述の例の文ではRETAIN句が指定されていないため、新しいイベントのみがエンジンによって文のリスナーにポストされ、古いイベントはポストされません。
スライディング・ウィンドウには、行ベースおよび時間ベースの2種類があります。以下の節では、これらの各項目について示します。
行ベースのスライディング・ウィンドウでは、ストリームの直前のN件のイベントのみを保持するようエンジンに通知します。次の文は、Withdrawalイベント・ストリームに長さウィンドウを適用します。この文は、データ・ウィンドウ、およびデータ・ウィンドウに入るまたはウィンドウから出るイベントの概念を示しています。
SELECT * FROM Withdrawal RETAIN 5 EVENTS
この文のウィンドウのサイズは5件のイベントです。エンジンにより到着するすべてのWithdrawalイベントがウィンドウに入れられます。ウィンドウが一杯になると、最も古いWithdrawalイベントがウィンドウからプッシュされます。エンジンは更新リスナーに対し、ウィンドウに入れられるすべてのイベントを新しいイベントとして示し、ウィンドウから出されるすべてのイベントを古いイベントとして示します。
挿入ストリームという用語が到着する新しいイベントを示すのに対し、削除ストリームという用語はデータ・ウィンドウから出されるイベント、または集約値を変更するイベントを示します。この例の削除ストリームは、長さウィンドウから退去するWithdrawalイベントのストリームであり、これらのイベントは古いイベントとして更新リスナーにポストされます。
次の図は、イベントの到着に応じて変化する長さウィンドウの内容と、更新リスナーにポストされるイベントを示します。
以前と同様、到着するすべてのイベントは新しいイベントとして更新リスナーにポストされます。また、イベントW1は、イベントW6の到着時に長さウィンドウから出され、古いイベントとして更新リスナーにポストされます。
長さウィンドウと同様、時間ウィンドウでは指定の期間内で最新のイベントが保持されます。たとえば、5秒の時間ウィンドウでは直前の5秒間のイベントが保持されます。秒の経過につれて、時間ウィンドウでは最も古いイベントがウィンドウから実際にプッシュされ、その結果1つまたは複数の古いイベントが更新リスナーにポストされます。
EPLでは、省略可能なISTREAMおよびRSTREAMキーワードがSELECT句およびINSERT INTO句でサポートされています。これにより、データ・ウィンドウに入れられるイベントまたは出されるイベントのみを転送したり、現在または以前の集約値のみを選択したりするなど、挿入ストリームまたは削除ストリームに基づいた操作を行うようエンジンに通知できます。
時間ベースのスライディング・ウィンドウは、システム時間に基づいて、過去の指定された時間間隔までの範囲を示す変化するウィンドウです。行ベースのスライディング・ウィンドウと同様、時間ベースのスライディング・ウィンドウでは問合せで処理されるイベント数を制限できます。
次の図は、時間ウィンドウの動作を示しています。この図では、イベントのグループ化やフィルタ処理を行わず、単にイベントを選択する問合せを想定します。
SELECT * FROM Withdrawal RETAIN 4 SECONDS
この図は指定の時間tから開始し、t+4秒、t+5秒、およびそれ以降の時点における時間ウィンドウの内容を示しています。
図では、以下の動作を示しています。
時間t + 4秒の時点でイベントW1が到着し、時間ウィンドウに入ります。エンジンにより新しいイベントが更新リスナーに報告されます。
時間t + 5秒の時点でイベントW2が到着し、時間ウィンドウに入ります。エンジンにより新しいイベントが更新リスナーに報告されます。
時間t + 6.5秒の時点でイベントW3が到着し、時間ウィンドウに入ります。エンジンにより新しいイベントが更新リスナーに報告されます。
時間t + 8秒の時点でW1が時間ウィンドウから離れます。エンジンによりイベントは古いイベントとして更新リスナーに報告されます。
実際的な例として、直前の4秒間の各口座の平均引出し額が1000を超えるすべての口座を決定する必要がある場合を考えます。この問題を解決する文は、以下のとおりです。
SELECT account, AVG(amount) FROM Withdrawal RETAIN 4 SECONDS GROUP BY account HAVING amount > 1000
行ベースおよび時間ベースのウィンドウは両方ともバッチ処理に対応できます。次の節では、これらの概念について示します。
時間ベースのバッチ・ウィンドウではイベントがバッファ・リングされ、指定された時間間隔ごとにそれらのイベントが1回の更新で解放されます。長さベースのバッチ・ウィンドウと同様、時間ベースのバッチ・ウィンドウではイベントの評価が制御されます。
次の図は、時間バッチ・ビューの動作を示しています。この図では、以下の単純な問合せを想定しています。
SELECT * FROM Withdrawal RETAIN BATCH OF 4 SECONDS
この図は指定の時間tから開始し、t + 4秒、t + 5秒、およびそれ以降の時点における時間ウィンドウの内容を示しています。
図では、以下の動作を示しています。
時間t + 1秒の時点でイベントW1が到着し、バッチに入ります。更新リスナーへの通知の呼出しは発生しません。
時間t + 3秒の時点でイベントW2が到着し、バッチに入ります。更新リスナーへの通知の呼出しは発生しません。
時間t + 4秒の時点でエンジンによりバッチ内のイベントが処理され、新しいバッチが開始されます。エンジンによりイベントW1およびW2が更新リスナーに報告されます。
時間t + 6.5秒の時点でイベントW3が到着し、バッチに入ります。更新リスナーへの通知の呼出しは発生しません。
時間t + 8秒の時点でエンジンによりバッチ内のイベントが処理され、新しいバッチが開始されます。エンジンによりイベントW3が新しいデータとして更新リスナーに報告されます。エンジンによりイベントW1およびW2が古いデータ(前回のバッチ)として更新リスナーに報告されます。
イベント・ストリームに対するフィルタは副問合せ式内で使用され、イベントがデータ・ウィンドウ内に入る前にイベントを指定のストリームからフィルタで除外します。このフィルタ処理はWHERE句の実行前に発生します。残りのEPL文で処理されるデータ量が軽減され、パフォーマンスが向上するため、可能な場合は、WHERE句に対し、副問合せ内でフィルタ処理を完了します。
次の文は、金額が200以上のWithdrawalイベントを選択する副問合せを示します。
SELECT * FROM (SELECT * FROM Withdrawal WHERE amount >= 200) RETAIN 5 EVENTS
この副問合せでは、金額が200未満のWithdrawalイベントは外部問合せのウィンドウに入らず、そのため更新リスナーには渡されません。
文中のWHERE句およびHAVING句では、文のデータ・ウィンドウや他のビュー内でイベントが処理された後、処理の後半で可能性のある結果行が消去されます。
次の文は、副問合せを使用せず、WithdrawalイベントにWHERE句を適用します。
SELECT * FROM Withdrawal RETAIN 5 EVENTS WHERE amount >= 200
WHERE句は新しいイベントと古いイベントの両方に適用されます。次の図に示すとおり、到着するイベントは"amount"プロパティの値にかかわらずウィンドウに入れられます。ただし、WHERE句を満たすイベントのみが更新リスナーに渡されます。また、イベントはデータ・ウィンドウを離れますが、WHERE句の条件を満たすイベントのみが古いイベントとして更新リスナーにポストされます。
イベント・ストリームのフィルタは指定可能なフィルタ・タイプの中でより限定的ですが、WHERE句には複雑な条件を含めることができます。次の文のWHERE句はWHERE句内のjava.lang.Math Javaライブラリ・クラスのceil関数に適用されます。INSERT INTO句により、最初の文の結果が2番目の文で使用可能になります。
INSERT INTO BigWithdrawal
SELECT * FROM Withdrawal RETAIN ALL WHERE Math.ceil(amount) >= 200
SELECT * FROM BigWithdrawal RETAIN ALL
集約関数を介してイベントを集約する文では、集約された値の変化に伴い、削除ストリーム・イベントもポストされます。2つのWithdrawalイベントを受信した場合にアラートを発生する以下の文について考えます。
SELECT COUNT(*) AS mycount FROM Withdrawal RETAIN ALL HAVING COUNT(*) = 2
エンジンで2つ目のwithdrawalイベントが検出されると、エンジンにより新しいイベントが更新リスナーにポストされます。その新しいイベントのmycountプロパティの値は2です。さらに、エンジンで3つ目のWithdrawalイベントが検出されると、以前のカウントの値を含む古いイベントがエンジンにより更新リスナーにポストされます。その古いイベントのmycountプロパティの値も2です。
ISTREAMまたはRSTREAMキーワードを使用すると、更新リスナーにポストされる新しいイベントまたは古いイベントを除去できます。次の文では、ISTREAMキーワードを使用した結果、2つ目のWithdrawalイベントを受信した際にエンジンで更新リスナーが1回のみ呼び出されます。
SELECT ISTREAM COUNT(*) AS mycount FROM Withdrawal RETAIN ALL HAVING COUNT(*) = 2
次の使用例では、言語のさまざまな機能の使用方法の例を示します。
スループットの統計および急速な減衰の検出のために、各マーケット・データ・フィードごとに毎秒のティック数を計算します。
マーケット・データ・イベント・ストリームのソースからの1秒間のイベントをバッチにまとめるEPL文を使用できます。フィードおよびフィードごとのイベント数を出力値として指定します。このデータをさらに処理するために、出力イベントをTicksPerSecondイベント・ストリームに挿入します。
INSERT INTO TicksPerSecond
SELECT feed, COUNT(*) AS cnt
FROM MarketDataEvent
RETAIN BATCH OF 1 SECOND
GROUP BY feed
最高値の株価を計算するため、固有の各銘柄記号に対し、トランザクションのブロック・サイズが10を超える100件のイベントを保持するスライディング・ウィンドウを定義します。たとえば、5000の銘柄記号がある場合、5,000 x 100の5,000,000件のイベントが保持されます。ブロック・サイズが10を越えるMarketTradeイベントのみがウィンドウに入れられ、価格の高い上位100件のイベントのみが保持されます。
結果は銘柄記号別にグループ化され、平均価格が100未満の銘柄記号は出力から除外され、アルファベット順に並べられます。
SELECT symbol, AVG(price) FROM (SELECT * FROM MarketTrade WHERE blockSize > 10) RETAIN 100 EVENTS WITH LARGEST price PARTITION BY symbol GROUP BY symbol HAVING AVG(price) >= 100 ORDER BY symbol
高速道路の自動車の位置および方向に関する情報が含まれる自動車位置のイベント・データに基づいて、自動車が進むルートを検出します。まずcarIdによってデータを分類することで特定の自動車に関する情報を特定し、次に高速道路、方向で分類し、方向をプロットします。これらの情報に基づいて自動車の速度が計算されます。
最初のPARTITION BY carIdでは自動車位置イベントが自動車ごとにグループ化され、さらにその後のPARTITION BY expressway PARTITION BY directionではより詳細な位置および方向のプロパティ値によって、データがさらに分類されます。この問合せで保持されるイベント数は4ですが、これは最後のPARTITION BY句で保持された最大数に適用されます。このため、個別の分類プロパティ値に対し、最大4件のイベントが保持されます。
SELECT carId, expressway, direction,
SUM(segment)/(MAX(timestamp)-MIN(timestamp)) AS speed
FROM CarLocationEvent
RETAIN 4 events
PARTITION BY carId PARTITION BY expressway PARTITION BY direction
任意のタイミングの毎秒ティック数が直前の10秒間の毎秒ティック数の75%を下回った場合にアラートを発することにより、急速な減衰を定義します。
直前の10秒間の毎秒ティック数は、最初の文で計算されたTicksPerSecondイベントを使用し、直前の10秒間の平均を取ることで計算できます。次に、現在のレートを変化する平均値と比べ、平均値の75%に満たないレートをフィルタ処理します。
SELECT feed, AVG(cnt) AS avgCnt, cnt AS feedCnt
FROM TicksPerSecond
RETAIN 10 seconds
GROUP BY feed
HAVING cnt < AVG(cnt) * 0.75
顧客側でのチェック・インの作業中に、端末でハードウェアの問題が検出されたり、ネットワークが停止したりする場合があります。このような状況では、顧客を支援するチーム・メンバーに警告する必要があります。端末で問題が検出された場合はOutOfOrderイベントが発行されます。パターンでは端末でout-of-orderが通知され、顧客がチェック・イン処理中である状況が検出されます。
SELECT ci.term
MATCHING ci:=Checkin FOLLOWED BY
( OutOfOrder (term.id=ci.term.id) AND NOT
(Cancelled (term.id=ci.term.id) OR
Completed (term.id=ci.term.id) ) WITHIN 3 MINUTES )
セルフサービスの各端末では、以下の4つのうちいずれかのイベントを発行できます。
Checkin - 顧客がチェック・イン・ダイアログを開始したことを示します。
Cancelled - 顧客がチェック・イン・ダイアログを取り消したことを示します。
Completed - 顧客がチェック・イン・ダイアログを完了したことを示します。
OutOfOrder - 端末でハードウェアの問題が検出されたことを示します。
すべてのイベントでは、イベントをパブリッシュした端末の情報と、タイム・スタンプが提供されます。端末の情報はtermというプロパティに格納され、端末のidが指定されます。すべてのイベントは類似の情報を含んでいるため、各イベントを基本クラスTerminalEventのサブタイプとしてモデル化し、すべてのイベントで共有される端末情報がこのクラスで提供されます。これにより、すべての端末イベントを多態的に扱うことができ、派生したイベント・タイプを親イベント・タイプと同様に扱うことができるため問合せが簡素化されます。
Statusイベントは定期的に60秒間隔で到着するため、MATCHING句を使用した一時的なパターン照合を利用して、時間通りに到着しなかったイベントを検出できます。WITHIN演算子を使用して送信や処理による遅延の可能性を考慮した65秒のウィンドウを保持し、NOT演算子を使用してterm.idがT1に等しいStatusイベントの欠落を検出します。
SELECT 'terminal 1 is offline' MATCHING NOT Status(term.id = 'T1') WITHIN 65 SECONDS OUTPUT FIRST EVERY 5 MINUTES
端末アクティビティの統計情報をスタッフ側にリアル・タイムに提示することで、システムをモニターし、問題を特定することが可能になります。次のサンプル問合せでは、イベント・タイプごとのカウントが毎分取得されます。CountPerTypeイベント・ストリームを通じて使用可能なこのデータをさらに利用して、記録された使用法パターンに対して結合および比較したり、単にアクティビティをリアル・タイムで要約できます。
INSERT INTO CountPerType SELECT type, COUNT(*) AS countPerType FROM TerminalEvent RETAIN 10 MINUTES GROUP BY type OUTPUT ALL EVERY 1 MINUTE
このサンプルでは、いずれかのリーダーの範囲内にパレットが含まれている場合に、RFIDリーダーのアレイがRFIDタグを検知します。リーダーは、リーダーのセンサーID、調査時刻、調査したタグなどの調査情報が含まれるXMLドキュメントを生成します。リーダーのセンサーIDごとに直前の60秒間のタグの総数が計算されます。
SELECT ID AS sensorId, SUM(countTags) AS numTagsPerSensor FROM AutoIdRFIDExample RETAIN 60 SECONDS WHERE Observation[0].Command = 'READ_PALLET_TAGS_ONLY' GROUP BY ID
このサンプルでは、トランザクションの各コンポーネントが含まれている組み合わせイベントを検出するEPL文を作成します。イベント照合は、直前の30分間に到着したイベントに制限されます。この文はINSERT INTO構文を使用してCombinedEventイベント・ストリームを生成します。
INSERT INTO CombinedEvent(transactionId, customerId, supplierId,
latencyAC, latencyBC, latencyAB)
SELECT C.transactionId, customerId, supplierId,
C.timestamp - A.timestamp,
C.timestamp - B.timestamp,
B.timestamp - A.timestamp
FROM TxnEventA A, TxnEventB B, TxnEventC C
RETAIN 30 MINUTES
WHERE A.transactionId = B.transactionId AND
B.transactionId = C.transactionId
過去30分間のイベントの最小、最大、および平均の合計レイテンシ(AおよびCの間の時間差)を導くには、以下のEPLを使用できます。また、イベント・サーバーをモニターするため、ダッシュボードUIはイベントのサブセットをサブスクライブして、サーバーとエンド・ツー・エンドのレイテンシなどのシステム・パフォーマンスを計測します。システム全体を流れる各イベントがUIでモニターされることは期待できないため、モニター・アプリケーションでの処理が可能なイベントのサブセットに出力を制限する手段が必要になります。最後の1つのイベントのみ、またはすべてのイベントを出力できます。
SELECT MIN(latencyAC) as minLatencyAC,
MAX(latencyAC) as maxLatencyAC,
AVG(latencyAC) as avgLatencyAC
FROM CombinedEvent
RETAIN 30 MINUTES
GROUP BY customerId
OUTPUT LAST 50 EVERY 1 SECOND
OUTER JOINを使用すると、3つのすべてのイベントを通じて成功しなかったトランザクションを検出できます。TxnEventAまたはTxnEventBイベントが、直前の30分間のイベントで構成されるそれぞれの時間ウィンドウを離れた場合、EPLによりEventC行が見つからない行がフィルタで除外されます。
SELECT *
FROM TxnEventA A
FULL OUTER JOIN TxnEventC C ON A.transactionId = C.transactionId
FULL OUTER JOIN TxnEventB B ON B.transactionId = C.transactionId
RETAIN 30 MINUTES
WHERE C.transactionId is null