17 Querying an Event Stream with Oracle CQL

This chapter describes how to configure Oracle Continuous Query Language (Oracle CQL) processors for Oracle Event Processing event processing networks. It includes information on configuring the processor's data source and optimizing performance.

This chapter includes the following sections:

17.1 Overview of Oracle CQL Processor Configuration

An Oracle Event Processing application contains one or more event processors, or processors for short. Each processor takes as input events from one or more adapters; these adapters in turn listen to data feeds that send a continuous stream of data from a source. The source could be anything, such as a financial data feed or the Oracle Event Processing load generator.

The main feature of an Oracle CQL processor is its associated Oracle Continuous Query Language (Oracle CQL) rules that select a subset of the incoming events to then pass on to the component that is listening to the processor. The listening component could be another processor, or the business object POJO that typically defines the end of the event processing network, and thus does something with the events, such as publish them to a client application. For more information on Oracle CQL, see the Oracle Fusion Middleware CQL Language Reference for Oracle Event Processing.

For each Oracle CQL processor in your application, you must create a processor element in a component configuration file. In this processor element you specify the initial set of Oracle CQL rules of the processor and any optional processor configuration.

You can configure additional optional Oracle CQL processor features in the Oracle CQL processor EPN assembly file.

The component configuration file processor element's name element must match the EPN assembly file processor element's id attribute. For example, given the EPN assembly file processor element shown in Example 17-1, the corresponding component configuration file processor element is shown in Example 17-2.

Example 17-1 EPN Assembly File Oracle CQL Processor Id: proc

<wlevs:processor id="proc">
    <wlevs:table-source ref="Stock" />
</wlevs:processor>

Example 17-2 Component Configuration File Oracle CQL Processor Name: proc

<processor>
    <name>proc</name>
    <rules>
        <query id="q1"><![CDATA[
            SELECT ExchangeStream.symbol, ExchangeStream.price, Stock.exchange
            FROM   ExchangeStream [Now], Stock
            WHERE  ExchangeStream.symbol = Stock.symbol 
        ]]></query>
    </rules>
</procesor>

You can create a processor element in any of the following component configuration files:

  • The default Oracle Event Processing application configuration file (by default, META-INF/wlevs/config.xml).

  • A separate configuration file.

If your application has more than one processor, you can create a processor element for each of them in the default config.xml file, you can create separate XML files in META-INF/wlevs for each, or create a single XML file in META-INF/wlevs that contains the configuration for all processors, or even all components of your application (adapters, processors, and channels). Choose the method that best suits your development environment.

By default, Oracle Event Processing IDE for Eclipse creates one component configuration file and one EPN assembly file. When you create an Oracle CQL processor using Oracle Event Processing IDE for Eclipse, by default, the processor element is added to the default component configuration file META-INF/wlevs/config.xml file. Using Oracle Event Processing IDE for Eclipse, you can choose to create a new configuration file or use an existing configuration file at the time you create the Oracle CQL processor.

Component configuration files are deployed as part of the Oracle Event Processing application bundle. You can later update this configuration at runtime using Oracle Event Processing Visualizer, the wlevs.Admin utility, or manipulating the appropriate JMX Mbeans directly.

For more information, see:

For more information on Oracle CQL processor configuration, see:

17.1.1 Controlling Which Queries Output to a Downstream Channel

If you configure an Oracle CQL processor with more than one query, by default, all queries output their results to the downstream channel.

You can control which queries may output their results to a downstream channel using the channel selector element to specify a space delimited list of query names that may output their results on this channel.

You may configure a channel element with a selector before creating the queries in the upstream processor. In this case, you must specify query names that match the names in the selector.

For more information, see Section 10.1.5, "Controlling Which Queries Output to a Downstream Channel: selector".

17.2 Configuring an Oracle CQL Processor

You can configure a processor manually or by using the Oracle Event Processing IDE for Eclipse.

See Section B.2, "Component Configuration Schema wlevs_application_config.xsd" for the complete XSD Schema that describes the processor component configuration file.

See Section 17.7, "Example Oracle CQL Processor Configuration Files" for a complete example of an Oracle CQL processor component configuration file and assembly file.

This section describes the following topics:

17.2.1 How to Configure an Oracle CQL Processor Using Oracle Event Processing IDE for Eclipse

The most efficient and least error-prone way to create and edit a processor is to use the Oracle Event Processing IDE for Eclipse. Optionally, you can create and edit a processor manually (see Section 17.2.2, "How to Create an Oracle CQL Processor Component Configuration File Manually").

