プライマリ・コンテンツに移動
Oracle® Fusion Middleware Oracle Stream Analyticsによるイベント処理用アプリケーションの開発
12c (12.2.1.2.0)
E82664-01
目次へ移動
目次

前
前へ

18 スケーラブル・アプリケーション

パーティション化と並列処理を使用して、また高可用性オプションを考慮して、アプリケーション設計にスケーラビリティを組み込むことができます。Oracle Stream Analyticsでは、チャネルとアップストリーム・アダプタでデフォルトまたはカスタムのパーティション化および並列処理設定を使用できます。着信JMSイベント・ストリームをパーティション化し、JSMSイベント・ストリームのグループ・パターン・マッチングを構成することもできます。

この章の内容は次のとおりです。

18.1 デフォルト・チャネルのスケーラビリティ設定

デフォルト・イベントのプロパティ・ベースのイベント・パーティショナを使用するようチャネルを構成できます。このデフォルト構成では、着信イベントが到着するたびに、チャネルはリスナーを選択し、各リスナーに各イベントをブロードキャストするかわりに、そのリスナーにイベントをディスパッチします。

注:

イベント・パーティショナでチャネルを構成した場合、バッチ処理はサポートされていません。

図18-1に、イベント・パーティショナのプロパティを使用してチャネルをパーティション化するEPNを示します。この例では、株式銘柄と株価の2つのプロパティを持つPriceEventタイプのイベントをインバウンド・アダプタが送信します。この例は、symbolプロパティでチャネルをパーティション化し、チャネルまたはアップストリーム・アダプタにマルチスレッド処理を追加する方法を示しています。

18.1.1 チャネルでのパーティション化の構成

イベント・ストアのカスタマイズの詳細は、『Oracle Stream Analyticsのカスタマイズ』のイベント・ストアのカスタマイズに関する項を参照してください。

  1. EPNにチャネルを追加します。

    図18-1では、チャネルはEventPartitionerChannelです。

  2. チャネルをアップストリーム・アダプタに接続します。

    図18-1では、アップストリーム・アダプタはinboundです。

  3. チャネルを複数のリスナーに接続します。

    図18-1では、チャネルはOracle CQLプロセッサprocessor1processor2およびprocessor3に接続されます。

  4. partitionByEventPropertyのインスタンス・プロパティをチャネル要素に追加するようにアセンブリ・ファイルを編集します。

    このinstance-propertyvalueはチャネルがイベントをパーティション化するイベント・プロパティの名前です。

    この例では、チャネルはイベント・プロパティsymbol別にイベントをパーティション化します。

    <wlevs:event-type-repository>
      <wlevs:event-type type-name="PriceEvent">
        <wlevs:properties>
           <wlevs:property name="symbol" type="char" />
           <wlevs:property name="price" type="long" />
        </wlevs:properties>
      </wlevs:event-type>
    </wlevs:event-type-repository>
    
    <wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent">
      <wlevs:instance-property name="partitionByEventProperty" value="symbol" />
       <wlevs:listener ref="processor1" />
       <wlevs:listener ref="processor2" />
       <wlevs:listener ref="processor3" />
       <wlevs:source ref="inbound" />
    </wlevs:channel>
    

18.1.2 チャネルでの並列処理の構成

チャネルでスレッドを割り当てる場合、max-threadsプロパティをEPNのリスナー数に設定します。

チャネルから並列性ダウンストリームを増やす場合には、チャネルでmax-threadsプロパティを設定して、スレッド・プールにチャネルを関連付けます。スレッドの最大数として最適な値は、ダウンストリーム・プロセッサにおけるOracle CQL問合せの詳細(問合せでパラレル実行を許可するかどうか)や、アプリケーションの実行中に観察される動作(CPUコアをすべて使用するかどうか)など、多くの要素に依存します。スレッドの最大数をチューニングするには、手始めとしてチャネルでのリスナー数と等しく設定することをお薦めします。

この例では、リスナー数は3つです。

<wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent" max-threads="3" >
  <wlevs:instance-property name="eventPartitioner" value="true" />
  <wlevs:listener ref="processor1" />
  <wlevs:listener ref="processor2" />
  <wlevs:listener ref="processor3" />
  <wlevs:source ref="inbound" />
</wlevs:channel>

18.1.3 アップストリーム・アダプタでの並列処理の構成

  1. EPNアセンブリ・ファイルを編集し、max-threads属性を0に設定するようチャネルを構成します。
    <wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent"
      max-threads="0" >
      <wlevs:instance-property name="eventPartitioner" value="true" />
      <wlevs:listener ref="processor1" />
      <wlevs:listener ref="processor2" />
      <wlevs:listener ref="processor3" />
      <wlevs:source ref="inbound" />
    </wlevs:channel>
    
  2. Oracle Stream Analyticsサーバー・ファイルを編集し、work-manager要素を追加します。

    ワーク・マネージャに対するmin-threads-constraintmax-threads-constraintの適切な選択は、チャネルでのスレッド数の設定に関してチャネルでの並列処理の構成で説明している要素や、ワーク・マネージャを特定のアダプタ専用にするか、それとも他のコンポーネント(他のアダプタまたはJettyサービス)と共有するかなど、多くの要素に依存します。チューニングの手始めとして、ワーク・マネージャが単一のアダプタ・インスタンス専用の場合には、min-threads-constraintおよびmax-threads-constraintプロパティを、アダプタからのダウンストリームのリスナー数と等しく設定することをお薦めします。

    このワーク・マネージャが複数のコンポーネントで共有されていない場合(つまり、この構成のアップストリーム・アダプタ専用の場合)、min-threads-constraintおよび max-threads-constraint要素をリスナー数と同じ値に設定します。

    <work-manager>
        <name>adapterWorkManager</name>
        <min-threads-constraint>3</min-threads-constraint>
        <max-threads-constraint>3</max-threads-constraint>
    </work-manager>
    

    max-threadsの詳細は、『Oracle Stream Analyticsスキーマ・リファレンス』のmax-threadsに関する項を参照してください。

  3. コンポーネント構成ファイルを編集し、このwork-managerでアップストリーム・アダプタを構成します。
    <adapter>
        <name>inbound</name>
        <work-manager-name>adapterWorkManager</work-manager-name>
        ...
    </adapter>

18.1.4 ローカル・パーティション・チャネルの定義

ローカル・パーティション化をサポートするには、Oracle CQLプロセッサを構成する必要があります。

次に示すサンプル・コードを使用して、ローカル・パーティション化を定義します。

例18-1 アセンブリ・ファイル

<wlevs:channel id="LocalPartitionChannel" event-type="StockEvent" is-local-partitioner="true" max-threads="3">
<wlevs:instance-property name="partitionByEventProperty" value="symbol" />
</wlevs:channel>

重要なチャネル・プロパティ

ローカル・パーティション・チャネルには、次の重要なプロパティがあります。

  • IS-LOCAL PARTITIONER: ローカル・パーティション化チャネルにするチャネルを定義します。

  • MAX-THREADS: 並列度を指定します

  • MAX-SIZE: バッファリングするイベントの最大数をパーティションごとに決定します。

  • PARTITIONING ATTRIBUTE: ストリームのパーティション化に使用するストリームの属性を指定します。

例18-2 構成

<processor>
	<name>StockAggregateProcessor</name>
	<rules>
		<query id="helloworldRule">
			<![CDATA[
			select count(*) as symbolCount, symbol from LocalPartitionChannel group by symbol]]>
		</query>
	</rules>
</processor>

18.2 着信JMSイベント・ストリームのパーティション化

マルチサーバー・ドメインのセレクタによって、ActiveActiveGroupBeanクラスをアセンブリ・ファイルに追加し、着信JMSイベント・ストリームをパーティション化できます。

