ヘッダーをスキップ
Oracle® Fusion Middleware Oracle Complex Event Processing開発者ガイド
11gリリース1 (11.1.1.6.2) for Eclipse
B61654-04
  目次へ移動
目次
索引へ移動
索引

前
 
次
 

9 チャネルの構成

この章では、イベント処理ネットワークを介してOracle Complex Event Processing (Oracle CEP)のチャネルおよびイベントのパイプを構成する方法について説明します。

9.1 チャネル構成の概要

Oracle CEPアプリケーションには、1つ以上のチャネル・コンポーネントが含まれます。チャネルは、アダプタとプロセッサ間やプロセッサとイベントBean(ビジネス・ロジックPOJO)間のような、他のコンポーネント同士をイベントが往来する物理的なパイプを表します。

チャネルはストリームとリレーションの両方に使用できます。詳細は、9.1.2項「ストリームおよびリレーションを表すチャネル」を参照してください。

イベント処理ネットワーク(EPN)でチャネルを作成する場合、デフォルト構成があります。完全な詳細は、C.10項「wlevs:channel」を参照してください。

デフォルトのチャネル構成は、通常、大半のアプリケーションに適しています。

ただし、この構成を変更する場合は、コンポーネント構成ファイル内にchannel要素を作成する必要があります。このchannel要素では、デフォルト構成をオーバーライドするチャネル構成を指定できます。

コンポーネント構成ファイルのchannel要素のname要素は、EPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。たとえば、例9-1に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例9-2で示されます。

例9-1 EPNアセンブリ・ファイルのチャネルID: priceStream

<wlevs:channel id="priceStream" event-type="PriceEvent">
    <wlevs:listener ref="filterFanoutProcessor" />
    <wlevs:source ref="PriceAdapter" />
</wlevs:channel>

例9-2 コンポーネント構成ファイルのチャネル名: priceStream

<channel>
    <name>priceStream</name>
    <max-size>10000</max-size>
    <max-threads>4</max-threads>
</channel>

次のコンポーネント構成ファイルのいずれかでchannel要素を作成できます。

アプリケーションが1つ以上のチャネルを持つ場合、デフォルトのconfig.xmlファイル内でそれぞれのチャネルのchannel要素を作成したり、それぞれのMETA-INF/wlevsで個別のXMLファイルを作成したり、すべてのチャネルの構成またはアプリケーションのすべてのコンポーネント(アダプタ、プロセッサ、およびチャネル)さえも含むMETA-INF/wlevsで単一のXMLファイルを作成したりできます。開発環境に最も適した方法を選択します。

デフォルトでは、Oracle CEP IDE for Eclipseは1つのコンポーネント構成ファイルと1つのEPNアセンブリ・ファイルを作成します。

コンポーネント構成ファイルは、Oracle CEPアプリケーション・バンドルの一部としてデプロイされます。Oracle CEP Visualizer、wlevs.Adminユーティリティを使用するか、または適切なJMX Mbeanを直接操作して、後で実行時にこの構成を更新できます。

この項では次について説明します:

詳細は、次を参照してください:

9.1.1 チャネルを使用するタイミング

EPNを構成するとき、次のルールを考慮してください。

  • Oracle CQLプロセッサをダウンストリーム・ステージに接続するとき、チャネルは必須です。

  • プッシュ・ソース・ストリームまたはリレーションをプロセッサに接続するとき、チャネルは必須です。

    チャネルは、Oracle CQLプロセッサがその形状を認識される必要があり(つまり、DDLが必須)、チャネルも仲介者として機能する必要があるため、プッシュ・ソースに必須です。

  • 外部リレーション、またはキャッシュや表のソースのようなプル・ソースをプロセッサに接続するとき、チャネルはオプションです。

    チャネルは、キャッシュや表などのプル・ソースとプロセッサの間では、プル・ソースが外部リレーションを示すため必要ありません。外部リレーションでは、有効な操作はストリームとNOWウィンドウ演算子間の結合のみとなるため、プル・ソースとみなされます。つまり、結合は実際には、Oracle CQLプロセッサの外部で発生します。これはプルであるため、Oracle CQLプロセッサはその形状を認識される必要はありません(つまり、DDLは必要ありません)。そのため、チャネルを仲介者として機能させる必要もありません。

通常、コンポーネント間のチャネルは次の場合に使用します。

  • バッファリングは、発行するコンポーネントと受信側の間で必要です。

  • 受信側のコンポーネントにはキューイングまたは同時実行性が必要です。

  • カスタム・アダプタが使用される場合は、スレッド制御が必要です。