To configure an Oracle CQL processor using Oracle Event Processing IDE for Eclipse:

  1. Use Oracle Event Processing IDE for Eclipse to create a processor.

    See Section 7.4.1.3, "How to Create a Processor Node".

    When you use the EPN editor to create an Oracle CQL processor, Oracle Event Processing IDE for Eclipse prompts you to choose either the default component configuration file or a new component configuration file. For more information, see Chapter 7, "Oracle Event Processing IDE for Eclipse and the Event Processing Network".

  2. Right-click the processor node and select Go to Configuration Source.

    Oracle Event Processing IDE for Eclipse opens the appropriate component configuration file. The default processor component configuration is shown in Example 17-3.

    The default processor component configuration includes a name element and rules element.

    Use the rules element to group the child elements you create to contain the Oracle CQL statements this processor executes, including:

    • rule: contains Oracle CQL statements that register or create user-defined windows. The rule element id attribute must match the name of the window.

    • view: contains Oracle CQL view statements (the Oracle CQL equivalent of subqueries). The view element id attribute defines the name of the view.

    • query: contains Oracle CQL select statements. The query element id attribute defines the name of the query.

    The default processor component configuration includes a dummy query element with id Query.

    Example 17-3 Default Processor Component Configuration

    <processor>
        <name>proc</name>
        <rules>
            <query id="Query"><!-- <![CDATA[ select * from MyChannel [now] ]]> -->
            </query>
        </rules>
    </processor>
    
  3. Replace the dummy query element with the rule, view, and query elements you create to contain the Oracle CQL statements this processor executes.

    For more information, see "Introduction to Oracle CQL Queries, Views, and Joins" in the Oracle Fusion Middleware CQL Language Reference for Oracle Event Processing.

  4. Select File > Save.

  5. Optionally, configure additional Oracle CQL processor features in the assembly file:

17.2.2 How to Create an Oracle CQL Processor Component Configuration File Manually

Although the most efficient and least error-prone way to create and edit a processor configuration is to use the Oracle Event Processing IDE for Eclipse (see Section 17.2.1, "How to Configure an Oracle CQL Processor Using Oracle Event Processing IDE for Eclipse"), alternatively, you can also create and maintain a processor configuration file manually.

This section describes the main steps to create the processor configuration file manually. For simplicity, it is assumed in the procedure that you are going to configure all processors in a single XML file, although you can also create separate files for each processor.

To create an Oracle CQL processor component configuration file manually:

  1. Design the set of Oracle CQL rules that the processor executes. These rules can be as simple as selecting all incoming events to restricting the set based on time, property values, and so on, as shown in the following:

    SELECT *
    FROM   TradeStream [Now]
    WHERE  price > 10000
    

    Oracle CQL is similar in many ways to Structure Query Language (SQL), the language used to query relational database tables, although the syntax between the two differs in many ways. The other big difference is that Oracle CQL queries take another dimension into account (time), and the processor executes the Oracle CQL continually, rather than SQL queries that are static.

    For more information, see "Introduction to Oracle CQL Queries, Views, and Joins" in the Oracle Fusion Middleware CQL Language Reference for Oracle Event Processing.

  2. Create the processor configuration XML file that will contain the Oracle CQL rules you designed in the preceding step, as well as other optional features, for each processor in your application.

    You can name this XML file anything you want, provided it ends with the .xml extension.

    The root element of the processor configuration file is config, with namespace definitions shown in the next step.

  3. For each processor in your application, add a processor child element of config.

    Uniquely identify each processor with the name child element. This name must be the same as the value of the id attribute in the wlevs:processor element of the EPN assembly file that defines the event processing network of your application. This is how Oracle Event Processing knows to which particular processor component in the EPN assembly file this processor configuration applies. See Section 5.3, "Creating EPN Assembly Files" for details.

    For example, if your application has two processors, the configuration file might initially look like:

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application">
      <processor>
        <name>firstProcessor</name>
         ...
      </processor>
      <processor>
        <name>secondProcessor</name>
         ...
       </processor>
    </n1:config>
    

    In the example, the configuration file includes two processors called firstProcessor and secondProcessor. This means that the EPN assembly file must include at least two processor registrations with the same identifiers:

    <wlevs:processor id="firstProcessor" ...>
      ...
    </wlevs:processor>
    <wlevs:processor id="secondProcessor" ...>
      ...
    </wlevs:processor>
    

    Caution:

    Identifiers and names in XML files are case sensitive, so be sure you specify the same case when referencing the component's identifier in the EPN assembly file.

  4. Add a rules child element to each processor element.

    Use the rules element to group the child elements you create to contain the Oracle CQL statements this processor executes, including:

    • rule: contains Oracle CQL statements that register or create user-defined windows. The rule element id attribute must match the name of the window.

    • view: contains Oracle CQL view statements (the Oracle CQL equivalent of subqueries). The view element id attribute defines the name of the view.

    • query: contains Oracle CQL select statements. The query element id attribute defines the name of the query.

    Use the required id attribute of the view and query elements to uniquely identify each rule. Use the XML CDATA type to input the actual Oracle CQL rule. For example:

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config
        xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd"
        xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
        <processor>
            <name>proc</name>
            <rules>
                <view id="lastEvents" schema="cusip bid srcId bidQty"><![CDATA[ 
                    select mod(price) 
                    from filteredStream[partition by srcId, cusip rows 1]
                ]]></view>
                <query id="q1"><![CDATA[
                    SELECT *
                    FROM   lastEvents
                    WHERE  price > 10000
                ]]></query>
            </rules>
        </processor>
    </n1:config>]]></query>
    
  5. Save and close the file.

  6. Optionally, configure additional Oracle CQL processor features in the assembly file:

17.3 Configuring an Oracle CQL Processor Table Source

You can access a relational database table from an Oracle CQL query using:

  • table source: using a table source, you may join a stream only with a NOW window and only to a single database table.

    Note:

    Because changes in the table source are not coordinated in time with stream data, you may only join the table source to an event stream using a Now window and you may only join to a single database table. For more information, see "S[now]" in the Oracle Fusion Middleware CQL Language Reference for Oracle Event Processing.

    To integrate arbitrarily complex SQL queries and multiple tables with your Oracle CQL queries, consider using the Oracle JDBC data cartridge instead.

    For more information, Section 17.3, "Configuring an Oracle CQL Processor Table Source".

  • Oracle JDBC data cartridge: using the Oracle JDBC data cartridge, you may integrate arbitrarily complex SQL queries and multiple tables and datasources with your Oracle CQL queries.

    Note:

    Oracle recommends that you use the Oracle JDBC data cartridge to access relational database tables from an Oracle CQL statement.

    For more information, see "Understanding the Oracle JDBC Data Cartridge" in the Oracle Fusion Middleware CQL Language Reference for Oracle Event Processing.

In all cases, you must define datasources in the Oracle Event Processing server config.xml file. For more information, see "Configuring Access to a Relational Database" in the Oracle Fusion Middleware Administrator's Guide for Oracle Event Processing.

Oracle Event Processing relational database table event sources are pull data sources: that is, Oracle Event Processing will periodically poll the event source.

In this section, assume that you create the table you want to access using the SQL statement that Example 17-4 shows.

Example 17-4 Table Create SQL Statement

create table Stock (symbol varchar(16), exchange varchar(16));

After configuration, you can define Oracle CQL queries that access the Stock table as if it was just another event stream. In the following example, the query joins one event stream ExchangeStream with the Stock table:

Example 17-5 Oracle CQL Query on Relational Database Table Stock

SELECT ExchangeStream.symbol, ExchangeStream.price, Stock.exchange
FROM   ExchangeStream [Now], Stock
WHERE  ExchangeStream.symbol = Stock.symbol

Note:

Because changes in the table source are not coordinated in time with stream data, you may only join the table source to an event stream using a Now window and you may only join to a single database table.

To integrate arbitrarily complex SQL queries and multiple tables with your Oracle CQL queries, consider using the Oracle JDBC data cartridge instead.

For more information, see "Understanding the Oracle JDBC Data Cartridge" in the Oracle Fusion Middleware CQL Language Reference for Oracle Event Processing.

17.3.1 How to Configure an Oracle CQL Processor Table Source Using Oracle Event Processing IDE for Eclipse

The most efficient and least error-prone way to configure an Oracle CQL processor to access a relational database table is to use the Oracle Event Processing IDE for Eclipse.

