18 スケーラブル・アプリケーション
パーティション化と並列処理を使用して、また高可用性オプションを考慮して、アプリケーション設計にスケーラビリティを組み込むことができます。Oracle Stream Analyticsでは、チャネルとアップストリーム・アダプタでデフォルトまたはカスタムのパーティション化および並列処理設定を使用できます。着信JMSイベント・ストリームをパーティション化し、JSMSイベント・ストリームのグループ・パターン・マッチングを構成することもできます。
この章の内容は次のとおりです。
18.1 デフォルト・チャネルのスケーラビリティ設定
デフォルト・イベントのプロパティ・ベースのイベント・パーティショナを使用するようチャネルを構成できます。このデフォルト構成では、着信イベントが到着するたびに、チャネルはリスナーを選択し、各リスナーに各イベントをブロードキャストするかわりに、そのリスナーにイベントをディスパッチします。
注意:
イベント・パーティショナでチャネルを構成した場合、バッチ処理はサポートされていません。
図18-1に、イベント・パーティショナのプロパティを使用してチャネルをパーティション化するEPNを示します。この例では、株式銘柄と株価の2つのプロパティを持つPriceEventタイプのイベントをインバウンド・アダプタが送信します。この例は、symbolプロパティでチャネルをパーティション化し、チャネルまたはアップストリーム・アダプタにマルチスレッド処理を追加する方法を示しています。
18.1.2 チャネルでの並列処理の構成
チャネルでスレッドを割り当てる場合、max-threadsプロパティをEPNのリスナー数に設定します。
チャネルから並列性ダウンストリームを増やす場合には、チャネルでmax-threadsプロパティを設定して、スレッド・プールにチャネルを関連付けます。スレッドの最大数として最適な値は、ダウンストリーム・プロセッサにおけるOracle CQL問合せの詳細(問合せでパラレル実行を許可するかどうか)や、アプリケーションの実行中に観察される動作(CPUコアをすべて使用するかどうか)など、多くの要素に依存します。スレッドの最大数をチューニングするには、手始めとしてチャネルでのリスナー数と等しく設定することをお薦めします。
この例では、リスナー数は3つです。
<wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent" max-threads="3" >
<wlevs:instance-property name="eventPartitioner" value="true" />
<wlevs:listener ref="processor1" />
<wlevs:listener ref="processor2" />
<wlevs:listener ref="processor3" />
<wlevs:source ref="inbound" />
</wlevs:channel>18.1.4 ローカル・パーティション・チャネルの定義
ローカル・パーティション化をサポートするには、Oracle CQLプロセッサを構成する必要があります。
次に示すサンプル・コードを使用して、ローカル・パーティション化を定義します。
例18-1 アセンブリ・ファイル
<wlevs:channel id="LocalPartitionChannel" event-type="StockEvent" is-local-partitioner="true" max-threads="3">
<wlevs:instance-property name="partitionByEventProperty" value="symbol" />
</wlevs:channel>
重要なチャネル・プロパティ
ローカル・パーティション・チャネルには、次の重要なプロパティがあります。
-
IS-LOCAL PARTITIONER: ローカル・パーティション化チャネルにするチャネルを定義します。 -
MAX-THREADS: 並列度を指定します -
MAX-SIZE: バッファリングするイベントの最大数をパーティションごとに決定します。 -
PARTITIONING ATTRIBUTE: ストリームのパーティション化に使用するストリームの属性を指定します。
例18-2 構成
<processor>
<name>StockAggregateProcessor</name>
<rules>
<query id="helloworldRule">
<![CDATA[
select count(*) as symbolCount, symbol from LocalPartitionChannel group by symbol]]>
</query>
</rules>
</processor>18.2 着信JMSイベント・ストリームのパーティション化
マルチサーバー・ドメインのセレクタによって、ActiveActiveGroupBeanクラスをアセンブリ・ファイルに追加し、着信JMSイベント・ストリームをパーティション化できます。
18.2.1 高可用性のないパーティション化の構成
-
マルチサーバー・ドメインを作成します。
この例では、デプロイメント・グループ名は、
MyDeploymentGroupです。 -
適切な
ActiveActiveGroupBean通知グループをcluster要素のgroups子要素に追加するように各Oracle Stream AnalyticsサーバーのOracle Stream Analyticsサーバー構成ファイルを構成します。Oracle Stream Analyticsサーバー構成ファイルは、
/Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.にあります。表18-2は、
ocep-server-1、ocep-server-2、ocep-server-3およびocep-server-4のOracle Stream Analyticsサーバーのcluster要素を示します。デプロイメント・グループはMyDeploymentGroupであり、通知グループはデフォルトのActiveActiveGroupBean通知グループ・ネーミングを使用して定義します。オプションとして、通知グループの命名規則の説明に従って、独自のグループ命名規則を指定できます。
表18-1 サーバー構成ファイル・グループ要素構成
パーティション cluster要素 ocep-server-1<cluster> <server-name>ocep-server-1</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups> </cluster>
ocep-server-2<cluster> <server-name>ocep-server-2</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups> </cluster>
ocep-server-3<cluster> <server-name>ocep-server-3</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group3</groups> </cluster>
ocep-server-4<cluster> <server-name>ocep-server-4</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group4</groups> </cluster>
-
Oracle Stream Analyticsアプリケーションを作成します。
-
次のように、
ActiveActiveGroupBean要素をアセンブリ・ファイルに追加します。<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> </bean>
-
JMSインバウンド・アダプタの
jms-adapter要素にパラメータ化されたmessage-selectorを定義します。-
JMSインバウンド・アダプタの
jms-adapter要素にgroup-binding子要素を追加するために、コンポーネント構成ファイルを編集します。 -
各可能JMSメッセージ・セレクタ値に対して、次のように1つの
group-binding要素を追加します。
<jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <user>weblogic</user> <password>weblogic1</password> <work-manager>JettyWorkManager</work-manager> <concurrent-consumers>1</concurrent-consumers> <session-transacted>true</session-transacted> <message-selector>${CONDITION}</message-selector> <bindings> <group-binding group-id="ActiveActiveGroupBean_group1"> <param id="CONDITION">acctid > 400</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group2"> <param id="CONDITION">acctid BETWEEN 301 AND 400</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group3"> <param id="CONDITION">acctid BETWEEN 201 AND 300</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group4"> <param id="CONDITION">acctid <= 200</param> </group-binding> </bindings> </jms-adapter>
この構成では、
ActiveActiveGroupBean_group1を含むcluster要素、groups子要素でアプリケーションをOracle Stream Analyticsサーバーにデプロイすると、CONDITIONパラメータがacctid > 400と定義され、acctidプロパティが400以上であるイベントがアプリケーションにより処理されます。注意:
各インバウンドJMSアダプタは異なるトピックをリスニングする必要があります。詳細は、アダプタを参照してください。
-
-
マルチサーバー・ドメインのデプロイメント・グループにアプリケーションをデプロイします。
実行時に、各Oracle Stream Analyticsサーバーが
ActiveActiveGroupBean通知グループに対するmessage-selectorにアプリケーションのインスタンスを構成します。これにより、アプリケーションの各インスタンスがメッセージ総数のサブセットを並行して処理するようにJMSトピックがパーティションされます。
18.2.2 高可用性のあるパーティション化の構成
この手順では、JMSによる正確なリカバリの構成からのサンプル・アプリケーションを使用します。図18-2はEPN図を示し、例18-3および例18-4は、対応するアセンブリと構成ファイルを示します。
この手順により、図18-3に示すOracle Stream Analyticsの高可用性構成が作成されます。
高可用性のあるJMSアプリケーションでのスケーラビリティの構成
-
マルチサーバー・ドメインを作成します。
この例では、デプロイメント・グループの名前は、
MyDeploymentGroupです。 -
適切な
ActiveActiveGroupBean通知グループをcluster要素のgroups子要素に追加するように各Oracle Stream AnalyticsサーバーのOracle Stream Analyticsサーバー構成ファイルを構成します。Oracle Stream Analyticsサーバー構成ファイルは、
/Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.にあります。表18-2は、
ocep-server-1、ocep-server-2、ocep-server-3およびocep-server-4のOracle Stream Analyticsサーバーのcluster要素を示します。デプロイメント・グループはMyDeploymentGroupであり、通知グループはデフォルトのActiveActiveGroupBean通知グループ・ネーミングを使用して定義します。ocep-server-1とocep-server-2は同一の通知グループ名前(ActiveActiveGroupBean_group1)を使用し、ocep-server-3とocep-server-4は同一の通知グループ名前(ActiveActiveGroupBean_group2)を使用します。表18-2 サーバー構成ファイル・グループ要素構成
パーティション cluster要素 ocep-server-1<cluster> <server-name>ocep-server-1</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups> </cluster>
ocep-server-2<cluster> <server-name>ocep-server-2</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups> </cluster>
ocep-server-3<cluster> <server-name>ocep-server-3</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups> </cluster>
ocep-server-4<cluster> <server-name>ocep-server-4</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups> </cluster>
-
Oracle Stream Analytics高可用性アプリケーションを作成します。
詳細は、高可用性アプリケーションを参照してください。
-
次のように、
ActiveActiveGroupBean要素をアセンブリ・ファイルに追加します。<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> </bean>
-
コンポーネント構成ファイルを編集し、次に示すように、インバウンドJMSアダプタ用の
jms-adapter要素を構成します。異なるトピックをリスニングするには、インバウンドJMSアダプタをそれぞれ設定し、
session-transactedをtrueに設定する必要があります。<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> ... <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
-
各JMSインバウンド・アダプタの
jms-adapter要素にパラメータ化されたmessage-selectorを定義します。-
JMSインバウンド・アダプタの
jms-adapter要素にgroup-binding子要素を追加するために、コンポーネント構成ファイルを編集します。 -
各可能JMS
message-selector値に対して、次のように1つのgroup-binding要素を追加します。
<jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> <message-selector>${CONDITION}</message-selector> <bindings> <group-binding group-id="ActiveActiveGroupBean_group1"> <param id="CONDITION">acctid <= 1000</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group2"> <param id="CONDITION">acctid > 1000</param> </group-binding> </bindings> </jms-adapter>
この構成では、
ActiveActiveGroupBean_group1を含むcluster要素、groups子要素を持つアプリケーションをOracle Stream Analyticsサーバーにデプロイすると、CONDITIONパラメータがacctid <= 1000と定義され、acctidプロパティが1000以下であるイベントがアプリケーションにより処理されます。同様に、ActiveActiveGroupBean_group2を含むcluster要素、groups子要素を持つアプリケーションをOracle Stream Analyticsサーバーにデプロイすると、CONDITIONパラメータがacctid > 1000と定義され、acctidプロパティが1000以上であるイベントがアプリケーションにより処理されます。 -
-
コンポーネント構成ファイルを編集し、次に示すように、アウトバウンドJMSアダプタ用の
jms-adapter要素を構成します。対応するインバウンド・アダプタ(この例では、
JMSInboundAdapter2:./Topic2)と同一トピックでアウトバウンドJMSアダプタを構成し、session-transactedをtrueに設定します。<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> ... <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSOutboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
-
マルチサーバー・ドメインのデプロイメント・グループにアプリケーションをデプロイします。
実行時に、各Oracle Stream Analyticsサーバーが
ActiveActiveGroupBean通知グループに対するmessage-selectorにアプリケーションのインスタンスを構成します。これにより、アプリケーションの各インスタンスがメッセージ総数のサブセットを並行して処理するようにJMSトピックがパーティションされます。ActiveActiveGroupBeanグループの有効なOracle Stream Analyticsサーバーが停止すると、そのActiveActiveGroupBeanグループのスタンバイOracle Stream Analyticsサーバーに対してOracle Stream AnalyticsサーバーがOracle Stream Analytics高可用性フェイルオーバーを実行します。
例18-3 JMS EPNアセンブリ・ファイルを使用して精度の高いリカバリ
<?xml version="1.0" encoding="UTF-8"?> <beans ... > <wlevs:event-type-repository> <wlevs:event-type type-name="StockTick"> <wlevs:properties> <wlevs:property name="lastPrice" type="double" /> <wlevs:property name="symbol" type="char" /> </wlevs:properties> </wlevs:event-type> </wlevs:event-type-repository> <wlevs:adapter id="JMSInboundAdapter" provider="jms-inbound"> <wlevs:listener ref="myHaInputAdapter"/> </wlevs:adapter> <wlevs:adapter id="myHaInputAdapter" provider="ha-inbound" > <wlevs:instance-property name="keyProperties" value="sequenceNo"/> <wlevs:instance-property name="timeProperty" value="inboundTime"/> </wlevs:adapter> <wlevs:channel id="channel1" event-type="StockTick"> <wlevs:listener ref="processor1" /> <wlevs:source ref="myHaInputAdapter"/> <wlevs:application-timestamped> <wlevs:expression>inboundTime</wlevs:expression> </wlevs:application-timestamped> </wlevs:channel> <wlevs:processor id="processor1"> <wlevs:listener ref="channel2" /> </wlevs:processor> <wlevs:channel id="channel2" event-type="StockTick"> <wlevs:listener ref="myHaCorrelatingAdapter" /> </wlevs:channel> <wlevs:adapter id="myHaCorrelatingAdapter" provider="ha-correlating" > <wlevs:instance-property name="correlatedSource" ref="clusterCorrelatingOutstream"/> <wlevs:instance-property name="failOverDelay" value="2000"/> <wlevs:listener ref="JMSOutboundAdapter"/> </wlevs:adapter> <wlevs:adapter id="JMSOutboundAdapter" provider="jms-outbound"> </wlevs:adapter> <wlevs:adapter id="JMSInboundAdapter2" provider="jms-inbound"> </wlevs:adapter> <wlevs:channel id="clusterCorrelatingOutstream" event-type="StockTick" advertise="true"> <wlevs:source ref="JMSInboundAdapter2"/> </wlevs:channel> </beans>
例18-4 JMSコンポーネント構成アセンブリ・ファイルを使用して精度の高いリカバリ
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> <processor> <name>processor1</name> <rules> <query id="helloworldRule"> <![CDATA[ select * from channel1 [Now] > </query> </rules> </processor> <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSOutboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
18.3 通知グループの命名規則
デフォルトでは、ActiveActiveGroupBeanクラスは次の名前で通知グループを作成します。Xは文字列です。
ActiveActiveGroupBean_X
実行時に、ActiveActiveGroupBeanは、Oracle Event Processingサーバーに定義された既存のグループをスキャンし、次のデフォルトのパターン一致を適用します。ActiveActiveGroupBeanで一致が見つかったら、その名前の通知グループを作成します。
ActiveActiveGroupBean_\\w+
オプションで、別の通知グループ・ネーミング・パターンを指定するために独自のグループ・パターンを定義できます。
-
アセンブリ・ファイルを構成し、次に示すように、
ActiveActiveGroupBean要素にgroupPattern属性を追加します。<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> <property name="groupPattern" value="MyNotificationGroupPattern*"/> </bean>
-
通知グループに対し使用するクラスタ・グループ・ネーミング・ルールに一致する
groupPattern属性の値を指定します。
18.4 カスタム・チャネル・イベント・パーティショナ
ほとんどのチャネルはデフォルト・イベントのパーティション化を使用します。パーティショナが指定されていなく、partitionByEventProperty要素が存在しない場合、チャネルはすべてのリスナーにイベントを送信します。partitionByEventProperty要素では、デフォルトのパーティション化アルゴリズムを使用して、指定したイベントをパーティション化することによって、一定レベルのカスタマイズを指定します
この項では、デフォルトのパーティション化アルゴリズムに対する制御を微調整するカスタム・パーティショナをプログラム的に構成することによって、チャネル・リスナーへのイベントのディスパッチをさらにカスタマイズする方法について説明します。たとえば、プロパティ範囲に基づいてイベント・パーティショナを作成できます。
18.4.1 EventPartitionerインタフェース
チャネルにまたがってイベントをパーティション化し、チャネル・リスナーへのイベントのディスパッチをカスタマイズするには、com.bea.wlevs.channel.EventPartitionerインタフェースを使用します。
注意:
カスタム・パーティション化と並列処理を実装する際には、イベントの順序を維持し、マルチスレッドを慎重に管理するコードを必ず追加してください。
図18-4に、イベント・パーティショナを使用してチャネルをパーティション化するEPNを示します。この例では、株式銘柄と株価の2つのプロパティを持つPriceEventタイプのイベントをインバウンド・アダプタが送信します。この例は、symbolプロパティでチャネルをパーティション化し、チャネルまたはアップストリーム・アダプタにマルチスレッド処理を追加する方法を示しています。