EPNにチャネルを含めることは、今後の設計ライフサイクルでパフォーマンス・チューニング(バッファリング、キューイング、および同時実行性のオプションを使用)の柔軟性を提供するため、設計上よく行われます。チャネルの属性max-threadsを0に設定すると、パススルー・モードでチャネルを設置し、パフォーマンス・ペナルティを起こしません。

詳細は、次を参照してください:

9.1.2 ストリームおよびリレーションを表すチャネル

チャネルは、ストリームまたはリレーションのいずれかを示します。

詳細は、次を参照してください:

9.1.2.1 ストリームとしてのチャネル

ストリームは追加のみサポートします。EPNアセンブリ要素のwlevs:channel属性のis-relationfalse(デフォルト)に設定することによって、チャネルをストリームとして指定できます。

9.1.2.2 リレーションとしてのチャネル

リレーションは、挿入、削除、および更新をサポートします。EPNアセンブリ要素のwlevs:channel属性のis-relationtrueに設定することによって、チャネルをリレーションとして指定できます。

チャネルがリレーションである場合は、例9-3に示すように、wlevs:channel属性primary-keyを使用してイベント・アイデンティティを定義する、1つ以上のイベント・プロパティを指定する必要があります。

例9-3 リレーションとしてのチャネル: primary-key属性

...
<wlevs:channel id="priceStream" event-type="PriceEvent" primary-key="stock,broker">
    <wlevs:listener ref="filterFanoutProcessor" />
    <wlevs:source ref="PriceAdapter" />
</wlevs:channel>
...

例9-4 PriceEvent

<wlevs:event-type-repository>
    <wlevs:event-type type-name="PriceEvent">
          <wlevs:property>
             <entry key="stock" value="java.lang.String"/>
             <entry key="broker" value="java.lang.String"/>
             <entry key="high" value="float"/>
             <entry key="low" value="float"/>
          </wlevs:property>
    </wlevs:event-type>
</wlevs:event-type-repository>

詳細は、表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」primary-keyを参照してください。

9.1.3 システム・タイムスタンプ・チャネル

デフォルトでは、チャネルはシステムにタイムスタンプを付けられます。この場合、Oracle CEPは2つの条件下でCPUクロックから新しい時間を割り当てます。新規イベントの到着時、および構成可能なハートビートのタイムアウトが期限切れになったとき。

詳細は、次を参照してください:

9.1.4 アプリケーション・タイムスタンプ・チャネル

オプションで、チャネルをアプリケーションがタイムスタンプを付けるように構成できます。この場合、イベントのタイムスタンプは、構成可能なwlevs:expression要素によって判定されます。式のよくある例は、イベントでのプロパティへの参照です。式が指定されていない場合、タイムスタンプは前のイベントから伝播されます。たとえば、別のダウンストリームOracle CQLプロセッサのアプリケーション・タイムスタンプ・チャネルにイベントを送信する1つのOracle CQLプロセッサからのシステム・タイムスタンプ・チャネルを持つ場合などです。

また、アプリケーションはStreamSender.sendHeartbeatメソッドを使用して、イベント型heart-beatダウンストリームをEPNのStreamSinkリスナーに送信することが可能です。

詳細は、次を参照してください:

9.1.5 ダウンストリーム・チャネルに対する問合せ出力の制御: selector

1つ以上の問合せを使用してOracle CQLプロセッサを構成する場合、デフォルトではすべての問合せの結果がダウンストリーム・チャネルに出力されます。

どの問合せの結果をダウンストリーム・チャネルに出力するかは、チャネルselector子要素を使用して制御できます。

図9-1はアップストリームOracle CQLプロセッサfilteredFanoutProcessorに接続されたチャネルfilteredStreamを備えたEPNを示します。

図9-1 Oracle CQLプロセッサおよびダウンストリーム・チャネルを備えたEPN

図9-1の説明が続きます
「図9-1 Oracle CQLプロセッサおよびダウンストリーム・チャネルを備えたEPN」の説明

例9-5は、Oracle CQLプロセッサに構成された問合せを示します。

例9-5 filterFanoutProcessor Oracle CQL問合せ

<processor>
    <name>filterFanoutProcessor</name>
    <rules>
        <query id="Yr3Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="3_YEAR"
        ]]></query>
        <query id="Yr2Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="2_YEAR"
        ]]></query>
        <query id="Yr1Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="1_YEAR"
        ]]></query>
    </rules>
</processor>

例9-5に示すように、Oracle CQLプロセッサに1つ以上の問合せを指定する場合、デフォルトでは、すべての問合せ結果がプロセッサのアウトバウンド・チャネルに出力されます(図9-1filteredStream)。オプションで、コンポーネント構成ソースでchannel要素のselector子要素を使用し、例9-6が示すようにチャネルに結果を出力する1つ以上のOracle CQL問合せ名のスペース区切りのリストを指定できます。この例では、問合せYr3SectorおよびYr2Sectorの問合せ結果はfilteredStreamに出力されますが、問合せYr1Sectorの問合せ結果は出力されません。

