6 Oracle CQLプロセッサ

Oracle CQLプロセッサは、様々な入力チャネルおよび他のデータ・ソースからの着信イベントを処理します。Oracle CQLを使用して、着信イベントを処理する連続した問合せの形式でビジネス・ロジックを作成します。Oracle CQLは、イベントをリアルタイムでフィルタ、集計、関連付けおよび処理します。

注意:

Oracle CQLで使用可能な機能を高めるメソッドを使用してJavaクラスを作成できます。Oracle CQL内で、コンパイルされたクラスを名前別に参照し、そのメソッドをSELECT文からコールします。

この章の内容は次のとおりです。

この章では、基本概念を理解しやすいように、例を示しながらOracle CQLの概要を示します。キャッシュに対するCQL問合せの実行の詳細は、キャッシュ済イベント・データを参照してください。

Oracle JDeveloperには、共通のOracle CQL問合せを作成するためのテンプレートを提供するOracle CQLパターン・コンポーネントが用意されています。

この章では、いくつかのアセンブリおよび構成ファイルのOracle CQLプロセッサ設定について説明します。

6.1 プロセッサのデータ・ソース

Oracle CQLの問合せにより、1つ以上の入力ソースからの着信イベント・データを処理する文、および1つ以上の出力チャネルへの送信イベント・データを送信する文を1つ以上定義できます。

各チャネル(入力または出力)およびデータ・ソースにはイベント・タイプが関連付けられています。

たとえば、1つの入力をチャネルにし、別の入力をCoherenceキャッシュにすることができます。チャネルとCoherenceキャッシュではイベント・タイプが異なります。これは、Coherenceキャッシュが、入力チャネルからの着信イベント・データに関連するがこれと同じではない追加情報をOracle CQLプロセッサの問合せに提供するためです。

複数の問合せを使用してOracle CQLプロセッサを構成する場合、デフォルトではすべての問合せの結果がすべての出力チャネルに出力されます。どの問合せの結果をどの出力チャネルに出力するかは、1つ以上のダウンストリーム・チャネルにselector要素を設定することによって制御できます。selector要素を使用して、結果をこのチャネルに出力できる1つ以上の問合せ名のスペース区切りリストを指定します。出力チャネルに割り当てられたOracle CQL問合せには、出力チャネルに定義されているイベント・タイプと一致する正しい属性があります。詳細は、ダウンストリーム・チャネルに出力する問合せの制御を参照してください。

6.2 アセンブリ・ファイルおよび構成ファイル

Oracle CQLプロセッサをEPNに追加すると、アセンブリ・ファイルに次のエントリが表示されます。

<wlevs:processor id="processor"/>

平均化ルールなどのOracle CQLパターンをOracle JDeveloperでEPNに追加すると、アセンブリ・ファイルに次のエントリが表示されます。

<wlevs:processor id="processor"/>
<wlevs:processor id="averaging-rule"/>

構成ファイル

