パーティション化と並列処理を使用して、また高可用性オプションを考慮して、アプリケーション設計にスケーラビリティを組み込むことができます。Oracle Event Processingでは、チャネルとアップストリーム・アダプタでデフォルトまたはカスタムのパーティション化および並列処理設定を使用できます。着信JMSイベント・ストリームをパーティション化し、JSMSイベント・ストリームのグループ・パターン・マッチングを構成することもできます。
この章の内容は次のとおりです。
デフォルト・イベントのプロパティ・ベースのイベント・パーティショナを使用するようチャネルを構成できます。このデフォルト構成では、着信イベントが到着するたびに、チャネルはリスナーを選択し、各リスナーに各イベントをブロードキャストするかわりに、そのリスナーにイベントをディスパッチします。
注:
イベント・パーティショナでチャネルを構成した場合、バッチ処理はサポートされていません。
図18-1に、イベント・パーティショナのプロパティを使用してチャネルをパーティション化するEPNを示します。この例では、株式銘柄と株価の2つのプロパティを持つPriceEvent
タイプのイベントをインバウンド・アダプタが送信します。この例は、symbol
プロパティでチャネルをパーティション化し、チャネルまたはアップストリーム・アダプタにマルチスレッド処理を追加する方法を示しています。
イベント・ストアのカスタマイズの詳細は、 『Oracle Event Processingのカスタマイズ』のイベント・ストアのカスタマイズに関する項を参照してください。
チャネルでスレッドを割り当てる場合、max-threadsプロパティをEPNのリスナー数に設定します。
チャネルから並列性ダウンストリームを増やす場合には、チャネルでmax-threads
プロパティを設定して、スレッド・プールにチャネルを関連付けます。スレッドの最大数として最適な値は、ダウンストリーム・プロセッサにおけるOracle CQL問合せの詳細(問合せでパラレル実行を許可するかどうか)や、アプリケーションの実行中に観察される動作(CPUコアをすべて使用するかどうか)など、多くの要素に依存します。スレッドの最大数をチューニングするには、手始めとしてチャネルでのリスナー数と等しく設定することをお薦めします。
この例では、リスナー数は3つです。
<wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent" max-threads="3" >
<wlevs:instance-property name="eventPartitioner" value="true" />
<wlevs:listener ref="processor1" />
<wlevs:listener ref="processor2" />
<wlevs:listener ref="processor3" />
<wlevs:source ref="inbound" />
</wlevs:channel>
マルチサーバー・ドメインのセレクタによって、ActiveActiveGroupBean
クラスをアセンブリ・ファイルに追加し、着信JMSイベント・ストリームをパーティション化できます。
マルチサーバー・ドメインを作成します。
この例では、デプロイメント・グループ名は、MyDeploymentGroup
です。
『Oracle Event Processingの管理』のマルチサーバー・ドメインに関する項を参照してください。
適切なActiveActiveGroupBean
通知グループをcluster
要素のgroups
子要素に追加するように各Oracle Event ProcessingサーバーのOracle Event Processingサーバー構成ファイルを構成します。
Oracle Event Processingサーバー構成ファイルは、/Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config
にあります。
表18-2では、ocep-server-1
、ocep-server-2
、ocep-server-3
およびocep-server-4
のOracle Event Processingサーバーのcluster
要素を示します。デプロイメント・グループはMyDeploymentGroup
であり、通知グループはデフォルトのActiveActiveGroupBean
通知グループ・ネーミングを使用して定義します。
オプションとして、通知グループの命名規則の説明に従って、独自のグループ命名規則を指定できます。
表18-1 サーバー構成ファイル・グループ要素構成
|
Oracle Event Processingアプリケーションを作成します。
次のように、ActiveActiveGroupBean
要素をアセンブリ・ファイルに追加します。
<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> </bean>
JMSインバウンド・アダプタのjms-adapter
要素にパラメータ化されたmessage-selector
を定義します。
JMSインバウンド・アダプタのjms-adapter
要素にgroup-binding
子要素を追加するために、コンポーネント構成ファイルを編集します。
各可能JMSメッセージ・セレクタ値に対して、次のように1つのgroup-binding
要素を追加します。
<jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <user>weblogic</user> <password>weblogic1</password> <work-manager>JettyWorkManager</work-manager> <concurrent-consumers>1</concurrent-consumers> <session-transacted>true</session-transacted> <message-selector>${CONDITION}</message-selector> <bindings> <group-binding group-id="ActiveActiveGroupBean_group1"> <param id="CONDITION">acctid > 400</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group2"> <param id="CONDITION">acctid BETWEEN 301 AND 400</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group3"> <param id="CONDITION">acctid BETWEEN 201 AND 300</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group4"> <param id="CONDITION">acctid <= 200</param> </group-binding> </bindings> </jms-adapter>
この構成では、ActiveActiveGroupBean_group1
を含むcluster
要素、groups
子要素でアプリケーションをOracle Event Processingサーバーにデプロイすると、CONDITION
パラメータがacctid > 400
と定義され、acctid
プロパティが400以上であるイベントがアプリケーションにより処理されます。
注:
各インバウンドJMSアダプタは異なるトピックをリスニングする必要があります。詳細は、アダプタを参照してください。
マルチサーバー・ドメインのデプロイメント・グループにアプリケーションをデプロイします。
実行時に、各Oracle Event ProcessingサーバーがActiveActiveGroupBean
通知グループに対するmessage-selector
にアプリケーションのインスタンスを構成します。これにより、アプリケーションの各インスタンスがメッセージ総数のサブセットを並行して処理するようにJMSトピックがパーティションされます。
この手順では、JMSによる正確なリカバリの構成からのサンプル・アプリケーションを使用します。図18-2ではEPN図を示し、例18-1と例18-2では対応するアセンブリおよび構成ファイルを示します。
この手順では、図18-3に示したOracle Event Processing高可用性構成を作成します。
高可用性のあるJMSアプリケーションでのスケーラビリティの構成
マルチサーバー・ドメインを作成します。
この例では、デプロイメント・グループの名前は、MyDeploymentGroup
です。
『Oracle Event Processingの管理』のマルチサーバー・ドメインに関する項を参照してください。
適切なActiveActiveGroupBean
通知グループをcluster
要素のgroups
子要素に追加するように各Oracle Event ProcessingサーバーのOracle Event Processingサーバー構成ファイルを構成します。
Oracle Event Processingサーバー構成ファイルは、/Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config
にあります。
表18-2では、ocep-server-1
、ocep-server-2
、ocep-server-3
およびocep-server-4
のOracle Event Processingサーバーのcluster
要素を示します。デプロイメント・グループはMyDeploymentGroup
であり、通知グループはデフォルトのActiveActiveGroupBean
通知グループ・ネーミングを使用して定義します。
ocep-server-1
とocep-server-2
は同一の通知グループ名前(ActiveActiveGroupBean_group1
)を使用し、ocep-server-3
とocep-server-4
は同一の通知グループ名前(ActiveActiveGroupBean_group2
)を使用します。
表18-2 サーバー構成ファイル・グループ要素構成
|
Oracle Event Processing高可用性アプリケーションを作成します。
詳細は、「高可用性アプリケーション」を参照してください。
次のように、ActiveActiveGroupBean
要素をアセンブリ・ファイルに追加します。
<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> </bean>
コンポーネント構成ファイルを編集し、次に示すように、インバウンドJMSアダプタ用のjms-adapter
要素を構成します。
異なるトピックをリスニングするには、インバウンドJMSアダプタをそれぞれ設定し、session-transacted
をtrue
に設定する必要があります。
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> ... <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
各JMSインバウンド・アダプタのjms-adapter
要素にパラメータ化されたmessage-selector
を定義します。
JMSインバウンド・アダプタのjms-adapter
要素にgroup-binding
子要素を追加するために、コンポーネント構成ファイルを編集します。
各可能JMSmessage-selector
値に対して、次のように1つのgroup-binding
要素を追加します。
<jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> <message-selector>${CONDITION}</message-selector> <bindings> <group-binding group-id="ActiveActiveGroupBean_group1"> <param id="CONDITION">acctid <= 1000</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group2"> <param id="CONDITION">acctid > 1000</param> </group-binding> </bindings> </jms-adapter>
この構成では、ActiveActiveGroupBean_group1
を含むcluster
要素、groups
子要素を持つアプリケーションをOracle Event Processingサーバーにデプロイすると、CONDITION
パラメータがacctid <= 1000
と定義され、acctid
プロパティが1000以下であるイベントがアプリケーションにより処理されます。同様に、ActiveActiveGroupBean_group2
を含むcluster
要素、groups
子要素を持つアプリケーションをOracle Event Processingサーバーにデプロイすると、CONDITION
パラメータがacctid > 1000
と定義され、acctid
プロパティが1000以上であるイベントがアプリケーションにより処理されます。
コンポーネント構成ファイルを編集し、次に示すように、アウトバウンドJMSアダプタ用のjms-adapter
要素を構成します。
対応するインバウンド・アダプタ(この例では、JMSInboundAdapter2
: ./Topic2
)と同一トピックでアウトバウンドJMSアダプタを構成し、session-transacted
をtrue
に設定します。
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> ... <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSOutboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
マルチサーバー・ドメインのデプロイメント・グループにアプリケーションをデプロイします。
実行時に、各Oracle Event ProcessingサーバーがActiveActiveGroupBean
通知グループに対するmessage-selector
にアプリケーションのインスタンスを構成します。これにより、アプリケーションの各インスタンスがメッセージ総数のサブセットを並行して処理するようにJMSトピックがパーティションされます。
ActiveActiveGroupBean
グループの有効なOracle Event Processingサーバーが停止すると、そのActiveActiveGroupBean
グループのスタンバイOracle Event Processingサーバーに対してOracle Event ProcessingサーバーがOracle Event Processing高可用性フェイルオーバーを実行します。
例18-1 JMS EPNアセンブリ・ファイルを使用して精度の高いリカバリ
<?xml version="1.0" encoding="UTF-8"?> <beans ... > <wlevs:event-type-repository> <wlevs:event-type type-name="StockTick"> <wlevs:properties> <wlevs:property name="lastPrice" type="double" /> <wlevs:property name="symbol" type="char" /> </wlevs:properties> </wlevs:event-type> </wlevs:event-type-repository> <wlevs:adapter id="JMSInboundAdapter" provider="jms-inbound"> <wlevs:listener ref="myHaInputAdapter"/> </wlevs:adapter> <wlevs:adapter id="myHaInputAdapter" provider="ha-inbound" > <wlevs:instance-property name="keyProperties" value="sequenceNo"/> <wlevs:instance-property name="timeProperty" value="inboundTime"/> </wlevs:adapter> <wlevs:channel id="channel1" event-type="StockTick"> <wlevs:listener ref="processor1" /> <wlevs:source ref="myHaInputAdapter"/> <wlevs:application-timestamped> <wlevs:expression>inboundTime</wlevs:expression> </wlevs:application-timestamped> </wlevs:channel> <wlevs:processor id="processor1"> <wlevs:listener ref="channel2" /> </wlevs:processor> <wlevs:channel id="channel2" event-type="StockTick"> <wlevs:listener ref="myHaCorrelatingAdapter" /> </wlevs:channel> <wlevs:adapter id="myHaCorrelatingAdapter" provider="ha-correlating" > <wlevs:instance-property name="correlatedSource" ref="clusterCorrelatingOutstream"/> <wlevs:instance-property name="failOverDelay" value="2000"/> <wlevs:listener ref="JMSOutboundAdapter"/> </wlevs:adapter> <wlevs:adapter id="JMSOutboundAdapter" provider="jms-outbound"> </wlevs:adapter> <wlevs:adapter id="JMSInboundAdapter2" provider="jms-inbound"> </wlevs:adapter> <wlevs:channel id="clusterCorrelatingOutstream" event-type="StockTick" advertise="true"> <wlevs:source ref="JMSInboundAdapter2"/> </wlevs:channel> </beans>
例18-2 JMSコンポーネント構成アセンブリ・ファイルを使用して精度の高いリカバリ
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> <processor> <name>processor1</name> <rules> <query id="helloworldRule"> <![CDATA[ select * from channel1 [Now] > </query> </rules> </processor> <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSOutboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
デフォルトでは、ActiveActiveGroupBean
クラスは次の名前で通知グループを作成します。Xは文字列です。
ActiveActiveGroupBean_X
実行時に、ActiveActiveGroupBean
は、Oracle Event Processingサーバーに定義された既存のグループをスキャンし、次のデフォルトのパターン一致を適用します。ActiveActiveGroupBeanで一致が見つかったら、その名前の通知グループを作成します。
ActiveActiveGroupBean_\\w+
オプションで、別の通知グループ・ネーミング・パターンを指定するために独自のグループ・パターンを定義できます。
アセンブリ・ファイルを構成し、次に示すように、ActiveActiveGroupBean
要素にgroupPattern
属性を追加します。
<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> <property name="groupPattern" value="MyNotificationGroupPattern*"/> </bean>
通知グループに対し使用するクラスタ・グループ・ネーミング・ルールに一致するgroupPattern
属性の値を指定します。
ほとんどのチャネルでは、デフォルトのイベントのパーティション化を使用します。パーティショナが指定されず、partitionByEventProperty
要素が存在しない場合、チャネルはすべてのリスナーにイベントを送ります。partitionByEventProperty
要素では、デフォルトのパーティション化アルゴリズムを使用して、指定したイベントをパーティション化することによって、一定レベルのカスタマイズを指定します。この項では、デフォルトのパーティション化アルゴリズムに対する制御を微調整するカスタム・パーティショナをプログラム的に構成することによって、チャネル・リスナーへのイベントのディスパッチをさらにカスタマイズする方法について説明します。たとえば、プロパティ範囲に基づいてイベント・パーティショナを作成できます。
チャネルにまたがってイベントをパーティション化し、チャネル・リスナーへのイベントのディスパッチをカスタマイズするには、com.bea.wlevs.channel.EventPartitionerインタフェースを使用します。
注:
カスタム・パーティション化と並列処理を実装する際には、イベントの順序を維持し、マルチスレッドを慎重に管理するコードを必ず追加してください。
図18-4に、イベント・パーティショナを使用してチャネルをパーティション化するEPNを示します。この例では、株式銘柄と株価の2つのプロパティを持つPriceEvent
タイプのイベントをインバウンド・アダプタが送信します。この例は、symbol
プロパティでチャネルをパーティション化し、チャネルまたはアップストリーム・アダプタにマルチスレッド処理を追加する方法を示しています。