例9-6 どの問合せ結果を出力するか制御するためのselectorの使用

<channel>
    <name>filteredStream</name>
    <selector>Yr3Sector Yr2Sector</selector>
</channel>

アップストリーム・プロセッサに問合せを作成する前に、selectorchannel要素を構成することもできます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。


注意:

selector子要素は、アップストリーム・ノードがOracle CQLプロセッサである場合のだけ適用できます。詳細は、第10章「Oracle CQLプロセッサの構成」を参照してください。


詳細は、付録D「selector」を参照してください。

9.1.6 バッチ処理チャネル

デフォルトでは、チャネルはイベントが到達した時点でイベントを処理します。あるいは、例9-7に示すように、wlevs:channel属性batchingtrueに設定することで、同じタイムスタンプを持ち、同じ問合せからの出力のイベントをまとめてバッチ処理するように構成できます。

例9-7 バッチ処理チャネル

...
<wlevs:channel id="priceStream" event-type="PriceEvent" batching="true">
    <wlevs:listener ref="filterFanoutProcessor" />
    <wlevs:source ref="PriceAdapter" />
</wlevs:channel>
...

詳細は、次を参照してください:

9.1.7 EventPartitionerチャネル

デフォルトでは、チャネルは各リスナーに各イベントをブロードキャストします。

EventPartitionerを使用するようにチャネルを構成した場合、受信イベントが到着するたびにチャネルはリスナーを選択し、各リスナーに各イベントをブロードキャストするかわりに、そのリスナーにイベントをディスパッチします。

チャネルでEventPartitionerを使用してスケーラビリティを向上できます。

詳細は、22.2.1項「EventPartitioner」を参照してください。

9.2 チャネルの構成

チャネルを手動で構成するか、またはOracle CEP IDE for Eclipseを使用して構成できます。

チャネル・コンポート構成ファイルを記述する完全なXSDスキーマについては、B.2項「コンポーネント構成スキーマwlevs_application_config.xsd」を参照してください。

チャネル構成ファイルの完全な例については、9.3項「チャネル構成ファイルの例」を参照してください。

この節では、次のトピックについて説明します。

9.2.1 Oracle CEP IDE for Eclipseを使用してシステム・タイムスタンプ・チャネルを構成する方法

この項では、システム・タイムスタンプ・チャネルの作成方法を説明します。

デフォルトのコンポーネント構成ファイルでチャネル構成を作成・編集するための最も効率的でエラーの発生が少ない方法は、Oracle CEP IDE for Eclipseを使用することです。オプションで、チャネル構成ファイルを手動で作成できます(9.2.3項「手動によるチャネル・コンポーネント構成ファイルの作成方法」を参照)。

詳細は、次を参照してください:

Oracle CEP IDE for Eclipseを使用してチャネルを構成するには:

  1. Oracle CEP IDE for Eclipseを使用してチャネルを作成します。

    6.4.1.1項「基本ノードの作成方法」を参照してください。

  2. オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。

    1. チャネル・ノードを右クリックし、「アセンブリのソースに移動」を選択します。

    2. 適切なwlevs:channel属性を追加します。

      必須の属性には次のものがあります。

      • id

      • event-type

      特に、is-relation属性を構成することによって、このチャネルがストリームまたはリレーションかどうかを指定します。

      • このチャネルをストリームに指定するには、is-relationfalse(デフォルト)に設定します。

      • このチャネルをリレーションに指定するには、is-relationtrueに設定します。

        このチャネルをリレーションとして指定する場合は、チャネル属性primary-keyも指定する必要があります。

      表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。

    3. 適切なwlevs:channel子要素を追加します。

  3. プロジェクト・エクスプローラで、META-INF/wlevsディレクトリを展開します。

  4. 使用するコンポーネント構成ファイルを選択します。

    1. デフォルトのコンポーネント構成ファイルを使用するには、META-INF/wlevs/config.xmlファイルを右クリックし、次で開く>XMLエディタを選択します。

      ファイルがXMLエディタで開きます。

    2. 新しいコンポーネント構成ファイルを作成するには:

      • wlevsディレクトリを右クリックし、「新規」>「ファイル」を選択します。

        新規ファイルダイアログが表示されます。

      • ファイル名を入力します。

        名前は任意で付けることができますが、ファイル名の最後は.xmlにする必要がります。

      • 「終了」をクリックします。

        Oracle CEP IDE for Eclipseは、コンポーネント構成ファイルをwlevsディレクトリに追加します。

  5. 使用するように選択したコンポーネント構成ファイルを右クリックし、次で開く>XMLエディタを選択します。

    ファイルがXMLエディタで開きます。

  6. 新しいコンポーネント構成ファイルを作成した場合、例9-8で示されているヘッダーおよびconfig要素を追加します。それ以外の場合は、ステップ7に進みます。

    例9-8 コンポーネント構成ファイルのヘッダーおよびconfig要素

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    
    </config>
    
  7. チャネルのchannel要素を例9-9で示されるように追加します。

    例9-9 コンポーネント構成ファイルのチャネル要素

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
        <processor>
            ...
        </processor>
        ...
        <channel>
        </channel>
    </config>
    
  8. name子要素をchannel要素に追加します。

    name要素の値は、対応するEPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。

    たとえば、例9-10に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例9-11で示されます。

    例9-10 EPNアセンブリ・ファイルのチャネルID: priceStream

    <wlevs:channel id="priceStream" event-type="PriceEvent">
        <wlevs:listener ref="filterFanoutProcessor" />
        <wlevs:source ref="PriceAdapter" />
    </wlevs:channel>
    

    例9-11 コンポーネント構成ファイルのチャネル名: priceStream

    <channel>
        <name>priceStream</name>
    </channel>
    

    注意:

    XMLファイルの識別子および名前は大/小文字が区別されます。EPNアセンブリ・ファイル内のコンポーネントの識別子を参照するときは、必ず大/小文字を同じにして指定します。


  9. オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。

    • max-threads子要素を追加すると、Oracle CEPサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。

      max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。

      <channel>
          <name>priceStream</name>
          <max-threads>2</size>
      </channel>
      

      0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。

      複数のスレッドが有効な場合、EPNのダウンストリーム・コンポーネントからスローされるcom.bea.wlevs.ede.api.EventRejectedExceptionインスタンスは、チャネルがイベントを受信しているアダプタに伝播されないことに注意してください。

    • max-size子要素を追加すると、チャネルの最大サイズを指定できます。

      サイズがゼロのチャネルは、同期的にイベントを受け渡します。

      サイズがゼロでないチャネルはイベントを非同期的に処理し、リクエストされたサイズに従ってイベントをバッファリングします。デフォルト値は0です。

      <channel>
          <name>priceStream</name>
          <max-size>10000</size>
      </channel>
      
    • heartbeat子要素を追加すると、Oracle CEPが時間を進めるためにハートビート・イベントを生成する前にチャネルが待機できる時間をナノ秒数で指定できます。

      heartbeatの子要素は、プロセッサに送信を行うイベント・チャネルにイベントが到着しない場合のみ、システムがタイムスタンプを付けた関係またはストリームに適用されます。プロセッサは、時間ベースのウィンドウや期間のパターン照合などの、一時的な演算子をいくつか含む文を用いて構成されています。

      <channel>
          <name>MatchOutputChannel</name>
          <heartbeat>10000</heartbeat>
      </channel>
      
    • selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。

      アップストリーム・プロセッサに問合せを作成する前に、selectorchannel要素を構成することもできます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。

      詳細は、9.1.5項「ダウンストリーム・チャネルに対する問合せ出力の制御: selector」を参照してください。


      注意:

      selector属性はアップストリーム・ノードがOracle CQLプロセッサの場合のみ該当します。詳細は、第10章「Oracle CQLプロセッサの構成」を参照してください。


  10. 「ファイル」>「保存」を選択します。

    EPNエディタは、図9-2で示すように構成バッジをチャネルに追加します。詳細は、6.2.7項「バッジの構成」を参照してください。

    図9-2 構成バッジを備えたチャネル

    図9-2の説明が続きます
    「図9-2 構成バッジを備えたチャネル」の説明

9.2.2 Oracle CEP IDE for Eclipseを使用してアプリケーション・タイムスタンプ・チャネルを構成する方法

この項では、アプリケーション・タイムスタンプ・チャネルの作成方法を説明します。

デフォルトのコンポーネント構成ファイルでチャネル構成を作成・編集するための最も効率的でエラーの発生が少ない方法は、Oracle CEP IDE for Eclipseを使用することです。オプションで、チャネル構成ファイルを手動で作成できます(9.2.3項「手動によるチャネル・コンポーネント構成ファイルの作成方法」を参照)。

詳細は、次を参照してください:

