18 Scalable Applications

You can build scalability into your application design with partitioning and parallel processing, and by taking high availability options into consideration. Oracle Stream Analytics enables you to use default or custom partitioning and parallel processing settings on channels and the upstream adapter. You can also partition an incoming JMS event stream and configure the JSMS Event stream group pattern matching.

This chapter includes the following sections:

18.1 Default Channel Scalability Settings

You can configure a channel to use the default event property-based event partitioner. With this default configuration, every time an incoming event arrives, the channel selects a listener and dispatches the event to that listener instead of broadcasting every event to every listener.

Note:

Batching is not supported when you configure a channel with an event partitioner.

Figure 18-1 shows an EPN that uses an event partitioner property to partition a channel. In this example, the inbound adapter sends events of type PriceEvent, which has two properties: stock symbol and stock price. The example partitions the channel on the symbol property and shows you how to add multithreading to either the channel or the upstream adapter.

Figure 18-1 EventPartitioner EPN

Description of Figure 18-1 follows
Description of "Figure 18-1 EventPartitioner EPN"

18.1.1 Configure Partitioning on the Channel

See Customizing Event Store in Customizing Oracle Stream Analytics for information about customizing an event store.

  1. Add a channel to your EPN.

    In Figure 18-1, the channel is EventPartitionerChannel.

  2. Connect the channel to an upstream adapter.

    In Figure 18-1, the upstream adapter is inbound.

  3. Connect the channel to two or more listeners.

    In Figure 18-1, the channel is connected to Oracle CQL processors processor1, processor2, and processor3.

  4. Edit the assembly file to add a partitionByEventProperty instance property to the channel element.

    The value of this instance-property is the name of the event property by which the channel partitions events.

    In this example, the channel partitions events by the event property symbol.

    <wlevs:event-type-repository>
      <wlevs:event-type type-name="PriceEvent">
        <wlevs:properties>
           <wlevs:property name="symbol" type="char" />
           <wlevs:property name="price" type="long" />
        </wlevs:properties>
      </wlevs:event-type>
    </wlevs:event-type-repository>
    
    <wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent">
      <wlevs:instance-property name="partitionByEventProperty" value="symbol" />
       <wlevs:listener ref="processor1" />
       <wlevs:listener ref="processor2" />
       <wlevs:listener ref="processor3" />
       <wlevs:source ref="inbound" />
    </wlevs:channel>
    

18.1.2 Configure Parallel Processing on the Channel

If you want the channel to allocate threads, set the max-threads property to the number of listeners in the EPN.

If you want to provide increased concurrency downstream from the channel, you can associate a thread pool with the channel by setting the max-threads property on the channel. The best value for the maximum number of threads can depend on many factors including the details of the Oracle CQL queries in downstream processors (do the queries allow parallel execution), and the behavior observed while running the application (are all the CPU cores utilized). As a starting point in tuning the maximum number of threads, it is reasonable to set it equal to the number of listeners on the channel.

In this example, there are 3 listeners.

<wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent" max-threads="3" >
  <wlevs:instance-property name="eventPartitioner" value="true" />
  <wlevs:listener ref="processor1" />
  <wlevs:listener ref="processor2" />
  <wlevs:listener ref="processor3" />
  <wlevs:source ref="inbound" />
</wlevs:channel>

