9 Configuring Channels

This section contains information on the following subjects:

9.1 Overview of Channel Configuration

An Oracle CEP application contains one or more channel components. A channel represents the physical conduit through which events flow between other types of components, such as between adapters and processors, and between processors and event beans (business logic POJOs).

You may use a channel with both streams and relations. For more information, see Section 9.1.2, "Channels Representing Streams and Relations".

When you create a channel in your Event Processing Network (EPN), it has a default configuration. For complete details, see Section C.10, "wlevs:channel".

The default channel configuration is typically adequate for most applications.

However, if you want to change this configuration, you must create a channel element in a component configuration file. In this channel element, you can specify channel configuration that overrides the defaults.

The component configuration file channel element's name element must match the EPN assembly file channel element's id attribute. For example, given the EPN assembly file channel element shown in Example 9-1, the corresponding component configuration file channel element is shown in Example 9-2.

Example 9-1 EPN Assembly File Channel Id: priceStream

<wlevs:channel id="priceStream" event-type="PriceEvent">
    <wlevs:listener ref="filterFanoutProcessor" />
    <wlevs:source ref="PriceAdapter" />
</wlevs:channel>

Example 9-2 Component Configuration File Channel Name: priceStream

<channel>
    <name>priceStream</name>
    <max-size>10000</max-size>
    <max-threads>4</max-threads>
</channel>

You can create a channel element in any of the following component configuration files:

  • The default Oracle CEP application configuration file (by default, META-INF/wlevs/config.xml).

  • A separate configuration file.

If your application has more than one channel, you can create a channel element for each of them in the default config.xml file, you can create separate XML files in META-INF/wlevs for each, or create a single XML file in META-INF/wlevs that contains the configuration for all channels, or even all components of your application (adapters, processors, and channels). Choose the method that best suits your development environment.

By default, Oracle CEP IDE for Eclipse creates one component configuration file and one EPN assembly file.

Component configuration files are deployed as part of the Oracle CEP application bundle. You can later update this configuration at runtime using Oracle CEP Visualizer, the wlevs.Admin utility, or manipulating the appropriate JMX Mbeans directly.

This section describes:

For more information, see:

9.1.1 When to Use a Channel

When constructing your EPN, consider the following rules:

  • A channel is mandator when connecting an Oracle CQL processor to a down-stream stage.

  • A channel is mandatory when connecting a push source stream or relation to a processor.

    A channel is mandatory for a push source because in that case, the Oracle CQL processor does need to be aware of its shape (that is, DDL is required) and so does need the channel to act as intermediary.

  • A channel is optional when connecting an external relation, or pull source, such as a cache or table source, to a processor.

    A channel is not needed between a pull source, such as a cache or table, and a processor because the pull source represent an external relation. For an external relation, the only valid operation is a join between a stream and a NOW window operator and hence it is considered a pull source. In other words, the join actually happens outside of the Oracle CQL processor. Because it is a pull, the Oracle CQL processor does not need to be aware of its shape (that is, no DDL is required) and so does not need the channel to act as intermediary.

In general, use a channel between components when:

  • Buffering is needed between the emitting component and the receiver.

  • Queuing or concurrency is needed for the receiving component.

  • If a custom adapter is used and thread control is necessary.

It is a good design practice to include channels in your EPN to provide the flexibility of performance tuning (using buffering, queuing, and concurrency options) later in the design lifecycle. Setting the channel attribute max-threads to 0 puts a channel in pass-through mode and incurs no performance penalty.

For more information, see:

9.1.2 Channels Representing Streams and Relations

A channel can represent either a stream or a relation.

For more information, see:

9.1.2.1 Channels as Streams

A stream supports appends only. You specify a channel as a stream by setting EPN assembly element wlevs:channel attribute is-relation to false (the default).

9.1.2.2 Channels as Relations

A relation supports inserts, deletes, and updates. You specify a channel as a relation by setting EPN assembly element wlevs:channel attribute is-relation to true.