Oracle CEP IDE for Eclipseを使用してチャネルを構成するには:

  1. Oracle CEP IDE for Eclipseを使用してチャネルを作成します。

    6.4.1.1項「基本ノードの作成方法」を参照してください。

  2. オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。

    1. チャネル・ノードを右クリックし、「アセンブリのソースに移動」を選択します。

    2. 適切なwlevs:channel属性を追加します。

      特に、is-relation属性を構成することによって、このチャネルがストリームまたはリレーションかどうかを指定します。

      • このチャネルをストリームに指定するには、is-relationfalse(デフォルト)に設定します。

      • このチャネルをリレーションに指定するには、is-relationtrueに設定します。

      表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。

    3. wlevs:application-timestamped子要素を追加します。

      この要素を使用すると、Oracle CEPがタイムスタンプ値を出力するために使用するwlevs:expression子要素を指定できます。

      オプションで、wlevs:application-timestamped属性を構成します。

      • is-total-order:パブリッシュされたアプリケーション時間が常に前回使用された値よりも大きくなる場合に指定します。

        有効な値は、trueまたはfalseです。デフォルト: false

      詳細は、付録C「wlevs:application-timestamped」を参照してください。

    4. 他の適切なwlevs:channel子要素を追加します。

  3. プロジェクト・エクスプローラで、META-INF/wlevsディレクトリを展開します。

  4. 使用するコンポーネント構成ファイルを選択します。

    1. デフォルトのコンポーネント構成ファイルを使用するには、META-INF/wlevs/config.xmlファイルを右クリックし、次で開く>XMLエディタを選択します。

      ファイルがXMLエディタで開きます。

    2. 新しいコンポーネント構成ファイルを作成するには:

      • wlevsディレクトリを右クリックし、「新規」>「ファイル」を選択します。

        新規ファイルダイアログが表示されます。

      • ファイル名を入力します。

        名前は任意で付けることができますが、ファイル名の最後は.xmlにする必要がります。

      • 「終了」をクリックします。

        Oracle CEP IDE for Eclipseは、コンポーネント構成ファイルをwlevsディレクトリに追加します。

  5. 使用するように選択したコンポーネント構成ファイルを右クリックし、次で開く>XMLエディタを選択します。

    ファイルがXMLエディタで開きます。

  6. 新しいコンポーネント構成ファイルを作成した場合、例9-8で示されているヘッダーおよびconfig要素を追加します。それ以外の場合は、ステップ7に進みます。

    例9-12 コンポーネント構成ファイルのヘッダーおよびconfig要素

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    
    </config>
    
  7. チャネルのchannel要素を例9-9で示されるように追加します。

    例9-13 コンポーネント構成ファイルのチャネル要素

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
        <processor>
            ...
        </processor>
        ...
        <channel>
        </channel>
    </config>
    
  8. name子要素をchannel要素に追加します。

    name要素の値は、対応するEPNアセンブリ・ファイルのchannel要素のid属性に一致する必要があります。

    たとえば、例9-10に示すようにEPNアセンブリ・ファイルのchannel要素の場合、対応するコンポーネント構成ファイルのchannel要素は例9-11で示されます。

    例9-14 EPNアセンブリ・ファイルのチャネルID: priceStream

    <wlevs:channel id="priceStream" event-type="PriceEvent">
        <wlevs:listener ref="filterFanoutProcessor" />
        <wlevs:source ref="PriceAdapter" />
    </wlevs:channel>
    

    例9-15 コンポーネント構成ファイルのチャネル名: priceStream

    <channel>
        <name>priceStream</name>
    </channel>
    

    注意:

    XMLファイルの識別子および名前は大/小文字が区別されます。EPNアセンブリ・ファイル内のコンポーネントの識別子を参照するときは、必ず大/小文字を同じにして指定します。


  9. オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。

    • max-threads子要素を追加すると、Oracle CEPサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。

      max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。

      <channel>
          <name>priceStream</name>
          <max-threads>2</size>
      </channel>
      

      0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。

      複数のスレッドが有効な場合、EPNのダウンストリーム・コンポーネントからスローされるcom.bea.wlevs.ede.api.EventRejectedExceptionインスタンスは、チャネルがイベントを受信しているアダプタに伝播されないことに注意してください。

    • max-size子要素を追加すると、チャネルの最大サイズを指定できます。

      サイズがゼロのチャネルは、同期的にイベントを受け渡します。

      サイズがゼロでないチャネルはイベントを非同期的に処理し、リクエストされたサイズに従ってイベントをバッファリングします。デフォルト値は0です。

      <channel>
          <name>priceStream</name>
          <max-size>10000</size>
      </channel>
      
    • selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。

      アップストリーム・プロセッサに問合せを作成する前に、selectorchannel要素を構成することもできます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。

      詳細は、9.1.5項「ダウンストリーム・チャネルに対する問合せ出力の制御: selector」を参照してください。


      注意:

      selector属性はアップストリーム・ノードがOracle CQLプロセッサの場合のみ該当します。詳細は、第10章「Oracle CQLプロセッサの構成」を参照してください。


    詳細は、D.82項「selector」を参照してください。

  10. 「ファイル」>「保存」を選択します。

    EPNエディタは、図9-2で示すように構成バッジをチャネルに追加します。詳細は、6.2.7項「バッジの構成」を参照してください。

    図9-3 構成バッジを備えたチャネル

    図9-3の説明が続きます
    「図9-3 構成バッジを備えたチャネル」の説明