18.1.3 Configure Parallel Processing on the Upstream Adapter

  1. Edit the EPN assembly file to configure the channel to set the max-threads attribute to 0.
    <wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent"
      max-threads="0" >
      <wlevs:instance-property name="eventPartitioner" value="true" />
      <wlevs:listener ref="processor1" />
      <wlevs:listener ref="processor2" />
      <wlevs:listener ref="processor3" />
      <wlevs:source ref="inbound" />
    </wlevs:channel>
    
  2. Edit the Oracle Stream Analytics server file to add a work-manager element.

    Selecting the appropriate min-threads-constraint and max-threads-constraint for the work manager can depend on a number of factors, including the factors discussed in Configure Parallel Processing on the Channel for setting thread counts on a channel and whether the work manager is dedicated to a specific adapter or shared with other components (other adapters or the Jetty service). As a starting point in tuning, it' is reasonable to set the min-threads-constraint and max-threads-constraint properties equal to the number of listeners downstream from the adapter if the work manager is dedicated to a single adapter instance.

    If this work manager is not shared by more than one component (that is, it is dedicated to the upstream adapter in this configuration), then set the min-threads-constraint and max-threads-constraint elements equal to the number of listeners.

    <work-manager>
        <name>adapterWorkManager</name>
        <min-threads-constraint>3</min-threads-constraint>
        <max-threads-constraint>3</max-threads-constraint>
    </work-manager>
    

    For more information about max-threads, see max-threads in Schema Reference for Oracle Stream Analytics.

  3. Edit the component configuration file to configure the upstream adapter with this work-manager.
    <adapter>
        <name>inbound</name>
        <work-manager-name>adapterWorkManager</work-manager-name>
        ...
    </adapter>
    

18.1.4 Define a Local Partition Channel

You must configure the Oracle CQL Processor to support local partitioning.

Use the sample code given below to define local partitioning:

Example 18-1 Assembly File

<wlevs:channel id="LocalPartitionChannel" event-type="StockEvent" is-local-partitioner="true" max-threads="3">
<wlevs:instance-property name="partitionByEventProperty" value="symbol" />
</wlevs:channel>

Important Channel Properties

The local partition channel has the following important properties:

  • IS-LOCAL PARTITIONER: Defines channel to be a local partitioning channel

  • MAX-THREADS: Specifies the degree of parallelism

  • MAX-SIZE: Determines the maximum number of buffered events per partition

  • PARTITIONING ATTRIBUTE: Specifies the attribute of the stream which will be used to partition the stream.

Example 18-2 Configuration

<processor>
        <name>StockAggregateProcessor</name>
        <rules>
                <query id="helloworldRule">
                        <![CDATA[
                        select count(*) as symbolCount, symbol from LocalPartitionChannel group by symbol]]>
                </query>
        </rules>
</processor>

18.2 Partition an Incoming JMS Event Stream

You can add the ActiveActiveGroupBean class to the assembly file to partition an incoming JMS event stream by a selector in a multiserver domain.

18.2.1 Configure Partitioning without High Availability

  1. Create a multiserver domain.

    In this example, the deployment group name is MyDeploymentGroup.

    See About Multiserver Domains in Administering Oracle Stream Analytics.

  2. Configure the Oracle Stream Analytics server configuration file on each Oracle Stream Analytics server to add the appropriate ActiveActiveGroupBean notification group to the groups child element of the cluster element.

    The Oracle Stream Analytics server configuration file is located in /Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.

    Table 18-2 shows cluster elements for Oracle Stream Analytics servers ocep-server-1, ocep-server-2, ocep-server-3, and ocep-server-4. The deployment group is MyDeploymentGroup and the notification groups are defined using default ActiveActiveGroupBean notification group naming.

    Optionally, you can specify your own group naming convention as Notification Group Naming Conventions describes.

    Table 18-1 Server Configuration File Groups Element Configuration

    Partition cluster Element

    ocep-server-1

    <cluster>
        <server-name>ocep-server-1</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups>
    </cluster>
    

    ocep-server-2

    <cluster>
        <server-name>ocep-server-2</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups>
    </cluster>
    

    ocep-server-3

    <cluster>
        <server-name>ocep-server-3</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group3</groups>
    </cluster>
    

    ocep-server-4

    <cluster>
        <server-name>ocep-server-4</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group4</groups>
    </cluster>
    
  3. Create an Oracle Stream Analytics application.

  4. Add an ActiveActiveGroupBean element to the assembly file as follows.

    <bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean">
    </bean>
    
  5. Define a parameterized message-selector in the jms-adapter element for the JMS inbound adapters.

    1. Edit the component configuration file to add group-binding child elements to the jms-adapter element for the JMS inbound adapters.

    2. Add one group-binding element for each possible JMS message-selector value as shown.

    <jms-adapter>
      <name>JMSInboundAdapter</name>
      <event-type>StockTick</event-type>
      <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
      <destination-jndi-name>./Topic1</destination-jndi-name>
      <user>weblogic</user>
      <password>weblogic1</password>
      <work-manager>JettyWorkManager</work-manager>
      <concurrent-consumers>1</concurrent-consumers>
      <session-transacted>true</session-transacted>
      <message-selector>${CONDITION}</message-selector>
      <bindings>
        <group-binding group-id="ActiveActiveGroupBean_group1">
          <param id="CONDITION">acctid > 400</param>
        </group-binding>
        <group-binding group-id="ActiveActiveGroupBean_group2">
          <param id="CONDITION">acctid BETWEEN 301 AND 400</param>
        </group-binding>
        <group-binding group-id="ActiveActiveGroupBean_group3">
          <param id="CONDITION">acctid BETWEEN 201 AND 300</param>
        </group-binding>
        <group-binding group-id="ActiveActiveGroupBean_group4">
          <param id="CONDITION">acctid <= 200</param>
        </group-binding>
      </bindings>
    </jms-adapter>
    

    In this configuration, when the application is deployed to an Oracle Stream Analytics server with a cluster element groups child element that contains ActiveActiveGroupBean_group1, then the CONDITION parameter is defined as acctid > 400 and the application processes events whose acctid property is greater than 400.

    Note:

    Each in-bound JMS adapter must listen to a different topic. For more information, see Adapters.

  6. Deploy your application to the deployment group of your multiserver domain.

    At runtime, each Oracle Stream Analytics server configures its instance of the application with the message-selector that corresponds to its ActiveActiveGroupBean notification group. This partitions the JMS topic so that each instance of the application processes a subset of the total number of messages in parallel.