To configure an Oracle CQL processor table source using Oracle Event Processing IDE for Eclipse:

  1. Create a data source for the database that contains the table you want to use.

    Example 17-6 shows an example Oracle Event Processing server config.xml file with data source StockDS.

    Example 17-6 Oracle Event Processing Server config.xml File With Data Source StockDS

    <?xml version="1.0" encoding="UTF-8"?>
    <n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/server wlevs_server_config.xsd" 
        xmlns:n1="http://www.bea.com/ns/wlevs/config/server" 
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
      <domain>
         <name>ocep_domain</name>
      </domain>
    
    ...
    
      <data-source>
        <name>StockDs</name>
        <connection-pool-params>
          <initial-capacity>1</initial-capacity>
          <max-capacity>10</max-capacity>     
        </connection-pool-params>
        <driver-params>
          <url>jdbc:derby:</url>
          <driver-name>org.apache.derby.jdbc.EmbeddedDriver</driver-name>
          <properties>
            <element>
              <name>databaseName</name>
              <value>db</value>
            </element>
            <element>
              <name>create</name>
              <value>true</value>
            </element>
          </properties>
        </driver-params>
        <data-source-params>
          <jndi-names>
            <element>StockDs</element>
          </jndi-names>
          <global-transactions-protocol>None</global-transactions-protocol>
        </data-source-params>
      </data-source>
    
    ...
    
    </n1:config>
    

    For more information, see "Configuring Access to a Relational Database" in the Oracle Fusion Middleware Administrator's Guide for Oracle Event Processing.

  2. Use Oracle Event Processing IDE for Eclipse to create a table node.

    See Section 7.4.1.1, "How to Create a Basic Node".

  3. Use Oracle Event Processing IDE for Eclipse to create an Oracle CQL processor.

    See Section 7.4.1.3, "How to Create a Processor Node".

  4. Connect the table node to the Oracle CQL processor node.

    See Section 7.4.2.1, "How to Connect Nodes".

    The EPN Editor adds a wlevs:table-source element to the target processor node that references the source table.

  5. Right-click the table node in your EPN and select Go to Assembly Source.

    Oracle Event Processing IDE for Eclipse opens the EPN assembly file for this table node.

  6. Edit the table element as Example 17-7 shows and configure the table element attributes as shown in Table 17-1.

    Example 17-7 EPN Assembly File table Element

    <wlevs:table id="Stock" event-type="StockEvent" data-source="StockDs" />
    

    Table 17-1 EPN Assembly File table Element Attributes

    Attribute Description

    id

    The name of the table source. Subsequent references to this table source use this name.

    event-type

    The type-name you specify for the table event-type you create in step 9.

    data-source

    The data-source name you specified in the Oracle Event Processing server config.xml file in step 1.


  7. Right-click the Oracle CQL processor node connected to the table in your EPN and select Go to Assembly Source.

    Oracle Event Processing IDE for Eclipse opens the EPN assembly file for this Oracle CQL processor.

  8. Edit the Oracle CQL processor element's table-source child element as Example 17-8 shows.

    Set the ref attribute to the id of the table element you specified in step 6.

    Example 17-8 EPN Assembly File table-source Element

    <wlevs:processor id="proc">
        <wlevs:table-source ref="Stock" />
    </wlevs:processor>
    
  9. Edit the EPN assembly file to update the event-type-repository element with a new event-type child element for the table as Example 17-9 shows.

    Create a property child element for each column of the table you want to access and configure the property attributes as described in in Section 9.2.3.2, "Constraints on Event Types for Use With a Database Table Source".

    Example 17-9 EPN Assembly File event-type element for a Table

    <wlevs:event-type-repository>
        ...
        <wlevs:event-type type-name="StockEvent">
            <wlevs:properties>
                <wlevs:property name="symbol" type="char[]" length="16" />
                <wlevs:property name="exchange" type="char[]" length="16" />
            </wlevs:properties>
        </wlevs:event-type>
        ...
    </wlevs:event-type-repository>
    

    For more information on creating event types, see:

  10. Right-click the Oracle CQL processor node connected to the table in your EPN and select Go to Configuration Source.

    Oracle Event Processing IDE for Eclipse opens the component configuration file for this Oracle CQL processor.

  11. Edit the component configuration file to add Oracle CQL queries that use the table's event-type as shown in Example 17-10.

    Example 17-10 Oracle CQL Query Using Table Event Type StockEvent

    <processor>
        <name>proc</name>
        <rules>
            <query id="q1"><![CDATA[
                SELECT  ExchangeStream.symbol, ExchangeStream.price, Stock.exchange
                FROM    ExchangeStream [Now], Stock
                WHERE   ExchangeStream.symbol = Stock.symbol 
            ]]></query>
        </rules>
    </processor>
    

    Note:

    Because changes in the table source are not coordinated in time with stream data, you may only use a Now window. For more information, see "S[Now]" in the Oracle Fusion Middleware CQL Language Reference for Oracle Event Processing.

17.4 Configuring an Oracle CQL Processor Cache Source

You can configure an Oracle CQL processor to access the Oracle Event Processing cache.

For more information, see:

17.5 Configuring an Oracle CQL Processor for Parallel Query Execution

For improved performance, you can enable a CQL query to execute in parallel rather than serially, as it does by default. When the CQL code supports it, you can configure a query so that it can process incoming events in parallel when multiple threads are available to the CQL processor.

You should enable parallel query execution only in cases where the relative order of the query output events is unimportant to the query's downstream client. For example, event ordering probably isn't important if your query is intended primarily to filter events, such as to deliver to clients a set of stock transactions involving a particular company, where the transaction sequence is irrelevant.