18.2.1 高可用性のないパーティション化の構成

  1. マルチサーバー・ドメインを作成します。

    この例では、デプロイメント・グループ名は、MyDeploymentGroupです。

    Oracle Stream Analyticsの管理のマルチサーバー・ドメインに関する項を参照してください。

  2. 適切なActiveActiveGroupBean通知グループをcluster要素のgroups子要素に追加するように各Oracle Stream AnalyticsサーバーのOracle Stream Analyticsサーバー構成ファイルを構成します。

    Oracle Stream Analyticsサーバー構成ファイルは、/Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.にあります。

    表18-2は、ocep-server-1ocep-server-2ocep-server-3およびocep-server-4Oracle Stream Analyticsサーバーのcluster要素を示します。デプロイメント・グループはMyDeploymentGroupであり、通知グループはデフォルトのActiveActiveGroupBean通知グループ・ネーミングを使用して定義します。

    オプションとして、通知グループの命名規則の説明に従って、独自のグループ命名規則を指定できます。

    表18-1 サーバー構成ファイル・グループ要素構成

    パーティション cluster要素

    ocep-server-1

    <cluster>
        <server-name>ocep-server-1</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups>
    </cluster>

    ocep-server-2

    <cluster>
        <server-name>ocep-server-2</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups>
    </cluster>

    ocep-server-3

    <cluster>
        <server-name>ocep-server-3</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group3</groups>
    </cluster>

    ocep-server-4

    <cluster>
        <server-name>ocep-server-4</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group4</groups>
    </cluster>
  3. Oracle Stream Analyticsアプリケーションを作成します。

  4. 次のように、ActiveActiveGroupBean要素をアセンブリ・ファイルに追加します。

    <bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean">
    </bean>
    
  5. JMSインバウンド・アダプタのjms-adapter要素にパラメータ化されたmessage-selectorを定義します。

    1. JMSインバウンド・アダプタのjms-adapter要素にgroup-binding子要素を追加するために、コンポーネント構成ファイルを編集します。

    2. 各可能JMSメッセージ・セレクタ値に対して、次のように1つのgroup-binding要素を追加します。

    <jms-adapter>
      <name>JMSInboundAdapter</name>
      <event-type>StockTick</event-type>
      <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
      <destination-jndi-name>./Topic1</destination-jndi-name>
      <user>weblogic</user>
      <password>weblogic1</password>
      <work-manager>JettyWorkManager</work-manager>
      <concurrent-consumers>1</concurrent-consumers>
      <session-transacted>true</session-transacted>
      <message-selector>${CONDITION}</message-selector>
      <bindings>
        <group-binding group-id="ActiveActiveGroupBean_group1">
          <param id="CONDITION">acctid > 400</param>
        </group-binding>
        <group-binding group-id="ActiveActiveGroupBean_group2">
          <param id="CONDITION">acctid BETWEEN 301 AND 400</param>
        </group-binding>
        <group-binding group-id="ActiveActiveGroupBean_group3">
          <param id="CONDITION">acctid BETWEEN 201 AND 300</param>
        </group-binding>
        <group-binding group-id="ActiveActiveGroupBean_group4">
          <param id="CONDITION">acctid <= 200</param>
        </group-binding>
      </bindings>
    </jms-adapter>
    

    この構成では、ActiveActiveGroupBean_group1を含むcluster要素、groups子要素でアプリケーションをOracle Stream Analyticsサーバーにデプロイすると、CONDITIONパラメータがacctid > 400と定義され、acctidプロパティが400以上であるイベントがアプリケーションにより処理されます。

    注:

    各インバウンドJMSアダプタは異なるトピックをリスニングする必要があります。詳細は、アダプタを参照してください。

  6. マルチサーバー・ドメインのデプロイメント・グループにアプリケーションをデプロイします。

    実行時に、各Oracle Stream AnalyticsサーバーがActiveActiveGroupBean通知グループに対するmessage-selectorにアプリケーションのインスタンスを構成します。これにより、アプリケーションの各インスタンスがメッセージ総数のサブセットを並行して処理するようにJMSトピックがパーティションされます。

18.2.2 高可用性のあるパーティション化の構成

この手順では、JMSによる正確なリカバリの構成からのサンプル・アプリケーションを使用します。図18-2はEPN図を示し、例18-3および例18-4は、対応するアセンブリと構成ファイルを示します。

