5 チャネル

チャネルは、他のタイプのコンポーネント(ステージ)間をイベントが往来する論理的なパイプを表します。たとえば、アダプタとOracle CQLプロセッサ間やOracle CQLプロセッサとイベントBean間などがあります。

この章の内容は次のとおりです。

5.1 チャネルを使用するタイミング

チャネルには、バッファリング機能、キューイング機能および同時実行性機能が用意されており、これにより、設計ライフサイクルの後半でアプリケーションのパフォーマンスをチューニングできるようになります。

デフォルトでは、チャネルのmax-threads属性は0に設定されており、これは、チャネルがパススルー・モードであり、パフォーマンス・ペナルティが起きないことを意味します。

EPNを構成するとき、次のルールを考慮してください。

  • Oracle CQLプロセッサをダウンストリーム・ステージに接続する場合、チャネルは必須です。

  • ストリームまたはリレーションをOracle CQLプロセッサに接続する場合、チャネルは必須です。

    前述の2点に基づいて、アダプタとプロセッサ間でチャネルを設定することは必須であることに注意してください。Oracle JDeveloperを使用してアダプタをプロセッサに接続する場合、チャネルを作成するためのウィザードが表示されます。

  • 外部リレーション、キャッシュまたは表ソースのいずれかのコンポーネントをOracle CQLプロセッサに接続する場合、チャネルはオプションです。

チャネルは、キャッシュや表などのプル・ソースとプロセッサの間では、プル・ソースが外部リレーションを示すため必要ありません。外部リレーションでは、有効な操作はストリームとNOWウィンドウ演算子間の結合のみとなるため、プル・ソースとみなされます。結合は実際には、Oracle CQLプロセッサの外部で発生します。これはプルであるため、Oracle CQLプロセッサはその状態を認識される必要はなく(つまり、DDLは必要ありません)、チャネルで仲介する必要もありません。

通常、コンポーネント間のチャネルは次の場合に使用します。

  • バッファリングは、発行するコンポーネントと受信側の間で必要です。

  • 受信側のコンポーネントにはキューイングまたは同時実行性が必要です。

  • カスタム・アダプタが使用される場合は、スレッド制御が必要です。

5.2 チャネル構成

イベント処理ネットワーク(EPN)にチャネルを追加する場合は、デフォルト構成を使用できます。デフォルトのチャネルは、名前およびIDを持ち、システム・タイムスタンプが付加されたストリーム・チャネルです。デフォルトのハートビートのタイムアウトは100ミリ秒または100,000,000ナノ秒です。

ほとんどのアプリケーションにはデフォルトの構成が適しています。構成を変更するには、アプリケーションのアセンブリ・ファイルを編集するか、コンポーネントの構成ファイルを編集します。

システムによってチャネルにタイムスタンプが付加されている場合、新規イベントが到着したとき、および構成可能なハートビートのタイムアウトが期限切れになったときに、Oracle Stream AnalyticsによりCPUクロックから新しい時間が割り当てられます。

アプリケーションによってチャネルにタイムスタンプが付加されている場合、イベントのタイムスタンプはwlevs:expression要素によって決定されます。式のよくある例は、イベントでのプロパティへの参照です。式が指定されていない場合、タイムスタンプは前のイベントから伝播されます。たとえば、1つのOracle CQLプロセッサのシステムによりタイムスタンプが付加されたチャネルが、別のダウンストリームOracle CQLプロセッサのアプリケーションによりタイムスタンプが付加されたチャネルにイベントを送信する場合などです。また、アプリケーションはStreamSender.sendHeartbeatメソッドを使用して、イベント型heart-beatダウンストリームをEPNのStreamSinkリスナーに送信することが可能です。

注意:

チャネルがアプリケーションによってタイムスタンプが付加されていると同時にマップベースである(ハッシュ・マップ・イベント・タイプを使用する)場合、Oracle Stream Analyticsによりタイムスタンプが追加されます。キーのない削除または更新操作はこの構成のチャネルに対しては機能しません。これは、アプリケーションによりタイムスタンプが付加されたイベントによって常にtimestampプロパティの変更が回避されるためです。