By default (without enabling parallel execution), queries process events from a channel serially. For events routed through a channel that uses a system timestamp, event order is the order in which events are received; through a channel that is application timestamped, event order is the order determined by a timestamp value included in the event. Relaxing the total order constraint allows the configured query to not consider event order for that query, processing events in parallel where possible.

17.5.1 Setting Up Parallel Query Execution Support

While specifying support for parallel query execution is at its core a simple configuration task, be sure to follow the other steps below so that you get the most out of the feature.

  • Use the ordering-constraint attribute to support parallel execution.

  • Make sure you have enough threads calling into the processor to meet your performance goals. The maximum amount of parallel query execution is constrained by the number of threads available to the CQL processor. For example, if an adapter upstream of the processor supports the number of threads you need and there is a channel between the adapter and the processor, try configuring the channel with a max-threads count of 0 so that it acts as a pass-through.

    If you don't want a pass-through, be sure to configure the query's upstream channel with a max-threads value greater than 1. (To make a max-threads value setting useful, you'll need to also set the max-size attribute to a value greater than 0.) For more information, see Chapter 10, "Connecting EPN Stages Using Channels".

  • Follow other guidelines related to setting the max-threads attribute value. For example, to make a max-threads value setting useful, you'll need to also set the max-size attribute to a value greater than 0.

  • Ensure, if necessary, that a bean receiving the query results is thread-aware, such as by using synchronized blocks. For example, you might need to do so if the bean's code builds a list from results received from queries executed on multiple threads.

17.5.2 Using the ordering-constraint Attribute

You enable parallel query execution by relaxing the default ordering constraint that ensures that events are processed serially. You do this by setting the ordering-constraint attribute on a query or view element.

In Example 17-11, the ordering-constraint attribute is set to UNORDERED so that the query will execute in parallel whenever possible:

Example 17-11 Query Configured to Allow Parallel Execution

<query id="myquery" ordering-constraint="UNORDERED">
    SELECT symbol FROM S WHERE price > 10
</query>

The ordering-constraint attribute supports the following three values:

  • ORDERED means that the order of output events (as implied by the order of input events) is important. The CQL engine will process events serially. This is the default behavior.

  • UNORDERED means that order of the output events is not important to the consumer of the output events. This gives the freedom to the CQLProcessor to process events in parallel on multiple threads. When possible, the query will execute in parallel on multiple threads to process the events.

  • PARTITION_ORDERED means that you're specifying that order of output events within a partition is to be preserved (as implied by the order of input events) while order of output events across different partitions is not important to the consumer of the output events. This relaxation provides some freedom to the CQL engine to process events across partitions in parallel (when possible) on multiple threads.

Use the PARTITION_ORDERED value when you want to specify that events conforming to a given partition are processed serially, but that order can be disregarded across partitions and events belonging to different partitions may be processed in parallel. When using the PARTITION_ORDERED value, you must also add the partition-expression attribute to specify which expression for partitioning should be the basis for relaxing the cross-partition ordering constraint.

In Example 17-12, the GROUP BY clause partitions the output based on symbol values. The partition-expression attribute specifies that events in a given subset of events corresponding to a particular symbol value should be handled serially. Across partitions, on the other hand, order can be disregarded.

Example 17-12 Query Configured to Allow Parallel Execution Across Partitions

<query id="myquery" ordering-constraint="PARTITION_ORDERED"
    partitioning-expression="symbol">
    SELECT
        COUNT(*) as c, symbol
    FROM
        S[RANGE 1 minute]
    GROUP BY
        symbol
</query>

17.5.3 Using partition-order-capacity with Partitioning Queries

In general, you'll probably see improved performance for queries by making more threads available and setting the ordering-constraint attribute so that they're able to execute in parallel when possible. As with most performance tuning techniques, a little trial and error with these settings should yield a combination that gets better results.

However, in some cases where your queries use partitioning -- and you've set the ordering-constraint attribute to PARTITION_ORDERED -- you might not see the amount of scaling you'd expect. For example, consider a case in which running with four threads doesn't improve performance very much over running with two threads. In such a case, you can try using the partition-order-capacity value to get the most out of CQL engine characteristics at work with queries that include partitions.

The partition-order-capacity value specifies the maximum amount of parallelism that will be permitted within a given processor instance when processing a PARTITION_ORDERED query. When available threads are handling events belonging to different partitions, the value sets a maximum number of threads that will be allowed to simultaneously run in the query.

As with other aspects of performance tuning, getting the most out of partition-order-capacity may take a bit of experimentation. When tuning with partition-order-capacity, a good starting point is to set it equal to the maximum number of threads you expect to have active in any CQL processor instance. In some cases (for example, at high data rates or with expensive processing downstream from the CQL processor), it may be helpful to set the partition-order-capacity value even higher than the available number of threads. However, you should only do this if performance testing confirms that it's helpful for a given application and load.

The partition-order-capacity value is set from one of four places, two of which are fallbacks when you don't explicitly set it yourself. These are, in order of precedence:

  1. The partition-order-capacity element set on a channel configuration. If you specify this on the input channel for a processor, it takes effect for any PARTITION_ORDERED queries in that processor. For more information, see Section D.72, "partition-order-capacity" in Appendix D, "Schema Reference: Component Configuration wlevs_application_config.xsd".

  2. The partition-order-capacity property in server configuration. This value will be used for all PARTITION_ORDERED queries running on the server unless the value is set on a channel. For more information, see Section F.29, "partition-order-capacity" in Appendix F, "Schema Reference: Server Configuration wlevs_server_config.xsd".

  3. The max-threads value set on a channel configuration. If you specify this on the input channel for a processor, it takes effect for any PARTITION_ORDERED queries in that processor

  4. A system default value (currently set to 4) is used if you don't specify either a partition-order-capacity value or max-threads value, or if the max-threads value is set to 0 (meaning it's a pass-through channel).