Oracle CQLプロセッサをEPNに追加すると、構成ファイルに次のエントリが表示されます。デフォルトでは、1つの問合せ用のテンプレートが含まれるルール用のテンプレートを入手できます。

 <processor>
   <name>processor</name>
   <rules>
     <query id="ExampleQuery"><![CDATA[ 
       select * from MyChannel [now] >
     </query>
   </rules>
  </processor>
  • rules要素は、このプロセッサが実行するOracle CQL文をグループ化します。

  • query要素には、Oracle CQLのselect文が含まれます。query要素のid属性は問合せ名を定義します。

  • XML CDATAタイプは、Oracle CQLルールを配置する場所を示します。

  • select文は実際の問合せです。このテンプレートには、NOWウィンドウおよび最終イベント・ウィンドウに示されているようにnow操作を実行できるよう[now]演算子が用意されています。

6.3 問合せ

株式取引イベントに対して基本的なOracle CQLプロセッサの問合せを実行する方法を説明します。

目標

この項の目的は、Oracle CQL問合せでウィンドウ、スライドおよびビューを使用する方法について理解することです。

  • ウィンドウを使用すると、イベント・ストリームを時間ベースのイベント・リレーションに変換し、イベントに対してOracle CQL操作を実行できるようになります。時間ベースのリレーション(ウィンドウ)を参照してください。

  • スライドを使用すると、イベントをバッチ処理し、CQLプロセッサがイベントを出力する割合を制御できます。プロセッサ出力制御(スライド)を参照してください。

  • ビューを使用すると、他のOracle CQL問合せで再使用できるOracle CQL文を作成できます。「ビュー」を参照してください。

イベント・タイプ定義

この項の例で使用される株式取引イベントは、次のフィールドおよび型の定義を持つStockTradeEventTypeタイプです。

  • tickerSymbol: String

  • price: Double

  • dailyHigh: Double

  • dailyLow: Double

  • closingValue: Double

6.3.1 ストリーム・チャネル

ストリーム・チャネルは、コレクションにイベントを挿入し、次のEPNステージにストリームを送信します。ストリーム内のイベントは連続してフローし、ストリームから削除できず、終わりがありません。アプリケーション内にフローするイベントの連続ストリームに対して問合せを実行できます。

Oracleのティッカ・シンボルを持つすべての株式取引イベントを取得するために、入力ストリーム・チャネルStockTradeIChannelに対する問合せが後に続きます。

SELECT tickerSymbol
FROM StockTradeIStreamChannel
WHERE tickerSymbol = ORCL

次の構成ファイル・エントリはこの問合せを示しています。ISTREAMは、リレーションからストリームへの演算子で説明されているリレーションからストリームへの演算子です。

<processor>
  <rules>
    <query id=rule1 <![CDATA[ISTREAM (SELECT tickerSymbol
       FROM StockTradeIStreamChannel WHERE tickerSymbol = ORCL)>
    </query>
  </rules>
</processor>

6.3.2 時間ベースのリレーション(ウィンドウ)

リレーション・チャネルは、コレクションにイベントを挿入し、次のEPNステージにリレーションを送信します。リレーションは、最初と最後があるストリーム上の時間のウィンドウです。リレーション内のイベントは、リレーションに挿入、リレーションから削除、およびリレーションで更新することができます。挿入、削除および更新操作の場合、操作が正しいイベントに対して行われるようにリレーション内のイベントが特定の時点を参照している必要があります。リレーションに対する操作はすべて時間ベースで行われます。

ほとんどのアプリケーションではリレーション・チャネルは使用されません。ストリーム・チャネルからの着信イベントに対して時間のウィンドウを設定することにより、時間ベースの処理操作に対してリレーションを作成できます。特定の株式の平均価格を検索するには、平均を計算する時間枠(ウィンドウ)を決定する必要があります。ストリームに対してウィンドウを定義すると、データのコレクションが設定されますが、これらのデータはフローしておらず、ストリームとは異なり、最初と最後があります。このウィンドウはインメモリー・リレーションです。このウィンドウには、AVGなどの関数を適用したり、挿入、更新および削除操作を実行することもできます。

ストリームに対して時間のウィンドウを設定する演算子は、ストリームからリレーションへの演算子と呼ばれます。ストリームからリレーションへの演算子の出力はリレーションです。リレーションからストリームへの演算子を使用して、リレーションをストリームに再変換することにより、すべてのイベント、挿入されたイベントのみ、または削除されたイベントのみが含まれるストリームを出力します。

通常、Oracle CQLプロセッサの出力はストリーム・チャネルに移動するとともに、EPN内の次のステージに移動します。

6.3.2.1 ストリームからリレーションへの演算子

ストリームからリレーションへの演算子は、RANGEおよびROWです。

RANGE演算子

次のように、時間ベースのウィンドウ演算子RANGEを使用して時間のウィンドウを指定できます。

SELECT AVG(price) 
FROM StockTradeIStreamChannel [RANGE 1 MINUTE]

この例では、例を理解しやすくするために、範囲は1分(秒単位のティック)に設定されており、1秒ごとに1つの入力イベントが受信されます。問合せにより、ゼロ秒でイベントに含まれる価格の平均化が開始され、値0が出力されます。これは、ゼロ秒のリレーションにはイベントがないためです。次のイベントが1秒で到着した場合、平均価格は最初のイベントの価格になります。次のイベントが2秒で到着した場合、平均価格はリレーション内の2つのイベントの平均になります。これが、59秒(1分)に到達するまで続きます。

時間ベースのウィンドウ操作の重要な概念は、ウィンドウが時間に基づいてイベント・ストリーム上で移動するということです。60秒が経過すると、ウィンドウは1秒移動して1秒から60秒までのイベントの価格が平均化され、さらに60秒が経過すると、ウィンドウはもう1秒移動して2秒から61秒までのイベントの価格が平均化されます。ウィンドウがリレーション上で移動するこの動作は、アプリケーションが動作しているかぎり続きます。

次の構成ファイル・エントリはこの問合せを示しています。ISTREAMは、リレーションからストリームへの演算子で説明されているリレーションからストリームへの演算子です。

<processor>
  <rules>
    <query id=rule2 <![CDATA[ISTREAM (SELECT AVG(price)
       FROM StockTradeIStreamChannel [RANGE 1 MIN >
    </query>
  </rules>
</processor>

注意:

非常に大きな数には接尾辞を付ける必要があります。接頭辞がない場合、Javaでは、非常に大きな数が整数として処理され、値が整数の範囲から外れる可能性があります。この場合、エラーがスローされます。

接頭辞は次のように追加します。

lまたはL: LongfまたはF: floatdまたはD: doublenまたはN: big decimal

例: SELECT * FROM channel0[RANGE 1368430107027000000l nanoseconds]

ROW演算子

次のように、時間ベースのROWS演算子を使用してタプルベースのウィンドウを指定できます。

SELECT AVG(price)
FROM StockTradeIStreamChannel [ROWS 3]

タプルはイベントであるため、ROWS 3操作は、最初のイベントが到着したときにリレーション内の3つのイベントの価格を平均化することを意味します。この場合、リレーション内に入った最初のイベントに対して平均操作が実行されます。2番目のイベントがリレーション内に入ると、2つのイベントに対して平均操作が実行されます。3番目のイベントがリレーション内に入ると、3つのイベントに対して平均操作が実行されます。4番目のイベントがリレーションに入るまで平均化が再度行われることはありません。4番目のイベントがリレーション内に入ると、2番目、3番目および4番目のイベントが平均化されます。同様に、5番目のイベントがリレーション内に入ると、3番目、4番目および5番目のイベントが平均化されます。

前述の例では、すべての株式の価格が平均化されています。ストリーム内の特定の株式の平均を計算するために、次の問合せではパーティション化されたウィンドウが使用されています。

SELECT AVG(price), tickerSymbol 
FROM StockTradeIStreamChannel [PARTITION by tickerSymbol ROWS 3]
GROUP BY tickerSymbol

パーティション化されたウィンドウにより、パーティションごとに個別のリレーション・ウィンドウが作成されます。このため、この例ではPARTITION by tickerSymbol句を使用して、同じティッカ・シンボルの株式が3つのイベントによってグループ化されて平均化されています。パーティションなしでGROUP BY句のみを使用する場合、タプルによって最後の3つのイベントが予想どおりに保持されますが、タプル内のティッカ・シンボルが常に一致するとはかぎらず、その場合は平均化エラーが発生します。

この問合せの構成ファイル・エントリは次のとおりです。ISTREAMは、リレーションからストリームへの演算子で説明されているリレーションからストリームへの演算子です。

<procesor>
  <rules>
    <query id="Example"><![CDATA[ISTREAM select tickerSymbol, AVG(price) 
         from StockTradeIStream 
         [PARTITION by tickerSymbol ROWS 3] 
         GROUP BY tickerSymbol) >
    </query>
  </rules>
</processor>
6.3.2.2 リレーションからストリームへの演算子

リレーションからストリームへの演算子は、ISTREAMDSTREAMおよびRSTREAMです。

ISTREAM演算子

ISTREAM演算子は、挿入イベントをリレーションから出力ストリームに配置します。リレーション内で削除または更新されたイベントは無視されます。平均が変更されると、問合せによりdeleteイベントがリレーションに送信されて前の平均が削除され、insertイベントがリレーションに送信されて新しい平均がリレーションに追加されます。次の例では、ISTREAM演算子を使用して、新しい平均の計算時に出力ストリームが更新されます。

ISTREAM (SELECT AVG(price) 
FROM StockTradeIStreamChannel [RANGE 1 MINUTE])

次の構成ファイル・エントリは、ISTREAM演算子を示します。

<processor>
  <rules>
    <query id=rule2 <![CDATA[ISTREAM (SELECT AVG(price)
       FROM StockTradeIStreamChannel [RANGE 1 MIN >
    </query>
  </rules>
</processor>

DSTREAM演算子

DSTREAM演算子を使用すると、株式が交換のリストから外された場合など、有効ではなくなった状況を検出できます。次の例では、DSTREAM演算子を使用して、リレーション内の新しい平均の計算後に古い平均を持つ出力ストリームが更新されます。

DSTREAM (SELECT AVG(price) 
FROM StockTradeIStreamChannel [RANGE 1 MINUTE])

次の構成ファイル・エントリは、DSTREAM演算子を示します。

<processor>
  <rules>
    <query id=rule2 <![CDATA[DSTREAM (SELECT AVG(price)
       FROM StockTradeIStreamChannel [RANGE 1 MIN >
    </query>
  </rules>
</processor>

RSTREAM演算子

RSTREAM演算子は、イベントが削除または更新されたかどうかに関係なく、すべてのイベントを出力ストリームに挿入します。この演算子は、すべての出力に対してダウンストリーム・アクションを実行する必要がある場合に使用します。次の例では、RSTREAM演算子を使用して、入力ストリーム内のすべてのイベントを選択し、2つのイベントがリレーションに到着するのを待機し、2つのイベントをリレーションから出力ストリームに配置します。

RSTREAM (SELECT * 
FROM StockTradeIStreamChannel [ROWS 2])

次の構成ファイル・エントリは、RSTREAM演算子を示します。

<processor>
  <rules>
    <query id=rule2 <![CDATA[RSTREAM (SELECT *
       FROM StockTradeIStreamChannel [ROWS 2 >
    </query>
  </rules>
</processor>
6.3.2.3 NOWウィンドウおよび最終イベント・ウィンドウ

NOWウィンドウには、システムの最後のティックで発生したイベントが含まれます。NOW演算子を使用すると、最後の入力イベントが次回のタイム・ティック(新しいNOW)で削除される場合があるため、目的の結果が得られない可能性があります。実際に最後の入力イベントが必要な場合は、最終イベント・ウィンドウを使用してください。次の例は、NOWウィンドウを構築する方法を示します。

SELECT * FROM StockTradeIStream[NOW]

次の構成ファイル・エントリは、NOW演算子を示します。

<processor>
  <rules>
    <query id=rule2 <![CDATA[ISTREAM (SELECT *
       FROM StockTradeIStreamChannel [NOW>
    </query>
  </rules>
</processor>

最終イベント・ウィンドウでは、受信した最終イベントが捕捉されます。次の例は、最終イベント・ウィンドウを構築する方法を示します。

SELECT * FROM StockTradeIStream[ROWS 1]

次の構成ファイル・エントリは最終イベント・ウィンドウを示します。

<processor>
  <rules>
    <query id=rule2 <![CDATA[ISTREAM (SELECT *
       FROM StockTradeIStreamChannel [ROWS 1 >
    </query>
  </rules>
</processor>

6.3.3 プロセッサ出力制御(スライド)

問合せ結果を発生するたびに出力するかわりに、サブ句でSLIDE演算子を使用して出力イベントをバッチ処理できます。イベントは、ROW演算子を使用する場合はイベントの数に基づいてバッチ処理でき、RANGE演算子を使用する場合は時間の長さ(時間ウィンドウ)に基づいてバッチ処理できます。

注意:

スライド値が指定されていない場合、問合せでは、タプルベースのウィンドウの場合はデフォルト値の1、時間ベースのウィンドウの場合は1タイム・ティックが想定されます。

イベント数別のバッチ処理

次の例では、2イベントごと(2、4、6、8、...)に出力しています。

SELECT * FROM StockTradeIStreamChannel[ROWS 3 SLIDE 2]

SLIDE演算子からの出力には、削除されたイベントが含まれます。最初の2つのイベントがリレーションに到着すると、問合せにより両方のイベントがストリームに出力されます。次のイベントが到着すると、リレーション内には3つのイベントが存在しますが、出力が次に行われるのは4番目のイベントのときです。4番目のイベントが到着すると、最初のイベントが削除され、3番目と4番目のイベントとともに出力が行われます。

次の例は、RSTREAM演算子とともにスライドを使用する方法を示します。この場合、4番目のイベントが到着すると、イベント2、3および4が出力ストリームに送信されます。RSTREAM演算子は、イベントが削除または更新されたかどうかとは関係なく、すべてのイベントを出力ストリームに送信します。

RSTREAM(SELECT * FROM StockTradeIStreamChannel[ROWS 3 SLIDE 2])

次の構成ファイル・エントリは、RSTREAMを使用して数別にバッチ処理しています。

<processor>
  <rules>
    <query id=rule2 <![CDATA[RSTREAM (SELECT *
       FROM StockTradeIStreamChannel [ROWS 3 SLIDE 2 >
    </query>
  </rules>
</processor>

時間ウィンドウ別のバッチ処理

時間ウィンドウの場合、時間間隔(RANGE演算子)別にイベントがOracle Stream Analyticsによってバッチ処理されます。時間間隔を指定すると、Oracle CQLにより指定した数の倍数ごとにイベントがまとめて出力ストリームに送信されます。たとえば、5秒を指定すると、イベントは5、10、15、20、...秒ごとに送信されます。最初のイベントがこの間隔内で1、2または3秒に到着した場合、最初の出力は他の出力より小さくなります。

次の例では、5分間の範囲を30秒ごとのスライドを使用して指定しています。

SELECT * FROM StockTradeIStream[RANGE 5 MIN SLIDE 30 SECONDS]

次の構成ファイル・エントリは、時間ベースのスライドを示しています。

<processor>
  <rules>
    <query id=rule2 <![CDATA[RSTREAM (SELECT *
       FROM StockTradeIStreamChannel [RANGE 5 MIN 30 SECONDS >
    </query>
  </rules>
</processor>

6.3.4 ビュー

ビューを使用すると、他のOracle CQL問合せで再使用できるOracle CQL文を作成できます。view要素には、Oracle CQLのサブ問合せ文が含まれます。view要素のid属性はビュー名を定義します。view要素で作成した最上位のSELECT文はビューと呼ばれます。

注意:

サブ問合せは、UNION、UNION ALLおよびMINUSなどのバイナリ・セット演算子とともに使用されます。サブ問合せでは、問合せに正しい優先順位が適用されるようカッコを使用する必要があります。

次の例は、ビューv1、およびビューに対する問合せq1を示します。このビューは、xmltypeストリーム要素を持つストリームS1から選択します。ビューv1は、XMLTABLE句でXPath式を使用して、xmltypeストリーム要素のデータを解析します。問合せq1は他の任意のデータ・ソースの場合と同様に、ビューv1から選択します。XMLTABLE句はXMLネームスペースもサポートしています。

xmltypeストリームにはXMLデータが含まれます。Oracle CQLのXMLTABLE句を使用すると、XPath式を使用してxmltypeストリームのデータを列に解析し、列名でデータにアクセスできます。XPath式を使用すると、XMLドキュメント内の要素および属性間をナビゲートできます。

注意:

ビューのスキーマのデータ型は、COLUMNS句で解析されたデータのデータ型と一致します。

<view id="v1" schema="orderId LastShares LastPrice"><![CDATA[
  SELECT
    X.OrderId,
    X.LastShares,
    X.LastPrice
  FROM S1, XMLTABLE (
   "FILL"
   PASSING BY VALUE
   S1.c1 as "."
   COLUMNS
      OrderId char(16) PATH "fn:data(../@ID)",
      LastShares integer PATH "fn:data(@LastShares)",
      LastPrice float PATH "fn:data(@LastPx)"
   ) as X
></view>

<query id="q1"><![CDATA[
  IStream(
    select
      orderId,
      sum(LastShares * LastPrice),
      sum(LastShares * LastPrice) / sum(LastShares)
    from
      v1[now]
    group by orderId
  )
></query>

6.4 CQL集計

Oracle CQLは、増分形式計算用のAVGCOUNTSUMなどの集計関数、および増分形式計算には関連しないMAXおよびMINをサポートしています。

集計関数によってイベントがJavaコレクションに集計されるため、コレクションAPIを使用してイベントを操作できます。

集計された結果に関する条件を確認するには、HAVING句を使用します。次の例では、50を超える平均のみが出力されます。

SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 HOUR]
HAVING AVG(price) > 50

Oracle CQLには、Coltオープン・ソース・ライブラリに基づいた、高性能な科学技術計算のための様々な組込みの単一行関数および集計関数が用意されています。Coltライブラリの一部として使用可能な関数は、Big Decimalデータ型またはNULL入力値はサポートしません。また、関数の値計算は増分形式ではありません。詳細は、COLTのWebサイトを参照してください。

6.5 表ソースの構成

関連付けられたデータ・ソースを使用して表コンポーネントを作成することにより、Oracle CQL問合せからリレーショナル・データベース表にアクセスできます。Oracle Stream Analyticsのリレーション・データベース表イベント・ソースはプル・データ・ソースです。つまり、Oracle Stream Analyticsにより、イベント・ソースが定期的にポーリングされます。

  • NOWウィンドウのあるストリームのみを、単一のデータベース表にのみ結合できます。

    表ソースの変更とストリーム・データはすぐには連係されないため、表ソースはNowウィンドウを使用してイベント・ストリームにのみ結合でき、また単一のデータベース表にのみ結合できます。

  • Oracle JDBCデータ・カートリッジを使用して、任意の複雑なSQL問合せおよび複数の表とデータソースをOracle CQL問合せに統合できます。

    注意:

    Oracle CQL文からリレーショナル・データベース表にアクセスするにはOracle JDBCデータ・カートリッジを使用することをお薦めします。

NOWウィンドウとデータ・カートリッジのどちらを使用する場合でも、Oracle Stream Analyticsサーバー・ファイルにデータ・ソースを定義する必要があります。

6.5.1 アセンブリ・ファイル

次のアセンブリ・ファイル・エントリは、id属性がStockである表ソースの設定を示します。

<wlevs:table id="Stock" event-type="TradeEvent" data-source="StockDataSource"/>

Oracle Stream Analyticsでは、イベント・タイプおよびデータ・ソースを使用して、リレーショナル表の行をイベント・タイプにマップします。TradeEventイベント・タイプは、リレーショナル・データベース内の列にマップされる5つのプライベート・フィールド(symbolpricelastPricepercChangeおよびvolume)があるJavaクラスから作成されます。

注意:

XMLTYPEプロパティは表ソースに対してサポートされていません。

6.5.2 構成ファイル

  <data-source>
    <name>StockDs</name>
    <connection-pool-params>
      <initial-capacity>1</initial-capacity>
      <max-capacity>10</max-capacity>     
    </connection-pool-params>
    <driver-params>
      <url>jdbc:derby:</url>
      <driver-name>org.apache.derby.jdbc.EmbeddedDriver</driver-name>
      <properties>
        <element>
          <name>databaseName</name>
          <value>db</value>
        </element>
        <element>
          <name>create</name>
          <value>true</value>
        </element>
      </properties>
    </driver-params>
    <data-source-params>
      <jndi-names>
        <element>StockDs</element>
      </jndi-names>
      <global-transactions-protocol>None</global-transactions-protocol>
    </data-source-params>
  </data-source>

構成後、別のイベント・ストリームであるかのようにStock表にアクセスするOracle CQL問合せを定義できます。

次の例では、問合せはStockTradeIStreamChannelイベント・ストリームをStock表と結合しています。

SELECT StockTradeIStreamChannel.symbol, StockTradeIStreamChannel.price,
         StockTradeIStream.lastPrice, StockTradeIStream.percChange,
         StockTradeIStream.volume, Stock
FROM   StockTraceIStreamChannel [Now], Stock
WHERE  StockTradeIStreamChannel.symbol = Stock.symbol

表ソースの変更とストリーム・データはすぐには連係されないため、表ソースはNowウィンドウを使用してイベント・ストリームにのみ結合でき、また単一のデータベース表にのみ結合できます。

6.6 パラレル問合せ実行用のOracle CQLプロセッサの構成

パフォーマンス向上のため、CQL問合せを、デフォルトで行われているシリアル実行ではなく、パラレル実行できるようにすることが可能です。

CQLコードがこれをサポートしている場合、CQLプロセッサで複数のスレッドが使用可能なときは、着信イベントをパラレルに実行するように問合せを構成できます。

問合せ出力イベントの相対的な順序が問合せのダウンストリーム・クライアントにとって重要ではない場合のみ、パラレル問合せ実行を有効にします。たとえば、トランザクション・シーケンスが無関係な、特定の会社に関係する一連の株式取引を顧客に伝えるなど、問合せの主な目的がイベントのフィルタ処理である場合、イベントの順序付けは重要ではないと考えられます。

デフォルト(パラレル実行を有効にしない)では、問合せはチャネルからイベントをシリアルに処理します。システム・タイムスタンプを使用するチャネルを経由してルーティングされるイベントの場合、イベント順序は、イベントが受信される順序です。アプリケーションによってタイムスタンプが付加されるチャネルを経由する場合、イベント順序は、イベントに含まれるタイムスタンプの値によって決定された順序です。全体順序の制約を緩和すると、構成された問合せは、可能な場合はイベントをパラレルに処理しながら、その問合せのイベント順序を考慮しないようにできます。

6.6.1 パラレル問合せ実行のサポートの設定

パラレル問合せ実行のサポートの指定が主に単純な構成タスクである場合は、機能を最大限に活用できるように、次の手順に必ず従ってください。

  • パラレル実行をサポートするように、ordering-constraint属性を使用します。

  • プロセッサに呼び出すスレッドが、パフォーマンスの目標を達成するのに十分であることを確認します。パラレル問合せ実行の最大量は、CQLプロセッサで使用できるスレッドの数によって制約されます。たとえば、プロセッサのアダプタ・アップストリームが必要なスレッドの数をサポートし、アダプタとプロセッサの間にチャネルがある場合、パススルーとして機能するように、max-threadsの数を0にしてチャネルを構成してください。

    パススルーを行わない場合、必ずmax-threadsの値を1より大きくして、問合せのアップストリーム・チャネルを構成してください。(max-threadsの値の設定を有用にするには、max-size属性を0より大きい値に設定する必要もあります。)詳細は、チャネルを参照してください。

  • max-threads属性値の設定に関連するその他のガイドラインに従います。たとえば、max-threadsの値の設定を有用にするには、max-size属性を0より大きい値に設定する必要もあります。

  • 必要に応じて、同期化されたブロックの使用などによって、問合せ結果を受け取るBeanがスレッドを認識していることを確認します。たとえば、Beanのコードによって、複数のスレッドで実行された問合せから受け取った結果のリストを作成する場合に、これを行う必要がある可能性があります。

6.6.2 ordering-constraint属性

イベントがシリアルに処理されることを保証する、デフォルトの順序付けの制約を緩和することで、パラレル問合せ実行を有効にします。これを行うには、queryまたはview要素でordering-constraint属性を設定します。

次の例では、可能であれば常に、問合せをパラレルに実行するように、ordering-constraint属性をUNORDEREDに設定します。

<query id="myquery" ordering-constraint="UNORDERED">
    SELECT symbol FROM S WHERE price > 10
</query>

ordering-constraint属性は、次の3つの値をサポートします。

  • ORDEREDは、出力イベントの順序は(入力イベントの順序で暗示されるように)重要であることを意味します。CQLエンジンはイベントをシリアルに処理します。これはデフォルトの動作です。

  • UNORDEREDは、出力イベントの順序は出力イベントのコンシューマにとって重要ではないことを意味します。これにより、CQLプロセッサはイベントを複数のスレッドでパラレルに処理できるようになります。可能な場合、問合せは複数のスレッドでパラレルに実行して、イベントを処理します。

  • PARTITION_ORDEREDは、異なるパーティション間の出力イベントの順序が出力イベントのコンシューマにとって重要ではない場合に、パーティション内の出力イベントの順序が(入力イベントの順序で暗示されるように)保持されるよう指定していることを意味します。このような緩和により、CQLエンジンは、パーティション間のイベントを複数のスレッドでパラレルに(可能な場合)処理できるようになります。

指定したパーティションに従っているイベントをシリアルに処理するが、パーティション間の順序は無視し、異なるパーティションに属するイベントをパラレルに処理するよう指定する場合に、Use the PARTITION_ORDEREDの値を使用します。PARTITION_ORDEREDの値を使用する場合、partition-expression属性を追加して、パーティション化用のどの式をパーティション間の順序付けの制約を緩和するための基準とするかも指定する必要があります。

次の例では、GROUP BY句はsymbolの値に基づいて出力をパーティション化します。partition-expression属性は、特定のsymbolの値に対応するイベントの指定されたサブセット内のイベントをシリアルに処理するよう指定します。一方、パーティション間では順序を無視できます。

<query id="myquery" ordering-constraint="PARTITION_ORDERED"
    partitioning-expression="symbol">
    SELECT
        COUNT(*) as c, symbol
    FROM
        S[RANGE 1 minute]
    GROUP BY
        symbol
</query>

6.6.3 問合せのパーティション化でのpartition-order-capacityの使用

一般的に、より多くのスレッドを使用可能にして、可能な場合はパラレルで実行できるようにordering-constraint属性を設定することによって、問合せのパフォーマンスが向上すると考えられます。多くのパフォーマンス・チューニング技術と同様、これらの設定で少しずつ試行錯誤を重ねて、より良い結果となる組合せを見つける必要があります。

しかし、問合せでパーティション化を使用し、ordering-constraint属性をPARTITION_ORDEREDに設定した場合でも、期待するスケーリングの量にならない可能性があります。たとえば、4つのスレッドで実行しても2つのスレッドで実行する場合に比べてパフォーマンスがあまり向上しない場合を考えます。このような場合、パーティションを含む問合せの操作でCQLエンジンの特性を最大限に活用するために、partition-order-capacityの値を使用できます。

partition-order-capacityの値は、PARTITION_ORDEREDの問合せを処理する際に指定したプロセッサ・インスタンス内で許可されている並列の最大量を指定します。使用可能なスレッドが異なるパーティションに属するイベントを処理している場合、値は問合せ内で同時に実行できるスレッドの最大数を設定します。

他のパフォーマンス・チューニングの側面と同様、partition-order-capacityを最大限に活用するために、少し実験を行うことがあります。partition-order-capacityを調整する場合、最初に行うこととして適切なのは、CQLプロセッサ・インスタンスでアクティブにするスレッドの最大数にそれを設定することです。場合によっては(たとえば、データが高速である、CQLプロセッサからダウンストリームを処理することが困難など)、partition-order-capacityの値を使用可能なスレッド数より大きく設定しても、有用なことがあります。ただし、パフォーマンス・テストが指定したアプリケーションおよび負荷にとって有用であることを確認する場合、これのみを行う必要があります。

partition-order-capacityの値は4つの場所のうち1つから設定され、2つはユーザーが明示的に設定しない場合のフォールバックとなります。

次に、優先順位を示します。

  1. チャネル構成に設定されたpartition-order-capacity要素。プロセッサの入力チャネルでこれを指定した場合、そのプロセッサのPARTITION_ORDERED問合せに対して有効になります。

  2. サーバー構成に設定されたpartition-order-capacityプロパティ。チャネルに値が設定されない場合、サーバー上で実行中のすべてのPARTITION_ORDERED問合せに対してこの値が使用されます。

  3. チャネル構成に設定されたmax-threadsの値。プロセッサの入力チャネルでこれを指定した場合、そのプロセッサのPARTITION_ORDERED問合せに対して有効になります。

  4. partition-order-capacityの値とmax-threadsの値のいずれも指定しない場合、またはmax-threadsの値を0に設定した(つまりパススルー・チャネルである)場合に、システムのデフォルト値(現在は4に設定)が使用されます。

partition-order-capacityを使用する場合は、次の点に留意してください。

  • partition-order-capacityの値は、ordering-constraint属性をPARTITION_ORDEREDに設定している場合のみ有用になります。

  • partition-order-capacityを増やすと、通常、並列度およびスケーリングが増加します。たとえば、プロファイリングにロック競合のボトルネックが発生した場合、partition-order-capacityを増やして競合が減少しているかどうかを確認することが有用である可能性があります。

  • パーティション化をCQLプロセッサで行う特定の方法により、partition-order-capacityを使用可能なスレッド数より大きく設定しても、場合によっては有用なことがあります。

  • 非常に高い値を指定すると、使用するメモリーでリソース・コストがかかります。

  • このパラメータの調整は、アプリケーションおよび入力率の詳細に大きく依存します。実験によって調整することは、最適な値を決定するのに必要な場合があります。

6.6.4 制限事項

可能であれば常にCQLプロセッサによって使用できるように、サポートを指定するパフォーマンス強化機能として、パラレル問合せ実行を検討してください。すべての問合せをパラレルで実行できるわけではありません。これには、特定のCQL言語機能を使用した問合せが含まれています。

たとえば、問合せで集計のフォーム(値の範囲から最大値を検索するなど)を使用する場合、CQLプロセッサは問合せを完全にはパラレルで実行できない可能性があります(これには、順序付けの制約を考慮して正しい結果を保証する必要があります)。一部の問合せのセマンティクスがそれ自体でも順序付け処理への問合せを制限しています。パラレル実行のサポートを指定しているかどうかに関係なく、このような問合せはシリアルで実行されます。

また、IStreamRStreamおよびDStream演算子は、処理のためにそのオペランドの状態を維持し、問合せを実行するために、CQLプロセッサがスレッドを同期化する必要があるようにします。

CQLプロセッサは、常に問合せのセマンティク目的に配慮していることに注意してください。ordering-constraint属性によってこの目的を変更する場合、この属性は目的を元のままに残す値に強制変換されます。

partitioning-expression属性を使用している場合、この属性は単一の式のみをサポートしていることに留意してください。値への複数のプロパティ名の入力はサポートされていません。

6.7 フォルト処理

固有のフォルト処理メカニズムを持たないコードで発生するフォルトを処理するコードを書き込むことができます。これには、Oracle CQLコードおよびマルチスレッドEPNチャネルが含まれます。

デフォルトでは、try/catch構造を使用するJavaと同様に、CQL言語には発生するエラーを処理するメカニズムがありません。CQLで発生するフォルトを処理するには、フォルト・ハンドラを書き込み、ハンドラをOracle CQLプロセッサなどのフォルトを処理するEPNステージに接続します。

フォルト・ハンドラとマルチスレッド・チャネル(つまり、max-threads設定が0より大きいチャネル)を関連付けることもできます。これにより、チャネルのダウンストリームであるステージからチャネルにスローされる例外が発生した場合のフォルト処理が提供されます。max-threads設定が0であるチャネルはアップストリーム・ステージに例外をすでに再スローしているパススルー・チャネルであることに注意してください。チャネルのフォルト・ハンドラに固有の追加情報は、フォルト処理を参照してください。

フォルト・ハンドラは、com.bea.wlevs.ede.api.FaultHandlerインタフェースを実装するJavaクラスです。OSGiサービスとしてフォルト・ハンドラを登録してステージに関連付けることによって、クラスをEPNステージに接続します。OSGiの詳細は、Springフレームワークを参照してください。

カスタム・フォルト・ハンドラを使用しない場合、次のデフォルトのフォルト処理動作を取得します。

  • 例外がOracle CQLで発生する場合、CQLエンジンは例外を捕捉し、問合せプロセッサを停止します。

  • プロセッサのダウンストリームであるステージで例外が発生する場合、そのステージはリスナーとして削除されます。

  • 例外がログに記録され(CQLServerカテゴリの下)、例外句の一部であるイベントが破棄されます。

  • アップストリーム・ステージは障害が通知されません。

書き込むカスタム・フォルト・ハンドラを使用する場合、次の操作を実行できます。

  • ステージのフォルトをハンドラに例外としてスローするよう、フォルト・ハンドラとOracle CQLプロセッサまたはマルチスレッド・チャネルを関連付けます。そのため、例外を処理または再スローできます。

  • コードで例外を処理するか、次のアップストリームのステージに再スローする場合、問合せ処理を続行できます。

  • フォルトを処理する場合にイベント・データの損失を防ぎます。たとえば、データ・ソースへの接続を構成した場合、そこでイベント・データを保存できます。

  • フォルトが発生した場合にフォルトおよびイベント情報をログに記録します。

  • アップストリームでスローされた例外が他のOracle CQLプロセッサおよびチャネルに到達する場合にこれらが処理されるよう、EPNで必要な場合に複数のフォルト・ハンドラを使用します。

Oracle CQLプロセッサおよびマルチスレッド・チャネルを含むフォルトに対応する固有のメカニズムがないステージにフォルト・ハンドラを関連付けることを検討してください。固有の例外処理モデルを持つカスタム・アダプタなどの他のステージには、フォルト・ハンドラの利点がありません。

Oracle CQLプロセッサのアップストリームにあるステージへのフォルトの再スローを含む、実行するアクションを決定するためにフォルト処理コードでフォルトが評価される場合、問合せを続行できます。

たとえば、Oracle CQLプロセッサのアップストリーム・ステージは、JMSトランザクションをロールバックして(セッションがトランザクションである場合)イベントを再送信できるJMSサブスクライバ・アダプタである可能性があります。イベントがすでに再送信され、問題を解決できないことがわかった場合、トランザクションをコミットすることもできます。

カスタム・フォルト・ハンドラを使用する場合、問合せが停止して再起動したかのように、問合せ状態がフォルトの後にリセットされます。対照的に、デフォルトの動作の場合は、問合せが停止し、すべての後続のイベントが削除されます。

6.7.1 フォルト・ハンドラ・クラスの実装

com.bea.wlevs.ede.api.FaultHandlerインタフェースを実装して、フォルト・ハンドラ・クラスを作成します。クラスを書き込んだ後、OSGiサービスとして登録してフォルトを処理するステージと関連付けます。詳細は、フォルト・ハンドラの登録を参照してください。

handleFaultメソッドの実装は、ハンドラを関連付けるEPNステージの例外を受信します。例外自体は、com.bea.wlevs.ede.api.EventProcessingExceptionのインスタンスまたはJVMエラーが発生した場合のjava.lang.Errorのインスタンスです。

このメソッドは、コードで再スローする場合に例外が通過するアップストリーム・ステージの名前(捕捉コード)を含む文字配列も受信します。配列に複数の捕捉コードがある場合、再スローされた例外はそれらすべてを通過します。捕捉コードの配列が空になる場合は、一時的な問合せの実行中に例外が発生する場合と例外がチャネルのフォルト・ハンドラにスローされる場合の2つがあります。これらの場合、フォルト・ハンドラはバックグラウンド・スレッドのコンテキストで実行され、アップストリーム・ステージとのリンクはありません。

フォルト・ハンドラから再スローされる例外は、捕捉されるか、または捕捉できないステージ(関連付けられたフォルト・ハンドラがないプロセッサまたはマルチスレッド・チャネルなど)に到達するまで、アップストリームEPNステージ内を移動します。例外を再スローする場合、捕捉コード・リスト内のチャネルは例外を捕捉するために関連付けられたフォルト・ハンドラを持つ必要があるので注意してください。

EventProcessingExceptionインスタンスは、このクラスを拡張する例外タイプの1つ(CQLExecutionExceptionArithmeticExecutionExceptionおよびその他を含む)でもある場合があります。Oracle Stream Analytics Java APIリファレンスを参照してください。EventProcessingExceptionインスタンスは、コードでフォルトの生成に含まれたイベントの挿入、削除および更新を取得できるメソッドを提供します。

メソッドの実装は、次のいずれかを実行する必要があります。

  • Javaのtryおよびcatch文のようにフォルトを使用します。実装でフォルトを再スローしない場合、イベント処理は後続のイベントを続行します。ただし、問合せを再起動したかのように、問合せ処理は状態をリセットして続行します。処理状態が失われ、フォルトを発生させたイベントの後のイベントから処理を新たに開始します。

  • アップストリーム・ステージ(またはフォルト・ハンドラ)で受信されるよう、フォルトを再スローします。フォルトを使用する場合のように、問合せ状態が後続のイベントでリセットされますが、問合せはイベントの処理を続行します。フォルトを受信するアップストリーム・ステージには、CQLプロセッサのMBeanインタフェースを使用して、問題のある問合せを明示的に停止するオプションが常にあります。

    注意:

    MBeanでOracle CQL問合せを更新する場合、更新手順中はイベントを送信しないでください。いくつかの問合せの最中にイベントを送信した場合、出力ストリーム中のイベントの順序は保証されません。たとえば、Oracle CQLのパラレル実行でOracle CQL問合せを順序付けなしから順序付けありに更新する場合です。

次の例では、コードでフォルトを処理する上位レベルの説明を示します。

package com.example.faulthandler;

import com.bea.wlevs.ede.api.FaultHandler;

public class SimpleFaultHandler implements FaultHandler
{
    private String suppress;

    // Called by the server to pass in fault information.
    @Override
    public void handleFault(Throwable fault, String[] catchers) throws Throwable
    {
        // Log the fault.
        return;
    }
}

6.7.2 フォルト・ハンドラの登録

フォルト処理クラスを書き込んだ後、OSGiサービスとして登録して、EPNステージに関連付けることができます。これを実行する最も単純な方法は、EPNアセンブリ・ファイルで宣言的にハンドラを登録することです。

注意:

固有のOSGi動作のため、構成からランタイム・フォルト・ハンドラ登録が非同期的に発生します。つまり、ハンドラがフォルトを受信する前に少しのウォームアップ時間が必要になる場合があります。ネットワークに届く最初のイベントに対してハンドラの準備が整っているようにするには、アプリケーションでイベントの受信を開始する前に待機期間を追加します。

次の例では、EPNアセンブリ・ファイルの引用は、idexampleProcessorであるOracle CQLプロセッサのフォルト・ハンドラとしてSimpleFaultHandlerクラスを登録するservice要素節を示します。

<osgi:service interface="com.bea.wlevs.ede.api.FaultHandler">
    <osgi:service-properties>
        <entry key="application.identity" value="myapp"/>
        <entry key="stage.identity" value="exampleProcessor"/>
    </osgi:service-properties>
    <bean class="com.example.faulthandler.SimpleFaultHandler"/>
</osgi:service>

<!-- A processor with a user-defined function. -->
<wlevs:processor id="exampleProcessor" >
    ...
</wlevs:processor>

OSGiサービスを登録するスキーマの詳細は、http://static.springsource.org/osgi/docs/1.1.x/reference/html/appendix-schema.htmlを参照してください。OSGiの詳細は、http://en.wikipedia.org/wiki/OSGiを参照してください。