この章では、いくつかのアセンブリおよび構成ファイルのチャネル設定について説明します。

5.2.1 アセンブリ・ファイル

アセンブリ・ファイルは、helloworldInputChannelのチャネル設定を示します。この設定は、helloworldProcessorがイベントのチャネルをリスニングし、このイベントがhelloworldAdapterからチャネルにフローすることを示します。

<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent" >
   <wlevs:listener ref="helloworldProcessor"/>
   <wlevs:source ref="helloworldAdapter"/>
</wlevs:channel>

チャネルをリレーションとして構成するには、次のように、is-relation設定をアセンブリ・ファイルに追加します。

<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent"
is-relation="true" primary-key="myprimarykey" />

チャネルをリレーションにする場合、primary-key属性も構成する必要があります。主キーは、各イベントを一意に識別する空白またはカンマ区切りのイベント・プロパティ名のリストです。

アプリケーションによってタイムスタンプを付加するチャネルを構成するには、次のように、application-timestampedおよびexpression要素をアセンブリ・ファイルに追加します。is-total-order要素をtrueに設定すると、パブリッシュされたアプリケーション時間が常に前回使用された値よりも大きくなります。

<wlevs:application-timestamped is-total-order="true">
   <wlevs:expression>mytime+10</wlevs:expression>
</wlevs:application-timestamped>

5.2.2 構成ファイル

構成ファイルは、チャネル構成の設定を示します。この設定では、プロセス・イベントの非同期バッファリング(max-size)、最大4つのスレッドの使用(max-threads)、および10000ナノ秒のハートビート・タイムアウトの使用(heartbeat)を行うためにチャネルをカスタマイズします。

  <channel>
    <name>helloworldInputChannel</name>
    <max-size>10000</max-size>
    <max-threads>4</max-threads>
    <heartbeat>10000</name>
</channel>

5.3 ダウンストリーム・チャネルに出力する問合せの制御

複数の問合せを使用してOracle CQLプロセッサを構成する場合、デフォルトではすべての問合せの結果がダウンストリーム・チャネルに送信されます。どの問合せの結果をダウンストリーム・チャネルに送信するかは、selector要素を使用して制御できます。

図5-1は、アップストリームOracle CQLプロセッサfilteredFanoutProcessorに接続されたチャネルfilteredStreamを備えたEPNを示します。

図5-1 Oracle CQLプロセッサおよびダウンストリーム・チャネルを備えたEPN

図5-1の説明が続きます
「図5-1 Oracle CQLプロセッサおよびダウンストリーム・チャネルを備えたEPN」の説明

次の例は、Oracle CQLプロセッサに構成された問合せを示します。

<processor>
    <name>filterFanoutProcessor</name>
    <rules>
        <query id="Yr3Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="3_YEAR"
        ></query>
        <query id="Yr2Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="2_YEAR"
        ></query>
        <query id="Yr1Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="1_YEAR"
        ></query>
    </rules>
</processor>

Oracle CQLプロセッサに複数の問合せを指定する場合、デフォルトでは、すべての問合せ結果がOracle CQLプロセッサのアウトバウンド・チャネルに出力されます(図5-1filteredStream)。オプションで、コンポーネント構成ソースでchannel要素のselector子要素を使用し、チャネルに結果を出力する1つ以上のOracle CQL問合せ名のスペース区切りのリストを指定できます。次の例では、問合せYr3SectorおよびYr2Sectorの問合せ結果はfilteredStreamに出力されますが、問合せYr1Sectorの問合せ結果は出力されません。

<channel>
    <name>filteredStream</name>
    <selector>Yr3Sector Yr2Sector</selector>
</channel>

アップストリームOracle CQLプロセッサに問合せを作成する前に、selectorchannel要素を構成できます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。

注意:

selector子要素は、アップストリーム・ステージがOracle CQLプロセッサである場合にのみ適用できます。詳細は、Oracle CQLプロセッサを参照してください。