When using partition-order-capacity, keep in mind the following:

  • The partition-order-capacity value is only useful when you're setting the ordering-constraint attribute to PARTITION_ORDERED.

  • Increasing partition-order-capacity generally increases parallelism and scaling. For example, if your profiling reveals lock contention bottlenecks, you might find it helpful to increase partition-order-capacity to see if contention is reduced.

  • Setting partition-order-capacity even higher than the number of available threads can be helpful in some cases because of the particular way partitioning is done in the CQL processor.

  • There is some resource cost in memory used by specifying very high values.

  • Tuning this parameter is very dependent on details of the application and the input rate. Tuning by experimentation may be necessary to determine an optimal value.

17.5.4 Limitations

Think of parallel query execution as a performance enhancement feature that you specify support for so that the CQL processor can use it whenever possible. Not all queries can be executed in parallel. This includes queries using certain CQL language features.

For example, if your query uses some form of aggregation -- such as to find the maximum value from a range of values -- the CQL processor may not be able to fully execute the query in parallel (this is needed to guarantee the correct result considering the ordering constraint). Some query semantics in themselves also constrain the query to ordered processing. Such queries will be executed serially regardless of whether you specify support for parallel execution.

Also, the IStream, RStream and DStream operators maintain the state of their operand for processing, making it necessary for the CQL processor to synchronize threads in order to execute the query.

Note that the CQL processor always respects the semantic intention of your query. In cases where the ordering-constraint attribute would change this intention, the attribute is coerced to a value that keeps the intention intact.

If you're using the partitioning-expression attribute, keep in mind that the attribute supports a single expression only. Entering multiple property names for the value is not supported.

17.6 Handling Faults

You can write code to handle faults that occur in code that does not have an inherent fault handling mechanism. This includes Oracle CQL code and multi-threaded EPN channels. By default, the CQL language has no mechanism for handling errors that occur, as does Java with its try/catch structure. To handle faults that occur in CQL, you can write a fault handler, then connect the handler to the EPN stage for which it handles faults, such as an Oracle CQL processor.

You can also associate a fault handler with a multi-threaded channel -- that is, a channel whose max-threads setting is greater than 0. This provides fault handling in the case of exceptions that are thrown to the channel from a stage that is downstream of the channel. Note that channels whose max-threads setting is 0 are pass-through channels that already re-throw exception to their upstream stages. For additional information specific to fault handlers for channels, see Section 10.1.8, "Handling Faults in Channels".

A fault handler is a Java class that implements the com.bea.wlevs.ede.api.FaultHandler interface. You connect the class to an EPN stage by registering your fault handler as an OSGi service and associating it with the stage. For more information about OSGi, see Appendix A, "Additional Information about Spring and OSGi".

Without a custom fault handler, you get the following default fault handling behavior:

  • When an exception occurs in Oracle CQL, the CQL engine catches the exception and stops the query processor.

  • If an exception occurs in a stage that is downstream of the processor, then that stage will be dropped as a listener.

  • Exceptions are logged (under the CQLServer category) and the events that are part of the exception's cause are discarded.

  • Upstream stages are not notified of the failure.