図18-2 JMSのEPNによる正確なリカバリ

図18-2の説明が続きます
「図18-2 JMSのEPNによる正確なリカバリ」の説明

この手順により、図18-3に示すOracle Stream Analyticsの高可用性構成が作成されます。

図18-3 高可用性のあるActiveActiveGroupBean

図18-3の説明が続きます
「図18-3 高可用性のあるActiveActiveGroupBean」の説明

高可用性のあるJMSアプリケーションでのスケーラビリティの構成

  1. マルチサーバー・ドメインを作成します。

    この例では、デプロイメント・グループの名前は、MyDeploymentGroupです。

    Oracle Stream Analyticsの管理のマルチサーバー・ドメインに関する項を参照してください。

  2. 適切なActiveActiveGroupBean通知グループをcluster要素のgroups子要素に追加するように各Oracle Stream AnalyticsサーバーのOracle Stream Analyticsサーバー構成ファイルを構成します。

    Oracle Stream Analyticsサーバー構成ファイルは、/Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.にあります。

    表18-2は、ocep-server-1ocep-server-2ocep-server-3およびocep-server-4Oracle Stream Analyticsサーバーのcluster要素を示します。デプロイメント・グループはMyDeploymentGroupであり、通知グループはデフォルトのActiveActiveGroupBean通知グループ・ネーミングを使用して定義します。

    ocep-server-1ocep-server-2は同一の通知グループ名前(ActiveActiveGroupBean_group1)を使用し、ocep-server-3ocep-server-4は同一の通知グループ名前(ActiveActiveGroupBean_group2)を使用します。

    表18-2 サーバー構成ファイル・グループ要素構成

    パーティション cluster要素

    ocep-server-1

    <cluster>
        <server-name>ocep-server-1</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups>
    </cluster>

    ocep-server-2

    <cluster>
        <server-name>ocep-server-2</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups>
    </cluster>

    ocep-server-3

    <cluster>
        <server-name>ocep-server-3</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups>
    </cluster>

    ocep-server-4

    <cluster>
        <server-name>ocep-server-4</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups>
    </cluster>
  3. Oracle Stream Analytics高可用性アプリケーションを作成します。

    詳細は、高可用性アプリケーションを参照してください。

  4. 次のように、ActiveActiveGroupBean要素をアセンブリ・ファイルに追加します。

    <bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean">
    </bean>
    
  5. コンポーネント構成ファイルを編集し、次に示すように、インバウンドJMSアダプタ用のjms-adapter要素を構成します。

    異なるトピックをリスニングするには、インバウンドJMSアダプタをそれぞれ設定し、session-transactedtrueに設定する必要があります。

    <?xml version="1.0" encoding="UTF-8"?>
    <wlevs:config 
            xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application"
            xmlns:ha="http://www.oracle.com/ns/cep/config/cluster">
        ...
        <jms-adapter>
            <name>JMSInboundAdapter</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic1</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
        <jms-adapter>
            <name>JMSInboundAdapter2</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic2</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
    </wlevs:config>
    
  6. 各JMSインバウンド・アダプタのjms-adapter要素にパラメータ化されたmessage-selectorを定義します。

    1. JMSインバウンド・アダプタのjms-adapter要素にgroup-binding子要素を追加するために、コンポーネント構成ファイルを編集します。

    2. 各可能JMSmessage-selector値に対して、次のように1つのgroup-binding要素を追加します。

    <jms-adapter>
        <name>JMSInboundAdapter</name>
        <event-type>StockTick</event-type>
        <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
        <destination-jndi-name>./Topic1</destination-jndi-name>
        <session-transacted>true</session-transacted>
        <message-selector>${CONDITION}</message-selector>
        <bindings>
            <group-binding group-id="ActiveActiveGroupBean_group1">
                <param id="CONDITION">acctid <= 1000</param>
            </group-binding>
            <group-binding group-id="ActiveActiveGroupBean_group2">
                <param id="CONDITION">acctid > 1000</param>
            </group-binding>
         </bindings>
    </jms-adapter>
    

    この構成では、ActiveActiveGroupBean_group1を含むcluster要素、groups子要素を持つアプリケーションをOracle Stream Analyticsサーバーにデプロイすると、CONDITIONパラメータがacctid <= 1000と定義され、acctidプロパティが1000以下であるイベントがアプリケーションにより処理されます。同様に、ActiveActiveGroupBean_group2を含むcluster要素、groups子要素を持つアプリケーションをOracle Stream Analyticsサーバーにデプロイすると、CONDITIONパラメータがacctid > 1000と定義され、acctidプロパティが1000以上であるイベントがアプリケーションにより処理されます。

  7. コンポーネント構成ファイルを編集し、次に示すように、アウトバウンドJMSアダプタ用のjms-adapter要素を構成します。

    対応するインバウンド・アダプタ(この例では、JMSInboundAdapter2: ./Topic2)と同一トピックでアウトバウンドJMSアダプタを構成し、session-transactedtrueに設定します。

    <?xml version="1.0" encoding="UTF-8"?>
    <wlevs:config 
            xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application"
            xmlns:ha="http://www.oracle.com/ns/cep/config/cluster">
        ...
        <jms-adapter>
            <name>JMSInboundAdapter</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic1</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
        <jms-adapter>
            <name>JMSInboundAdapter2</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic2</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
        <jms-adapter>
            <name>JMSOutboundAdapter</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic2</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
    </wlevs:config>
    
  8. マルチサーバー・ドメインのデプロイメント・グループにアプリケーションをデプロイします。

    実行時に、各Oracle Stream AnalyticsサーバーがActiveActiveGroupBean通知グループに対するmessage-selectorにアプリケーションのインスタンスを構成します。これにより、アプリケーションの各インスタンスがメッセージ総数のサブセットを並行して処理するようにJMSトピックがパーティションされます。

    ActiveActiveGroupBeanグループの有効なOracle Stream Analyticsサーバーが停止すると、そのActiveActiveGroupBeanグループのスタンバイOracle Stream Analyticsサーバーに対してOracle Stream AnalyticsサーバーがOracle Stream Analytics高可用性フェイルオーバーを実行します。

