6 Oracle CQL Processors

An Oracle CQL Processor processes incoming events from various input channels and other data sources. You use Oracle CQL to write the business logic in the form of continuous queries that process incoming events. Oracle CQL filters, aggregates, correlates, and processes events in real time.

Note:

You can create a Java class with methods that enhance the functionality available in Oracle CQL. Within Oracle CQL you reference the compiled class by name and call its methods from a SELECT statement. See SELECT Clause in Oracle CQL Language Reference for Oracle Event Processing.

This chapter includes the following sections:

This chapter presents an overview of Oracle CQL with examples to help you understand the basic concepts. See Cached Event Data for information about performing CQL queries on caches. See also Oracle CQL Queries, Views, and Joins in Oracle CQL Language Reference for Oracle Event Processing.

Oracle JDeveloper provides Oracle CQL Pattern components that provide templates for creating common Oracle CQL queries. See Using Oracle CQL Patterns in Getting Started with Oracle Event Processing.

This chapter describes some of the assembly and configuration file Oracle CQL Processor settings For a complete reference, see Processor inSchema Reference for Oracle Event Processing .

6.1 Processor Data Sources

Oracle CQL queries can define one or more statements to process incoming event data from one or more input sources and send the outgoing event data to one or more output channels.

Each channel (input or output) and data source has an associated event type.

For example, one input can be a channel and another input can be a Coherence cache. The channel and Coherence cache have different event types because the Coherence cache provides additional information to the Oracle CQL processor query that is related to, but not the same as, the event data coming from the input channel.

If you configure an Oracle CQL processor with more than one query, by default, all queries output their results to all of the output channels. You can control which queries output their results to which output channels by putting a selector element on the downstream channel or channels. Use the selector element to specify a space delimited list of one or more query names that can output their results to that channel. The Oracle CQL query assigned to the output channel has the correct attributes to match the event type defined on the output channel. For more information, see Control Which Queries Output to a Downstream Channel.

6.2 Assembly and Configuration Files

When you add an Oracle CQL processor to your EPN, the assembly file shows the following entry.

<wlevs:processor id="processor"/>

When you add an Oracle CQL Pattern such as the Averaging Rule to the EPN in Oracle JDeveloper, the assembly file shows the following entries.

<wlevs:processor id="processor"/>
<wlevs:processor id="averaging-rule"/>

Configuration File