18.2.2 Configure Partitioning with High Availability

This procedure uses the example application from Configure Precise Recovery With JMS. Figure 18-2 shows the EPN diagram, and Example 18-3 and Example 18-4 show the corresponding assembly and configuration files.

Figure 18-2 Precise Recovery With JMS EPN

Description of Figure 18-2 follows
Description of "Figure 18-2 Precise Recovery With JMS EPN"

The procedure creates the Oracle Stream Analytics high availability configuration shown in Figure 18-3.

Figure 18-3 ActiveActiveGroupBean With High Availability

Description of Figure 18-3 follows
Description of "Figure 18-3 ActiveActiveGroupBean With High Availability"

Configure Scalability in a JMS Application with High Availability

  1. Create a multiserver domain.

    In this example, the deployment group is named MyDeploymentGroup.

    See About Multiserver Domains in Administering Oracle Stream Analytics.

  2. Configure the Oracle Stream Analytics server configuration file on each Oracle Stream Analytics server to add the appropriate ActiveActiveGroupBean notification group to the groups child element of the cluster element.

    The Oracle Stream Analytics server configuration file is located in /Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.

    Table 18-2 shows cluster elements for Oracle Stream Analytics servers ocep-server-1, ocep-server-2, ocep-server-3, and ocep-server-4. The deployment group is MyDeploymentGroup and notification groups are defined using default ActiveActiveGroupBean notification group names.

    Note that ocep-server-1 and ocep-server-2 use the same notification group name (ActiveActiveGroupBean_group1) and ocep-server-3 and ocep-server-4 use the same notification group name (ActiveActiveGroupBean_group2).

    Table 18-2 Server Configuration File Groups Element Configuration

    Partition cluster Element

    ocep-server-1

    <cluster>
        <server-name>ocep-server-1</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups>
    </cluster>
    

    ocep-server-2

    <cluster>
        <server-name>ocep-server-2</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups>
    </cluster>
    

    ocep-server-3

    <cluster>
        <server-name>ocep-server-3</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups>
    </cluster>
    

    ocep-server-4

    <cluster>
        <server-name>ocep-server-4</server-name>
        ...
        <enabled>coherence</enabled>
        ...
        <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups>
    </cluster>
    
  3. Create an Oracle Stream Analytics high availability application.

    For more information, see High Availability Applications.

  4. Add an ActiveActiveGroupBean element to the assembly file as shown.

    <bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean">
    </bean>
    
  5. Edit the component configuration file to configure a jms-adapter element for the inbound JMS adapters as shown.

    You must set each inbound JMS adapter to listen to a different topic and set session-transacted to true.

    <?xml version="1.0" encoding="UTF-8"?>
    <wlevs:config 
            xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application"
            xmlns:ha="http://www.oracle.com/ns/cep/config/cluster/">
        ...
        <jms-adapter>
            <name>JMSInboundAdapter</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic1</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
        <jms-adapter>
            <name>JMSInboundAdapter2</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic2</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
    </wlevs:config>
    
  6. Define a parameterized message-selector in the jms-adapter element for each JMS inbound adapter.

    1. Edit the component configuration file to add group-binding child elements to the jms-adapter element for the JMS inbound adapters.

    2. Add one group-binding element for each possible JMS message-selector value as shown.

    <jms-adapter>
        <name>JMSInboundAdapter</name>
        <event-type>StockTick</event-type>
        <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
        <destination-jndi-name>./Topic1</destination-jndi-name>
        <session-transacted>true</session-transacted>
        <message-selector>${CONDITION}</message-selector>
        <bindings>
            <group-binding group-id="ActiveActiveGroupBean_group1">
                <param id="CONDITION">acctid <= 1000</param>
            </group-binding>
            <group-binding group-id="ActiveActiveGroupBean_group2">
                <param id="CONDITION">acctid > 1000</param>
            </group-binding>
         </bindings>
    </jms-adapter>
    

    In this configuration, when the application is deployed to an Oracle Stream Analytics server with a cluster element groups child element that contains ActiveActiveGroupBean_group1, then the CONDITION parameter is defined as acctid <= 1000 and the application processes events whose acctid property is less than or equal to 1000. Similarly, when the application is deployed to an Oracle Stream Analytics server with a cluster element groups child element that contains ActiveActiveGroupBean_group2, then the CONDITION parameter is defined as acctid > 1000 and the application processes events whose acctid property is greater than 1000.

  7. Edit the component configuration file to configure a jms-adapter element for the outbound JMS adapter as shown:

    Configure the out-bound JMS adapter with the same topic as the correlating in-bound adapter (in this example, JMSInboundAdapter2: ./Topic2), and set session-transacted to true.

    <?xml version="1.0" encoding="UTF-8"?>
    <wlevs:config 
            xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application"
            xmlns:ha="http://www.oracle.com/ns/cep/config/cluster/">
        ...
        <jms-adapter>
            <name>JMSInboundAdapter</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic1</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
        <jms-adapter>
            <name>JMSInboundAdapter2</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic2</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
        <jms-adapter>
            <name>JMSOutboundAdapter</name>
            <event-type>StockTick</event-type>
            <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
            <destination-jndi-name>./Topic2</destination-jndi-name>
            <session-transacted>true</session-transacted>
        ...    </jms-adapter>
    </wlevs:config>
    
  8. Deploy your application to the deployment group of your multiserver domain.

    At runtime, each Oracle Stream Analytics server configures its instance of the application with the message-selector that corresponds to its ActiveActiveGroupBean notification group. This partitions the JMS topic so that each instance of the application processes a subset of the total number of messages in parallel.

    If the active Oracle Stream Analytics server in an ActiveActiveGroupBean group goes down, the Oracle Stream Analytics server performs an Oracle Stream Analytics high availability failover to the standby Oracle Stream Analytics server in that ActiveActiveGroupBean group.