例18-3 JMS EPNアセンブリ・ファイルを使用して精度の高いリカバリ

<?xml version="1.0" encoding="UTF-8"?>
<beans ... >

    <wlevs:event-type-repository>
        <wlevs:event-type type-name="StockTick">
            <wlevs:properties>
                <wlevs:property name="lastPrice" type="double" />
                <wlevs:property name="symbol" type="char" />
            </wlevs:properties>
        </wlevs:event-type>
    </wlevs:event-type-repository>

    <wlevs:adapter id="JMSInboundAdapter" provider="jms-inbound">
        <wlevs:listener ref="myHaInputAdapter"/>
    </wlevs:adapter>

    <wlevs:adapter id="myHaInputAdapter" provider="ha-inbound" >
        <wlevs:instance-property name="keyProperties" value="sequenceNo"/>
        <wlevs:instance-property name="timeProperty" value="inboundTime"/>
    </wlevs:adapter>

    <wlevs:channel id="channel1" event-type="StockTick">
        <wlevs:listener ref="processor1" />
        <wlevs:source ref="myHaInputAdapter"/>
        <wlevs:application-timestamped>
            <wlevs:expression>inboundTime</wlevs:expression>
        </wlevs:application-timestamped>
    </wlevs:channel>

    <wlevs:processor id="processor1">
        <wlevs:listener ref="channel2" />
    </wlevs:processor>

    <wlevs:channel id="channel2" event-type="StockTick">
        <wlevs:listener ref="myHaCorrelatingAdapter" />
    </wlevs:channel>

    <wlevs:adapter id="myHaCorrelatingAdapter" provider="ha-correlating" >
        <wlevs:instance-property name="correlatedSource" ref="clusterCorrelatingOutstream"/> 
        <wlevs:instance-property name="failOverDelay" value="2000"/> 
        <wlevs:listener ref="JMSOutboundAdapter"/>
    </wlevs:adapter>

    <wlevs:adapter id="JMSOutboundAdapter" provider="jms-outbound">
    </wlevs:adapter>

    <wlevs:adapter id="JMSInboundAdapter2" provider="jms-inbound">
    </wlevs:adapter>

    <wlevs:channel id="clusterCorrelatingOutstream" event-type="StockTick" advertise="true">
        <wlevs:source ref="JMSInboundAdapter2"/>
    </wlevs:channel> 
