You can build scalability into your application design with partitioning and parallel processing, and by taking high availability options into consideration. Oracle Stream Explorer enables you to use default or custom partitioning and parallel processing settings on channels and the upstream adapter. You can also partition an incoming JMS event stream and configure the JSMS Event stream group pattern matching.
This chapter includes the following sections:
You can configure a channel to use the default event property-based event partitioner. With this default configuration, every time an incoming event arrives, the channel selects a listener and dispatches the event to that listener instead of broadcasting every event to every listener.
Note:
Batching is not supported when you configure a channel with an event partitioner.
Figure 18-1 shows an EPN that uses an event partitioner property to partition a channel. In this example, the inbound adapter sends events of type PriceEvent
, which has two properties: stock symbol and stock price. The example partitions the channel on the symbol
property and shows you how to add multithreading to either the channel or the upstream adapter.
See Customizing Oracle Stream Explorer for information about customizing an event partitioner on a channel.
If you want the channel to allocate threads, set the max-threads property to the number of listeners in the EPN.
If you want to provide increased concurrency downstream from the channel, you can associate a thread pool with the channel by setting the max-threads
property on the channel. The best value for the maximum number of threads can depend on many factors including the details of the Oracle CQL queries in downstream processors (do the queries allow parallel execution), and the behavior observed while running the application (are all the CPU cores utilized). As a starting point in tuning the maximum number of threads, it is reasonable to set it equal to the number of listeners on the channel.
In this example, there are 3 listeners.
<wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent" max-threads="3" >
<wlevs:instance-property name="eventPartitioner" value="true" />
<wlevs:listener ref="processor1" />
<wlevs:listener ref="processor2" />
<wlevs:listener ref="processor3" />
<wlevs:source ref="inbound" />
</wlevs:channel>
You must configure the Oracle CQL Processor to support local partitioning.
Use the sample code given below to define local partitioning:
Example 18-1 Assembly File
<wlevs:channel id="LocalPartitionChannel" event-type="StockEvent" is-local-partitioner="true" max-threads="3"> <wlevs:instance-property name="partitionByEventProperty" value="symbol" /> </wlevs:channel>
Important Channel Properties
The local partition channel has the following important properties:
IS-LOCAL PARTITIONER
: Defines channel to be a local partitioning channel
MAX-THREADS
: Specifies the degree of parallelism
MAX-SIZE
: Determines the maximum number of buffered events per partition
PARTITIONING ATTRIBUTE
: Specifies the attribute of the stream which will be used to partition the stream.
Example 18-2 Configuration
<processor> <name>StockAggregateProcessor</name> <rules> <query id="helloworldRule"> <![CDATA[ select count(*) as symbolCount, symbol from LocalPartitionChannel group by symbol]]> </query> </rules> </processor>
You can add the ActiveActiveGroupBean
class to the assembly file to partition an incoming JMS event stream by a selector in a multiserver domain.
Create a multiserver domain.
In this example, the deployment group name is MyDeploymentGroup
.
Configure the Oracle Stream Explorer server configuration file on each Oracle Stream Explorer server to add the appropriate ActiveActiveGroupBean
notification group to the groups
child element of the cluster
element.
The Oracle Stream Explorer server configuration file is located in /Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.
Table 18-2 shows cluster
elements for Oracle Stream Explorer servers ocep-server-1
, ocep-server-2
, ocep-server-3
, and ocep-server-4
. The deployment group is MyDeploymentGroup
and the notification groups are defined using default ActiveActiveGroupBean
notification group naming.
Optionally, you can specify your own group naming convention as Notification Group Naming Conventions describes.
Table 18-1 Server Configuration File Groups Element Configuration
|
Create an Oracle Stream Explorer application.
Add an ActiveActiveGroupBean
element to the assembly file as follows.
<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> </bean>
Define a parameterized message-selector
in the jms-adapter
element for the JMS inbound adapters.
Edit the component configuration file to add group-binding
child elements to the jms-adapter
element for the JMS inbound adapters.
Add one group-binding
element for each possible JMS message-selector value as shown.
<jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <user>weblogic</user> <password>weblogic1</password> <work-manager>JettyWorkManager</work-manager> <concurrent-consumers>1</concurrent-consumers> <session-transacted>true</session-transacted> <message-selector>${CONDITION}</message-selector> <bindings> <group-binding group-id="ActiveActiveGroupBean_group1"> <param id="CONDITION">acctid > 400</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group2"> <param id="CONDITION">acctid BETWEEN 301 AND 400</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group3"> <param id="CONDITION">acctid BETWEEN 201 AND 300</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group4"> <param id="CONDITION">acctid <= 200</param> </group-binding> </bindings> </jms-adapter>
In this configuration, when the application is deployed to an Oracle Stream Explorer server with a cluster
element groups
child element that contains ActiveActiveGroupBean_group1
, then the CONDITION
parameter is defined as acctid > 400
and the application processes events whose acctid
property is greater than 400.
Note:
Each in-bound JMS adapter must listen to a different topic. For more information, see Adapters.
Deploy your application to the deployment group of your multiserver domain.
At runtime, each Oracle Stream Explorer server configures its instance of the application with the message-selector
that corresponds to its ActiveActiveGroupBean
notification group. This partitions the JMS topic so that each instance of the application processes a subset of the total number of messages in parallel.
This procedure uses the example application from Configure Precise Recovery With JMS. Figure 18-2 shows the EPN diagram, and Example 18-3 and Example 18-4 show the corresponding assembly and configuration files.
The procedure creates the Oracle Stream Explorer high availability configuration shown in Figure 18-3.
Figure 18-3 ActiveActiveGroupBean With High Availability
Configure Scalability in a JMS Application with High Availability
Create a multiserver domain.
In this example, the deployment group is named MyDeploymentGroup
.
Configure the Oracle Stream Explorer server configuration file on each Oracle Stream Explorer server to add the appropriate ActiveActiveGroupBean
notification group to the groups
child element of the cluster
element.
The Oracle Stream Explorer server configuration file is located in /Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.
Table 18-2 shows cluster
elements for Oracle Stream Explorer servers ocep-server-1
, ocep-server-2
, ocep-server-3
, and ocep-server-4
. The deployment group is MyDeploymentGroup
and notification groups are defined using default ActiveActiveGroupBean
notification group names.
Note that ocep-server-1
and ocep-server-2
use the same notification group name (ActiveActiveGroupBean_group1
) and ocep-server-3
and ocep-server-4
use the same notification group name (ActiveActiveGroupBean_group2
).
Table 18-2 Server Configuration File Groups Element Configuration
|
Create an Oracle Stream Explorer high availability application.
For more information, see High Availability Applications.
Add an ActiveActiveGroupBean
element to the assembly file as shown.
<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> </bean>
Edit the component configuration file to configure a jms-adapter
element for the inbound JMS adapters as shown.
You must set each inbound JMS adapter to listen to a different topic and set session-transacted
to true
.
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster/"> ... <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
Define a parameterized message-selector
in the jms-adapter
element for each JMS inbound adapter.
Edit the component configuration file to add group-binding
child elements to the jms-adapter
element for the JMS inbound adapters.
Add one group-binding
element for each possible JMS message-selector
value as shown.
<jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> <message-selector>${CONDITION}</message-selector> <bindings> <group-binding group-id="ActiveActiveGroupBean_group1"> <param id="CONDITION">acctid <= 1000</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group2"> <param id="CONDITION">acctid > 1000</param> </group-binding> </bindings> </jms-adapter>
In this configuration, when the application is deployed to an Oracle Stream Explorer server with a cluster
element groups
child element that contains ActiveActiveGroupBean_group1
, then the CONDITION
parameter is defined as acctid <= 1000
and the application processes events whose acctid
property is less than or equal to 1000. Similarly, when the application is deployed to an Oracle Stream Explorer server with a cluster
element groups
child element that contains ActiveActiveGroupBean_group2
, then the CONDITION
parameter is defined as acctid > 1000
and the application processes events whose acctid
property is greater than 1000.
Edit the component configuration file to configure a jms-adapter
element for the outbound JMS adapter as shown:
Configure the out-bound JMS adapter with the same topic as the correlating in-bound adapter (in this example, JMSInboundAdapter2
: ./Topic2
), and set session-transacted
to true
.
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster/"> ... <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSOutboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
Deploy your application to the deployment group of your multiserver domain.
At runtime, each Oracle Stream Explorer server configures its instance of the application with the message-selector
that corresponds to its ActiveActiveGroupBean
notification group. This partitions the JMS topic so that each instance of the application processes a subset of the total number of messages in parallel.
If the active Oracle Stream Explorer server in an ActiveActiveGroupBean
group goes down, the Oracle Stream Explorer server performs an Oracle Stream Explorer high availability failover to the standby Oracle Stream Explorer server in that ActiveActiveGroupBean
group.
Example 18-3 Precise Recovery With JMS EPN Assembly File
<?xml version="1.0" encoding="UTF-8"?> <beans ... > <wlevs:event-type-repository> <wlevs:event-type type-name="StockTick"> <wlevs:properties> <wlevs:property name="lastPrice" type="double" /> <wlevs:property name="symbol" type="char" /> </wlevs:properties> </wlevs:event-type> </wlevs:event-type-repository> <wlevs:adapter id="JMSInboundAdapter" provider="jms-inbound"> <wlevs:listener ref="myHaInputAdapter"/> </wlevs:adapter> <wlevs:adapter id="myHaInputAdapter" provider="ha-inbound" > <wlevs:instance-property name="keyProperties" value="sequenceNo"/> <wlevs:instance-property name="timeProperty" value="inboundTime"/> </wlevs:adapter> <wlevs:channel id="channel1" event-type="StockTick"> <wlevs:listener ref="processor1" /> <wlevs:source ref="myHaInputAdapter"/> <wlevs:application-timestamped> <wlevs:expression>inboundTime</wlevs:expression> </wlevs:application-timestamped> </wlevs:channel> <wlevs:processor id="processor1"> <wlevs:listener ref="channel2" /> </wlevs:processor> <wlevs:channel id="channel2" event-type="StockTick"> <wlevs:listener ref="myHaCorrelatingAdapter" /> </wlevs:channel> <wlevs:adapter id="myHaCorrelatingAdapter" provider="ha-correlating" > <wlevs:instance-property name="correlatedSource" ref="clusterCorrelatingOutstream"/> <wlevs:instance-property name="failOverDelay" value="2000"/> <wlevs:listener ref="JMSOutboundAdapter"/> </wlevs:adapter> <wlevs:adapter id="JMSOutboundAdapter" provider="jms-outbound"> </wlevs:adapter> <wlevs:adapter id="JMSInboundAdapter2" provider="jms-inbound"> </wlevs:adapter> <wlevs:channel id="clusterCorrelatingOutstream" event-type="StockTick" advertise="true"> <wlevs:source ref="JMSInboundAdapter2"/> </wlevs:channel> </beans>
Example 18-4 Precise Recovery With JMS Component Configuration Assembly File
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster/"> <processor> <name>processor1</name> <rules> <query id="helloworldRule"> <![CDATA[ select * from channel1 [Now] > </query> </rules> </processor> <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSOutboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
By default, the ActiveActiveGroupBean
class creates notification groups with the following name where X is a string.
ActiveActiveGroupBean_X
At runtime, ActiveActiveGroupBean
scans the existing groups defined on the Oracle Event Processing server and applies the following default pattern match. When ActiveActiveGroupBean finds a match, it creates a notification group with that name.
ActiveActiveGroupBean_\\w+
Optionally, you can define your own group pattern to specify a different notification group naming pattern.
Configure the assembly file to add a groupPattern
attribute to your ActiveActiveGroupBean
element as shown.
<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> <property name="groupPattern" value="MyNotificationGroupPattern*"/> </bean>
Specify a value for the groupPattern
attribute that matches the cluster group naming convention you want to use for notification groups.
Most channels use the default event partitioning described in Developing Applications for Event Processing with Oracle Stream Explorerwhere if no partitioner is specified and if the partitionByEventProperty
element is not present, the channel sends events to all listeners. The partitionByEventProperty
element provides a level of customization by partitioning on the specified event with a default partitioning algorithm. This section explains how you can further customize how events are dispatched to the channel listeners by programmatically configuring a custom partitioner that provides finer control over the default partitioning algorithm. For example, you can create an event partitioner that is based on a property range
Use the com.bea.wlevs.channel.EventPartitioner interface to partition events across a channel to customize how events are dispatched to the channel listener.
Note:
When you implement custom partitioning and parallel processing, make sure to add code to preserve event order and to carefully manage multithreading.
Figure 18-4 shows an EPN that uses an event partitioner to partition a channel. In this example, the inbound adapter sends events of type PriceEvent
, which has two properties: stock symbol and stock price. The example partitions the channel on the symbol
property and shows you how to add multithreading to the channel or to the upstream adapter.