9.2.3 手動によるチャネル・コンポーネント構成ファイルの作成方法

Oracle CEP IDE for Eclipseは、チャネル構成ファイルを作成するのに最も効率的でエラーの発生が少ない方法ですが(9.2.1項「Oracle CEP IDE for Eclipseを使用してシステム・タイムスタンプ・チャネルを構成する方法」を参照)、別の方法として、チャネル構成ファイルを手動で作成・維持することもできます。

簡略化するために、次の手順では、単一のXMLファイルでアプリケーションのすべてのコンポーネントを構成すると仮定しています。

手動でチャネル・コンポーネント構成ファイルを作成するには:

  1. EPNアセンブリ・ファイルを作成し、アプリケーションの各チャネルのwlevs:channel要素を追加します。

    id属性を持つそれぞれのwlevs:channelを一意で識別します。

    詳細は、4.3項「EPNアセンブリ・ファイルの作成」を参照してください。

  2. オプションで、wlevs:channel属性や子要素を追加することによって、デフォルトのチャネル・アセンブリ・ファイルの構成をオーバーライドします。

    1. 適切なwlevs:channel属性を追加します。

      表C-9「wlevs:channelアプリケーション・アセンブリ要素の属性」を参照してください。

    2. 適切なwlevs:channel子要素を追加します。

  3. お気に入りのXMLエディタを使用してXMLファイルを作成します。

    このXMLファイルの名前は自分で決定しますが、その拡張は.xmlで終了することが必要です。

    構成ファイルのルート要素はconfigであり、次のステップに示すネームスペース定義を使用します。

  4. アプリケーション内の各チャネルでは、configchannel子要素を追加します。

    name子要素を持つ各チャネルを一意で識別します。このnameは、アプリケーションのイベント処理ネットワークを定義するEPNアセンブリ・ファイルのchannel要素のid属性の値と同一にする必要があります。これが、このチャネル構成が適用される対象の、EPNアセンブリ・ファイル内の特定のチャネル・コンポーネントをOracle CEPが認識する仕組みです。詳細は、4.3項「EPNアセンブリ・ファイルの作成」を参照してください。

    たとえば、アプリケーションに2つのストリームがある場合、構成ファイルは最初に以下のようになります。

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <helloworld:config
      xmlns:helloworld="http://www.bea.com/xml/ns/wlevs/example/helloworld">
      <processor>
       ...
      </processor>
      <channel>
        <name>firstStream</name>
        ...
      </channel>
      <channel>
        <name>secondStream</name>
        ...
      </channel>
    </helloworld:config>
    

    この例では、構成ファイルにfirstStreamおよびsecondStreamと呼ばれる2つのチャネルが含まれます。つまり、EPNアセンブリ・ファイルには同一の識別子を持つ2つ以上のチャネル登録が含まれることになります。

    <wlevs:channel id="firstStream" ...>
      ...
    </wlevs:channel>
    <wlevs:channel id="secondStream" ...>
      ...
    </wlevs:channel>
    

    注意:

    XMLファイルの識別子と名前は大/小文字が区別されるため、EPNアセンブリ・ファイルでコンポーネントの識別子を参照するときは、必ず大/小文字を同一にします。


  5. オプションで、channel子要素を追加することによって、デフォルトのチャネル構成をオーバーライドします。

    • max-threads子要素を追加すると、Oracle CEPサーバーがこのチャネルのイベントを処理するために使用するスレッドの最大数を指定できます。

      max-sizeが0の場合は、この値の設定に影響はありません。デフォルト値は0です。

      <channel>
          <name>priceStream</name>
          <max-threads>2</size>
      </channel>
      

      0に指定する場合、チャネルは受渡しのように機能します。max-threads > 0の場合、チャネルは従来のブロッキング・キューのように機能し、アップストリーム・コンポーネントがイベントのプロデューサ、ダウンストリーム・コンポーネントがイベントのコンシューマになります。キューのサイズはmax-sizeの構成によって定義されます。キューからイベントを消費するスレッドは、最大でmax-threadsの数までとなります。

      複数のスレッドが有効な場合、EPNのダウンストリーム・コンポーネントからスローされるcom.bea.wlevs.ede.api.EventRejectedExceptionインスタンスは、チャネルがイベントを受信しているアダプタに伝播されないことに注意してください。

    • max-size子要素を追加すると、チャネルの最大サイズを指定できます。

      サイズがゼロのチャネルは、同期的にイベントを受け渡します。

      サイズがゼロでないチャネルはイベントを非同期的に処理し、リクエストされたサイズに従ってイベントをバッファリングします。デフォルト値は0です。

      <channel>
          <name>priceStream</name>
          <max-size>10000</size>
      </channel>
      
    • heartbeat子要素を追加すると、Oracle CEPが時間を進めるためにハートビート・イベントを生成する前にチャネルが待機できる時間をナノ秒数で指定できます。

      heartbeatの子要素は、プロセッサに送信を行うイベント・チャネルにイベントが到着しない場合のみ、システムがタイムスタンプを付けた関係またはストリームに適用されます。プロセッサは、時間ベースのウィンドウや期間のパターン照合などの、一時的な演算子をいくつか含む文を用いて構成されています。

      <channel>
          <name>MatchOutputChannel</name>
          <heartbeat>10000</heartbeat>
      </channel>
      
    • selector子要素を追加すると、どのアップストリームOracle CQLプロセッサの問合せがチャネルに結果を出力することを許可されるかを指定できます。

      アップストリーム・プロセッサに問合せを作成する前に、selectorchannel要素を構成することもできます。この場合、selector内の名前と一致する問合せ名を指定する必要があります。

      詳細は、9.1.5項「ダウンストリーム・チャネルに対する問合せ出力の制御: selector」を参照してください。


      注意:

      selector属性はアップストリーム・ノードがOracle CQLプロセッサの場合のみ該当します。詳細は、第10章「Oracle CQLプロセッサの構成」を参照してください。


    詳細は、D.82項「selector」を参照してください。

  6. 構成ファイルを保存して閉じます。