When you add the Oracle CQL processor to your EPN, the configuration file shows the following entry. By default, you get a template for rules that contains a template for one query.

 <processor>
   <name>processor</name>
   <rules>
     <query id="ExampleQuery"><![CDATA[ 
       select * from MyChannel [now] >
     </query>
   </rules>
  </processor>
  • The rules element groups the Oracle CQL statements that the Oracle CQL statements this processor executes.

  • The query element contains Oracle CQL select statements. The query element id attribute defines the query name.

  • The XML CDATA type indicates where to put the Oracle CQL rule.

  • The select statement is the actual query. The template provides the [now] operator so that you can perform a now operation as described in NOW and Last Event Windows.

6.3 Queries

The following sections show how to perform basic Oracle CQL processor queries on stock trade events.

Objective

The objective for this section is understand how to use windows, slides, and views in Oracle CQL queries.

  • Windows convert event streams to time-based event relations to make it possible to perform Oracle CQL operations on the events. See Time-Based Relations (Windows).

  • Slides enable you to batch events to control how the rate at which the CQL processor outputs events. See Processor Output Control (Slides).

  • Views enable you to create an Oracle CQL statement that can be reused by other Oracle CQL queries. See Views.

Event Type Definition

The stock trade events used in the examples for this section are type StockTradeEventType with the following field and type definitions:

  • tickerSymbol: String

  • price: Double

  • dailyHigh: Double

  • dailyLow: Double

  • closingValue: Double

6.3.1 Stream Channels

A stream channel inserts events into a collection and sends the stream to the next EPN stage. Events in a stream flow continuously, can never be deleted from the stream, and have no end. You can perform queries on the continuous stream of events flowing into your application.

A query on the input stream channel, StockTradeIChannel, to retrieve all stock trade events with the Oracle ticker symbol follows.

SELECT tickerSymbol
FROM StockTradeIStreamChannel
WHERE tickerSymbol = ORCL

The following configuration file entry shows the query. ISTREAM is a relation to stream operator described in Relation to Stream Operators.

<processor>
  <rules>
    <query id=rule1 <![CDATA[ISTREAM (SELECT tickerSymbol
       FROM StockTradeIStreamChannel WHERE tickerSymbol = ORCL)>
    </query>
  </rules>
</processor>

6.3.2 Time-Based Relations (Windows)

A relation channel inserts events into a collection and sends the relation to the next EPN stage. A relation is a window of time on the stream that has a beginning and an end. Events in a relation can be inserted into, deleted from, and updated in the relation. For insert, delete, and update operations, events in a relation must be referenced to a particular point in time to ensure the operation takes place on the correct event. All operations on a relation are time based.

Most applications do not use relation channels. You can put a window of time on events coming from a stream channel to create a relation for time-based processing operations. To find the average price for a particular stock, you must determine a time frame (window) in which to calculate the average. When you define a window on a stream, you have a collection of data that is not flowing, and unlike a stream, has a beginning and an end. The window is an in-memory relation on which you can apply a function such as AVG and also perform insert, update, and delete operations.

Operators that put a window of time on a stream are called stream to relation operators. The output of stream to relation operations are relations. You use relation to stream operators to convert a relation back to a stream to output a stream that contains every event, only inserted events, or only deleted events.

Oracle CQL processor output typically goes to a stream channel and on to the next stage in the EPN.

6.3.2.1 Stream to Relation Operators

The stream to relation operators are RANGE and ROW.

RANGE Operator

You can specify a window of time with the time-based window operator, RANGE, as follows:

SELECT AVG(price) 
FROM StockTradeIStreamChannel [RANGE 1 MINUTE]

In this example and to keep the example easy to understand, the range is 1 minute, ticks in seconds, and one input event is received every second. The query starts averaging the prices contained in the events at zero seconds and outputs a value of 0 because there is no event in the relation at zero seconds. When the next event arrives at 1 second, the average price is the price in the first event. When the next event arrives at 2 seconds, the average price is the average of the two events in the relation. This continues until 59 seconds (1 minute) is reached.

An important concept with time-based window operators is that the window shifts over the event stream based on time. When 60 seconds have elapsed, the window shifts by one-second to average the prices in the events from 1 to 60 seconds, and when 60 more seconds are reached, the window shifts by one more second to average the prices in the events from 2 to 61 seconds. The window shifting over the relation behavior continues as long as the application runs.

The following configuration file entry shows the query. ISTREAM is a relation to stream operator described in Relation to Stream Operators.

<processor>
  <rules>
    <query id=rule2 <![CDATA[ISTREAM (SELECT AVG(price)
       FROM StockTradeIStreamChannel [RANGE 1 MIN >
    </query>
  </rules>
</processor>

Note:

Very large numbers must be suffixed. Without the suffix, Java treats very large numbers like an integer and the value might be out of range for an integer, which throws an error.

Add a suffix as follows:

l or L for Longf or F for floatd or D for doublen or N for big decimal

For example: SELECT * FROM channel0[RANGE 1368430107027000000l nanoseconds]

ROW Operator

You can specify a tuple-based window with the time-based ROWS operator as follows:

SELECT AVG(price)
FROM StockTradeIStreamChannel [ROWS 3]

A tuple is an event, so the ROWS 3 operation means to average the price on three events in the relation starting when the first event arrives. The way it works is that the average operation is performed on the first event that enters the relation. When the second event enters the relation, the average operation is performed on the two events. When the third event enters the relation, the average operation is performed on the three events. No averaging occurs again until the fourth event enters the relation. When the fourth event enters the relation, the second, third, and fourth events are averaged. Likewise, when the fifth event enters the relation, the third, fourth, and fifth events are averaged.

The prior examples have averaged the price for all stocks. To compute the average for specific stocks in the stream, the following query uses a partitioned window.

SELECT AVG(price), tickerSymbol 
FROM StockTradeIStreamChannel [PARTITION by tickerSymbol ROWS 3]
GROUP BY tickerSymbol

A partitioned window creates separate relation-windows for each partition. So in this example with the PARTITION by tickerSymbol clause, stocks with the same ticker symbol are grouped by three events and averaged. Without the partition and using only the GROUP BY clause, the tuple keeps the last three events as expected, but the ticker symbols in the tuple do not always match, which introduces averaging errors.

The following is the configuration file entry for this query. ISTREAM is a relation to stream operator described in Relation to Stream Operators.

<procesor>
  <rules>
    <query id="Example"><![CDATA[ISTREAM select tickerSymbol, AVG(price) 
         from StockTradeIStream 
         [PARTITION by tickerSymbol ROWS 3] 
         GROUP BY tickerSymbol) >
    </query>
  </rules>
</processor>

6.3.2.2 Relation to Stream Operators

The relation to stream operators are ISTREAM, DSTREAM, and RSTREAM.

ISTREAM Operator

The ISTREAM operator puts an insert event from the relation into the output stream. Events that were deleted or updated in the relation are ignored. When the average changes, the query sends a delete event to the relation to remove the previous average and then sends an insert event to the relation to add the new average into the relation. The following example uses the ISTREAM operator to update the output stream when a new average is calculated.

ISTREAM (SELECT AVG(price) 
FROM StockTradeIStreamChannel [RANGE 1 MINUTE])

The following configuration file entry shows the ISTREAM operator.

<processor>
  <rules>
    <query id=rule2 <![CDATA[ISTREAM (SELECT AVG(price)
       FROM StockTradeIStreamChannel [RANGE 1 MIN >
    </query>
  </rules>
</processor>

DSTREAM Operator

Use the DSTREAM operator to find out when a situation is no longer useful such as when a stock has been delisted from the exchange. The following example uses the DSTREAM operator to update the output stream with the old average after the new average is calculated in the relation.

DSTREAM (SELECT AVG(price) 
FROM StockTradeIStreamChannel [RANGE 1 MINUTE])

The following configuration file entry shows the DSTREAM operator.

<processor>
  <rules>
    <query id=rule2 <![CDATA[DSTREAM (SELECT AVG(price)
       FROM StockTradeIStreamChannel [RANGE 1 MIN >
    </query>
  </rules>
</processor>

RSTREAM Operator

The RSTREAM operator inserts all events into the output stream regardless of whether events were deleted or updated. Use this operator when you need to take downstream action on every output. The following examples uses the RSTREAM operator to select all events in the input stream, wait for two events to arrive in the relation, and put the two events from the relation into the output stream.

RSTREAM (SELECT * 
FROM StockTradeIStreamChannel [ROWS 2])

The following configuration file entry shows the RSTREAM operator.

<processor>
  <rules>
    <query id=rule2 <![CDATA[RSTREAM (SELECT *
       FROM StockTradeIStreamChannel [ROWS 2 >
    </query>
  </rules>
</processor>

6.3.2.3 NOW and Last Event Windows

A NOW window to contain the event that happened at the last tick of the system. With the NOW operator, the last input event can be deleted in the next time tick (the new NOW) so you might not have captured what you want. If you truly the last input event, use a last event window. The following example shows how to construct a NOW window.

SELECT * FROM StockTradeIStream[NOW]

The following configuration file entry shows the NOW operator.

<processor>
  <rules>
    <query id=rule2 <![CDATA[ISTREAM (SELECT *
       FROM StockTradeIStreamChannel [NOW>
    </query>
  </rules>
</processor>

A last event window captures the last event received. The following example shows how to construct a last event window.

SELECT * FROM StockTradeIStream[ROWS 1]

The following configuration file entry shows a last event window.

<processor>
  <rules>
    <query id=rule2 <![CDATA[ISTREAM (SELECT *
       FROM StockTradeIStreamChannel [ROWS 1 >
    </query>
  </rules>
</processor>

6.3.3 Processor Output Control (Slides)

Instead of outputting query results as they happen, you can use the SLIDE operator in a subclause to batch the output events. You can batch the events based on the number of events when you use the ROW operator or an amount of time (time window) when you use the RANGE operator.

Note:

When a slide value is not specified, the query assumes the default value of 1 row for tuple-based windows, and 1 time tick for time-based windows.

Batch by Number of Events

The following example outputs every 2 events (2, 4, 6, 8, ...).

SELECT * FROM StockTradeIStreamChannel[ROWS 3 SLIDE 2]

The output from the SLIDE operator includes deleted events. When the first two events arrive in the relation, the query outputs both events to the stream. When the next event arrives, there are three events in the relation, but output happens next at the fourth event. When the fourth event arrives, the first event is deleted and output with the third and fourth events.

The following example shows how to use a slide with the RSTREAM operator. In this case, when the fourth event arrives, events 2, 3, and 4 are sent to the output stream. The RSTREAM operator sends all events to the output stream regardless of whether events were deleted or updated.

RSTREAM(SELECT * FROM StockTradeIStreamChannel[ROWS 3 SLIDE 2])

The following configuration file entry uses an RSTREAM to batch by numbers.

<processor>
  <rules>
    <query id=rule2 <![CDATA[RSTREAM (SELECT *
       FROM StockTradeIStreamChannel [ROWS 3 SLIDE 2 >
    </query>
  </rules>
</processor>

Batch by Time Window

With a time window, Oracle Event Processing batches events by a time interval (RANGE operator). When you specify the time interval, Oracle CQL sends the events to the output stream at a time that is a multiple of the number you specified. For example, if you specify 5 seconds, the events are sent at 5, 10, 15, 20, and so on seconds. In the case where the first event arrives at 1, 2, or 3 seconds into the interval, the first output will be smaller than the others.

The following example specifies a range of 5 minutes with a slide every 30 seconds.

SELECT * FROM StockTradeIStream[RANGE 5 MIN SLIDE 30 SECONDS]

The following configuration file entry shows a time-based slide.

<processor>
  <rules>
    <query id=rule2 <![CDATA[RSTREAM (SELECT *
       FROM StockTradeIStreamChannel [RANGE 5 MIN 30 SECONDS >
    </query>
  </rules>
</processor>

6.3.4 Views

Views enable you to create an Oracle CQL statement that can be reused by other Oracle CQL queries. A view element contains Oracle CQL subquery statements. The view element id attribute defines the view name. A top-level SELECT statement that you create in a view element is called a view.

Note:

Subqueries are used with binary set operators such as union, union all, and minus). You must use parentheses in the subqueries so that the right precedence is applied to the query.

The following example shows view v1 and query q1 on the view. The view selects from stream S1 of xmltype stream elements. The view v1 uses the XMLTABLE clause to parse data from the xmltype stream elements with XPath expressions. The query q1 selects from view v1 as it would from any other data source. The XMLTABLE clause also supports XML name spaces.

An xmltype stream contains XML data. With the Oracle CQL XMLTABLE clause, you can parse data from an xmltype stream into columns using XPath expressions and access the data by column name. XPath expressions enable you to navigate through elements and attributes in an XML document.

Note:

The data types in the view's schema match the data types of the parsed data in the COLUMNS clause.

<view id="v1" schema="orderId LastShares LastPrice"><![CDATA[
  SELECT
    X.OrderId,
    X.LastShares,
    X.LastPrice
  FROM S1, XMLTABLE (
   "FILL"
   PASSING BY VALUE
   S1.c1 as "."
   COLUMNS
      OrderId char(16) PATH "fn:data(../@ID)",
      LastShares integer PATH "fn:data(@LastShares)",
      LastPrice float PATH "fn:data(@LastPx)"
   ) as X
></view>

<query id="q1"><![CDATA[
  IStream(
    select
      orderId,
      sum(LastShares * LastPrice),
      sum(LastShares * LastPrice) / sum(LastShares)
    from
      v1[now]
    group by orderId
  )
></query>

6.4 CQL Aggregations

Oracle CQL supports aggregate functions such as AVG, COUNT, SUM, which are calculated incrementally and MAX, and MIN, which are not incremental.

The aggregate functions aggregate events into a Java collection so that you can use the Collection APIs to manipulate the events.

You can check for conditions on the aggregated results with the HAVING clause. In the following example only averages higher than 50 are output.

SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 HOUR]
HAVING AVG(price) > 50

Oracle CQL provides a variety of built-in single-row functions and aggregate functions based on the Colt open source libraries for high performance scientific and technical computing. The functions which are available as part of Colt library will not support Big Decimal data type and NULL input values. Also the value computation of the functions are not incremental. See the COLT website for details.

6.5 Configure a Table Source

You can access a relational database table from an Oracle CQL query by creating a table component with an associated data source. Oracle Event Processing relational database table event sources are pull data sources, which means that Oracle Event Processing periodically polls the event source.

  • You can join a stream only with a NOW window and only to a single database table.

    Because changes in the table source are not coordinated in time with stream data, you can only join the table source to an event stream with a Now window, and you can only join to a single database table.

  • With Oracle JDBC data cartridge, you can integrate arbitrarily complex SQL queries and multiple tables and data sources with your Oracle CQL queries. See Oracle JDBC Data Cartridge in Developing Applications with Oracle CQL Data Cartridges.

    Note:

    Oracle recommends the Oracle JDBC data cartridge for accessing relational database tables from an Oracle CQL statement.

Whether you use the NOW window or the data cartridge, you must define data sources in the Oracle Event Processing server file as described in Define Data Sources in Administering Oracle Event Processing.

6.5.1 Assembly File

The following assembly file entry shows the setting for a table source with an id attribute of Stock.

<wlevs:table id="Stock" event-type="TradeEvent" data-source="StockDataSource"/>

Oracle Event Processing uses the event type and the data source to map a relational table row to the event type. The TradeEvent event type is created from a Java class that has the following five private fields that map to columns in the relational database: symbol, price, lastPrice, percChange, and volume.

Note:

The XMLTYPE property is not supported for table sources.

6.5.2 Configuration File

  <data-source>
    <name>StockDs</name>
    <connection-pool-params>
      <initial-capacity>1</initial-capacity>
      <max-capacity>10</max-capacity>     
    </connection-pool-params>
    <driver-params>
      <url>jdbc:derby:</url>
      <driver-name>org.apache.derby.jdbc.EmbeddedDriver</driver-name>
      <properties>
        <element>
          <name>databaseName</name>
          <value>db</value>
        </element>
        <element>
          <name>create</name>
          <value>true</value>
        </element>
      </properties>
    </driver-params>
    <data-source-params>
      <jndi-names>
        <element>StockDs</element>
      </jndi-names>
      <global-transactions-protocol>None</global-transactions-protocol>
    </data-source-params>
  </data-source>

After configuration, you can define Oracle CQL queries that access the Stock table as if it were another event stream.

In the following example, the query joins the StockTradeIStreamChannel event stream to the Stock table:

SELECT StockTradeIStreamChannel.symbol, StockTradeIStreamChannel.price,
         StockTradeIStream.lastPrice, StockTradeIStream.percChange,
         StockTradeIStream.volume, Stock
FROM   StockTraceIStreamChannel [Now], Stock
WHERE  StockTradeIStreamChannel.symbol = Stock.symbol

Because changes in the table source are not coordinated in time with stream data, you can only join the table source to an event stream with a Now window, and you can only join to a single database table.

6.6 Configure an Oracle CQL Processor for Parallel Query Execution

For improved performance, you can enable a CQL query to execute in parallel rather than serially, as it does by default.

When the CQL code supports it, you can configure a query so that it can process incoming events in parallel when multiple threads are available to the CQL processor.

You should enable parallel query execution only in cases where the relative order of the query output events is unimportant to the query's downstream client. For example, event ordering probably is not important if your query is intended primarily to filter events, such as to deliver to clients a set of stock transactions involving a particular company, where the transaction sequence is irrelevant.

By default (without enabling parallel execution), queries process events from a channel serially. For events routed through a channel that uses a system time stamp, event order is the order in which events are received; through a channel that is time stamped by an application, event order is the order determined by a time stamp value included in the event. Relaxing the total order constraint allows the configured query to not consider event order for that query, processing events in parallel where possible.

6.6.1 Set Up Parallel Query Execution Support

While specifying support for parallel query execution is at its core a simple configuration task, be sure to follow the other steps below so that you get the most out of the feature.

  • Use the ordering-constraint attribute to support parallel execution.

  • Make sure you have enough threads calling into the processor to meet your performance goals. The maximum amount of parallel query execution is constrained by the number of threads available to the CQL processor. For example, if an adapter upstream of the processor supports the number of threads you need and there is a channel between the adapter and the processor, try configuring the channel with a max-threads count of 0 so that it acts as a pass-through.

    If you don't want a pass-through, be sure to configure the query's upstream channel with a max-threads value greater than 1. (To make a max-threads value setting useful, you'll need to also set the max-size attribute to a value greater than 0.) For more information, see Channels .

  • Follow other guidelines related to setting the max-threads attribute value. For example, to make a max-threads value setting useful, you'll need to also set the max-size attribute to a value greater than 0.

  • Ensure, if necessary, that a bean receiving the query results is thread-aware, such as by using synchronized blocks. For example, you might need to do so if the bean's code builds a list from results received from queries executed on multiple threads.

6.6.2 The ordering-constraint Attribute

You enable parallel query execution by relaxing the default ordering constraint that ensures that events are processed serially. You do this by setting the ordering-constraint attribute on a query or view element.

In the following example, the ordering-constraint attribute is set to UNORDERED so that the query will execute in parallel whenever possible:

<query id="myquery" ordering-constraint="UNORDERED">
    SELECT symbol FROM S WHERE price > 10
</query>

The ordering-constraint attribute supports the following three values:

  • ORDERED means that the order of output events (as implied by the order of input events) is important. The CQL engine will process events serially. This is the default behavior.

  • UNORDERED means that order of the output events is not important to the consumer of the output events. This gives the freedom to the CQLProcessor to process events in parallel on multiple threads. When possible, the query will execute in parallel on multiple threads to process the events.

  • PARTITION_ORDERED means that you're specifying that order of output events within a partition is to be preserved (as implied by the order of input events) while order of output events across different partitions is not important to the consumer of the output events. This relaxation provides some freedom to the CQL engine to process events across partitions in parallel (when possible) on multiple threads.

Use the PARTITION_ORDERED value when you want to specify that events conforming to a given partition are processed serially, but that order can be disregarded across partitions and events belonging to different partitions may be processed in parallel. When using the PARTITION_ORDERED value, you must also add the partition-expression attribute to specify which expression for partitioning should be the basis for relaxing the cross-partition ordering constraint.

In the following example, the GROUP BY clause partitions the output based on symbol values. The partition-expression attribute specifies that events in a given subset of events corresponding to a particular symbol value should be handled serially. Across partitions, on the other hand, order can be disregarded.

<query id="myquery" ordering-constraint="PARTITION_ORDERED"
    partitioning-expression="symbol">
    SELECT
        COUNT(*) as c, symbol
    FROM
        S[RANGE 1 minute]
    GROUP BY
        symbol
</query>

6.6.3 Using partition-order-capacity with Partitioning Queries

In general, you will probably see improved performance for queries by making more threads available and setting the ordering-constraint attribute so that they're able to execute in parallel when possible. As with most performance tuning techniques, a little trial and error with these settings should yield a combination that gets better results.

However, in some cases where your queries use partitioning -- and you've set the ordering-constraint attribute to PARTITION_ORDERED -- you might not see the amount of scaling you'd expect. For example, consider a case in which running with four threads doesn't improve performance very much over running with two threads. In such a case, you can try using the partition-order-capacity value to get the most out of CQL engine characteristics at work with queries that include partitions.

The partition-order-capacity value specifies the maximum amount of parallelism that will be permitted within a given processor instance when processing a PARTITION_ORDERED query. When available threads are handling events belonging to different partitions, the value sets a maximum number of threads that will be allowed to simultaneously run in the query.

As with other aspects of performance tuning, getting the most out of partition-order-capacity may take a bit of experimentation. When tuning with partition-order-capacity, a good starting point is to set it equal to the maximum number of threads you expect to have active in any CQL processor instance. In some cases (for example, at high data rates or with expensive processing downstream from the CQL processor), it may be helpful to set the partition-order-capacity value even higher than the available number of threads. However, you should only do this if performance testing confirms that it's helpful for a given application and load.

The partition-order-capacity value is set from one of four places, two of which you can fall back on when you do not explicitly set it yourself.

These are, in order of precedence.

  1. The partition-order-capacity element set on a channel configuration. If you specify this on the input channel for a processor, it takes effect for any PARTITION_ORDERED queries in that processor.

  2. The partition-order-capacity property in server configuration. This value will be used for all PARTITION_ORDERED queries running on the server unless the value is set on a channel.

  3. The max-threads value set on a channel configuration. If you specify this on the input channel for a processor, it takes effect for any PARTITION_ORDERED queries in that processor

  4. A system default value (currently set to 4) is used if you don't specify either a partition-order-capacity value or max-threads value, or if the max-threads value is set to 0 (meaning it's a pass-through channel).

When using partition-order-capacity, keep in mind the following:

  • The partition-order-capacity value is only useful when you're setting the ordering-constraint attribute to PARTITION_ORDERED.

  • Increasing partition-order-capacity generally increases parallelism and scaling. For example, if your profiling reveals lock contention bottlenecks, you might find it helpful to increase partition-order-capacity to see if contention is reduced.

  • Setting partition-order-capacity even higher than the number of available threads can be helpful in some cases because of the particular way partitioning is done in the CQL processor.

  • There is some resource cost in memory used by specifying very high values.

  • Tuning this parameter is very dependent on details of the application and the input rate. Tuning by experimentation may be necessary to determine an optimal value.

6.6.4 Limitations

Think of parallel query execution as a performance enhancement feature that you specify support for so that the CQL processor can use it whenever possible. Not all queries can be executed in parallel. This includes queries using certain CQL language features.

For example, if your query uses some form of aggregation -- such as to find the maximum value from a range of values -- the CQL processor may not be able to fully execute the query in parallel (this is needed to guarantee the correct result considering the ordering constraint). Some query semantics in themselves also constrain the query to ordered processing. Such queries will be executed serially regardless of whether you specify support for parallel execution.

Also, the IStream, RStream and DStream operators maintain the state of their operand for processing, making it necessary for the CQL processor to synchronize threads in order to execute the query.

Note that the CQL processor always respects the semantic intention of your query. In cases where the ordering-constraint attribute would change this intention, the attribute is coerced to a value that keeps the intention intact.

If you're using the partitioning-expression attribute, keep in mind that the attribute supports a single expression only. Entering multiple property names for the value is not supported.

6.7 Fault Handling

You can write code to handle faults that occur in code that does not have an inherent fault handling mechanism. This includes Oracle CQL code and multithreaded EPN channels.

By default, the CQL language has no mechanism for handling errors that occur, as does Java with its try/catch structure. To handle faults that occur in CQL, you can write a fault handler, then connect the handler to the EPN stage for which it handles faults, such as an Oracle CQL processor.

You can also associate a fault handler with a multithreaded channel, which is a channel whose max-threads setting is greater than 0. This provides fault handling in the case of exceptions that are thrown to the channel from a stage that is downstream of the channel. Note that channels whose max-threads setting is 0 are pass-through channels that already rethrow exceptions to their upstream stages. For additional information specific to fault handlers for channels, see Fault Handling.

A fault handler is a Java class that implements the com.bea.wlevs.ede.api.FaultHandler interface. You connect the class to an EPN stage by registering your fault handler as an OSGi service and associating it with the stage. For more information about OSGi, see Spring Framework.

Without a custom fault handler, you get the following default fault handling behavior:

  • When an exception occurs in Oracle CQL, the CQL engine catches the exception and stops the query processor.

  • If an exception occurs in a stage that is downstream to the processor, then that stage is dropped as a listener.

  • Exceptions are logged (under the CQLServer category) and the events that are part of the exception clause are discarded.

  • Upstream stages are not notified of the failure.

When using custom fault handlers you write, you can:

  • Associate a fault handler with an Oracle CQL processor or multithreaded channel so that faults in those stages are thrown as exceptions to the handler. There, you can handle or rethrow the exception.

  • Allow query processing to continue as your code either handles the exception or rethrows it to the stage that is next upstream.

  • Save event data from being lost while handling a fault. For example, if you have configured a connection to a data source, you could save event data there.

  • Log fault and event information when faults occur.

  • Use multiple fault handlers where needed in an EPN so that exceptions thrown upstream are handled when they reach other Oracle CQL processors and channels.

Consider associating a fault handler with a stage that does not have its own mechanism for responding to faults, including Oracle CQL processors and multithreaded channels. Other stages, such as custom adapters that have their own exception-handling model, do not benefit from a fault handler.

Queries can continue as your fault handling code evaluates the fault to determine what action should be taken, including rethrowing the fault to a stage that is upstream from the Oracle CQL processor.

For example, the upstream stage of the Oracle CQL processor could be the JMS subscriber adapter, which can roll back the JMS transaction (if the session is transacted) to allow the event to be redelivered. It can also commit the transaction if the event has been re-delivered already and found that the problem is not solvable.

Note that when you use a custom fault handler, the query state is reset after a fault as if the query had been stopped and restarted. In contrast the default behavior stops the query and drops all subsequent events.

6.7.1 Implement a Fault Handler Class

You create a fault handler class by implementing the com.bea.wlevs.ede.api.FaultHandler interface. After you have written the class, you associate it with the stage for which it handles faults by registering it as an OSGi service. For more information, see Register a Fault Handler.

Your implementation of the handleFault method receives exceptions for the EPN stage with which the handler is associated. The exception itself is either an instance of com.bea.wlevs.ede.api.EventProcessingException or, if there has been a JVM error, an instance of java.lang.Error.

The method also receives a string array that contains the names of upstream stages, or catchers, to which the exception goes when your code rethrows it. If there is more than one catcher in the array, your rethrown exception goes to all of them. There are two cases when the catchers array is empty: when the exception occurs while executing a temporal query and when the exception is thrown to a channel's fault handler. In these cases, the fault handler executes in the context of a background thread, and there is no linkage to upstream stages.

An exception that is rethrown from a fault handler travels through upstream EPN stages until it is either caught or reaches a stage that cannot catch it (such as a processor or multithreaded channel that does not have an associated fault handler). Note that if you rethrow an exception, any channels in the catcher's list must have an associated fault handler to catch the exception.

The EventProcessingException instance could also be one of the exception types that extend that class, including CQLExecutionException, ArithmeticExecutionException, and others. See the Java API Reference for Oracle Event Processing. The EventProcessingException instance provides methods with which your code can retrieve insert, delete, and update events that were involved in generating the fault.

Your implementation of the method should do one of the following:

  • Consume the fault in the way that a Java try and catch statement might. If your implementation does not rethrow the fault, then event processing continues with subsequent events. However, query processing continues with its state reset as if the query had been restarted. The processing state is lost and processing begins fresh with events that follow those that provoked the fault.

  • Rethrow the fault so that it is received by upstream stages (or their fault handlers). As when the fault is consumed, queries continue processing events, although the query state is reset with subsequent events. The upstream stage receiving the fault always has the option of explicitly stopping the offending query by using the CQL processor's MBean interface.

    Note:

    When you update an Oracle CQL query with an MBean, do not send events during the update procedure. If you send events during some queries, the order of the events in the output stream is not guaranteed. For example, when you update an Oracle CQL query from unordered to ordered in an Oracle CQL parallelism execution.

In the following example the code provides a high-level illustration of handling a fault.

package com.example.faulthandler;

import com.bea.wlevs.ede.api.FaultHandler;

public class SimpleFaultHandler implements FaultHandler
{
    private String suppress;

    // Called by the server to pass in fault information.
    @Override
    public void handleFault(Throwable fault, String[] catchers) throws Throwable
    {
        // Log the fault.
        return;
    }
}

6.7.2 Register a Fault Handler

After you have written a fault handling class, you can associate it with an EPN stage by registering it as an OSGi service. The simplest way to do this is to register the handler declaratively in the EPN assembly file.

Note:

Due to inherent OSGi behavior, runtime fault handler registration from your configuration happens asynchronously, meaning that a small amount of warm-up time might be required before the handler can receive faults. To be sure your handler is ready for the first events that enters the network, add a wait period before the application begins to receive events.

In the following example, the EPN assembly file excerpt shows a service element stanza that registers the SimpleFaultHandler class as the fault handler for the Oracle CQL processor with an id of exampleProcessor.

<osgi:service interface="com.bea.wlevs.ede.api.FaultHandler">
    <osgi:service-properties>
        <entry key="application.identity" value="myapp"/>
        <entry key="stage.identity" value="exampleProcessor"/>
    </osgi:service-properties>
    <bean class="com.example.faulthandler.SimpleFaultHandler"/>
</osgi:service>

<!-- A processor with a user-defined function. -->
<wlevs:processor id="exampleProcessor" >
    ...
</wlevs:processor>

For more on the schema for registering OSGi services, see http://static.springsource.org/osgi/docs/1.1.x/reference/html/appendix-schema.html. For more on OSGi, see http://en.wikipedia.org/wiki/OSGi.