5.4 バッチ処理チャネル

デフォルトでは、チャネルはイベントが到達した時点でイベントを処理します。wlevs:channel属性batchingtrueに設定することで、同じタイムスタンプを持ち、同じ問合せからの出力のイベントをまとめてバッチ処理するように構成できます。

イベントのバッチ処理により、アプリケーションのパフォーマンスを向上させることができます。

<wlevs:channel id="priceStream" event-type="PriceEvent" batching="true">
    <wlevs:listener ref="filterFanoutProcessor" />
    <wlevs:source ref="PriceAdapter" />
</wlevs:channel>

「RelationSenderの実装」も参照してください。

5.5 フォルト処理

チャネルのダウンストリーム・ステージで発生し、チャネルにスローされる例外を処理するコードを書き込むことができます。

デフォルトでは、チャネルのフォルト処理動作は次のとおりです。

  • チャネルのmax-threads設定が0である場合(パススルー・チャネル)、例外がEPNの次のアップストリーム・ステージに再スローされます。

  • チャネルのmax-threads設定が0より大きい場合、例外がログに記録および削除されます。また、フォルトに関連付けられているイベントもログに記録および削除されます。

フォルト処理クラスを書き込み、max-threads値が0より大きいチャネルとハンドラを関連付けることができます。フォルト・ハンドラをチャネルに関連付けると、チャネルにスローされる例外は、フォルトを処理したり再スローするコードが含まれるハンドラで受信されます。フォルト処理コードが例外を再スローする場合、例外はログに記録されますが例外に関連するイベントは失われます。これらの例外に含まれるイベントを追跡する場合、イベント・データをEPNに接続されるデータ・ソースに書き込むことなどによって、コードを使用して永続化する必要があります。

注意:

マルチスレッドのチャネルによってスローされた例外を処理するには、プロセッサなどのチャネルのアップストリームにあるコンポーネントにフォルト・ハンドラが登録されている必要があります。アップストリームのコンポーネントにフォルト・ハンドラを登録しない場合、例外はアップストリームに渡されますが、フォルト・ハンドラは呼び出されません。

フォルト・ハンドラの書込みの詳細は、フォルト処理を参照してください。

5.6 EventPartitionerチャネル

デフォルトでは、チャネルは各リスナーに各イベントをブロードキャストします。

EventPartitionerを使用するようにチャネルを構成した場合、受信イベントが到着するたびにチャネルはリスナーを選択し、各リスナーに各イベントをブロードキャストするかわりに、そのリスナーにイベントをディスパッチします。チャネルでEventPartitionerを使用してスケーラビリティを向上できます。

5.7 分散フロー

EPNの静的な関係は、有向非巡回グラフとして表すことができます。つまり、任意のペア(N,C)において、Nはノード(頂点)のセットであり、Cはソース・ノードから宛先ノードまでの結合を表してNをつなぐ2つの場所の関係(辺)を表します。

これは、分散フローと呼ばれます。

分散フローとは、アプリケーション内の一連のイベントのことを指します。分散フローは、コンピュータ・プログラミング言語の変数またはパラメータと同じ目的を果たします。分散フローは、ソフトウェア・レイヤーからの通信状態を表します。分散フローは本来動的なものです。分散フローは、分散プロトコルの部分間の高度な論理関係を表します。

分散フローを使用可能ないくつかのシナリオを次に示します。

  • 待機時間がそれほど長いわけではないイベント

  • 論理的にパーティション化可能なイベント

  • パラレルに実行できる個別のコンポーネント/タスクに論理的に分割可能なイベント

分散フローのプロパティ

通常、分散フローには次のプロパティが含まれます。

  1. 非同期で一方向 - 各イベントは単一のインスタンスを表します。

  2. 同種で均一 - フロー内のすべてのイベントは均一であり、本質的に似ています。

  3. 同時で分散されている - フロー内のイベントは様々な時間(同時)に、様々なノードで(分散されて)発生します。

5.7.1 分散フローの例

