この章では、イベント処理ネットワークを介してOracle Event Processingのチャネルおよびイベントのパイプを構成する方法について説明します。
この章の内容は次のとおりです。
Oracle Event Processingアプリケーションは、1つ以上のチャネル・コンポーネントを含みます。チャネルは、アダプタとプロセッサ間やプロセッサとイベントBean(ビジネス・ロジックPOJO)間のような、他のコンポーネント同士をイベントが往来する物理的なパイプを表します。
チャネルはストリームとリレーションの両方に使用できます。詳細は、10.1.2項「ストリームおよびリレーションを表すチャネル」を参照してください。
イベント処理ネットワーク(EPN)でチャネルを作成する場合、デフォルト構成があります。完全な詳細は、C.10項「wlevs:channel」を参照してください。
デフォルトのチャネル構成は、通常、大半のアプリケーションに適しています。
ただし、この構成を変更する場合は、コンポーネント構成ファイル内にchannel要素を作成する必要があります。このchannel要素では、デフォルト構成をオーバーライドするチャネル構成を指定できます。
コンポーネント構成ファイルのchannel要素のname要素は、EPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。たとえば、例10-1に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例10-2で示されます。
例10-1 EPNアセンブリ・ファイルのチャネルID: priceStream
<wlevs:channel id="priceStream" event-type="PriceEvent">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
例10-2 コンポーネント構成ファイルのチャネル名: priceStream
<channel>
<name>priceStream</name>
<max-size>10000</max-size>
<max-threads>4</max-threads>
</channel>
次のコンポーネント構成ファイルのいずれかでchannel要素を作成できます。
デフォルトのOracle Event Processingアプリケーション構成ファイル(デフォルトでは、META-INF/wlevs/config.xml)。
個別の構成ファイル。
アプリケーションが1つ以上のチャネルを持つ場合、デフォルトのconfig.xmlファイル内でそれぞれのチャネルのchannel要素を作成したり、それぞれのMETA-INF/wlevsで個別のXMLファイルを作成したり、すべてのチャネルの構成またはアプリケーションのすべてのコンポーネント(アダプタ、プロセッサ、およびチャネル)さえも含むMETA-INF/wlevsで単一のXMLファイルを作成したりできます。開発環境に最も適した方法を選択します。
デフォルトでは、Oracle Event Processing IDE for Eclipseは、1つのコンポーネント構成ファイルおよび1つのEPNアセンブリ・ファイルを作成します。
コンポーネント構成ファイルは、Oracle Event Processingアプリケーション・バンドルの一部としてデプロイされます。Oracle Event Processing Visualizer、wlevs.Adminユーティリティを使用するか、または適切なJMX Mbeanを直接操作して、後で実行時にこの構成を更新できます。
この項では次について説明します:
詳細は、次を参照してください:
『Oracle Fusion Middleware Oracle Event Processing Visualizerユーザーズ・ガイド』
『Oracle Fusion Middleware Oracle Event Processing管理者ガイド』のwlevs.Adminコマンドライン・リファレンスに関する項
『Oracle Fusion Middleware Oracle Event Processing管理者ガイド』のOracle Event ProcessingのJMXの構成に関する項
EPNを構成するとき、次のルールを考慮してください。
Oracle CQLプロセッサをダウンストリーム・ステージに接続するとき、チャネルは必須です。
プッシュ・ソース・ストリームまたはリレーションをプロセッサに接続するとき、チャネルは必須です。
チャネルは、Oracle CQLプロセッサがその形状を認識される必要があり(つまり、DDLが必須)、チャネルも仲介者として機能する必要があるため、プッシュ・ソースに必須です。
外部リレーション、またはキャッシュや表のソースのようなプル・ソースをプロセッサに接続するとき、チャネルはオプションです。
チャネルは、キャッシュや表などのプル・ソースとプロセッサの間では、プル・ソースが外部リレーションを示すため必要ありません。外部リレーションでは、有効な操作はストリームとNOWウィンドウ演算子間の結合のみとなるため、プル・ソースとみなされます。つまり、結合は実際には、Oracle CQLプロセッサの外部で発生します。これはプルであるため、Oracle CQLプロセッサはその形状を認識される必要はありません(つまり、DDLは必要ありません)。そのため、チャネルを仲介者として機能させる必要もありません。
通常、コンポーネント間のチャネルは次の場合に使用します。
バッファリングは、発行するコンポーネントと受信側の間で必要です。
受信側のコンポーネントにはキューイングまたは同時実行性が必要です。
カスタム・アダプタが使用される場合は、スレッド制御が必要です。
EPNにチャネルを含めることは、今後の設計ライフサイクルでパフォーマンス・チューニング(バッファリング、キューイング、および同時実行性のオプションを使用)の柔軟性を提供するため、設計上よく行われます。チャネルの属性max-threadsを0に設定すると、パススルー・モードでチャネルを設置し、パフォーマンス・ペナルティを起こしません。
詳細は、次を参照してください:
チャネルは、ストリームまたはリレーションのいずれかを示します。
詳細は、次を参照してください:
ストリームは追加のみサポートします。EPNアセンブリ要素のwlevs:channel属性のis-relationをfalse(デフォルト)に設定することによって、チャネルをストリームとして指定できます。
リレーションは、挿入、削除、および更新をサポートします。EPNアセンブリ要素のwlevs:channel属性のis-relationをtrueに設定することによって、チャネルをリレーションとして指定できます。
チャネルがリレーションである場合は、例10-3に示すように、wlevs:channel属性primary-keyを使用してイベント・アイデンティティを定義する、1つ以上のイベント・プロパティを指定する必要があります。
例10-3 リレーションとしてのチャネル: primary-key属性
...
<wlevs:channel id="priceStream" event-type="PriceEvent" primary-key="stock,broker">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
...
例10-4 PriceEvent
<wlevs:event-type-repository>
<wlevs:event-type type-name="PriceEvent">
<wlevs:property>
<entry key="stock" value="java.lang.String"/>
<entry key="broker" value="java.lang.String"/>
<entry key="high" value="float"/>
<entry key="low" value="float"/>
</wlevs:property>
</wlevs:event-type>
</wlevs:event-type-repository>
詳細は、表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」のprimary-keyを参照してください。
デフォルトでは、チャネルはシステムにタイムスタンプを付けられます。この場合、Oracle Event Processingは2つの条件下でCPUクロックから新しい時間を割り当てます。新規イベントの到着時、および構成可能なハートビートのタイムアウトが期限切れになったとき。
詳細は、次を参照してください:
オプションで、チャネルをアプリケーションがタイムスタンプを付けるように構成できます。この場合、イベントのタイムスタンプは、構成可能なwlevs:expression要素によって判定されます。式のよくある例は、イベントでのプロパティへの参照です。式が指定されていない場合、タイムスタンプは前のイベントから伝播されます。たとえば、別のダウンストリームOracle CQLプロセッサのアプリケーション・タイムスタンプ・チャネルにイベントを送信する1つのOracle CQLプロセッサからのシステム・タイムスタンプ・チャネルを持つ場合などです。
また、アプリケーションはStreamSender.sendHeartbeatメソッドを使用して、イベント型heart-beatダウンストリームをEPNのStreamSinkリスナーに送信することが可能です。
詳細は、次を参照してください:
1つ以上の問合せを使用してOracle CQLプロセッサを構成する場合、デフォルトではすべての問合せの結果がダウンストリーム・チャネルに出力されます。
どの問合せの結果をダウンストリーム・チャネルに出力するかは、チャネルselector子要素を使用して制御できます。プロセッサのダウンストリーム側に接続される複数のチャネルがある場合、セレクタを使用する必要があります。これは、プロセッサの問合せの出力に特定のタイプがあるからです。特定の問合せの選択によって、問合せ出力がチャネルで受け入れられることを確認します。
図10-1はアップストリームOracle CQLプロセッサfilteredFanoutProcessorに接続されたチャネルfilteredStreamを備えたEPNを示します。
例10-5は、Oracle CQLプロセッサに構成された問合せを示します。
例10-5 filterFanoutProcessor Oracle CQL問合せ
<processor>
<name>filterFanoutProcessor</name>
<rules>
<query id="Yr3Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="3_YEAR"
]]></query>
<query id="Yr2Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="2_YEAR"
]]></query>
<query id="Yr1Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="1_YEAR"
]]></query>
</rules>
</processor>
例10-5に示すように、Oracle CQLプロセッサに1つ以上の問合せを指定する場合、デフォルトでは、すべての問合せ結果がプロセッサのアウトバウンド・チャネルに出力されます(図10-1のfilteredStream)。オプションで、コンポーネント構成ソースでchannel要素のselector子要素を使用し、例10-6が示すようにチャネルに結果を出力する1つ以上のOracle CQL問合せ名のスペース区切りのリストを指定できます。この例では、問合せYr3SectorおよびYr2Sectorの問合せ結果はfilteredStreamに出力されますが、問合せYr1Sectorの問合せ結果は出力されません。
例10-6 どの問合せ結果を出力するか制御するためのselectorの使用
<channel>
<name>filteredStream</name>
<selector>Yr3Sector Yr2Sector</selector>
</channel>
アップストリーム・プロセッサに問合せを作成する前に、selectorでchannel要素を構成することもできます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。
|
注意: selector子要素は、アップストリーム・ノードがOracle CQLプロセッサである場合のだけ適用できます。詳細は、第17章「Oracle CQLでのイベント・ストリームの問合せ」を参照してください。 |
詳細は、付録D「selector」を参照してください。
デフォルトでは、チャネルはイベントが到達した時点でイベントを処理します。あるいは、例10-7に示すように、wlevs:channel属性batchingをtrueに設定することで、同じタイムスタンプを持ち、同じ問合せからの出力のイベントをまとめてバッチ処理するように構成できます。
例10-7 バッチ処理チャネル
...
<wlevs:channel id="priceStream" event-type="PriceEvent" batching="true">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
...
詳細は、次を参照してください:
表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」のbatching
デフォルトでは、チャネルは各リスナーに各イベントをブロードキャストします。
EventPartitionerを使用するようにチャネルを構成した場合、受信イベントが到着するたびにチャネルはリスナーを選択し、各リスナーに各イベントをブロードキャストするかわりに、そのリスナーにイベントをディスパッチします。
チャネルでEventPartitionerを使用してスケーラビリティを向上できます。
詳細は、25.1.2.1項「EventPartitioner」を参照してください。
チャネルのダウンストリームでチャネルにスローされるステージで発生する例外を処理するコードを書き込むことができます。デフォルトでは、チャネルのフォルト処理動作は次のとおりです。
チャネルのmax-threads設定が0である場合(パススルー・チャネルとも呼ばれます)、例外がEPNの次のアップストリーム・ステージに再スローされます。
チャネルのmax-threads設定が0より大きい場合、フォルトに関連付けられているイベントとともに例外がログに記録および削除されます。
フォルト処理クラスを書き込み、ハンドラとmax-threads値が0より大きいチャネルを関連付けることができます。フォルト・ハンドラをチャネルに関連付けると、チャネルにスローされる例外は、コードでフォルトを処理したり再スローできるハンドラで受信されます。フォルト処理コードが例外を再スローする場合、例外はログに記録されますが例外に関連するイベントは失われることに留意してください。これらの例外に含まれるイベントを追跡する場合、イベント・データをEPNに接続されるデータ・ソースに書き込むことなどによって、コードを使用して永続化する必要があります。
フォルト・ハンドラの書込みの詳細は、17.6項「フォルトの処理」を参照してください。
チャネルを手動で構成するか、またはOracle Event Processing IDE for Eclipseを使用して構成できます。
チャネル・コンポート構成ファイルを記述する完全なXSDスキーマについては、B.2項「コンポーネント構成スキーマwlevs_application_config.xsd」を参照してください。
チャネル構成ファイルの完全な例については、10.3項「チャネル構成ファイルの例」を参照してください。
この節では、次のトピックについて説明します。
10.2.1項「Oracle Event Processing IDE for Eclipseを使用してシステム・タイムスタンプ・チャネルを構成する方法」
10.2.2項「Oracle Event Processing IDE for Eclipseを使用してアプリケーション・タイムスタンプ・チャネルを構成する方法」
この項では、システム・タイムスタンプ・チャネルの作成方法を説明します。
デフォルトのコンポーネント構成ファイルでチャネル構成を作成・編集するための最も効率的でエラーの発生が少ない方法は、Oracle Event Processing IDE for Eclipseを使用することです。オプションで、チャネル構成ファイルを手動で作成できます(10.2.3項「手動によるチャネル・コンポーネント構成ファイルの作成方法」を参照)。
詳細は、次を参照してください:
Oracle Event Processing IDE for Eclipseを使用してチャネルを構成するには:
Oracle Event Processing IDE for Eclipseを使用してチャネルを作成します。
7.4.1.1項「基本ノードの作成方法」を参照してください。
オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。
チャネル・ノードを右クリックし、「アセンブリのソースに移動」を選択します。
適切なwlevs:channel属性を追加します。
必須の属性には次のものがあります。
id
event-type
特に、is-relation属性を構成することによって、このチャネルがストリームまたはリレーションかどうかを指定します。
このチャネルをストリームに指定するには、is-relationをfalse(デフォルト)に設定します。
このチャネルをリレーションに指定するには、is-relationをtrueに設定します。
このチャネルをリレーションとして指定する場合は、チャネル属性primary-keyも指定する必要があります。
表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。
適切なwlevs:channel子要素を追加します。
プロジェクト・エクスプローラで、META-INF/wlevsディレクトリを展開します。
使用するコンポーネント構成ファイルを選択します。
デフォルトのコンポーネント構成ファイルを使用するには、META-INF/wlevs/config.xmlファイルを右クリックし、次で開く>XMLエディタを選択します。
ファイルがXMLエディタで開きます。
新しいコンポーネント構成ファイルを作成するには:
wlevsディレクトリを右クリックし、「新規」>「ファイル」を選択します。
新規ファイルダイアログが表示されます。
ファイル名を入力します。
名前は任意で付けることができますが、ファイル名の最後は.xmlにする必要がります。
「終了」をクリックします。
Oracle Event Processing IDE for Eclipseは、コンポーネント構成ファイルをwlevsディレクトリに追加します。
使用するように選択したコンポーネント構成ファイルを右クリックし、次で開く>XMLエディタを選択します。
ファイルがXMLエディタで開きます。
新しいコンポーネント構成ファイルを作成した場合、例10-8で示されているヘッダーおよびconfig要素を追加します。それ以外の場合は、ステップ7に進みます。
例10-8 コンポーネント構成ファイルのヘッダーおよびconfig要素
<?xml version="1.0" encoding="UTF-8"?> <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" xmlns:n1="http://www.bea.com/ns/wlevs/config/application" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> </config>
チャネルのchannel要素を例10-9で示されるように追加します。
例10-9 コンポーネント構成ファイルのチャネル要素
<?xml version="1.0" encoding="UTF-8"?>
<n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd"
xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
...
</processor>
...
<channel>
</channel>
</config>
name子要素をchannel要素に追加します。
name要素の値は、対応するEPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。
たとえば、例10-10に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例10-11で示されます。
例10-10 EPNアセンブリ・ファイルのチャネルID: priceStream
<wlevs:channel id="priceStream" event-type="PriceEvent">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
|
注意: XMLファイルの識別子および名前は大/小文字が区別されます。EPNアセンブリ・ファイル内のコンポーネントの識別子を参照するときは、必ず大/小文字を同じにして指定します。 |
オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。
max-threads子要素を追加すると、Oracle Event Processingサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。
max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-threads>2</size>
</channel>
0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。
複数のスレッドが有効な場合、チャネルに関連付けられたフォルト・ハンドラがないかぎり、EPNのダウンストリーム・コンポーネントからスローされるcom.bea.wlevs.ede.api.EventRejectedExceptionインスタンスは、チャネルがイベントを受信しているアダプタに伝播されません。詳細は、10.1.8項「チャネルのフォルトの処理」を参照してください。
max-size子要素を追加すると、チャネルの最大サイズを指定できます。
サイズがゼロのチャネルは、同期的にイベントを受け渡します。
サイズがゼロでないチャネルはイベントを非同期的に処理し、リクエストされたサイズに従ってイベントをバッファリングします。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-size>10000</size>
</channel>
キューに入れられたチャネルがいっぱいの場合、デフォルトの動作によってキューがいっぱいのためにキューに挿入できなかったイベントが破棄されます。かわりにアップストリーム・ステージにEventProcessingExceptionを呼び出すよう、チャネルを構成できます。例外は、キューに挿入できなかったイベントを含みます。この動作をサポートするには、fail-when-rejected設定をtrueに設定します。イベントが削除されるか、例外が呼び出されるまでのタイムアウト値も指定できます。構成の詳細なリファレンスは、0「fail-when-rejected」および0「offer-timeout」を参照してください。
heartbeat子要素を追加すると、Oracle Event Processingが時間を進めるためにハートビート・イベントを生成する前にチャネルが待機できる時間をナノ秒数で指定できます。デフォルトは100,000,000ナノ秒です。
heartbeatの子要素は、プロセッサに送信を行うイベント・チャネルにイベントが到着しない場合のみ、システムがタイムスタンプを付けた関係またはストリームに適用されます。プロセッサは、時間ベースのウィンドウや期間のパターン照合などの、一時的な演算子をいくつか含む文を用いて構成されています。
<channel>
<name>MatchOutputChannel</name>
<heartbeat>10000</heartbeat>
</channel>
selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。
アップストリーム・プロセッサに問合せを作成する前に、selectorでchannel要素を構成することもできます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。
詳細は、10.1.5項「ダウンストリーム・チャネルに対する問合せ出力の制御: selector」を参照してください。
|
注意: selector属性はアップストリーム・ノードがOracle CQLプロセッサの場合のみ該当します。詳細は、第17章「Oracle CQLでのイベント・ストリームの問合せ」を参照してください。 |
「ファイル」>「保存」を選択します。
EPNエディタは、図10-2で示すように構成バッジをチャネルに追加します。詳細は、7.2.7項「バッジの構成」を参照してください。
この項では、アプリケーション・タイムスタンプ・チャネルの作成方法を説明します。
デフォルトのコンポーネント構成ファイルでチャネル構成を作成・編集するための最も効率的でエラーの発生が少ない方法は、Oracle Event Processing IDE for Eclipseを使用することです。オプションで、チャネル構成ファイルを手動で作成できます(10.2.3項「手動によるチャネル・コンポーネント構成ファイルの作成方法」を参照)。
詳細は、次を参照してください:
Oracle Event Processing IDE for Eclipseを使用してチャネルを構成するには:
Oracle Event Processing IDE for Eclipseを使用してチャネルを作成します。
7.4.1.1項「基本ノードの作成方法」を参照してください。
オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。
チャネル・ノードを右クリックし、「アセンブリのソースに移動」を選択します。
適切なwlevs:channel属性を追加します。
特に、is-relation属性を構成することによって、このチャネルがストリームまたはリレーションかどうかを指定します。
このチャネルをストリームに指定するには、is-relationをfalse(デフォルト)に設定します。
このチャネルをリレーションに指定するには、is-relationをtrueに設定します。
表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。
wlevs:application-timestamped子要素を追加します。
この要素を使用すると、Oracle Event Processingがタイムスタンプ値を出力するために使用するwlevs:expression子要素を指定できます。
オプションで、wlevs:application-timestamped属性を構成します。
is-total-order: パブリッシュされたアプリケーション時間が常に前回使用された値よりも大きくなる場合に指定します。
有効な値は、trueまたはfalseです。デフォルト: false。
詳細は、付録C「wlevs:application-timestamped」を参照してください。
他の適切なwlevs:channel子要素を追加します。
プロジェクト・エクスプローラで、META-INF/wlevsディレクトリを展開します。
使用するコンポーネント構成ファイルを選択します。
デフォルトのコンポーネント構成ファイルを使用するには、META-INF/wlevs/config.xmlファイルを右クリックし、次で開く>XMLエディタを選択します。
ファイルがXMLエディタで開きます。
新しいコンポーネント構成ファイルを作成するには:
wlevsディレクトリを右クリックし、「新規」>「ファイル」を選択します。
新規ファイルダイアログが表示されます。
ファイル名を入力します。
名前は任意で付けることができますが、ファイル名の最後は.xmlにする必要がります。
「終了」をクリックします。
Oracle Event Processing IDE for Eclipseは、コンポーネント構成ファイルをwlevsディレクトリに追加します。
使用するように選択したコンポーネント構成ファイルを右クリックし、次で開く>XMLエディタを選択します。
ファイルがXMLエディタで開きます。
新しいコンポーネント構成ファイルを作成した場合、例10-8で示されているヘッダーおよびconfig要素を追加します。それ以外の場合は、ステップ7に進みます。
例10-12 コンポーネント構成ファイルのヘッダーおよびconfig要素
<?xml version="1.0" encoding="UTF-8"?> <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" xmlns:n1="http://www.bea.com/ns/wlevs/config/application" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> </config>
チャネルのchannel要素を例10-9で示されるように追加します。
例10-13 コンポーネント構成ファイルのチャネル要素
<?xml version="1.0" encoding="UTF-8"?>
<n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd"
xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
...
</processor>
...
<channel>
</channel>
</config>
name子要素をchannel要素に追加します。
name要素の値は、対応するEPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。
たとえば、例10-10に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例10-11で示されます。
例10-14 EPNアセンブリ・ファイルのチャネルID: priceStream
<wlevs:channel id="priceStream" event-type="PriceEvent">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
|
注意: XMLファイルの識別子および名前は大/小文字が区別されます。EPNアセンブリ・ファイル内のコンポーネントの識別子を参照するときは、必ず大/小文字を同じにして指定します。 |
オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。
max-threads子要素を追加すると、Oracle Event Processingサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。
max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-threads>2</size>
</channel>
0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。
複数のスレッドが有効な場合、EPNのダウンストリーム・コンポーネントからスローされるcom.bea.wlevs.ede.api.EventRejectedExceptionインスタンスは、チャネルがイベントを受信しているアダプタに伝播されないことに注意してください。
max-size子要素を追加すると、チャネルの最大サイズを指定できます。
サイズがゼロのチャネルは、同期的にイベントを受け渡します。
サイズがゼロでないチャネルはイベントを非同期的に処理し、リクエストされたサイズに従ってイベントをバッファリングします。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-size>10000</size>
</channel>
キューに入れられたチャネルがいっぱいの場合、デフォルトの動作によってキューがいっぱいのためにキューに挿入できなかったイベントが破棄されます。かわりにアップストリーム・ステージにEventProcessingExceptionを呼び出すよう、チャネルを構成できます。例外は、キューに挿入できなかったイベントを含みます。この動作をサポートするには、fail-when-rejected設定をtrueに設定します。イベントが削除されるか、例外が呼び出されるまでのタイムアウト値も指定できます。構成の詳細なリファレンスは、0「fail-when-rejected」および0「offer-timeout」を参照してください。
selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。
アップストリーム・プロセッサに問合せを作成する前に、selectorでchannel要素を構成することもできます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。
詳細は、10.1.5項「ダウンストリーム・チャネルに対する問合せ出力の制御: selector」を参照してください。
|
注意: selector属性はアップストリーム・ノードがOracle CQLプロセッサの場合のみ該当します。詳細は、第17章「Oracle CQLでのイベント・ストリームの問合せ」を参照してください。 |
詳細は、D.87項「selector」を参照してください。
「ファイル」>「保存」を選択します。
EPNエディタは、図10-3で示すように構成バッジをチャネルに追加します。詳細は、7.2.7項「バッジの構成」を参照してください。
Oracle Event Processing IDE for Eclipseは、チャネル構成ファイルを作成するのに最も効率的でエラーの発生が少ない方法ですが(10.2.1項「Oracle Event Processing IDE for Eclipseを使用してシステム・タイムスタンプ・チャネルを構成する方法」を参照)、別の方法として、チャネル構成ファイルを手動で作成・維持することもできます。
簡略化するために、次の手順では、単一のXMLファイルでアプリケーションのすべてのコンポーネントを構成すると仮定しています。
手動でチャネル・コンポーネント構成ファイルを作成するには:
EPNアセンブリ・ファイルを作成し、アプリケーションの各チャネルのwlevs:channel要素を追加します。
id属性を持つそれぞれのwlevs:channelを一意で識別します。
詳細は、5.3項「EPNアセンブリ・ファイルの作成」を参照してください。
オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。
適切なwlevs:channel属性を追加します。
表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。
適切なwlevs:channel子要素を追加します。
お気に入りのXMLエディタを使用してXMLファイルを作成します。
このXMLファイルの名前は自分で決定しますが、その拡張は.xmlで終了することが必要です。
構成ファイルのルート要素はconfigであり、次のステップに示すネームスペース定義を使用します。
アプリケーション内の各チャネルでは、configのchannel子要素を追加します。
name子要素を持つ各チャネルを一意で識別します。このnameは、アプリケーションのイベント処理ネットワークを定義するEPNアセンブリ・ファイルのchannel要素のid属性の値と同一にする必要があります。これが、このチャネル構成が適用される対象の、EPNアセンブリ・ファイル内の特定のチャネル・コンポーネントをOracle Event Processingが認識する仕組みです。詳細は、5.3項「EPNアセンブリ・ファイルの作成」を参照してください。
たとえば、アプリケーションに2つのストリームがある場合、構成ファイルは最初に以下のようになります。
<?xml version="1.0" encoding="UTF-8"?>
<n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<helloworld:config
xmlns:helloworld="http://www.bea.com/xml/ns/wlevs/example/helloworld">
<processor>
...
</processor>
<channel>
<name>firstStream</name>
...
</channel>
<channel>
<name>secondStream</name>
...
</channel>
</helloworld:config>
この例では、構成ファイルにfirstStreamおよびsecondStreamと呼ばれる2つのチャネルが含まれます。つまり、EPNアセンブリ・ファイルには同一の識別子を持つ2つ以上のチャネル登録が含まれることになります。
<wlevs:channel id="firstStream" ...> ... </wlevs:channel> <wlevs:channel id="secondStream" ...> ... </wlevs:channel>
|
注意: XMLファイルの識別子と名前は大/小文字が区別されるため、EPNアセンブリ・ファイルでコンポーネントの識別子を参照するときは、必ず大/小文字を同一にします。 |
オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。
max-threads子要素を追加すると、Oracle Event Processingサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。
max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-threads>2</size>
</channel>
0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。
複数のスレッドが有効な場合、EPNのダウンストリーム・コンポーネントからスローされるcom.bea.wlevs.ede.api.EventRejectedExceptionインスタンスは、チャネルがイベントを受信しているアダプタに伝播されないことに注意してください。
max-size子要素を追加すると、チャネルの最大サイズを指定できます。
サイズがゼロのチャネルは、同期的にイベントを受け渡します。
サイズがゼロでないチャネルはイベントを非同期的に処理し、リクエストされたサイズに従ってイベントをバッファリングします。デフォルト値は0です。
<channel>
<name>priceStream</name>
<max-size>10000</size>
</channel>
キューに入れられたチャネルがいっぱいの場合、デフォルトの動作によってキューがいっぱいのためにキューに挿入できなかったイベントが破棄されます。かわりにアップストリーム・ステージにEventProcessingExceptionを呼び出すよう、チャネルを構成できます。例外は、キューに挿入できなかったイベントを含みます。この動作をサポートするには、fail-when-rejected設定をtrueに設定します。イベントが削除されるか、例外が呼び出されるまでのタイムアウト値も指定できます。構成の詳細なリファレンスは、0「fail-when-rejected」および0「offer-timeout」を参照してください。
heartbeat子要素を追加すると、Oracle Event Processingが時間を進めるためにハートビート・イベントを生成する前にチャネルが待機できる時間を新しいナノ秒数で指定できます。デフォルトは100,000,000ナノ秒です。
heartbeatの子要素は、プロセッサに送信を行うイベント・チャネルにイベントが到着しない場合のみ、システムがタイムスタンプを付けた関係またはストリームに適用されます。プロセッサは、時間ベースのウィンドウや期間のパターン照合などの、一時的な演算子をいくつか含む文を用いて構成されています。
<channel>
<name>MatchOutputChannel</name>
<heartbeat>10000</heartbeat>
</channel>
selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。
アップストリーム・プロセッサに問合せを作成する前に、selectorでchannel要素を構成することもできます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。
詳細は、10.1.5項「ダウンストリーム・チャネルに対する問合せ出力の制御: selector」を参照してください。
|
注意: selector属性はアップストリーム・ノードがOracle CQLプロセッサの場合のみ該当します。詳細は、第17章「Oracle CQLでのイベント・ストリームの問合せ」を参照してください。 |
詳細は、D.87項「selector」を参照してください。
構成ファイルを保存して閉じます。
図10-4は、2つのチャネルを含むEPNの一部を示します: priceStreamおよびfilteredStream。priceStreamチャネルは、PriceAdapterイベント・ソースとそのPriceEventイベントをOracle CQLプロセッサのfilterFanoutProcessorに接続するインバウンド・チャネルです。filteredStreamチャネルは、Oracle CQLプロセッサの問合せ結果(FilteredPriceEventイベント)をダウンストリーム・コンポーネント(図10-4では示されていません)に接続するアウトバウンド・チャネルです。
この項では、次を含むチャネル構成ファイルの例を提供します。
例10-16は、図10-4で示されるように2つのチャネルを構成するサンプルのコンポーネント構成ファイルを示します。
例10-16 サンプルのチャネル・コンポーネント構成ファイル
<?xml version="1.0" encoding="UTF-8"?>
<n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
<name>filterFanoutProcessor</name>
<rules>
<query id="Yr3Sector"><![CDATA[
select cusip, bid, srcId, bidQty, ask, askQty, seq
from priceStream where sector="3_YEAR"
]]></query>
</rules>
</processor>
<channel>
<name>priceStream</name>
<max-size>10000</max-size>
<max-threads>4</max-threads>
</channel>
<channel>
<name>filteredStream</name>
<max-size>5000</max-size>
<max-threads>2</max-threads>
</channel>
</n1:config>
例10-17は、図10-4で示されるように2つのチャネルを構成するEPNアセンブリ・ファイルを示します。
例10-17 チャネルEPNアセンブリ・ファイル
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
xmlns:cqlx="http://www.oracle.com/schema/cqlx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://www.bea.com/ns/wlevs/spring
http://www.bea.com/ns/wlevs/spring/spring-wlevs-v11_1_1_6.xsd">
<wlevs:event-type-repository>
<wlevs:event-type type-name="PriceEvent">
<wlevs:properties>
<wlevs:property name="cusip" type="java.lang.String" />
<wlevs:property name="bid" type="java.lang.Double" />
<wlevs:property name="srcId" type="java.lang.String" />
<wlevs:property name="bidQty" type="java.lang.Integer" />
<wlevs:property name="ask" type="java.lang.Double" />
<wlevs:property name="askQty" type="java.lang.Integer" />
<wlevs:property name="seq" type="java.lang.Long" />
<wlevs:property name="sector" type="java.lang.String" />
</wlevs:properties>
</wlevs:event-type>
<wlevs:event-type type-name="FilteredPriceEvent">
<wlevs:properties>
<wlevs:property name="cusip" type="java.lang.String" />
<wlevs:property name="bid" type="java.lang.Double" />
<wlevs:property name="srcId" type="java.lang.String" />
<wlevs:property name="bidQty" type="java.lang.Integer" />
<wlevs:property name="ask" type="java.lang.Double" />
<wlevs:property name="askQty" type="java.lang.Integer" />
<wlevs:property name="seq" type="java.lang.Long" />
</wlevs:properties>
</wlevs:event-type>
<wlevs:event-type type-name="BidAskEvent">
<wlevs:properties>
<wlevs:property name="cusip" type="java.lang.String" />
<wlevs:property name="bidseq" type="java.lang.Long" />
<wlevs:property name="bidSrcId" type="java.lang.String" />
<wlevs:property name="bid" type="java.lang.Double" />
<wlevs:property name="askseq" type="java.lang.Long" />
<wlevs:property name="askSrcId" type="java.lang.String" />
<wlevs:property name="ask" type="java.lang.Double" />
<wlevs:property name="bidQty" type="java.lang.Integer" />
<wlevs:property name="askQty" type="java.lang.Integer" />
<wlevs:property name="intermediateStrategy" type="java.lang.String" />
<wlevs:property name="correlationId" type="java.lang.Long" />
<wlevs:property name="priority" type="java.lang.Integer" />
</wlevs:properties>
</wlevs:event-type>
<wlevs:event-type type-name="FinalOrderEvent">
<wlevs:properties>
<wlevs:property name="cusip" type="java.lang.String" />
<wlevs:property name="bidseq" type="java.lang.Long" />
<wlevs:property name="bidSrcId" type="java.lang.String" />
<wlevs:property name="bid" type="java.lang.Double" />
<wlevs:property name="bidQty" type="java.lang.Integer" />
<wlevs:property name="bidSourceStrategy" type="java.lang.String" />
<wlevs:property name="askseq" type="java.lang.Long" />
<wlevs:property name="askSrcId" type="java.lang.String" />
<wlevs:property name="ask" type="java.lang.Double" />
<wlevs:property name="askQty" type="java.lang.Integer" />
<wlevs:property name="askSourceStrategy" type="java.lang.String" />
<wlevs:property name="correlationId" type="java.lang.Long" />
</wlevs:properties>
</wlevs:event-type>
</wlevs:event-type-repository>
<!-- Assemble EPN (event processing network) -->
<wlevs:adapter advertise="true" id="PriceAdapter"
provider="csvgen">
<wlevs:instance-property name="port" value="9008" />
<wlevs:instance-property name="eventTypeName"
value="PriceEvent" />
<wlevs:instance-property name="eventPropertyNames"
value="srcId,sector,cusip,bid,ask,bidQty,askQty,seq" />
</wlevs:adapter>
<wlevs:channel id="priceStream" event-type="PriceEvent">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
<!-- By default, CQL is used for OCEP 11.0 -->
<wlevs:processor id="filterFanoutProcessor" >
</wlevs:processor>
<wlevs:channel id="filteredStream"
event-type="FilteredPriceEvent">
<wlevs:listener ref="bbaProcessor" />
<wlevs:listener ref="analyticsProcessor" />
<wlevs:source ref="filterFanoutProcessor" />
</wlevs:channel>
<!-- Explicitly specify provider CQL -->
<wlevs:processor id="bbaProcessor" provider="cql">
<wlevs:listener ref="bidAskBBAStream" />
</wlevs:processor>
<wlevs:processor id="analyticsProcessor">
<wlevs:listener ref="bidAskAnalyticsStream" />
</wlevs:processor>
<wlevs:channel id="bidAskBBAStream" event-type="BidAskEvent">
<wlevs:listener ref="selectorProcessor" />
</wlevs:channel>
<wlevs:channel id="bidAskAnalyticsStream" event-type="BidAskEvent">
<wlevs:listener ref="selectorProcessor" />
</wlevs:channel>
<wlevs:processor id="selectorProcessor">
<wlevs:listener ref="citipocOut" />
</wlevs:processor>
<wlevs:channel id="citipocOut" event-type="FinalOrderEvent" advertise="true">
<wlevs:listener>
<!-- Create business object -->
<bean id="outputBean"
class="com.bea.wlevs.POC.citi.OutputBean"
autowire="byName" />
</wlevs:listener>
</wlevs:channel>
</beans>