Oracle® Fusion Middleware Oracle Stream Analyticsによるイベント処理用アプリケーションの開発 12c (12.2.1.2.0) E82664-01 |
|
前へ |
次へ |
チャネルは、他のタイプのコンポーネント(ステージ)間をイベントが往来する論理的なパイプを表します。たとえば、アダプタとOracle CQLプロセッサ間やOracle CQLプロセッサとイベントBean間などがあります。
この章の内容は次のとおりです。
チャネルには、バッファリング機能、キューイング機能および同時実行性機能が用意されており、これにより、設計ライフサイクルの後半でアプリケーションのパフォーマンスをチューニングできるようになります。
デフォルトでは、チャネルのmax-threads
属性は0に設定されており、これは、チャネルがパススルー・モードであり、パフォーマンス・ペナルティが起きないことを意味します。
EPNを構成するとき、次のルールを考慮してください。
Oracle CQLプロセッサをダウンストリーム・ステージに接続する場合、チャネルは必須です。
ストリームまたはリレーションをOracle CQLプロセッサに接続する場合、チャネルは必須です。
前述の2点に基づいて、アダプタとプロセッサ間でチャネルを設定することは必須であることに注意してください。Oracle JDeveloperを使用してアダプタをプロセッサに接続する場合、チャネルを作成するためのウィザードが表示されます。『Oracle Stream Analyticsイベント処理スタート・ガイド』のチャネルの作成に関する項を参照してください。
外部リレーション、キャッシュまたは表ソースのいずれかのコンポーネントをOracle CQLプロセッサに接続する場合、チャネルはオプションです。
チャネルは、キャッシュや表などのプル・ソースとプロセッサの間では、プル・ソースが外部リレーションを示すため必要ありません。外部リレーションでは、有効な操作はストリームとNOWウィンドウ演算子間の結合のみとなるため、プル・ソースとみなされます。結合は実際には、Oracle CQLプロセッサの外部で発生します。これはプルであるため、Oracle CQLプロセッサはその状態を認識される必要はなく(つまり、DDLは必要ありません)、チャネルで仲介する必要もありません。
通常、コンポーネント間のチャネルは次の場合に使用します。
バッファリングは、発行するコンポーネントと受信側の間で必要です。
受信側のコンポーネントにはキューイングまたは同時実行性が必要です。
カスタム・アダプタが使用される場合は、スレッド制御が必要です。
イベント処理ネットワーク(EPN)にチャネルを追加する場合は、デフォルト構成を使用できます。デフォルトのチャネルは、名前およびIDを持ち、システム・タイムスタンプが付加されたストリーム・チャネルです。デフォルトのハートビートのタイムアウトは100ミリ秒または100,000,000ナノ秒です。
ほとんどのアプリケーションにはデフォルトの構成が適しています。構成を変更するには、アプリケーションのアセンブリ・ファイルを編集するか、コンポーネントの構成ファイルを編集します。
システムによってチャネルにタイムスタンプが付加されている場合、新規イベントが到着したとき、および構成可能なハートビートのタイムアウトが期限切れになったときに、Oracle Stream AnalyticsによりCPUクロックから新しい時間が割り当てられます。
アプリケーションによってチャネルにタイムスタンプが付加されている場合、イベントのタイムスタンプはwlevs:expression
要素によって決定されます。標準的な式の例として、イベントでのプロパティへの参照があります。式が指定されていない場合、タイムスタンプは前のイベントから伝播されます。たとえば、1つのOracle CQLプロセッサのシステムによりタイムスタンプが付加されたチャネルが、別のダウンストリームOracle CQLプロセッサのアプリケーションによりタイムスタンプが付加されたチャネルにイベントを送信する場合などです。また、アプリケーションはStreamSender.sendHeartbeat
メソッドを使用して、イベント型heart-beat
ダウンストリームをEPNのStreamSink
リスナーに送信することが可能です。
注:
チャネルがアプリケーションによってタイムスタンプが付加されていると同時にマップベースである(ハッシュ・マップ・イベント・タイプを使用する)場合、Oracle Stream Analyticsによりタイムスタンプが追加されます。キーのない削除または更新操作はこの構成のチャネルに対しては機能しません。これは、アプリケーションによりタイムスタンプが付加されたイベントによって常にtimestamp
プロパティの変更が回避されるためです。
この章では、いくつかのアセンブリおよび構成ファイルのチャネル設定について説明します。
アセンブリ・ファイルは、helloworldInputChannelのチャネル設定を示します。この設定は、helloworldProcessorがイベントのチャネルをリスニングし、このイベントがhelloworldAdapterからチャネルにフローすることを示します。
<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent" > <wlevs:listener ref="helloworldProcessor"/> <wlevs:source ref="helloworldAdapter"/> </wlevs:channel>
チャネルをリレーションとして構成するには、次のように、is-relation
設定をアセンブリ・ファイルに追加します。
<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent" is-relation="true" primary-key="myprimarykey" />
このチャネルをリレーションとして指定する場合は、primary-key
属性も構成する必要があります。主キーは、各イベントを一意に識別する空白またはカンマ区切りのイベント・プロパティ名のリストです。プライマリ・キーを定義する方法の詳細は、『Oracle Stream Analyticsスキーマ・リファレンス』のwlevs:metadata
に関する項を参照してください。
アプリケーションによってタイムスタンプを付加するチャネルを構成するには、次のように、application-timestamped
およびexpression
要素をアセンブリ・ファイルに追加します。is-total-order
要素をtrue
に設定すると、パブリッシュされたアプリケーション時間が常に前回使用された値よりも大きくなります。
<wlevs:application-timestamped is-total-order="true"> <wlevs:expression>mytime+10</wlevs:expression> </wlevs:application-timestamped>
複数の問合せを使用してOracle CQLプロセッサを構成する場合、デフォルトではすべての問合せの結果がダウンストリーム・チャネルに送信されます。どの問合せの結果をダウンストリーム・チャネルに送信するかは、selector
要素を使用して制御できます。
図5-1は、アップストリームOracle CQLプロセッサfilteredFanoutProcessor
に接続されたチャネルfilteredStream
を備えたEPNを示します。
次の例は、Oracle CQLプロセッサに構成された問合せを示します。
<processor> <name>filterFanoutProcessor</name> <rules> <query id="Yr3Sector"><![CDATA[ select cusip, bid, srcId, bidQty, ask, askQty, seq from priceStream where sector="3_YEAR" ></query> <query id="Yr2Sector"><![CDATA[ select cusip, bid, srcId, bidQty, ask, askQty, seq from priceStream where sector="2_YEAR" ></query> <query id="Yr1Sector"><![CDATA[ select cusip, bid, srcId, bidQty, ask, askQty, seq from priceStream where sector="1_YEAR" ></query> </rules> </processor>
Oracle CQLプロセッサに複数の問合せを指定する場合、デフォルトでは、すべての問合せ結果がOracle CQLプロセッサのアウトバウンド・チャネルに出力されます(図5-1のfilteredStream
)。オプションで、コンポーネント構成ソースでchannel
要素のselector
子要素を使用し、チャネルに結果を出力する1つ以上のOracle CQL問合せ名のスペース区切りのリストを指定できます。次の例では、問合せYr3Sector
およびYr2Sector
の問合せ結果はfilteredStream
に出力されますが、問合せYr1Sector
の問合せ結果は出力されません。
<channel> <name>filteredStream</name> <selector>Yr3Sector Yr2Sector</selector> </channel>
アップストリームOracle CQLプロセッサに問合せを作成する前に、selector
でchannel
要素を構成できます。この場合、selector
内の名前と一致する問合せ名を指定する必要があります。
注:
selector
子要素は、アップストリーム・ステージがOracle CQLプロセッサである場合にのみ適用できます。詳細は、Oracle CQLプロセッサを参照してください。
デフォルトでは、チャネルはイベントが到達した時点でイベントを処理します。wlevs:channel
属性batching
をtrue
に設定することで、同じタイムスタンプを持ち、同じ問合せからの出力のイベントをまとめてバッチ処理するように構成できます。
イベントのバッチ処理により、アプリケーションのパフォーマンスを向上させることができます。
<wlevs:channel id="priceStream" event-type="PriceEvent" batching="true">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
関連項目:
『Oracle Stream Analyticsスキーマ・リファレンス』のbatch-sizeに関する項
『Oracle Stream Analyticsスキーマ・リファレンス』のbatch-time-outに関する項
チャネルのダウンストリーム・ステージで発生し、チャネルにスローされる例外を処理するコードを書き込むことができます。
デフォルトでは、チャネルのフォルト処理動作は次のとおりです。
チャネルのmax-threads
設定が0である場合(パススルー・チャネル)、例外がEPNの次のアップストリーム・ステージに再スローされます。
チャネルのmax-threads
設定が0より大きい場合、例外がログに記録および削除されます。また、フォルトに関連付けられているイベントもログに記録および削除されます。
フォルト処理クラスを書き込み、max-threads
値が0より大きいチャネルとハンドラを関連付けることができます。フォルト・ハンドラをチャネルに関連付けると、チャネルにスローされる例外は、フォルトを処理したり再スローするコードが含まれるハンドラで受信されます。フォルト処理コードが例外を再スローする場合、例外はログに記録されますが例外に関連するイベントは失われます。これらの例外に含まれるイベントを追跡する場合、イベント・データをEPNに接続されるデータ・ソースに書き込むことなどによって、コードを使用して永続化する必要があります。
注:
マルチスレッドのチャネルによってスローされた例外を処理するには、プロセッサなどのチャネルのアップストリームにあるコンポーネントにフォルト・ハンドラが登録されている必要があります。アップストリームのコンポーネントにフォルト・ハンドラを登録しない場合、例外はアップストリームに渡されますが、フォルト・ハンドラは呼び出されません。
フォルト・ハンドラの書込みの詳細は、フォルト処理を参照してください。
デフォルトでは、チャネルは各リスナーに各イベントをブロードキャストします。
EventPartitioner
を使用するようにチャネルを構成した場合、受信イベントが到着するたびにチャネルはリスナーを選択し、各リスナーに各イベントをブロードキャストするかわりに、そのリスナーにイベントをディスパッチします。チャネルでEventPartitioner
を使用してスケーラビリティを向上できます。
EPNの静的な関係は、有向非巡回グラフとして表すことができます。つまり、任意のペア(N,C)において、Nはノード(頂点)のセットであり、Cはソース・ノードから宛先ノードまでの結合を表してNをつなぐ2つの場所の関係(辺)を表します。
これは、分散フローと呼ばれます。
分散フローとは、アプリケーション内の一連のイベントのことを指します。分散フローは、コンピュータ・プログラミング言語の変数またはパラメータと同じ目的を果たします。分散フローは、ソフトウェア・レイヤーからの通信状態を表します。分散フローは本来動的なものです。分散フローは、分散プロトコルの部分間の高度な論理関係を表します。
分散フローを使用可能ないくつかのシナリオを次に示します。
待機時間がそれほど長いわけではないイベント
論理的にパーティション化可能なイベント
パラレルに実行できる個別のコンポーネント/タスクに論理的に分割可能なイベント
分散フローのプロパティ
通常、分散フローには次のプロパティが含まれます。
非同期で一方向 - 各イベントは単一のインスタンスを表します。
同種で均一 - フロー内のすべてのイベントは均一であり、本質的に似ています。
同時で分散されている - フロー内のイベントは様々な時間(同時)に、様々なノードで(分散されて)発生します。
分散フローを実際に使用可能ないくつかのシナリオを次に示します。
ワード・カウント
単語数をカウントする必要があるシナリオでは、アプリケーションにより受信する文を意味のある用語にマップした後、これらの用語を(用語別に)件数で表します。ストリーム処理により、Twitterから受信するストリームなど、単語の実際のフローをカウントできます。ただし、ストリーム処理によってこれを行う場合、大量の単語の処理が問題になります。分散処理では、異なるパラレルなTwitterストリームをサブスクライブして、その結果(単語数)をまとめます。
スマート・メーターのエネルギ使用量
現在、家庭では、敷地内に設置されたスマート・メーターを使用して、エネルギ使用量を収集するのが一般的です。通常、これらのスマート・メーターでは、エネルギ使用量のセンサー・データをイベントの形式で1日中定期的(毎分など)に出力します。このセンサー・データは、地域の処理センターでダウンストリーム管理システムによって取得され、家庭や近隣の平均エネルギ使用量などの有用な記述統計およびその地域の履歴データへの関連性を導き出すために使用されます。これらの集計の実行をパーティション化して、エネルギ使用量を効率的に算出できます。分散フローを使用して異常値(一般的な使用量を上回るか、または下回る家庭など)を識別し、将来の使用量を予測できます。このデータを使用して、パートナとのエネルギの購入や販売のプロセスを制御できます。
リスク分析
分散フローを使用して、金融ポートフォリオのエクスポージャをリアルタイムで計算し、これにより、リスクを分析します。
ローカル・パーティション化は、分散フロー内のイベントをパーティション化する際に役立つ技術です。ローカル・パーティション・チャネルでは、順序付けされたイベント・プロパティのセットをパーティション基準として使用する必要があります。チャネル・パーティション化は、チャネルおよびチャネル構成の一部として管理コンソール内にあります。図5-2を参照してください。
ローカル・パーティション化は入力スループットの向上を目的としているため、速度率を測定する場合、アダプタによりプロセッサに送信できる毎秒のイベントを測定する必要があります。
注意:
使用中のマシンのハードウェア容量に適切に基づいて、max-threads
値を選択する必要があります。セキュリティ
Secure Sockets Layer (SSL)は、分散されたEPNノード間のネットワーク・トラフィックを保護するために使用します。
構成
ローカル・パーティション化をサポートするには、Oracle CQLプロセッサを構成する必要があります。ローカル・パーティション化チャネルの定義およびパラレル問合せ実行用のOracle CQLプロセッサの構成を参照してください。
例
単純なEPNのいくつかの例を次に示します。
例5-1 EPN例1
イベントは、プロパティ名とプロパティ値を表す任意のペア(PN, PV)の関係Pとして定義されます。このマニュアルの目的のために、プロパティ名とプロパティ値のドメインを定義する必要はありません。
EPN1 = ({adapter1, channel1, processor, channel2, adapter2}, {(adapter1,channel1), (channel1, processor), (processor, channel2), (channel2, adapter2)})
例5-2 EPN例2
EPNノードには、複数のイベントが含まれる場合があるため、順序付けされた連続するイベントとしてセットEを定義します。
注意:
Eは、その他のケースとは異なり順序付けされます。これにより、ランタイム状態S = EPNの(N,E)は、NからEへの2つの場所の関係として表すことができます。関係Sは単射ではなく、同じイベントが複数のノードに存在する場合があることに注意してください。ただし、EPN内にあるイベント・セット全体のすべてのイベントが少なくとも1つのノードに存在する必要があるため、これは全射となります。
e1 = {(price, 10), (volume, 200), (symbol, 'ORCL')} e2 = {(p1, v1), (p2, v2), (p3, v3)}