Example 18-3 Precise Recovery With JMS EPN Assembly File

<?xml version="1.0" encoding="UTF-8"?>
<beans ... >

    <wlevs:event-type-repository>
        <wlevs:event-type type-name="StockTick">
            <wlevs:properties>
                <wlevs:property name="lastPrice" type="double" />
                <wlevs:property name="symbol" type="char" />
            </wlevs:properties>
        </wlevs:event-type>
    </wlevs:event-type-repository>

    <wlevs:adapter id="JMSInboundAdapter" provider="jms-inbound">
        <wlevs:listener ref="myHaInputAdapter"/>
    </wlevs:adapter>

    <wlevs:adapter id="myHaInputAdapter" provider="ha-inbound" >
        <wlevs:instance-property name="keyProperties" value="sequenceNo"/>
        <wlevs:instance-property name="timeProperty" value="inboundTime"/>
    </wlevs:adapter>

    <wlevs:channel id="channel1" event-type="StockTick">
        <wlevs:listener ref="processor1" />
        <wlevs:source ref="myHaInputAdapter"/>
        <wlevs:application-timestamped>
            <wlevs:expression>inboundTime</wlevs:expression>
        </wlevs:application-timestamped>
    </wlevs:channel>

    <wlevs:processor id="processor1">
        <wlevs:listener ref="channel2" />
    </wlevs:processor>

    <wlevs:channel id="channel2" event-type="StockTick">
        <wlevs:listener ref="myHaCorrelatingAdapter" />
    </wlevs:channel>

    <wlevs:adapter id="myHaCorrelatingAdapter" provider="ha-correlating" >
        <wlevs:instance-property name="correlatedSource" ref="clusterCorrelatingOutstream"/> 
        <wlevs:instance-property name="failOverDelay" value="2000"/> 
        <wlevs:listener ref="JMSOutboundAdapter"/>
    </wlevs:adapter>

    <wlevs:adapter id="JMSOutboundAdapter" provider="jms-outbound">
    </wlevs:adapter>

    <wlevs:adapter id="JMSInboundAdapter2" provider="jms-inbound">
    </wlevs:adapter>

    <wlevs:channel id="clusterCorrelatingOutstream" event-type="StockTick" advertise="true">
        <wlevs:source ref="JMSInboundAdapter2"/>
    </wlevs:channel> 