When a channel is a relation, you must specify one or more event properties that define event identity using the wlevs:channel attribute primary-key as Example 9-3 shows.

Example 9-3 Channel as Relation: primary-key Attribute

...
<wlevs:channel id="priceStream" event-type="PriceEvent" primary-key="stock,broker">
    <wlevs:listener ref="filterFanoutProcessor" />
    <wlevs:source ref="PriceAdapter" />
</wlevs:channel>
...

Example 9-4 PriceEvent

<wlevs:event-type-repository>
    <wlevs:event-type type-name="PriceEvent">
          <wlevs:property>
             <entry key="stock" value="java.lang.String"/>
             <entry key="broker" value="java.lang.String"/>
             <entry key="high" value="float"/>
             <entry key="low" value="float"/>
          </wlevs:property>
    </wlevs:event-type>
</wlevs:event-type-repository>

For more information, see primary-key in Table C-9, "Attributes of the wlevs:channel Application Assembly Element".

9.1.3 System-Timestamped Channels

By default, channels are system-timestamped. In this case, Oracle CEP will assign a new time from the CPU clock under two conditions: when a new event arrives, and when the configurable heartbeat timeout expires.

For more information, see:

9.1.4 Application-Timestamped Channels

Optionally, you can configure a channel to be application-timestamped. In this case, the time-stamp of an event is determined by the configurable wlevs:expression element. A common example of an expression is a reference to a property on the event. If no expression is specified, then the time-stamp may be propagated from a prior event. For example, this is the case when you have a system-timestamped channel from one Oracle CQL processor feeding events into an application-timestamped channel of another downstream Oracle CQL processor.

In addition, an application can use the StreamSender.sendHeartbeat method to send an event of type heart-beat downstream to StreamSink listeners in the EPN.

For more information, see:

9.1.5 Controlling Which Queries Output to a Downstream Channel: selector

If you configure an Oracle CQL processor with more than one query, by default, all queries output their results to the downstream channel.

You can control which queries may output their results to a downstream channel using the channel selector child element.

Figure 9-1 shows an EPN with channel filteredStream connected to up-stream Oracle CQL processor filteredFanoutProcessor.

Figure 9-1 EPN With Oracle CQL Processor and Down-Stream Channel

Description of Figure 9-1 follows
Description of "Figure 9-1 EPN With Oracle CQL Processor and Down-Stream Channel"

Example 9-5 shows the queries configured for the Oracle CQL processor.

Example 9-5 filterFanoutProcessor Oracle CQL Queries

<processor>
    <name>filterFanoutProcessor</name>
    <rules>
        <query id="Yr3Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="3_YEAR"
        ]]></query>
        <query id="Yr2Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="2_YEAR"
        ]]></query>
        <query id="Yr1Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="1_YEAR"
        ]]></query>
    </rules>
</processor>

If you specify more than one query for an Oracle CQL processor as Example 9-5 shows, then, by default, all query results are output to the processor's out-bound channel (filteredStream in Figure 9-1). Optionally, in the component configuration source, you can use the channel element selector child element to specify a space-delimited list of one or more Oracle CQL query names that may output their results to the channel as Example 9-6 shows. In this example, query results for query Yr3Sector and Yr2Sector are output to filteredStream but not query results for query Yr1Sector.

Example 9-6 Using selector to Control Which Query Results are Output

<channel>
    <name>filteredStream</name>
    <selector>Yr3Sector Yr2Sector</selector>
</channel>

You may configure a channel element with a selector before creating the queries in the upstream processor. In this case, you must specify query names that match the names in the selector.

Note:

The selector child element is only applicable if the up-stream node is an Oracle CQL processor. For more information, see Chapter 10, "Configuring Oracle CQL Processors".

For more information, see Appendix D, "selector".

9.1.6 Batch Processing Channels

By default, a channel processes events as they arrive. Alternatively, you can configure a channel to batch events together that have the same timestamp and were output from the same query by setting the wlevs:channel attribute batching to true as Example 9-7 shows.