</beans>

例18-4 JMSコンポーネント構成アセンブリ・ファイルを使用して精度の高いリカバリ

<?xml version="1.0" encoding="UTF-8"?>
<wlevs:config 
        xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application"
        xmlns:ha="http://www.oracle.com/ns/cep/config/cluster">
    <processor>
        <name>processor1</name>
        <rules>
            <query id="helloworldRule">
                <![CDATA[ select * from channel1 [Now] >
            </query>
        </rules>
    </processor>
    <jms-adapter>
        <name>JMSInboundAdapter</name>
        <event-type>StockTick</event-type>
        <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
        <destination-jndi-name>./Topic1</destination-jndi-name>
        <session-transacted>true</session-transacted>
    ...
    </jms-adapter>
    <jms-adapter>
        <name>JMSInboundAdapter2</name>
        <event-type>StockTick</event-type>
        <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
        <destination-jndi-name>./Topic2</destination-jndi-name>
        <session-transacted>true</session-transacted>
    ...
    </jms-adapter>
    <jms-adapter>
        <name>JMSOutboundAdapter</name>
        <event-type>StockTick</event-type>
        <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
        <destination-jndi-name>./Topic2</destination-jndi-name>
        <session-transacted>true</session-transacted>
    ...
    </jms-adapter>
</wlevs:config>

18.3 通知グループの命名規則

デフォルトでは、ActiveActiveGroupBeanクラスは次の名前で通知グループを作成します。Xは文字列です。

ActiveActiveGroupBean_X

実行時に、ActiveActiveGroupBeanは、Oracle Event Processingサーバーに定義された既存のグループをスキャンし、次のデフォルトのパターン一致を適用します。ActiveActiveGroupBeanで一致が見つかったら、その名前の通知グループを作成します。

ActiveActiveGroupBean_\\w+

オプションで、別の通知グループ・ネーミング・パターンを指定するために独自のグループ・パターンを定義できます。

  1. アセンブリ・ファイルを構成し、次に示すように、ActiveActiveGroupBean要素にgroupPattern属性を追加します。

    <bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean">
        <property name="groupPattern" value="MyNotificationGroupPattern*"/>
    </bean>
  2. 通知グループに対し使用するクラスタ・グループ・ネーミング・ルールに一致するgroupPattern属性の値を指定します。

18.4 カスタム・チャネル・イベント・パーティショナ

ほとんどのチャネルはデフォルト・イベントのパーティション化を使用します。パーティショナが指定されていなく、partitionByEventProperty要素が存在しない場合、チャネルはすべてのリスナーにイベントを送信します。partitionByEventProperty要素では、デフォルトのパーティション化アルゴリズムを使用して、指定したイベントをパーティション化することによって、一定レベルのカスタマイズを指定します。この項では、デフォルトのパーティション化アルゴリズムに対する制御を微調整するカスタム・パーティショナをプログラム的に構成することによって、チャネル・リスナーへのイベントのディスパッチをさらにカスタマイズする方法について説明します。たとえば、プロパティ範囲に基づいてイベント・パーティショナを作成できます。

18.4.1 EventPartitionerインタフェース

チャネルにまたがってイベントをパーティション化し、チャネル・リスナーへのイベントのディスパッチをカスタマイズするには、com.bea.wlevs.channel.EventPartitionerインタフェースを使用します。

注:

カスタム・パーティション化と並列処理を実装する際には、イベントの順序を維持し、マルチスレッドを慎重に管理するコードを必ず追加してください。

図18-4に、イベント・パーティショナを使用してチャネルをパーティション化するEPNを示します。この例では、株式銘柄と株価の2つのプロパティを持つPriceEventタイプのイベントをインバウンド・アダプタが送信します。この例は、symbolプロパティでチャネルをパーティション化し、チャネルまたはアップストリーム・アダプタにマルチスレッド処理を追加する方法を示しています。

図18-4 イベント・パーティショナEPN

図18-4の説明が続きます
「図18-4 イベント・パーティショナEPN」の説明

18.4.2 EventPartitionerインタフェースの実装

  1. Oracle JDeveloperで、Oracle Stream Analyticsアプリケーションを開きます。
  2. MANIFEST.MFファイルを編集し、パッケージcom.bea.wlevs.channelをインポートします。
  3. プロジェクトを選択し、「ファイル」→「新規」→「ギャラリから」を選択します。

    「新規ギャラリ」ダイアログが表示されます。

  4. 「新規ギャラリ」ダイアログで、左パネルから「一般」を、右パネルから「Javaクラス」を選択して、「OK」をクリックします。

    「Javaクラスの作成」ダイアログが表示されます。

  5. 「Javaクラスの作成」ダイアログで、クラス名、パッケージ名および拡張情報を指定します。
  6. 「オプション属性」「実装」の下で、「追加」(+)ボタンを使用してcom.bea.wlevs.channel.EventPartitionerインタフェースを指定します。
  7. 「OK」をクリックします。

    新しいEventPartitionerクラスが作成されます。

  8. 次のように、EventPartitionerの実装を完了します。
    package com.acme;
    
    import com.bea.wlevs.channel.EventPartitioner;
    import com.bea.wlevs.ede.api.EventProcessingException;
    import com.bea.wlevs.ede.api.EventType;
    
    public class MyEventPartitioner implements EventPartitioner {
    
      private final EventType eventType;
      private int numberOfPartitions;
    
      @Override
      public void activateConfiguration(int numberOfPartitions, EventType eventType) {
         this.numberOfPartitions = numberOfPartitions;
         this.eventType = eventType;
      }
    
      @Override
      public int partition(Object event) throws EventProcessingException {
        int dispatchToListener = 0;
        ... // Your implementation.
        return dispatchToListener;
      }
    }
    

    activateConfigurationメソッドは、ActivatableBean.afterConfigurationActiveの前、およびEventPartitionerクラスのpartitionメソッドの起動前にOracle Stream Analyticsサーバーが起動するコールバックです。

    このEventPartitionerをチャネルと関連付ける場合、チャネルがイベントを受信するたびに、EventPartitionerクラスのpartitionメソッドが起動されます。

    partitionメソッドは、チャネルによるイベントのディスパッチ先であるリスナーの索引を返す必要があります。索引は0からnumberOfPartitions - 1の間のintである必要があります。

  9. EPNにチャネルを追加します。

    図18-4では、チャネルはEventPartitionerChannelです。

  10. チャネルをアップストリーム・アダプタに接続します。

    図18-4では、アップストリーム・アダプタはinboundです。

  11. チャネルを複数のリスナーに接続します。

    図18-4では、チャネルはOracle CQLプロセッサprocessor1processor2およびprocessor3に接続されます。

    チャネルでロード・バランシングを実行する場合、各リスナーは同じである必要があります。

  12. eventPartitionerのインスタンス・プロパティをチャネル要素に追加するようにEPNアセンブリ・ファイルを編集します。

    このinstance-propertyvalueはチャネルがイベントをパーティション化するためのEventPartitionerインスタンスの完全修飾クラス名です。このクラスは、Oracle Stream Analyticsアプリケーション・クラス・パスに存在する必要があります。

    この例では、チャネルはEventPartitionerインスタンスcom.acme.MyEventPartitionerを使用してイベントをパーティション化します。

    <wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent"   max-threads="0" >
      <wlevs:instance-property name="eventPartitioner"     value="com.acme.MyEventPartitioner" />
      <wlevs:listener ref="filterFanoutProcessor1" />
      <wlevs:listener ref="filterFanoutProcessor2" />
      <wlevs:listener ref="filterFanoutProcessor3" />
      <wlevs:source ref="PriceAdapter" />
    </wlevs:channel>