9.3 チャネル構成ファイルの例

図9-4は、2つのチャネルを含むEPNの一部を示します: priceStreamおよびfilteredStreampriceStreamチャネルは、PriceAdapterイベント・ソースとそのPriceEventイベントをOracle CQLプロセッサのfilterFanoutProcessorに接続するインバウンド・チャネルです。filteredStreamチャネルは、Oracle CQLプロセッサの問合せ結果(FilteredPriceEventイベント)をダウンストリーム・コンポーネント(図9-4では示されていません)に接続するアウトバウンド・チャネルです。

図9-4 2つのチャネルを持つEPN

図9-4の説明が続きます
「図9-4 2つのチャネルを持つEPN」の説明

この項では、次を含むチャネル構成ファイルの例を提供します。

9.3.1 チャネル・コンポーネント構成ファイル

例9-16は、図9-4で示されるように2つのチャネルを構成するサンプルのコンポーネント構成ファイルを示します。

例9-16 サンプルのチャネル・コンポーネント構成ファイル

<?xml version="1.0" encoding="UTF-8"?>
<n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <processor>
        <name>filterFanoutProcessor</name>
        <rules>
            <query id="Yr3Sector"><![CDATA[ 
                select cusip, bid, srcId, bidQty, ask, askQty, seq 
                from priceStream where sector="3_YEAR"
            ]]></query>
        </rules>
    </processor>
    <channel>
        <name>priceStream</name>
        <max-size>10000</max-size>
        <max-threads>4</max-threads>
    </channel>
    <channel>
        <name>filteredStream</name>
        <max-size>5000</max-size>
        <max-threads>2</max-threads>
    </channel>
</n1:config>

9.3.2 チャネルEPNアセンブリ・ファイル

例9-17は、図9-4で示されるように2つのチャネルを構成するEPNアセンブリ・ファイルを示します。

