この節では、次の項目について説明します。
Oracle CEPアプリケーションには、1つ以上のチャネル・コンポーネントが含まれます。チャネルは、アダプタとプロセッサ間やプロセッサとイベントBean(ビジネス・ロジックPOJO)間のような、他のコンポーネント同士をイベントが往来する物理的なパイプを表します。
チャネルはストリームとリレーションの両方に使用できます。詳細は、8.1.2項「ストリームおよびリレーションを表すチャネル」を参照してください。
イベント処理ネットワーク(EPN)でチャネルを作成する場合、デフォルト構成があります。完全な詳細は、C.10項「wlevs:channel」を参照してください。
デフォルトのチャネル構成は、通常、大半のアプリケーションに適しています。
ただし、この構成を変更する場合は、コンポーネント構成ファイル内にchannel要素を作成する必要があります。このchannel要素では、デフォルト構成をオーバーライドするチャネル構成を指定できます。
コンポーネント構成ファイルのchannel要素のname要素は、EPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。たとえば、例8-1に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例8-2で示されます。
例8-1 EPNアセンブリ・ファイルのチャネルID: priceStream
<wlevs:channel id="priceStream" event-type="PriceEvent">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
例8-2 コンポーネント構成ファイルのチャネル名: priceStream
<channel>
<name>priceStream</name>
<max-size>10000</max-size>
<max-threads>4</max-threads>
</channel>
次のコンポーネント構成ファイルのいずれかでchannel要素を作成できます。
デフォルトのOracle CEPアプリケーションの構成ファイル(デフォルトでは、META-INF/wlevs/config.xml)。
個別の構成ファイル。
アプリケーションが1つ以上のチャネルを持つ場合、デフォルトのconfig.xmlファイル内でそれぞれのチャネルのchannel要素を作成したり、それぞれのMETA-INF/wlevsで個別のXMLファイルを作成したり、すべてのチャネルの構成またはアプリケーションのすべてのコンポーネント(アダプタ、プロセッサ、およびチャネル)さえも含むMETA-INF/wlevsで単一のXMLファイルを作成したりできます。開発環境に最適なメソッドを選択します。
デフォルトでは、Oracle CEP IDE for Eclipseは1つのコンポーネント構成ファイルと1つのEPNアセンブリ・ファイルを作成します。
コンポーネント構成ファイルは、Oracle CEPアプリケーション・バンドルの一部としてデプロイされます。Oracle CEP Visualizer、wlevs.Adminユーティリティを使用するか、または適切なJMX Mbeanを直接操作して、後で実行時にこの構成を更新できます。
詳細は、次を参照してください:
『Oracle CEP Visualizerユーザーズ・ガイド』
『Oracle CEP管理者ガイド』のwlevs.Adminコマンドライン・リファレンス
『Oracle CEP管理者ガイド』のOracle CEP用JMXの構成に関する項
Oracle CQLプロセッサをダウンストリーム・ステージに接続するとき、チャネルは必須です。
プッシュ・ソース・ストリームまたはリレーションをプロセッサに接続するとき、チャネルは必須です。
チャネルは、Oracle CQLプロセッサがその形状を認識される必要があり(つまり、DDLが必須)、チャネルも仲介者として機能する必要があるため、プッシュ・ソースに必須です。
外部リレーション、またはキャッシュや表のソースのようなプル・ソースをプロセッサに接続するとき、チャネルはオプションです。
チャネルは、キャッシュや表などのプル・ソースとプロセッサの間では、プル・ソースが外部リレーションを示すため必要ありません。外部リレーションでは、有効な操作はストリームとNOWウィンドウ演算子間の結合のみとなるため、プル・ソースとみなされます。つまり、結合は実際には、Oracle CQLプロセッサの外部で発生します。これはプルであるため、Oracle CQLプロセッサはその形状を認識される必要はありません(つまり、DDLは必要ありません)。そのため、チャネルを仲介者として機能させる必要もありません。
通常、コンポーネント間のチャネルは次の場合に使用します。
バッファリングは、発行するコンポーネントと受信側の間で必要です。
受信側のコンポーネントにはキューイングまたは同時実行性が必要です。
カスタム・アダプタが使用される場合は、スレッド制御が必要です。
EPNにチャネルを含めることは、今後の設計ライフサイクルでパフォーマンス・チューニング(バッファリング、キューイング、および同時実行性のオプションを使用)の柔軟性を提供するため、設計上よく行われます。チャネルの属性max-threadsを0に設定すると、パススルー・モードでチャネルを設置し、パフォーマンス・ペナルティを起こしません。詳細は、表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。
チャネルは、ストリームまたはリレーションのいずれかを示します。
ストリームは追加のみサポートします。EPNアセンブリ要素のwlevs:channel属性のis-relationをfalse(デフォルト)に設定することによって、チャネルをストリームとして指定できます。
リレーションは、挿入、削除、および更新をサポートします。EPNアセンブリ要素のwlevs:channel属性のis-relationをtrueに設定することによって、チャネルをリレーションとして指定できます。
詳細は、1.1.3項「EPNにおけるイベントの送信: ストリームおよびリレーションのソースとシンク」を参照してください。
デフォルトでは、チャネルはシステムにタイムスタンプを付けられます。この場合、Oracle CEPは2つの条件下でCPUクロックから新しい時間を割り当てます。新規イベントの到着時、および構成可能なハートビートのタイムアウトが期限切れになったとき。
詳細は、次を参照してください:
オプションで、チャネルをアプリケーションがタイムスタンプを付けるように構成できます。この場合、イベントのタイムスタンプは、構成可能なwlevs:expressionタグによって判定されます。式のよくある例は、イベントでのプロパティへの参照です。式が指定されていない場合、タイムスタンプは前のイベントから伝播されます。たとえば、別のダウンストリームOracle CQLプロセッサのアプリケーション・タイムスタンプ・チャネルにイベントを送信する1つのOracle CQLプロセッサからのシステム・タイムスタンプ・チャネルを持つ場合などです。
また、アプリケーションはStreamSender.sendHeartbeatメソッドを使用して、イベント型heart-beatダウンストリームをEPNのStreamSinkリスナーに送信することが可能です。
詳細は、次を参照してください:
チャネルを手動で構成するか、またはOracle CEP IDE for Eclipseを使用して構成できます。
チャネル・コンポート構成ファイルを記述する完全なXSDスキーマについては、B.2項「コンポーネント構成スキーマwlevs_application_config.xsd」を参照してください。
チャネル構成ファイルの完全な例については、8.3項「チャネル構成ファイルの例」を参照してください。
この節では、次のトピックについて説明します。
8.2.1項「Oracle CEP IDE for Eclipseを使用してシステム・タイムスタンプ・チャネルを構成する方法」
8.2.2項「Oracle CEP IDE for Eclipseを使用してアプリケーション・タイムスタンプ・チャネルを構成する方法」
この項では、システム・タイムスタンプ・チャネルの作成方法を説明します。
デフォルトのコンポーネント構成ファイルでチャネル構成を作成・編集するための最も効率的でエラーの発生が少ない方法は、Oracle CEP IDE for Eclipseを使用することです。オプションで、チャネル構成ファイルを手動で作成できます(8.2.3項「手動によるチャネル・コンポーネント構成ファイルの作成方法」を参照)。
詳細は、次を参照してください:
Oracle CEP IDE for Eclipseを使用してチャネルを構成するには:
Oracle CEP IDE for Eclipseを使用してチャネルを作成します。
5.4.1.1項「基本ノードの作成方法」を参照してください。
オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。
チャネル・ノードを右クリックし、「アセンブリのソースに移動」を選択します。
適切なwlevs:channel属性を追加します。
特に、is-relation属性を構成することによって、このチャネルがストリームまたはリレーションかどうかを指定します。
このチャネルをストリームに指定するには、is-relationをfalse(デフォルト)に設定します。
このチャネルをリレーションに指定するには、is-relationをtrueに設定します。
表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。
適切なwlevs:channel子要素を追加します。
プロジェクト・エクスプローラで、META-INF/wlevsディレクトリを展開します。
使用するコンポーネント構成ファイルを選択します。
デフォルトのコンポーネント構成ファイルを使用するには、META-INF/wlevs/config.xmlファイルを右クリックし、次で開く>XMLエディタを選択します。
ファイルがXMLエディタで開きます。
新しいコンポーネント構成ファイルを作成するには:
wlevsディレクトリを右クリックし、「新規」>「ファイル」を選択します。
新規ファイルダイアログが表示されます。
ファイル名を入力します。
名前は任意で付けることができますが、ファイル名の最後は.xmlにする必要がります。
「終了」をクリックします。
Oracle CEP IDE for Eclipseは、コンポーネント構成ファイルをwlevsディレクトリに追加します。
使用するように選択したコンポーネント構成ファイルを右クリックし、次で開く>XMLエディタを選択します。
ファイルがXMLエディタで開きます。
新しいコンポーネント構成ファイルを作成した場合、例8-3で示されているヘッダーおよびconfig要素を追加します。それ以外の場合は、ステップ7に進みます。
例8-3 コンポーネント構成ファイルのヘッダーおよびconfig要素
<?xml version="1.0" encoding="UTF-8"?> <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" xmlns:n1="http://www.bea.com/ns/wlevs/config/application" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> </config>
チャネルのchannel要素を例8-4で示されるように追加します。
例8-4 コンポーネント構成ファイルのチャネル要素
<?xml version="1.0" encoding="UTF-8"?>
<n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd"
xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
...
</processor>
...
<channel>
</channel>
</config>
name子要素をchannel要素に追加します。
name要素の値は、対応するEPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。
たとえば、例8-5に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例8-6で示されます。
例8-5 EPNアセンブリ・ファイルのチャネルID: priceStream
<wlevs:channel id="priceStream" event-type="PriceEvent">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
|
注意: XMLファイルの識別子および名前は大/小文字が区別されます。EPNアセンブリ・ファイル内のコンポーネントの識別子を参照するときは、必ず大/小文字を同じにして指定します。 |
オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。
max-threads子要素を追加すると、Oracle CEPサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。
max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-threads>2</size>
</channel>
0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。
max-size子要素を追加すると、チャネルの最大サイズを指定できます。
サイズがゼロのチャネルは、同期的にイベントを受け渡します。
サイズがゼロでないチャネルはイベントを非同期的に処理し、要求されたサイズに従ってイベントをバッファリングします。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-size>10000</size>
</channel>
heartbeat子要素を追加すると、Oracle CEPが時間を進めるためにハートビート・イベントを生成する前にチャネルが待機できる時間をナノ秒数で指定できます。
heartbeatの子要素は、プロセッサに送信を行うイベント・チャネルにイベントが到着しない場合のみ、システムがタイムスタンプを付けた関係またはストリームに適用されます。プロセッサは、時間ベースのウィンドウや期間のパターン照合などの、一時的な演算子をいくつか含むステートメントを用いて構成されています。
<channel>
<name>MatchOutputChannel</name>
<heartbeat>10000</heartbeat>
</channel>
selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。
図8-1は、アップストリームOracle CQLプロセッサfilteredFanoutProcessorに接続されたチャネルfilteredStreamを備えたEPNを示します。
例8-7は、Oracle CQLプロセッサに構成された問合せを示します。
例8-7 filterFanoutProcessor Oracle CQL問合せ
<processor>
<name>filterFanoutProcessor</name>
<rules>
<query id="Yr3Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="3_YEAR"
]]></query>
<query id="Yr2Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="2_YEAR"
]]></query>
<query id="Yr1Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="1_YEAR"
]]></query>
</rules>
</processor>
例8-7に示すように、Oracle CQLプロセッサに1つ以上の問合せを指定する場合、デフォルトでは、すべての問合せ結果がプロセッサのアウトバウンド・チャネルに出力されます(図8-1のfilteredStream)。オプションで、コンポーネント構成ソースでchannel要素のselector属性を使用し、例8-8が示すようにチャネルに結果を出力する1つ以上のOracle CQL問合せ名のスペース区切りのリストを指定できます。この例では、問合せYr3SectorおよびYr2Sectorの問合せ結果はfilteredStreamに出力されますが、問合せYr1Sectorの問合せ結果は出力されません。
例8-8 どの問合せ結果を出力するか制御するためのselectorの使用
<channel>
<name>filteredStream</name>
<selector>Yr3Sector Yr2Sector</selector>
</channel>
詳細は、D.75項「selector」を参照してください。
「ファイル」>「保存」を選択します。
EPNエディタは、図8-2で示すように構成バッジをチャネルに追加します。詳細は、5.2.7項「バッジの構成」を参照してください。
この項では、アプリケーション・タイムスタンプ・チャネルの作成方法を説明します。
デフォルトのコンポーネント構成ファイルでチャネル構成を作成・編集するための最も効率的でエラーの発生が少ない方法は、Oracle CEP IDE for Eclipseを使用することです。オプションで、チャネル構成ファイルを手動で作成できます(8.2.3項「手動によるチャネル・コンポーネント構成ファイルの作成方法」を参照)。
詳細は、次を参照してください:
Oracle CEP IDE for Eclipseを使用してチャネルを構成するには:
Oracle CEP IDE for Eclipseを使用してチャネルを作成します。
5.4.1.1項「基本ノードの作成方法」を参照してください。
オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。
チャネル・ノードを右クリックし、「アセンブリのソースに移動」を選択します。
適切なwlevs:channel属性を追加します。
特に、is-relation属性を構成することによって、このチャネルがストリームまたはリレーションかどうかを指定します。
このチャネルをストリームに指定するには、is-relationをfalse(デフォルト)に設定します。
このチャネルをリレーションに指定するには、is-relationをtrueに設定します。
表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。
wlevs:application-timestamped子要素を追加します。
この要素を使用すると、Oracle CEPがタイムスタンプ値を出力するために使用するwlevs:expression子要素を指定できます。
オプションで、wlevs:application-timestamped属性を構成します。
詳細は、付録C「wlevs:application-timestamped」を参照してください。
他の適切なwlevs:channel子要素を追加します。
プロジェクト・エクスプローラで、META-INF/wlevsディレクトリを展開します。
使用するコンポーネント構成ファイルを選択します。
デフォルトのコンポーネント構成ファイルを使用するには、META-INF/wlevs/config.xmlファイルを右クリックし、次で開く>XMLエディタを選択します。
ファイルがXMLエディタで開きます。
新しいコンポーネント構成ファイルを作成するには:
wlevsディレクトリを右クリックし、「新規」>「ファイル」を選択します。
新規ファイルダイアログが表示されます。
ファイル名を入力します。
名前は任意で付けることができますが、ファイル名の最後は.xmlにする必要がります。
「終了」をクリックします。
Oracle CEP IDE for Eclipseは、コンポーネント構成ファイルをwlevsディレクトリに追加します。
使用するように選択したコンポーネント構成ファイルを右クリックし、次で開く>XMLエディタを選択します。
ファイルがXMLエディタで開きます。
新しいコンポーネント構成ファイルを作成した場合、例8-3で示されているヘッダーおよびconfig要素を追加します。それ以外の場合は、ステップ7に進みます。
例8-9 コンポーネント構成ファイルのヘッダーおよびconfig要素
<?xml version="1.0" encoding="UTF-8"?> <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" xmlns:n1="http://www.bea.com/ns/wlevs/config/application" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> </config>
チャネルのchannel要素を例8-4で示されるように追加します。
例8-10 コンポーネント構成ファイルのチャネル要素
<?xml version="1.0" encoding="UTF-8"?>
<n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd"
xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
...
</processor>
...
<channel>
</channel>
</config>
name子要素をchannel要素に追加します。
name要素の値は、対応するEPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。
たとえば、例8-5に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例8-6で示されます。
例8-11 EPNアセンブリ・ファイルのチャネルID: priceStream
<wlevs:channel id="priceStream" event-type="PriceEvent">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
|
注意: XMLファイルの識別子および名前は大/小文字が区別されます。EPNアセンブリ・ファイル内のコンポーネントの識別子を参照するときは、必ず大/小文字を同じにして指定します。 |
オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。
max-threads子要素を追加すると、Oracle CEPサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。
max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-threads>2</size>
</channel>
0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。
max-size子要素を追加すると、チャネルの最大サイズを指定できます。
サイズがゼロのチャネルは、同期的にイベントを受け渡します。
サイズがゼロでないチャネルはイベントを非同期的に処理し、要求されたサイズに従ってイベントをバッファリングします。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-size>10000</size>
</channel>
selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。
図8-1は、アップストリームOracle CQLプロセッサfilteredFanoutProcessorに接続されたチャネルfilteredStreamを備えたEPNを示します。
例8-7は、Oracle CQLプロセッサに構成された問合せを示します。
例8-13 filterFanoutProcessor Oracle CQL問合せ
<processor>
<name>filterFanoutProcessor</name>
<rules>
<query id="Yr3Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="3_YEAR"
]]></query>
<query id="Yr2Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="2_YEAR"
]]></query>
<query id="Yr1Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="1_YEAR"
]]></query>
</rules>
</processor>
例8-7に示すように、Oracle CQLプロセッサに1つ以上の問合せを指定する場合、デフォルトでは、すべての問合せ結果がプロセッサのアウトバウンド・チャネルに出力されます(図8-1のfilteredStream)。オプションで、コンポーネント構成ソースでchannel要素のselector属性を使用し、例8-8が示すようにチャネルに結果を出力する1つ以上のOracle CQL問合せ名のスペース区切りのリストを指定できます。この例では、問合せYr3SectorおよびYr2Sectorの問合せ結果はfilteredStreamに出力されますが、問合せYr1Sectorの問合せ結果は出力されません。
例8-14 どの問合せ結果を出力するか制御するためのselectorの使用
<channel>
<name>filteredStream</name>
<selector>Yr3Sector Yr2Sector</selector>
</channel>
詳細は、D.75項「selector」を参照してください。
「ファイル」>「保存」を選択します。
EPNエディタは、図8-2で示すように構成バッジをチャネルに追加します。詳細は、5.2.7項「バッジの構成」を参照してください。
Oracle CEP IDE for Eclipseは、チャネル構成ファイルを作成するのに最も効率的でエラーの発生が少ない方法ですが(8.2.1項「Oracle CEP IDE for Eclipseを使用してシステム・タイムスタンプ・チャネルを構成する方法」を参照)、別の方法として、チャネル構成ファイルを手動で作成・維持することもできます。
簡略化するために、次の手順では、単一のXMLファイルでアプリケーションのすべてのコンポーネントを構成すると仮定しています。
手動でチャネル・コンポーネント構成ファイルを作成するには:
EPNアセンブリ・ファイルを作成し、アプリケーションの各チャネルのwlevs:channel要素を追加します。
id属性を持つそれぞれのwlevs:channelを一意で識別します。
詳細は、3.3項「EPNアセンブリ・ファイルの作成」を参照してください。
オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。
適切なwlevs:channel属性を追加します。
表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。
適切なwlevs:channel子要素を追加します。
お気に入りのXMLエディタを使用してXMLファイルを作成します。
このXMLファイルの名前は自分で決定しますが、その拡張は.xmlで終了することが必要です。
構成ファイルのルート要素はconfigであり、次のステップに示すネームスペース定義を使用します。
アプリケーション内の各チャネルでは、configのchannel子要素を追加します。
name子要素を持つ各チャネルを一意で識別します。このnameは、アプリケーションのイベント処理ネットワークを定義するEPNアセンブリ・ファイルのchannel要素のid属性の値と同一にする必要があります。これが、このチャネル構成が適用される対象の、EPNアセンブリ・ファイル内の特定のチャネル・コンポーネントをOracle CEPが認識する仕組みです。詳細は、3.3項「EPNアセンブリ・ファイルの作成」を参照してください。
たとえば、アプリケーションに2つのストリームがある場合、構成ファイルは最初に以下のようになります。
<?xml version="1.0" encoding="UTF-8"?>
<n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<helloworld:config
xmlns:helloworld="http://www.bea.com/xml/ns/wlevs/example/helloworld">
<processor>
...
</processor>
<channel>
<name>firstStream</name>
...
</channel>
<channel>
<name>secondStream</name>
...
</channel>
</helloworld:config>
この例では、構成ファイルにfirstStreamおよびsecondStreamと呼ばれる2つのチャネルが含まれます。つまり、EPNアセンブリ・ファイルには同一の識別子を持つ2つ以上のチャネル登録が含まれることになります。
<wlevs:channel id="firstStream" ...> ... </wlevs:channel> <wlevs:channel id="secondStream" ...> ... </wlevs:channel>
|
注意: XMLファイルの識別子と名前は大/小文字が区別されるため、EPNアセンブリ・ファイルでコンポーネントの識別子を参照するときは、必ず大/小文字を同一にします。 |
オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。
max-threads子要素を追加すると、Oracle CEPサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。
max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-threads>2</size>
</channel>
0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。
max-size子要素を追加すると、チャネルの最大サイズを指定できます。
サイズがゼロのチャネルは、同期的にイベントを受け渡します。
サイズがゼロでないチャネルはイベントを非同期的に処理し、要求されたサイズに従ってイベントをバッファリングします。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-size>10000</size>
</channel>
heartbeat子要素を追加すると、Oracle CEPが時間を進めるためにハートビート・イベントを生成する前にチャネルが待機できる時間をナノ秒数で指定できます。
heartbeatの子要素は、プロセッサに送信を行うイベント・チャネルにイベントが到着しない場合のみ、システムがタイムスタンプを付けた関係またはストリームに適用されます。プロセッサは、時間ベースのウィンドウや期間のパターン照合などの、一時的な演算子をいくつか含むステートメントを用いて構成されています。
<channel>
<name>MatchOutputChannel</name>
<heartbeat>10000</heartbeat>
</channel>
selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。
図8-1は、アップストリームOracle CQLプロセッサfilteredFanoutProcessorに接続されたチャネルfilteredStreamを備えたEPNを示します。
例8-7は、Oracle CQLプロセッサに構成された問合せを示します。
例8-15 filterFanoutProcessor Oracle CQL問合せ
<processor>
<name>filterFanoutProcessor</name>
<rules>
<query id="Yr3Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="3_YEAR"
]]></query>
<query id="Yr2Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="2_YEAR"
]]></query>
<query id="Yr1Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="1_YEAR"
]]></query>
</rules>
</processor>
例8-7に示すように、Oracle CQLプロセッサに1つ以上の問合せを指定する場合、デフォルトでは、すべての問合せ結果がプロセッサのアウトバウンド・チャネルに出力されます(図8-1のfilteredStream)。オプションで、コンポーネント構成ソースでchannel要素のselector属性を使用し、例8-8が示すようにチャネルに結果を出力する1つ以上のOracle CQL問合せ名のスペース区切りのリストを指定できます。この例では、問合せYr3SectorおよびYr2Sectorの問合せ結果はfilteredStreamに出力されますが、問合せYr1Sectorの問合せ結果は出力されません。
例8-16 どの問合せ結果を出力するか制御するためのselectorの使用
<channel>
<name>filteredStream</name>
<selector>Yr3Sector Yr2Sector</selector>
</channel>
詳細は、D.75項「selector」を参照してください。
構成ファイルを保存して閉じます。
図8-6は、2つのチャネルを含むEPNの一部を示します: priceStreamおよびfilteredStream。priceStreamチャネルは、PriceAdapterイベント・ソースとそのPriceEventイベントをOracle CQLプロセッサのfilterFanoutProcessorに接続するインバウンド・チャネルです。filteredStreamチャネルは、Oracle CQLプロセッサの問合せ結果(FilteredPriceEventイベント)をダウンストリーム・コンポーネント(図8-6では示されていません)に接続するアウトバウンド・チャネルです。
この項では、次を含むチャネル構成ファイルの例を提供します。
例8-17は、図8-6で示されるように2つのチャネルを構成するサンプルのコンポーネント構成ファイルを示します。
例8-17 サンプルのチャネル・コンポーネント構成ファイル
<?xml version="1.0" encoding="UTF-8"?>
<n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
<name>filterFanoutProcessor</name>
<rules>
<query id="Yr3Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="3_YEAR"
]]></query>
</rules>
</processor>
<channel>
<name>priceStream</name>
<max-size>10000</max-size>
<max-threads>4</max-threads>
</channel>
<channel>
<name>filteredStream</name>
<max-size>5000</max-size>
<max-threads>2</max-threads>
</channel>
</n1:config>
例8-18は、図8-6で示されるように2つのチャネルを構成するEPNアセンブリ・ファイルを示します。
例8-18 チャネルEPNアセンブリ・ファイル
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
xmlns:cqlx="http://www.oracle.com/schema/cqlx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://www.bea.com/ns/wlevs/spring
http://www.bea.com/ns/wlevs/spring/spring-wlevs-v11_1_1_3.xsd">
<wlevs:event-type-repository>
<wlevs:event-type type-name="PriceEvent">
<wlevs:properties>
<wlevs:property name="cusip" type="java.lang.String" />
<wlevs:property name="bid" type="java.lang.Double" />
<wlevs:property name="srcId" type="java.lang.String" />
<wlevs:property name="bidQty" type="java.lang.Integer" />
<wlevs:property name="ask" type="java.lang.Double" />
<wlevs:property name="askQty" type="java.lang.Integer" />
<wlevs:property name="seq" type="java.lang.Long" />
<wlevs:property name="sector" type="java.lang.String" />
</wlevs:properties>
</wlevs:event-type>
<wlevs:event-type type-name="FilteredPriceEvent">
<wlevs:properties>
<wlevs:property name="cusip" type="java.lang.String" />
<wlevs:property name="bid" type="java.lang.Double" />
<wlevs:property name="srcId" type="java.lang.String" />
<wlevs:property name="bidQty" type="java.lang.Integer" />
<wlevs:property name="ask" type="java.lang.Double" />
<wlevs:property name="askQty" type="java.lang.Integer" />
<wlevs:property name="seq" type="java.lang.Long" />
</wlevs:properties>
</wlevs:event-type>
<wlevs:event-type type-name="BidAskEvent">
<wlevs:properties>
<wlevs:property name="cusip" type="java.lang.String" />
<wlevs:property name="bidseq" type="java.lang.Long" />
<wlevs:property name="bidSrcId" type="java.lang.String" />
<wlevs:property name="bid" type="java.lang.Double" />
<wlevs:property name="askseq" type="java.lang.Long" />
<wlevs:property name="askSrcId" type="java.lang.String" />
<wlevs:property name="ask" type="java.lang.Double" />
<wlevs:property name="bidQty" type="java.lang.Integer" />
<wlevs:property name="askQty" type="java.lang.Integer" />
<wlevs:property name="intermediateStrategy" type="java.lang.String" />
<wlevs:property name="correlationId" type="java.lang.Long" />
<wlevs:property name="priority" type="java.lang.Integer" />
</wlevs:properties>
</wlevs:event-type>
<wlevs:event-type type-name="FinalOrderEvent">
<wlevs:properties>
<wlevs:property name="cusip" type="java.lang.String" />
<wlevs:property name="bidseq" type="java.lang.Long" />
<wlevs:property name="bidSrcId" type="java.lang.String" />
<wlevs:property name="bid" type="java.lang.Double" />
<wlevs:property name="bidQty" type="java.lang.Integer" />
<wlevs:property name="bidSourceStrategy" type="java.lang.String" />
<wlevs:property name="askseq" type="java.lang.Long" />
<wlevs:property name="askSrcId" type="java.lang.String" />
<wlevs:property name="ask" type="java.lang.Double" />
<wlevs:property name="askQty" type="java.lang.Integer" />
<wlevs:property name="askSourceStrategy" type="java.lang.String" />
<wlevs:property name="correlationId" type="java.lang.Long" />
</wlevs:properties>
</wlevs:event-type>
</wlevs:event-type-repository>
<!-- Assemble EPN (event processing network) -->
<wlevs:adapter advertise="true" id="PriceAdapter"
provider="csvgen">
<wlevs:instance-property name="port" value="9008" />
<wlevs:instance-property name="eventTypeName"
value="PriceEvent" />
<wlevs:instance-property name="eventPropertyNames"
value="srcId,sector,cusip,bid,ask,bidQty,askQty,seq" />
</wlevs:adapter>
<wlevs:channel id="priceStream" event-type="PriceEvent">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
<!-- By default, CQL is used for OCEP 11.0 -->
<wlevs:processor id="filterFanoutProcessor" >
</wlevs:processor>
<wlevs:channel id="filteredStream"
event-type="FilteredPriceEvent">
<wlevs:listener ref="bbaProcessor" />
<wlevs:listener ref="analyticsProcessor" />
<wlevs:source ref="filterFanoutProcessor" />
</wlevs:channel>
<!-- Explicitly specify provider CQL -->
<wlevs:processor id="bbaProcessor" provider="cql">
<wlevs:listener ref="bidAskBBAStream" />
</wlevs:processor>
<wlevs:processor id="analyticsProcessor">
<wlevs:listener ref="bidAskAnalyticsStream" />
</wlevs:processor>
<wlevs:channel id="bidAskBBAStream" event-type="BidAskEvent">
<wlevs:listener ref="selectorProcessor" />
</wlevs:channel>
<wlevs:channel id="bidAskAnalyticsStream" event-type="BidAskEvent">
<wlevs:listener ref="selectorProcessor" />
</wlevs:channel>
<wlevs:processor id="selectorProcessor">
<wlevs:listener ref="citipocOut" />
</wlevs:processor>
<wlevs:channel id="citipocOut" event-type="FinalOrderEvent" advertise="true">
<wlevs:listener>
<!-- Create business object -->
<bean id="outputBean"
class="com.bea.wlevs.POC.citi.OutputBean"
autowire="byName" />
</wlevs:listener>
</wlevs:channel>
</beans>