Example 9-7 Batch Processing Channel

...
<wlevs:channel id="priceStream" event-type="PriceEvent" batching="true">
    <wlevs:listener ref="filterFanoutProcessor" />
    <wlevs:source ref="PriceAdapter" />
</wlevs:channel>
...

For more information, see:

9.1.7 EventPartitioner Channels

By default, a channel broadcasts each event to every listener.

When you configure a channel to use an EventPartitioner, each time an incoming event arrives, the channel selects a listener and dispatches the event to that listener instead of broadcasting each event to every listener.

You can use an EventPartitioner on a channel to improve scalability.

For more information, see Section 22.2.1, "EventPartitioner".

9.2 Configuring a Channel

You can configure a channel manually or by using the Oracle CEP IDE for Eclipse.

See Section B.2, "Component Configuration Schema wlevs_application_config.xsd" for the complete XSD Schema that describes the channel component configuration file.

See Section 9.3, "Example Channel Configuration Files" for a complete example of a channel configuration file.

This section describes the following topics:

9.2.1 How to Configure a System-Timestamped Channel Using Oracle CEP IDE for Eclipse

This section describes how to create a system-timestamped channel.

The most efficient and least error-prone way to create and edit a channel configuration in the default component configuration file is to use the Oracle CEP IDE for Eclipse. Optionally, you can create a channel configuration file manually (see Section 9.2.3, "How to Create a Channel Component Configuration File Manually").

For more information, see:

To configure a channel using Oracle CEP IDE for Eclipse:

  1. Use Oracle CEP IDE for Eclipse to create a channel.

    See Section 6.4.1.1, "How to Create a Basic Node".

  2. Optionally, override the default channel assembly file configuration by adding additional wlevs:channel attributes and child elements:

    1. Right-click the channel node and select Go To Assembly Source.

    2. Add the appropriate wlevs:channel attributes.

      Required attributes include:

      • id

      • event-type

      In particular, specify whether this channel is a stream or relation by configuring attribute is-relation:

      • To specify this channel as stream , set is-relation to false (default).

      • To specify this channel as a relation, set is-relation to true.

        If you specify this channel as a relation, you must also configure the channel attribute primary-key.

      See Table C-9, "Attributes of the wlevs:channel Application Assembly Element".

    3. Add the appropriate wlevs:channel child elements.

  3. In the Project Explorer, expand your META-INF/wlevs directory.

  4. Choose the component configuration file you want to use:

    1. To use the default component configuration file, right-click the META-INF/wlevs/config.xml file and select Open With > XML Editor.

      The file opens in an XML Editor.

    2. To create a new component configuration file:

      • Right-click the wlevs directory and select New > File.

        The New File dialog appears.

      • Enter a file name.

        You can name the file anything you want but the name of the file must end in .xml.

      • Click Finish.

        Oracle CEP IDE for Eclipse adds the component configuration file to the wlevs directory.

  5. Right-click the component configuration file you chose to use and select Open With > XML Editor.

    The file opens in an XML Editor.

  6. If you created a new component configuration file, add the header and config element shown in Example 9-8. Otherwise, proceed to step 7.

    Example 9-8 Component Configuration File Header and config Element

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    
    </config>
    
  7. Add a channel element for the channel as Example 9-9 shows.

    Example 9-9 Component Configuration File Channel Element

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
        <processor>
            ...
        </processor>
        ...
        <channel>
        </channel>
    </config>
    
  8. Add a name child element to the channel element.

    The name element value must match the corresponding EPN assembly file channel element's id attribute.

    For example, given the EPN assembly file channel element shown in Example 9-10, the corresponding configuration file channel element is shown in Example 9-11.

    Example 9-10 EPN Assembly File Channel Id: priceStream

    <wlevs:channel id="priceStream" event-type="PriceEvent">
        <wlevs:listener ref="filterFanoutProcessor" />
        <wlevs:source ref="PriceAdapter" />
    </wlevs:channel>
    

    Example 9-11 Component Configuration File Channel Name: priceStream

    <channel>
        <name>priceStream</name>
    </channel>
    

    Caution:

    Identifiers and names in XML files are case sensitive. Be sure to specify the same case when referencing the component's identifier in the EPN assembly file.
  9. Optionally, override the default channel configuration by adding additional channel child elements:

    • Add a max-threads child element to specify the maximum number of threads that Oracle CEP server uses to process events for this channel.

      Setting this value has no effect when max-size is 0. The default value is 0.

      <channel>
          <name>priceStream</name>
          <max-threads>2</size>
      </channel>
      

      When set to 0, the channel acts as a pass-through. When max-threads > 0, the channel acts as classic blocking queue, where upstream components are producers of events and the downstream components are the consumers of events. The queue size is defined by the configuration max-size. There will be up to max-threads number of threads consuming events from the queue.

    • Add a max-size child element to specify the maximum size of the channel.

      Zero-size channels synchronously pass-through events.

      Non-zero size channels process events asynchronously, buffering events by the requested size. The default value is 0.

      <channel>
          <name>priceStream</name>
          <max-size>10000</size>
      </channel>
      
    • Add a heartbeat child element to specify the number of nanoseconds a channel can be idle before Oracle CEP generates a heartbeat event to advance time.

      The heartbeat child element applies to system-timestamped relations or streams only when no events arrive in the event channels that are feeding the processors and the processor has been configured with a statement that includes some temporal operator, such as a time-based window or a pattern matching with duration.

      <channel>
          <name>MatchOutputChannel</name>
          <heartbeat>10000</heartbeat>
      </channel>
      
    • Add a selector child element to specify which up-stream Oracle CQL processor queries are permitted to output their results to the channel.

      You may configure a channel element with a selector before creating the queries in the upstream processor. In this case, you must specify query names that match the names in the selector.

      For more information, Section 9.1.5, "Controlling Which Queries Output to a Downstream Channel: selector".

      Note:

      The selector attribute is only applicable if the up-stream node is an Oracle CQL processor. For more information, see Chapter 10, "Configuring Oracle CQL Processors".
  10. Select File > Save.

    The EPN Editor adds a configuration badge to the channel as Figure 9-2 shows. For more information, see Section 6.2.7, "Configuration Badging".

    Figure 9-2 Channel With Configuration Badge

    Description of Figure 9-2 follows
    Description of "Figure 9-2 Channel With Configuration Badge"

9.2.2 How to Configure an Application-Timestamped Channel Using Oracle CEP IDE for Eclipse

This section describes how to create an application-timestamped channel.

The most efficient and least error-prone way to create and edit a channel configuration in the default component configuration file is to use the Oracle CEP IDE for Eclipse. Optionally, you can create a channel configuration file manually (see Section 9.2.3, "How to Create a Channel Component Configuration File Manually").

For more information, see:

To configure a channel using Oracle CEP IDE for Eclipse:

  1. Use Oracle CEP IDE for Eclipse to create a channel.

    See Section 6.4.1.1, "How to Create a Basic Node".

  2. Optionally, override the default channel assembly file configuration by adding additional wlevs:channel attributes and child elements:

    1. Right-click the channel node and select Go To Assembly Source.

    2. Add the appropriate wlevs:channel attributes.

      In particular, specify whether this channel is a stream or relation by configuring attribute is-relation:

      • To specify this channel as stream , set is-relation to false (default).

      • To specify this channel as a relation, set is-relation to true.

      See Table C-9, "Attributes of the wlevs:channel Application Assembly Element".

    3. Add a wlevs:application-timestamped child element.

      Use this element to specify a wlevs:expression child element that Oracle CEP uses to generate timestamp values.

      Optionally, configure the wlevs:application-timestamped attributes:

      • is-total-order: specifies if the application time published is always strictly greater than the last value used.

        Valid values are true or false. Default: false.

      For more information, see Appendix C, "wlevs:application-timestamped".

    4. Add other appropriate wlevs:channel child elements.

  3. In the Project Explorer, expand your META-INF/wlevs directory.

  4. Choose the component configuration file you want to use:

    1. To use the default component configuration file, right-click the META-INF/wlevs/config.xml file and select Open With > XML Editor.

      The file opens in an XML Editor.

    2. To create a new component configuration file:

      • Right-click the wlevs directory and select New > File.

        The New File dialog appears.

      • Enter a file name.

        You can name the file anything you want but the name of the file must end in .xml.

      • Click Finish.

        Oracle CEP IDE for Eclipse adds the component configuration file to the wlevs directory.

  5. Right-click the component configuration file you chose to use and select Open With > XML Editor.

    The file opens in an XML Editor.

  6. If you created a new component configuration file, add the header and config element shown in Example 9-8. Otherwise, proceed to step 7.

    Example 9-12 Component Configuration File Header and config Element

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    
    </config>
    
  7. Add a channel element for the channel as Example 9-9 shows.

    Example 9-13 Component Configuration File Channel Element

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
        <processor>
            ...
        </processor>
        ...
        <channel>
        </channel>
    </config>
    
  8. Add a name child element to the channel element.

    The name element value must match the corresponding EPN assembly file channel element's id attribute.

    For example, given the EPN assembly file channel element shown in Example 9-10, the corresponding configuration file channel element is shown in Example 9-11.

    Example 9-14 EPN Assembly File Channel Id: priceStream

    <wlevs:channel id="priceStream" event-type="PriceEvent">
        <wlevs:listener ref="filterFanoutProcessor" />
        <wlevs:source ref="PriceAdapter" />
    </wlevs:channel>
    

    Example 9-15 Component Configuration File Channel Name: priceStream

    <channel>
        <name>priceStream</name>
    </channel>
    

    Caution:

    Identifiers and names in XML files are case sensitive. Be sure to specify the same case when referencing the component's identifier in the EPN assembly file.
  9. Optionally, override the default channel configuration by adding additional channel child elements:

    • Add a max-threads child element to specify the maximum number of threads that Oracle CEP server uses to process events for this channel.

      Setting this value has no effect when max-size is 0. The default value is 0.

      <channel>
          <name>priceStream</name>
          <max-threads>2</size>
      </channel>
      

      When set to 0, the channel acts as a pass-through. When max-threads > 0, the channel acts as classic blocking queue, where upstream components are producers of events and the downstream components are the consumers of events. The queue size is defined by the configuration max-size. There will be up to max-threads number of threads consuming events from the queue.

    • Add a max-size child element to specify the maximum size of the channel.

      Zero-size channels synchronously pass-through events.

      Non-zero size channels process events asynchronously, buffering events by the requested size. The default value is 0.

      <channel>
          <name>priceStream</name>
          <max-size>10000</size>
      </channel>
      
    • Add a selector child element to specify which up-stream Oracle CQL processor queries are permitted to output their results to the channel.

      You may configure a channel element with a selector before creating the queries in the upstream processor. In this case, you must specify query names that match the names in the selector.

      For more information, Section 9.1.5, "Controlling Which Queries Output to a Downstream Channel: selector".

      Note:

      The selector attribute is only applicable if the up-stream node is an Oracle CQL processor. For more information, see Chapter 10, "Configuring Oracle CQL Processors".

    For more information, Section D.78, "selector".

  10. Select File > Save.

    The EPN Editor adds a configuration badge to the channel as Figure 9-2 shows. For more information, see Section 6.2.7, "Configuration Badging".

    Figure 9-3 Channel With Configuration Badge

    Description of Figure 9-3 follows
    Description of "Figure 9-3 Channel With Configuration Badge"

9.2.3 How to Create a Channel Component Configuration File Manually