When using custom fault handlers you write, you can:

  • Associate a fault handler with an Oracle CQL processor or multi-threaded channel so that faults in those stages are thrown as exceptions to the handler. There, you can handle or re-throw the exception.

  • Allow query processing to continue as your code either handles the exception or re-throws it to the stage that is next upstream.

  • Save event data from being lost while handling a fault. For example, if you have configured a connection to a data source, you could save event data there.

  • Log fault and event information when faults occur.

  • Use multiple fault handlers where needed in an EPN so that exceptions thrown upstream will be handled when they reach other Oracle CQL processors and channels.

In other words, consider associating a fault handler with a stage that does not have its own mechanism for responding to faults, including Oracle CQL processors and multi-threaded channels. Other stages, such as custom adapters you write in Java, which have their own exception-handling model, would not benefit from a fault handler.

Queries can continue as your fault handling code evaluates the fault to determine what action should be taken, including re-throwing the fault to a stage that is upstream of the CQL processor.

For example, the upstream stage of the CQL processor could be the JMS subscriber adapter, which has the option of rolling back the JMS transaction (if the session is transacted), allowing the event to be re-delivered. It can also commit the transaction if the event has been re-delivered already and found that the problem is not solvable.

Note that even when you are using a custom fault handler, query state is reset after a fault as if the query had been stopped and restarted. Yet contrast this with the default behavior, where the query is stopped and all subsequent events are dropped.

17.6.1 Implementing a Fault Handler Class

You create a fault handler class by implementing the com.bea.wlevs.ede.api.FaultHandler interface. After you have written the class, you associated it with the stage for which it will handle faults by registering it as an OSGi service. For more information, see Section 17.6.2, "Registering a Fault Handler".

Your implementation of the interface's one method, handleFault, receives exceptions for the EPN stage with which the handler is associated. The exception itself is either an instance of com.bea.wlevs.ede.api.EventProcessingException or, if there has been a JVM error, an instance of java.lang.Error.

The method also receives a string array containing the names of upstream stages, or catchers, to which the exception will go if your code re-throws it. If there is more than one catcher in the array, your re-thrown exception will go to all of them. There are two cases when the catchers array will be empty: when the exception occurs while executing a temporal query and if the exception is thrown to a channel's fault handler. In these cases, the fault handler is executed in the context of a background thread; there is no linkage to upstream stages.

An exception that is re-thrown from a fault handler will travel back up through upstream EPN stages until it is either caught or reaches a stage that cannot catch it (such as a processor or multi-threaded channel that does not have an associated fault handler). Note that if you re-throw an exception, any channels in the catchers list must have an associated fault handler in order to catch the exception.

The EventProcessingException instance could also be one of the exception types that extend that class, including CQLExecutionException, ArithmeticExecutionException, and others (be sure to see the Oracle Fusion Middleware Java API Reference for Oracle Event Processing). The EventProcessingException instance provides methods with which your code can retrieve insert, delete, and update events that were involved in generating the fault.

Your implementation of the method should do one of the following:

  • Consume the fault in the way that a Java try/catch statement might. If your implementation does not re-throw the fault, then event processing will continue with subsequent events. However, query processing continues with its state reset as if the query had been restarted. Processing state is lost and processing begins fresh with events that follow those that provoked the fault.

  • Re-throw the fault so that it will be received by upstream stages (or their fault handlers). As when the fault is consumed, queries continue processing events, although query state is reset with subsquent events. The upstream stage receiving the fault always has the option of explicitly stopping the offending query by using the CQL processor's MBean interface.

In Example 17-13, "Fault Handler Class", the code provides a high level illustration of handling a fault.

Example 17-13 Fault Handler Class

package com.example.faulthandler;

import com.bea.wlevs.ede.api.FaultHandler;

public class SimpleFaultHandler implements FaultHandler
{
    private String suppress;

    // Called by the server to pass in fault information.
    @Override
    public void handleFault(Throwable fault, String[] catchers) throws Throwable
    {
        // Log the fault.
        return;
    }
}

17.6.2 Registering a Fault Handler

After you have written a fault handling class, you can associate it with an EPN stage by registering it as an OSGi service. The simplest way to do this is to register the handler declaratively in the EPN assembly file.

Note:

Due to inherent OSGi behavior, runtime fault handler registration from your configuration happens asynchronously, meaning that a small amount of warm-up time might be required before the handler is able to receive faults. To be sure your handler is ready for the first events entering the network, consider adding a wait period before the application begins receiving events.

In Example 17-14, "Code to Register a Fault Handler with an EPN Stage", the EPN assembly file excerpt shows a service element stanza that registers the SimpleFaultHandler class as the fault handler for the Oracle CQL processor whose id is exampleProcessor.

Example 17-14 Code to Register a Fault Handler with an EPN Stage