分散フローを実際に使用可能ないくつかのシナリオを次に示します。

ワード・カウント

単語数をカウントする必要があるシナリオでは、アプリケーションにより受信する文を意味のある用語にマップした後、これらの用語を(用語別に)件数で表します。ストリーム処理により、Twitterから受信するストリームなど、単語の実際のフローをカウントできます。ただし、ストリーム処理によってこれを行う場合、大量の単語の処理が問題になります。分散処理では、異なるパラレルなTwitterストリームをサブスクライブして、その結果(単語数)をまとめます。

スマート・メーターのエネルギ使用量

現在、家庭では、敷地内に設置されたスマート・メーターを使用して、エネルギ使用量を収集するのが一般的です。通常、これらのスマート・メーターでは、エネルギ使用量のセンサー・データをイベントの形式で1日中定期的(毎分など)に出力します。このセンサー・データは、地域の処理センターでダウンストリーム管理システムによって取得され、家庭や近隣の平均エネルギ使用量などの有用な記述統計およびその地域の履歴データへの関連性を導き出すために使用されます。これらの集計の実行をパーティション化して、エネルギ使用量を効率的に算出できます。分散フローを使用して異常値(一般的な使用量を上回るか、または下回る家庭など)を識別し、将来の使用量を予測できます。このデータを使用して、パートナとのエネルギの購入や販売のプロセスを制御できます。

リスク分析

分散フローを使用して、金融ポートフォリオのエクスポージャをリアルタイムで計算し、これにより、リスクを分析します。

5.7.2 ローカル・パーティション化チャネル

ローカル・パーティション化は、分散フロー内のイベントをパーティション化する際に役立つ技術です。ローカル・パーティション・チャネルでは、順序付けされたイベント・プロパティのセットをパーティション基準として使用する必要があります。チャネル・パーティション化は、チャネルおよびチャネル構成の一部として管理コンソール内にあります。図5-2を参照してください。

ローカル・パーティション化は入力スループットの向上を目的としているため、速度率を測定する場合、アダプタによりプロセッサに送信できる毎秒のイベントを測定する必要があります。

注意:

使用中のマシンのハードウェア容量に適切に基づいて、max-threads値を選択する必要があります。

図5-2 Oracle CQLプロセッサおよびダウンストリーム・チャネルを備えたEPN

図5-2の説明が続きます
「図5-2 Oracle CQLプロセッサおよびダウンストリーム・チャネルを備えたEPN」の説明

セキュリティ

Secure Sockets Layer (SSL)は、分散されたEPNノード間のネットワーク・トラフィックを保護するために使用します。

構成

ローカル・パーティション化をサポートするには、Oracle CQLプロセッサを構成する必要があります。ローカル・パーティション化チャネルの定義およびパラレル問合せ実行用のOracle CQLプロセッサの構成を参照してください。

単純なEPNのいくつかの例を次に示します。

例5-1 EPN例1

イベントは、プロパティ名とプロパティ値を表す任意のペア(PN, PV)の関係Pとして定義されます。このマニュアルの目的のために、プロパティ名とプロパティ値のドメインを定義する必要はありません。

EPN1 = ({adapter1, channel1, processor, channel2, adapter2},
{(adapter1,channel1), (channel1, processor), (processor, channel2), (channel2, adapter2)})

例5-2 EPN例2

EPNノードには、複数のイベントが含まれる場合があるため、順序付けされた連続するイベントとしてセットEを定義します。

注意:

Eは、その他のケースとは異なり順序付けされます。

これにより、ランタイム状態S = EPNの(N,E)は、NからEへの2つの場所の関係として表すことができます。関係Sは単射ではなく、同じイベントが複数のノードに存在する場合があることに注意してください。ただし、EPN内にあるイベント・セット全体のすべてのイベントが少なくとも1つのノードに存在する必要があるため、これは全射となります。

e1 = {(price, 10), (volume, 200), (symbol, 'ORCL')}
e2 = {(p1, v1), (p2, v2), (p3, v3)}