Although the Oracle CEP IDE for Eclipse is the most efficient and least error-prone way to create and a channel configuration file (see Section 9.2.1, "How to Configure a System-Timestamped Channel Using Oracle CEP IDE for Eclipse"), alternatively, you can also create and maintain a channel configuration file manually.

For simplicity, the following procedure assumes that you are going to configure all components of an application in a single XML file.

To create a channel component configuration file manually:

  1. Create an EPN assembly file and add a wlevs:channel element for each channel in your application.

    Uniquely identify each wlevs:channel with the id attribute.

    See Section 4.3, "Creating EPN Assembly Files" for details.

  2. Optionally, override the default channel assembly file configuration by adding additional wlevs:channel attributes and child elements:

    1. Add the appropriate wlevs:channel attributes.

      See Table C-9, "Attributes of the wlevs:channel Application Assembly Element".

    2. Add the appropriate wlevs:channel child elements.

  3. Create an XML file using your favorite XML editor.

    You can name this XML file anything you want, provided it ends with the .xml extension.

    The root element of the configuration file is config, with namespace definitions shown in the next step.

  4. For each channel in your application, add a channel child element of config.

    Uniquely identify each channel with the name child element. This name must be the same as the value of the id attribute in the channel element of the EPN assembly file that defines the event processing network of your application. This is how Oracle CEP knows to which particular channel component in the EPN assembly file this channel configuration applies. See Section 4.3, "Creating EPN Assembly Files" for details.

    For example, if your application has two streams, the configuration file might initially look like:

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <helloworld:config
      xmlns:helloworld="http://www.bea.com/xml/ns/wlevs/example/helloworld">
      <processor>
       ...
      </processor>
      <channel>
        <name>firstStream</name>
        ...
      </channel>
      <channel>
        <name>secondStream</name>
        ...
      </channel>
    </helloworld:config>
    

    In the example, the configuration file includes two channels called firstStream and secondStream. This means that the EPN assembly file must include at least two channel registrations with the same identifiers:

    <wlevs:channel id="firstStream" ...>
      ...
    </wlevs:channel>
    <wlevs:channel id="secondStream" ...>
      ...
    </wlevs:channel>
    

    Caution:

    Identifiers and names in XML files are case sensitive, so be sure you specify the same case when referencing the component's identifier in the EPN assembly file.
  5. Optionally, override the default channel configuration by adding additional channel child elements:

    • Add a max-threads child element to specify the maximum number of threads that Oracle CEP server uses to process events for this channel.

      Setting this value has no effect when max-size is 0. The default value is 0.

      <channel>
          <name>priceStream</name>
          <max-threads>2</size>
      </channel>
      

      When set to 0, the channel acts as a pass-through. When max-threads > 0, the channel acts as classic blocking queue, where upstream components are producers of events and the downstream components are the consumers of events. The queue size is defined by the configuration max-size. There will be up to max-threads number of threads consuming events from the queue.

    • Add a max-size child element to specify the maximum size of the channel.

      Zero-size channels synchronously pass-through events.

      Non-zero size channels process events asynchronously, buffering events by the requested size. The default value is 0.

      <channel>
          <name>priceStream</name>
          <max-size>10000</size>
      </channel>
      
    • Add a heartbeat child element to specify the number of nanoseconds a channel can be idle before Oracle CEP generates a heartbeat event to advance time.

      The heartbeat child element applies to system-timestamped relations or streams only when no events arrive in the event channels that are feeding the processors and the processor has been configured with a statement that includes some temporal operator, such as a time-based window or a pattern matching with duration.

      <channel>
          <name>MatchOutputChannel</name>
          <heartbeat>10000</heartbeat>
      </channel>
      
    • Add a selector child element to specify which up-stream Oracle CQL processor queries are permitted to output their results to the channel.

      You may configure a channel element with a selector before creating the queries in the upstream processor. In this case, you must specify query names that match the names in the selector.

      For more information, Section 9.1.5, "Controlling Which Queries Output to a Downstream Channel: selector".

      Note:

      The selector attribute is only applicable if the up-stream node is an Oracle CQL processor. For more information, see Chapter 10, "Configuring Oracle CQL Processors".

    For more information, Section D.78, "selector".

  6. Save and close the configuration file.