<osgi:service interface="com.bea.wlevs.ede.api.FaultHandler">
    <osgi:service-properties>
        <entry key="application.identity" value="myapp"/>
        <entry key="stage.identity" value="exampleProcessor"/>
    </osgi:service-properties>
    <bean class="com.example.faulthandler.SimpleFaultHandler"/>
</osgi:service>

<!-- A processor with a user-defined function. -->
<wlevs:processor id="exampleProcessor" >
    ...
</wlevs:processor>

For more on the schema for registering OSGi services, see http://static.springsource.org/osgi/docs/1.1.x/reference/html/appendix-schema.html. For more on OSGi, see http://en.wikipedia.org/wiki/OSGi.

17.7 Example Oracle CQL Processor Configuration Files

This section provides example Oracle CQL processor configuration files, including:

17.7.1 Oracle CQL Processor Component Configuration File

The following example shows a component configuration file for an Oracle CQL processor.

<?xml version="1.0" encoding="UTF-8"?>
<n1:config
    xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd"
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <processor>
        <name>proc</name>
        <rules>
            <view id="lastEvents"><![CDATA[ 
                select mod(price) 
                from filteredStream[partition by srcId, cusip rows 1]
            ]]></view>
            <query id="q1"><![CDATA[
                SELECT *
                FROM   lastEvents
                WHERE  price > 10000
            ]]></query>
        </rules>
    </processor>
</n1:config>

In the example, the name element specifies that the processor for which the Oracle CQL rules are being configured is called proc. This in turn implies that the EPN assembly file that defines your application must include a corresponding wlevs:processor element with an id attribute value of proc to link these Oracle CQL rules with an actual proc processor instance (see Section 17.7.2, "Oracle CQL Processor EPN Assembly File").

This Oracle CQL processor component configuration file also defines a view element to specify an Oracle CQL view statement (the Oracle CQL equivalent of a subquery). The results of the view's select are not output to a down-stream channel.

Finally, this Oracle CQL processor component configuration file defines a query element to specify an Oracle CQL query statement. The query statement selects from the view. By default, the results of a query are output to a down-stream channel. You can control this behavior in the channel configuration using a selector element. For more information, see:

17.7.2 Oracle CQL Processor EPN Assembly File

The following example shows an EPN assembly file for an Oracle CQL processor.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:osgi="http://www.springframework.org/schema/osgi"
       xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
       xsi:schemaLocation="
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/osgi
  http://www.springframework.org/schema/osgi/spring-osgi.xsd
  http://www.bea.com/ns/wlevs/spring
  http://www.bea.com/ns/wlevs/spring/spring-wlevs-v11_1_1_6.xsd">

    <wlevs:event-type-repository>
        <wlevs:event-type type-name="ExchangeEvent">
            <wlevs:properties>
                <wlevs:property name="symbol" type="char[]" length="16" />
                <wlevs:property name="price" type="java.lang.Double" />
            </wlevs:properties>
        </wlevs:event-type>
        <wlevs:event-type type-name="StockExchangeEvent">
            <wlevs:properties>
                <wlevs:property name="symbol" type="char[]" length="16" />
                <wlevs:property name="price" type="java.lang.Double" />
                <wlevs:property name="exchange" type="char[]" length="16" />
            </wlevs:properties>
        </wlevs:event-type>
        <wlevs:event-type type-name="StockEvent">
            <wlevs:properties>
                <wlevs:property name="symbol" type="char[]" length="16" />
                <wlevs:property name="exchange" type="char[]" length="16" />
            </wlevs:properties>
        </wlevs:event-type>
    </wlevs:event-type-repository>

    <!-- Assemble EPN (event processing network) -->
    <wlevs:adapter id="adapter" class="com.bea.wlevs.example.db.ExchangeAdapter" >
        <wlevs:listener ref="ExchangeStream"/>
    </wlevs:adapter>

    <wlevs:channel id="ExchangeStream" event-type="ExchangeEvent" >
        <wlevs:listener ref="proc"/>
    </wlevs:channel>

    <wlevs:table id="Stock" event-type="StockEvent" data-source="StockDs" />

    <wlevs:processor id="proc" advertise="true" >
        <wlevs:table-source ref="Stock" />
    </wlevs:processor>

    <wlevs:channel id="OutputStream" advertise="true" event-type="StockExchangeEvent" >
        <wlevs:listener ref="bean"/>
        <wlevs:source ref="proc"/>
    </wlevs:channel>

    <osgi:reference id="ds" interface="com.bea.core.datasource.DataSourceService" cardinality="0..1" />

    <!-- Create business object -->
    <bean id="bean" class="com.bea.wlevs.example.db.OutputBean">
        <property name="dataSourceService" ref="ds"/>
    </bean>

</beans>