</beans>

Example 18-4 Precise Recovery With JMS Component Configuration Assembly File

<?xml version="1.0" encoding="UTF-8"?>
<wlevs:config 
        xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application"
        xmlns:ha="http://www.oracle.com/ns/cep/config/cluster/">
    <processor>
        <name>processor1</name>
        <rules>
            <query id="helloworldRule">
                <![CDATA[ select * from channel1 [Now] >
            </query>
        </rules>
    </processor>
    <jms-adapter>
        <name>JMSInboundAdapter</name>
        <event-type>StockTick</event-type>
        <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
        <destination-jndi-name>./Topic1</destination-jndi-name>
        <session-transacted>true</session-transacted>
    ...
    </jms-adapter>
    <jms-adapter>
        <name>JMSInboundAdapter2</name>
        <event-type>StockTick</event-type>
        <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
        <destination-jndi-name>./Topic2</destination-jndi-name>
        <session-transacted>true</session-transacted>
    ...
    </jms-adapter>
    <jms-adapter>
        <name>JMSOutboundAdapter</name>
        <event-type>StockTick</event-type>
        <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url>
        <destination-jndi-name>./Topic2</destination-jndi-name>
        <session-transacted>true</session-transacted>
    ...
    </jms-adapter>
</wlevs:config>

18.3 Notification Group Naming Conventions

By default, the ActiveActiveGroupBean class creates notification groups with the following name where X is a string.

ActiveActiveGroupBean_X

At runtime, ActiveActiveGroupBean scans the existing groups defined on the Oracle Event Processing server and applies the following default pattern match. When ActiveActiveGroupBean finds a match, it creates a notification group with that name.

ActiveActiveGroupBean_\\w+

Optionally, you can define your own group pattern to specify a different notification group naming pattern.

  1. Configure the assembly file to add a groupPattern attribute to your ActiveActiveGroupBean element as shown.

    <bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean">
        <property name="groupPattern" value="MyNotificationGroupPattern*"/>
    </bean>
    
  2. Specify a value for the groupPattern attribute that matches the cluster group naming convention you want to use for notification groups.

18.4 Custom Channel Event Partitioner

Most channels use the default event partitioning, where if no partitioner is specified and if the partitionByEventProperty element is not present, the channel sends events to all listeners. The partitionByEventProperty element provides a level of customization by partitioning on the specified event with a default partitioning algorithm. This section explains how you can further customize how events are dispatched to the channel listeners by programmatically configuring a custom partitioner that provides finer control over the default partitioning algorithm. For example, you can create an event partitioner that is based on a property range

18.4.1 EventPartitioner Interface

Use the com.bea.wlevs.channel.EventPartitioner interface to partition events across a channel to customize how events are dispatched to the channel listener.

Note:

When you implement custom partitioning and parallel processing, make sure to add code to preserve event order and to carefully manage multithreading.

Figure 18-4 shows an EPN that uses an event partitioner to partition a channel. In this example, the inbound adapter sends events of type PriceEvent, which has two properties: stock symbol and stock price. The example partitions the channel on the symbol property and shows you how to add multithreading to the channel or to the upstream adapter.

Figure 18-4 Event Partitioner EPN

Description of Figure 18-4 follows
Description of "Figure 18-4 Event Partitioner EPN"

18.4.2 Implement the EventPartitioner Interface

  1. In Oracle JDeveloper, open your Oracle Stream Analytics application.
  2. Edit your MANIFEST.MF file to import package com.bea.wlevs.channel.
  3. Select the project and select File > New > From Gallery.

    The New Gallery dialog displays.

  4. In the New Gallery dialog, select General in the left panel and Java Class in the right panel, and click OK.

    The Create Java Class dialog displays.

  5. In the Create Java Class dialog, provide a class name, package name, and extends information.
  6. Under Optional Attributes and Implements, use the Add (+) button to locate the com.bea.wlevs.channel.EventPartitioner interface.
  7. Click OK.

    A new EventPartitioner class is created.

  8. Complete the implementation of your EventPartitioner as shown.
    package com.acme;
    
    import com.bea.wlevs.channel.EventPartitioner;
    import com.bea.wlevs.ede.api.EventProcessingException;
    import com.bea.wlevs.ede.api.EventType;
    
    public class MyEventPartitioner implements EventPartitioner {
    
      private final EventType eventType;
      private int numberOfPartitions;
    
      @Override
      public void activateConfiguration(int numberOfPartitions, EventType eventType) {
         this.numberOfPartitions = numberOfPartitions;
         this.eventType = eventType;
      }
    
      @Override
      public int partition(Object event) throws EventProcessingException {
        int dispatchToListener = 0;
        ... // Your implementation.
        return dispatchToListener;
      }
    }
    

    The activateConfiguration method is a callback that the Oracle Stream Analytics server invokes before ActivatableBean.afterConfigurationActive and before your EventPartitioner class's partition method is invoked.

    When you associate this EventPartitioner with a channel, the channel will invoke your EventPartitioner class's partition method each time the channel receives an event.

    Your partition method must return the index of the listener to which the channel should dispatch the event. The index must be an int between 0 and numberOfPartitions - 1.

  9. Add a channel to your EPN.

    In Figure 18-4, the channel is EventPartitionerChannel.

  10. Connect the channel to an upstream adapter.

    In Figure 18-4, the upstream adapter is inbound.

  11. Connect the channel to two or more listeners.

    In Figure 18-4, the channel is connected to Oracle CQL processors processor1, processor2, and processor3.

    If you want to the channel to perform load balancing, each listener must be identical.

  12. Edit the EPN assembly file to add an eventPartitioner instance property to the channel element.

    The value of this instance-property is the fully qualified class name of the EventPartitioner instance the channel will use to partition events. This class must be on your Oracle Stream Analytics application class path.

    In this example, the channel uses EventPartitioner instance com.acme.MyEventPartitioner to partition events.

    <wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent"   max-threads="0" >
      <wlevs:instance-property name="eventPartitioner"     value="com.acme.MyEventPartitioner" />
      <wlevs:listener ref="filterFanoutProcessor1" />
      <wlevs:listener ref="filterFanoutProcessor2" />
      <wlevs:listener ref="filterFanoutProcessor3" />
      <wlevs:source ref="PriceAdapter" />
    </wlevs:channel>