9.3 Example Channel Configuration Files

Figure 9-4 shows part of an EPN that contains two channels: priceStream and filteredStream. The priceStream channel is an in-bound channel that connects the PriceAdapter event source and its PriceEvent events to an Oracle CQL processor filterFanoutProcessor. The filteredStream channel is an out-bound channel that connects the Oracle CQL processor's query results (FilteredPriceEvent events) to down-stream components (not shown in Figure 9-4).

Figure 9-4 EPN with Two Channels

Description of Figure 9-4 follows
Description of "Figure 9-4 EPN with Two Channels"

This section provides example channel configuration files, including:

9.3.1 Channel Component Configuration File

Example 9-16 shows a sample component configuration file that configures the two channels shown in Figure 9-4.

Example 9-16 Sample Channel Component Configuration File

<?xml version="1.0" encoding="UTF-8"?>
<n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <processor>
        <name>filterFanoutProcessor</name>
        <rules>
            <query id="Yr3Sector"><![CDATA[ 
                select cusip, bid, srcId, bidQty, ask, askQty, seq 
                from priceStream where sector="3_YEAR"
            ]]></query>
        </rules>
    </processor>
    <channel>
        <name>priceStream</name>
        <max-size>10000</max-size>
        <max-threads>4</max-threads>
    </channel>
    <channel>
        <name>filteredStream</name>
        <max-size>5000</max-size>
        <max-threads>2</max-threads>
    </channel>
</n1:config>

9.3.2 Channel EPN Assembly File

Example 9-17 shows a EPN assembly file that configures the two channels shown in Figure 9-4.