例9-17 チャネルEPNアセンブリ・ファイル

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:osgi="http://www.springframework.org/schema/osgi"
    xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
    xmlns:cqlx="http://www.oracle.com/schema/cqlx"
    xsi:schemaLocation="
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/osgi
  http://www.springframework.org/schema/osgi/spring-osgi.xsd
  http://www.bea.com/ns/wlevs/spring
  http://www.bea.com/ns/wlevs/spring/spring-wlevs-v11_1_1_6.xsd">
 
    <wlevs:event-type-repository>
        <wlevs:event-type type-name="PriceEvent">
            <wlevs:properties>
                <wlevs:property name="cusip" type="java.lang.String" />
                <wlevs:property name="bid" type="java.lang.Double" />
                <wlevs:property name="srcId" type="java.lang.String" />
                <wlevs:property name="bidQty" type="java.lang.Integer" />
                <wlevs:property name="ask" type="java.lang.Double" />
                <wlevs:property name="askQty" type="java.lang.Integer" />
                <wlevs:property name="seq" type="java.lang.Long" />
                <wlevs:property name="sector" type="java.lang.String" />
            </wlevs:properties>
        </wlevs:event-type>
        <wlevs:event-type type-name="FilteredPriceEvent">
            <wlevs:properties>
                <wlevs:property name="cusip" type="java.lang.String" />
                <wlevs:property name="bid" type="java.lang.Double" />
                <wlevs:property name="srcId" type="java.lang.String" />
                <wlevs:property name="bidQty" type="java.lang.Integer" />
                <wlevs:property name="ask" type="java.lang.Double" />
                <wlevs:property name="askQty" type="java.lang.Integer" />
                <wlevs:property name="seq" type="java.lang.Long" />
            </wlevs:properties>
        </wlevs:event-type>
        <wlevs:event-type type-name="BidAskEvent">
            <wlevs:properties>
                <wlevs:property name="cusip" type="java.lang.String" />
                <wlevs:property name="bidseq" type="java.lang.Long" />
                <wlevs:property name="bidSrcId" type="java.lang.String" />
                <wlevs:property name="bid" type="java.lang.Double" />
                <wlevs:property name="askseq" type="java.lang.Long" />
                <wlevs:property name="askSrcId" type="java.lang.String" />
                <wlevs:property name="ask" type="java.lang.Double" />
                <wlevs:property name="bidQty" type="java.lang.Integer" />
                <wlevs:property name="askQty" type="java.lang.Integer" />
                <wlevs:property name="intermediateStrategy" type="java.lang.String" />
                <wlevs:property name="correlationId" type="java.lang.Long" />
                <wlevs:property name="priority" type="java.lang.Integer" />
            </wlevs:properties>
        </wlevs:event-type>
        <wlevs:event-type type-name="FinalOrderEvent">
            <wlevs:properties>
                <wlevs:property name="cusip" type="java.lang.String" />
                <wlevs:property name="bidseq" type="java.lang.Long" />
                <wlevs:property name="bidSrcId" type="java.lang.String" />
                <wlevs:property name="bid" type="java.lang.Double" />
                <wlevs:property name="bidQty" type="java.lang.Integer" />
                <wlevs:property name="bidSourceStrategy" type="java.lang.String" />
                <wlevs:property name="askseq" type="java.lang.Long" />
                <wlevs:property name="askSrcId" type="java.lang.String" />
                <wlevs:property name="ask" type="java.lang.Double" />
                <wlevs:property name="askQty" type="java.lang.Integer" />
                <wlevs:property name="askSourceStrategy" type="java.lang.String" />
                <wlevs:property name="correlationId" type="java.lang.Long" />
            </wlevs:properties>
        </wlevs:event-type>
    </wlevs:event-type-repository>
 
    <!-- Assemble EPN (event processing network) -->
    <wlevs:adapter advertise="true" id="PriceAdapter"
        provider="csvgen">
        <wlevs:instance-property name="port" value="9008" />
        <wlevs:instance-property name="eventTypeName"
            value="PriceEvent" />
        <wlevs:instance-property name="eventPropertyNames"
            value="srcId,sector,cusip,bid,ask,bidQty,askQty,seq" />
    </wlevs:adapter>
 
    <wlevs:channel id="priceStream" event-type="PriceEvent">
        <wlevs:listener ref="filterFanoutProcessor" />
        <wlevs:source ref="PriceAdapter" />
    </wlevs:channel>
 
    <!-- By default, CQL is used for OCEP 11.0 -->
    <wlevs:processor id="filterFanoutProcessor" >
    </wlevs:processor>
 
    <wlevs:channel id="filteredStream"
        event-type="FilteredPriceEvent">
        <wlevs:listener ref="bbaProcessor" />
        <wlevs:listener ref="analyticsProcessor" />
        <wlevs:source ref="filterFanoutProcessor" />
    </wlevs:channel>
 
    <!-- Explicitly specify provider CQL -->
    <wlevs:processor id="bbaProcessor" provider="cql">
        <wlevs:listener ref="bidAskBBAStream" />
    </wlevs:processor>
 
    <wlevs:processor id="analyticsProcessor">
        <wlevs:listener ref="bidAskAnalyticsStream" />
    </wlevs:processor>
 
    <wlevs:channel id="bidAskBBAStream" event-type="BidAskEvent">
        <wlevs:listener ref="selectorProcessor" />
    </wlevs:channel>
    
    <wlevs:channel id="bidAskAnalyticsStream" event-type="BidAskEvent">
        <wlevs:listener ref="selectorProcessor" />
    </wlevs:channel>
 
    <wlevs:processor id="selectorProcessor">
        <wlevs:listener ref="citipocOut" />
    </wlevs:processor>
 
    <wlevs:channel id="citipocOut" event-type="FinalOrderEvent" advertise="true">
        <wlevs:listener>
            <!-- Create business object -->
            <bean id="outputBean"
                class="com.bea.wlevs.POC.citi.OutputBean"
                autowire="byName" />
        </wlevs:listener>
    </wlevs:channel>
</beans>