Example 9-17 Channel EPN Assembly File

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:osgi="http://www.springframework.org/schema/osgi"
    xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
    xmlns:cqlx="http://www.oracle.com/schema/cqlx/"
    xsi:schemaLocation="
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/osgi
  http://www.springframework.org/schema/osgi/spring-osgi.xsd
  http://www.bea.com/ns/wlevs/spring
  http://www.bea.com/ns/wlevs/spring/spring-wlevs-v11_1_1_3.xsd">
 
    <wlevs:event-type-repository>
        <wlevs:event-type type-name="PriceEvent">
            <wlevs:properties>
                <wlevs:property name="cusip" type="java.lang.String" />
                <wlevs:property name="bid" type="java.lang.Double" />
                <wlevs:property name="srcId" type="java.lang.String" />
                <wlevs:property name="bidQty" type="java.lang.Integer" />
                <wlevs:property name="ask" type="java.lang.Double" />
                <wlevs:property name="askQty" type="java.lang.Integer" />
                <wlevs:property name="seq" type="java.lang.Long" />
                <wlevs:property name="sector" type="java.lang.String" />
            </wlevs:properties>
        </wlevs:event-type>
        <wlevs:event-type type-name="FilteredPriceEvent">
            <wlevs:properties>
                <wlevs:property name="cusip" type="java.lang.String" />
                <wlevs:property name="bid" type="java.lang.Double" />
                <wlevs:property name="srcId" type="java.lang.String" />
                <wlevs:property name="bidQty" type="java.lang.Integer" />
                <wlevs:property name="ask" type="java.lang.Double" />
                <wlevs:property name="askQty" type="java.lang.Integer" />
                <wlevs:property name="seq" type="java.lang.Long" />
            </wlevs:properties>
        </wlevs:event-type>
        <wlevs:event-type type-name="BidAskEvent">
            <wlevs:properties>
                <wlevs:property name="cusip" type="java.lang.String" />
                <wlevs:property name="bidseq" type="java.lang.Long" />
                <wlevs:property name="bidSrcId" type="java.lang.String" />
                <wlevs:property name="bid" type="java.lang.Double" />
                <wlevs:property name="askseq" type="java.lang.Long" />
                <wlevs:property name="askSrcId" type="java.lang.String" />
                <wlevs:property name="ask" type="java.lang.Double" />
                <wlevs:property name="bidQty" type="java.lang.Integer" />
                <wlevs:property name="askQty" type="java.lang.Integer" />
                <wlevs:property name="intermediateStrategy" type="java.lang.String" />
                <wlevs:property name="correlationId" type="java.lang.Long" />
                <wlevs:property name="priority" type="java.lang.Integer" />
            </wlevs:properties>
        </wlevs:event-type>
        <wlevs:event-type type-name="FinalOrderEvent">
            <wlevs:properties>
                <wlevs:property name="cusip" type="java.lang.String" />
                <wlevs:property name="bidseq" type="java.lang.Long" />
                <wlevs:property name="bidSrcId" type="java.lang.String" />
                <wlevs:property name="bid" type="java.lang.Double" />
                <wlevs:property name="bidQty" type="java.lang.Integer" />
                <wlevs:property name="bidSourceStrategy" type="java.lang.String" />
                <wlevs:property name="askseq" type="java.lang.Long" />
                <wlevs:property name="askSrcId" type="java.lang.String" />
                <wlevs:property name="ask" type="java.lang.Double" />
                <wlevs:property name="askQty" type="java.lang.Integer" />
                <wlevs:property name="askSourceStrategy" type="java.lang.String" />
                <wlevs:property name="correlationId" type="java.lang.Long" />
            </wlevs:properties>
        </wlevs:event-type>
    </wlevs:event-type-repository>
 
    <!-- Assemble EPN (event processing network) -->
    <wlevs:adapter advertise="true" id="PriceAdapter"
        provider="csvgen">
        <wlevs:instance-property name="port" value="9008" />
        <wlevs:instance-property name="eventTypeName"
            value="PriceEvent" />
        <wlevs:instance-property name="eventPropertyNames"
            value="srcId,sector,cusip,bid,ask,bidQty,askQty,seq" />
    </wlevs:adapter>
 
    <wlevs:channel id="priceStream" event-type="PriceEvent">
        <wlevs:listener ref="filterFanoutProcessor" />
        <wlevs:source ref="PriceAdapter" />
    </wlevs:channel>
 
    <!-- By default, CQL is used for OCEP 11.0 -->
    <wlevs:processor id="filterFanoutProcessor" >
    </wlevs:processor>
 
    <wlevs:channel id="filteredStream"
        event-type="FilteredPriceEvent">
        <wlevs:listener ref="bbaProcessor" />
        <wlevs:listener ref="analyticsProcessor" />
        <wlevs:source ref="filterFanoutProcessor" />
    </wlevs:channel>
 
    <!-- Explicitly specify provider CQL -->
    <wlevs:processor id="bbaProcessor" provider="cql">
        <wlevs:listener ref="bidAskBBAStream" />
    </wlevs:processor>
 
    <wlevs:processor id="analyticsProcessor">
        <wlevs:listener ref="bidAskAnalyticsStream" />
    </wlevs:processor>
 
    <wlevs:channel id="bidAskBBAStream" event-type="BidAskEvent">
        <wlevs:listener ref="selectorProcessor" />
    </wlevs:channel>
    
    <wlevs:channel id="bidAskAnalyticsStream" event-type="BidAskEvent">
        <wlevs:listener ref="selectorProcessor" />
    </wlevs:channel>
 
    <wlevs:processor id="selectorProcessor">
        <wlevs:listener ref="citipocOut" />
    </wlevs:processor>
 
    <wlevs:channel id="citipocOut" event-type="FinalOrderEvent" advertise="true">
        <wlevs:listener>
            <!-- Create business object -->
            <bean id="outputBean"
                class="com.bea.wlevs.POC.citi.OutputBean"
                autowire="byName" />
        </wlevs:listener>
    </wlevs:channel>
</beans>