1 Introduction to Oracle CQL

This chapter introduces Oracle Continuous Query Language (Oracle CQL), a query language based on SQL with added constructs that support streaming data. Using Oracle CQL, you can express queries on data streams with Oracle Stream Analytics.

This chapter includes the following sections:

1.1 Fundamentals of Oracle CQL

Databases are best equipped to run queries over finite stored data sets. However, many modern applications require long-running queries over continuous unbounded sets of data. By design, a stored data set is appropriate when significant portions of the data are queried repeatedly and updates are relatively infrequent. In contrast, data streams represent data that is changing constantly, often exclusively through insertions of new elements. It is either unnecessary or impractical to operate on large portions of the data multiple times.

Many types of applications generate data streams as opposed to data sets, including sensor data applications, financial tickers, network performance measuring tools, network monitoring and traffic management applications, and clickstream analysis tools. Managing and processing data for these types of applications involves building data management and querying capabilities with a strong temporal focus.

To address this requirement, Oracle introduces Oracle Stream Analytics, a data management infrastructure that supports the notion of streams of structured data records together with stored relations.

To provide a uniform declarative framework, Oracle offers Oracle Continuous Query Language (Oracle CQL), a query language based on SQL with added constructs that support streaming data.

Oracle CQL is designed to be:

  • Scalable with support for a large number of queries over continuous streams of data and traditional stored data sets.

  • Comprehensive to deal with complex scenarios. For example, through composability, you can create various intermediate views for querying.

Figure 1-1 shows a simplified view of the Oracle Stream Analytics architecture. Oracle Stream Analytics server provides the light-weight Spring container for Oracle Stream Analytics applications. The Oracle Stream Analytics application shown is composed of an event adapter that provides event data to an input channel. The input channel is connected to an Oracle CQL processor associated with one or more Oracle CQL queries that operate on the events offered by the input channel. The Oracle CQL processor is connected to an output channel to which query results are written. The output channel is connected to an event Bean: a user-written Plain Old Java Object (POJO) that takes action based on the events it receives from the output channel.

Figure 1-1 Oracle Stream Analytics Architecture

Description of Figure 1-1 follows
Description of "Figure 1-1 Oracle Stream Analytics Architecture"

Using Oracle Stream Analytics, you can define event adapters for a variety of data sources including JMS, relational database tables, and files in the local filesystem. You can connect multiple input channels to an Oracle CQL processor and you can connect an Oracle CQL processor to multiple output channels. You can connect an output channel to another Oracle CQL processor, to an adapter, to a cache, or an event Bean.

Using Oracle JDeveloper and Oracle Stream Analytics Visualizer, you:

  • Create an Event Processing Network (EPN) as Figure 1-1 shows.

  • Associate one more Oracle CQL queries with the Oracle CQL processors in your EPN.

  • Package your Oracle Stream Analytics application and deploy it to Oracle Stream Analytics server for execution.

Consider the typical Oracle CQL statements in the following example.

<?xml version="1.0" encoding="UTF-8"?>
<n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
    <name>cqlProcessor</name>
    <rules>
        <view id="lastEvents" schema="cusip bid srcId bidQty ask askQty seq"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from inputChannel[partition by srcId, cusip rows 1]
        ]]></view>
        <view id="bidask" schema="cusip bid ask"><![CDATA[ 
            select cusip, max(bid), min(ask) 
            from lastEvents
            group by cusip
        ]]></view>
            <view ...><![CDATA[
                ...
        ]]></view>
        ...
        <view id="MAXBIDMINASK" schema="cusip bidseq bidSrcId bid askseq askSrcId ask bidQty askQty"><![CDATA[ 
            select bid.cusip, bid.seq, bid.srcId as bidSrcId, bid.bid, ask.seq, ask.srcId as askSrcId, ask.ask, bid.bidQty, ask.askQty 
            from BIDMAX as bid, ASKMIN as ask 
            where bid.cusip = ask.cusip
        ]]></view>
        <query id="BBAQuery"><![CDATA[ 
            ISTREAM(select bba.cusip, bba.bidseq, bba.bidSrcId, bba.bid, bba.askseq, bba.askSrcId, bba.ask, 
                bba.bidQty, bba.askQty, "BBAStrategy" as intermediateStrategy, p.seq as correlationId, 1 as priority 
            from MAXBIDMINASK as bba, inputChannel[rows 1] as p where bba.cusip = p.cusip)
        ]]></query>
    </rules>
</processor>

This example defines multiples views (the Oracle CQL-equivalent of subqueries) to create multiple relations, each building on previous views. Views always act on an inbound channel such as inputChannel. The first view, named lastEvents, selects directly from inputChannel. Subsequent views may select from inputChannel directly or select from previously defined views. The results returned by a view's select statement remain in the view's relation: they are not forwarded to any outbound channel. That is the responsibility of a query. This example defines query BBAQuery that selects from both the inputChannel directly and from previously defined views. The results returned from a query's select clause are forwarded to the outbound channel associated with it: in this example, to outputChannel. The BBAQuery uses a tuple-based stream-to-relation operator (or sliding window).

For more information on these elements, see:

For more information on Oracle Stream Analytics server and tools, see Oracle Stream Analytics Server.

1.1.1 Streams and Relations

This section introduces the two fundamental Oracle Stream Analytics objects that you manipulate using Oracle CQL:

Using Oracle CQL, you can perform the following operations with streams and relations:

1.1.1.1 Streams

A stream is the principle source of data that Oracle CQL queries act on.

Stream S is a bag (or multi-set) of elements (s,T) where s is in the schema of S and T is in the time domain.

Stream elements are tuple-timestamp pairs, which can be represented as a sequence of timestamped tuple insertions. In other words, a stream is a sequence of timestamped tuples. There could be more than one tuple with the same timestamp. The tuples of an input stream are required to arrive at the system in the order of increasing timestamps. For more information, see Time.

A stream has an associated schema consisting of a set of named attributes, and all tuples of the stream conform to the schema.

The term "tuple of a stream" denotes the ordered list of data portion of a stream element, excluding timestamp data (the s of <s,t>). The following example shows how a stock ticker data stream might appear, where each stream element is made up of <timestamp value>, <stock symbol>, and <stock price>:

...
<timestampN>    NVDA,4
<timestampN+1>  ORCL,62
<timestampN+2>  PCAR,38
<timestampN+3>  SPOT,53
<timestampN+4>  PDCO,44
<timestampN+5>  PTEN,50
...

In the stream element <timestampN+1>  ORCL,62, the tuple is ORCL,62.

By definition, a stream is unbounded.

This section describes:

For more information, see:

1.1.1.1.1 Streams and Channels

Oracle Stream Analytics represents a stream as a channel as Figure 1-2 shows. Using Oracle JDeveloper, you connect the stream event source (PriceAdapter) to a channel (priceStream) and the channel to an Oracle CQL processor (filterFanoutProcessor) to supply the processor with events. You connect the Oracle CQL processor to a channel (filteredStream) to output Oracle CQL query results to down-stream components (not shown in Figure 1-2).

Figure 1-2 Stream in the Event Processing Network

Description of Figure 1-2 follows
Description of "Figure 1-2 Stream in the Event Processing Network"

Note:

In Oracle Stream Analytics, you must use a channel to connect and push event source to an Oracle CQL processor and to connect an Oracle CQL processor to an event sink. A channel is optional with other Oracle Stream Analytics components.

1.1.1.1.2 Channel Schema

The event source you connect to a stream determines the stream's schema. The PriceAdapter adapter determines the priceStream stream's schema. The following example shows the PriceAdapter Event Processing Network (EPN) assembly file: the wlevs:event-type element defines event type PriceEvent. The wlevs:property element defines the property names and types for each property in this event type.

...
<wlevs:event-type-repository>
    <wlevs:event-type type-name="PriceEvent">
        <wlevs:properties>
            <wlevs:property name="cusip" type="char" />
            <wlevs:property name="bid" type="double" />
            <wlevs:property name="srcId" type="char" />
            <wlevs:property name="bidQty" type="int" />
            <wlevs:property name="ask" type="double" />
            <wlevs:property name="askQty" type="int" />
            <wlevs:property name="seq" type="bigint" />
            <wlevs:property name="sector" type="char" />
        </wlevs:properties>
    </wlevs:event-type>
</wlevs:event-type-repository>

<wlevs:adapter id="PriceAdapter" provider="loadgen">
    <wlevs:instance-property name="port" value="9011"/>
    <wlevs:listener ref="priceStream"/>
</wlevs:adapter>

<wlevs:channel id="priceStream" event-type="PriceEvent">
    <wlevs:listener ref="filterFanoutProcessor"/>
</wlevs:channel>

<wlevs:processor id="filterFanoutProcessor" provider="cql">
    <wlevs:listener ref="filteredStream"/>
</wlevs:processor>

...
1.1.1.1.3 Querying a Channel

Once the event source, channel, and processor are connected, you can write Oracle CQL statements that make use of the stream. The following example shows the component configuration file that defines the Oracle CQL statements for the filterFanoutProcessr.

<processor>
    <name>filterFanoutProcessor</name>
    <rules>
        <query id="Yr3Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="3_YEAR"
        ]]></query>
        <query id="Yr2Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="2_YEAR"
        ]]></query>
        <query id="Yr1Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="1_YEAR"
        ]]></query>
    </rules>
</processor>
1.1.1.1.4 Controlling Which Queries Output to a Downstream Channel

If you specify more than one query for a processor, then all query results are output to the processor's out-bound channel (filteredStream).

Optionally, in the component configuration file, you can use the channel element selector attribute to control which query's results are output. In this example, query results for query Yr3Sector and Yr2Sector are output to filteredStream but not query results for query Yr1Sector.

<channel>
    <name>filteredStream</name>
    <selector>Yr3Sector Yr2Sector</selector>
</channel>

You may configure a channel element with a selector before creating the queries in the upstream processor. In this case, you must specify query names that match the names in the selector.

1.1.1.2 Relations

Time varying relation R is a mapping from the time domain to an unbounded bag of tuples to the schema of R.

A relation is an unordered, time-varying bag of tuples: in other words, an instantaneous relation. At every instant of time, a relation is a bounded set. It can also be represented as a sequence of timestamped tuples that includes insertions, deletions, and updates to capture the changing state of the relation.

Like streams, relations have a fixed schema to which all tuples conform.

Oracle Stream Analytics supports both base and derived streams and relations. The external sources supply data to the base streams and relations.

A base (explicit) stream is a source data stream that arrives at an Oracle Stream Analytics adapter so that time is non-decreasing. That is, there could be events that carry same value of time.

A derived (implicit) stream/relation is an intermediate stream/relation that query operators produce. Note that these intermediate operators can be named (through views) and can therefore be specified in further queries.

A base relation is an input relation.

A derived relation is an intermediate relation that query operators produce. Note that these intermediate operators can be named (through views) and can therefore be specified in further queries.

In Oracle Stream Analytics, you do not create base relations yourself. The Oracle Stream Analytics server creates base relations for you as required.

When we say that a relation is a time-varying bag of tuples, time refers to an instant in the time domain. Input relations are presented to the system as a sequence of timestamped updates which capture how the relation changes over time. An update is either a tuple insertion or deletion. The updates are required to arrive at the system in the order of increasing timestamps.

For more information, see Time.

1.1.1.3 Relations and Oracle Stream Analytics Tuple Kind Indicator

By default, Oracle Stream Analytics includes time stamp and an Oracle Stream Analytics tuple kind indicator in the relations it generates.

Timestamp   Tuple Kind  Tuple
 1000:      +           ,abc,abc
 2000:      +           hihi,abchi,hiabc
 6000:      -           ,abc,abc
 7000:      -           hihi,abchi,hiabc
 8000:      +           hi1hi1,abchi1,hi1abc
 9000:      +           ,abc,abc
13000:      -           hi1hi1,abchi1,hi1abc
14000:      -           ,abc,abc
15000:      +           xyzxyz,abcxyz,xyzabc
20000:      -           xyzxyz,abcxyz,xyzabc

The Oracle Stream Analytics tuple kind indicators are:

1.1.2 Relation-to-Relation Operators

The relation-to-relation operators in Oracle CQL are derived from traditional relational queries expressed in SQL.

Anywhere a traditional relation is referenced in a SQL query, a relation can be referenced in Oracle CQL.

Consider the following examples for a stream CarSegStr with schema: car_id integer, speed integer, exp_way integer, lane integer, dir integer, and seg integer.

In the following example, at any time instant, the output relation of this query contains the set of vehicles having transmitted a position-speed measurement within the last 30 seconds.

<processor>
    <name>cqlProcessor</name>
    <rules>
        <view id="CurCarSeg" schema="car_id exp_way lane dir seg"><![CDATA[ 
            select distinct
                car_id, exp_way, lane, dir, seg 
            from 
                CarSegStr [range 30 seconds]
        ]]></query>
    </rules>
</processor>

The distinct operator is the relation-to-relation operator. Using distinct, Oracle Stream Analytics returns only one copy of each set of duplicate tuples selected. Duplicate tuples are those with matching values for each expression in the select list. You can use distinct in a select_clause and with aggregate functions.

For more information on distinct, see:

1.1.3 Stream-to-Relation Operators (Windows)

Oracle CQL supports stream-to-relation operations based on a sliding window. In general, S[W] is a relation. At time T the relation contains all tuples in window W applied to stream S up to T.

Queries that have the same source (stream) and window specifications are optimized by the system to share common memory space. When a new query is added with these parameters, it automatically receives the content (events) of this shared window. This optimization can cause the query to output initial events even though it might not have received newly added events.

window_type::=

Figure 1-3 window_type

window type

Oracle CQL supports the following built-in window types:

  • Range: time-based

    S[Range T], or, optionally,

    S[Range T1 Slide T2]

  • Range: time-based unbounded

    S[Range Unbounded]

  • Range: time-based now

    S[Now]

  • Range: constant value

    S[Range C on ID]

  • Tuple-based:

    S[Rows N], or, optionally,

    S[Rows N1 Slide N2]

  • Partitioned:

    S[Partition By A1 ... Ak Rows N] or, optionally,

    S[Partition By A1 ... Ak Rows N Range T], or

    S[Partition By A1 ... Ak Rows N Range T1 Slide T2]

This section describes the following stream-to-relation operator properties:

For more information, see:

1.1.3.1 Range, Rows, and Slide

The keywords Range and Rows specify how much data you want to query:

  • Range specifies as many tuples as arrive in a given time period

  • Rows specifies a number of tuples

The Slide keyword specifies how frequently you want to see output from the query, while the Range keyword specifies the time range from which to query events. Using Range and Slide together results in a set of events from which to query, with that set changing based on where the query window slides to.

So the set time is the time from which events get drawn for the query.So the time interval is the actual amount of time (as measured by event timestamps) divided by the amount of time specified for sliding. If the remainder from this is 0, then the set time is the time interval multiplied by the amount of time specified for the slide. If the remainder is greater than 0, then the set time is the time interval + 1 multiplied by the amount of time specified for the slide.

Another way to express this is the following formula: timeInterval = actualTime / slideSpecification if((actualTime % slideSpecification) == 0) // No remainder setTime = timeInterval * slideSpecification else setTime = (timeInterval + 1) * slideSpecification.

In Figure 1-4, the Range specification indicates "I want to look at 4 seconds worth of data" and the Slide specification indicates "I want a result every 4 seconds". In this case, the query returns a result at the end of each Slide specification (except for certain conditions, as Range, Rows, and Slide at Query Start-Up and for Empty Relations describes).

Figure 1-4 Range and Slide: Equal (Steady-State Condition)

Description of Figure 1-4 follows
Description of "Figure 1-4 Range and Slide: Equal (Steady-State Condition)"

In Figure 1-4, the Range specification indicates "I want to look at 8 seconds worth of data" and the Slide specification indicates "I want a result every 4 seconds". In this case, the query returns a result twice during each Range specification (except for certain conditions, as Range, Rows, and Slide at Query Start-Up and for Empty Relations describes)

Figure 1-5 Range and Slide: Different (Steady-State Condition)

Description of Figure 1-5 follows
Description of "Figure 1-5 Range and Slide: Different (Steady-State Condition)"

Table 1-1 lists the default Range, Range unit, and Slide (where applicable) for range-based and tuple-based stream-to-relation window operators:

Table 1-1 Default Range and Tuple-Based Stream-to-Relation Operators

Window Operator Default Range Default Range Unit Default Slide

Range-Based Stream-to-Relation Window Operators

Unbounded

seconds

1 nanosecond

Tuple-Based Stream-to-Relation Window Operators

N/A

N/A

1 tuple

1.1.3.1.1 Range, Rows, and Slide at Query Start-Up and for Empty Relations

Table 1-2 lists the behavior of Range, Rows, and Slide for special cases such as query start-up time and for an empty relation.

Table 1-2 Range, Rows, and Slide at Query Start-Up and Empty Relations

Operator or Function Result

COUNT(*) or COUNT(expression)

Immediately returns 0 for an empty relation (when there is no GROUP BY), before Range or Rows worth of data has accumulated and before the first Slide.

SUM(attribute) and other aggregate functions

Immediately returns null for an empty relation, before Range or Rows worth of data has accumulated and before the first Slide.

For more information and detailed examples, see:

1.1.3.2 Partition

The keyword Partition By logically separates an event stream S into different substreams based on the equality of the attributes given in the Partition By specification. For example, the S[Partition By A,C Rows 2] partition specification creates a sub-stream for every unique combination of A and C value pairs and the Rows specification is applied on these sub-streams. The Rows specification indicates "I want to look at 2 tuples worth of data".

For more information, see Range, Rows, and Slide.

1.1.3.3 Default Stream-to-Relation Operator

When you reference a stream in an Oracle CQL query where a relation is expected (most commonly in the from clause), a Range Unbounded window is applied to the stream by default. For example, the queries in the following examples are identical:

<query id="q1"><![CDATA[ 
    select * from InputChannel
]]></query>
<query id="q1"><![CDATA[ 
    IStream(select * from InputChannel[RANGE UNBOUNDED])
]]></query>

For more information, see Relation-to-Stream Operators.

1.1.4 Relation-to-Stream Operators

You can convert the result of a stream-to-relation operation back into a stream for further processing.

In the following example, the select will output a stream of tuples satisfying the filter condition (viewq3.ACCT_INTRL_ID = ValidLoopCashForeignTxn.ACCT_INTRL_ID). The now window converts the viewq3 into a relation, which is kept as a relation by the filter condition. The IStream relation-to-stream operator converts the output of the filter back into a stream.

<processor>
    <name>cqlProcessor</name>
    <rules>
        <query id="q3Txns"><![CDATA[ 
            IStream(
                select 
                    TxnId, 
                    ValidLoopCashForeignTxn.ACCT_INTRL_ID, 
                    TRXN_BASE_AM, 
                    ADDR_CNTRY_CD, 
                    TRXN_LOC_ADDR_SEQ_ID 
                from 
                    viewq3[NOW], ValidLoopCashForeignTxn 
                where 
                    viewq3.ACCT_INTRL_ID = ValidLoopCashForeignTxn.ACCT_INTRL_ID
            )
        ]]></query>
    </rules>
</processor>

Oracle CQL supports the following relation-to-stream operators:

By default, Oracle Stream Analytics includes an operation indicator in the relations it generates so you can identify insertions, deletions, and, when using UPDATE SEMANTICS, updates. For more information, see Relations and Oracle Stream Analytics Tuple Kind Indicator.

1.1.4.1 Default Relation-to-Stream Operator

Whenever an Oracle CQL query produces a relation that is monotonic, Oracle CQL adds an IStream operator by default.

A relation R is monotonic if and only if R(t1) is a subset of R(t2) whenever t1 <= t2.

Oracle CQL use a conservative static monotonicity test. For example, a base relation is monotonic if it is known to be append-only: S[Range Unbounded] is monotonic for any stream S.

If a relation is not monotonic (for example, it has a window like S[range 10 seconds]), it is impossible to determine what the query author intends (IStream, DStream, or RStream), so Oracle CQL does not add a relation-to-stream operator by default in this case.

1.1.5 Stream-to-Stream Operators

Typically, you perform stream to stream operations using the following:

However, some relation-relation operators (like filter and project) can also act as stream-stream operators. Consider the query assuming that the input S is a stream, the query will produce a stream as an output where stream element c1 is greater than 50.

<processor>
    <name>cqlProcessor</name>
    <rules>
        <query id="q0"><![CDATA[ 
            select * from S where c1 > 50
        ]]></query>
    </rules>
</processor>

This is a consequence of the application of the default stream-to-relation and relation-to-stream operators. The stream S gets a default [Range Unbounded] window added to it. Since this query then evaluates to a relation that is monotonic, an IStream gets added to it.

For more information, see:

In addition, Oracle CQL supports the following direct stream-to-stream operators:

  • MATCH_RECOGNIZE: use this clause to write various types of pattern recognition queries on the input stream. For more information, see Pattern Recognition.

  • XMLTABLE: use this clause to parse data from the xmltype stream elements using XPath expressions. For more information, see XMLTABLE Query.

1.1.6 Queries, Views, and Joins

An Oracle CQL query is an operation that you express in Oracle CQL syntax and execute on an Oracle Stream Analytics CQL processor to retrieve data from one or more streams, relations, or views. A top-level SELECT statement that you create in a <query> element is called a query. For more information, see Queries.

An Oracle CQL view represents an alternative selection on a stream or relation. In Oracle CQL, you use a view instead of a subquery. A top-level SELECT statement that you create in a <view> element is called a view. For more information, see Views.

Each query and view must have an identifier unique to the processor that contains it. The following example shows a query with an id of q0. The id value must conform with the specification given.

<processor>
    <name>cqlProcessor</name>
    <rules>
        <query id="q0"><![CDATA[ 
            select * from S where c1 > 50
        ]]></query>
    </rules>
</processor>

A join is a query that combines rows from two or more streams, views, or relations. For more information, see Joins.

For more information, see Oracle CQL Queries, Views, and Joins.

1.1.7 Pattern Recognition

The Oracle CQL MATCH_RECOGNIZE construct is the principle means of performing pattern recognition.

A sequence of consecutive events or tuples in the input stream, each satisfying certain conditions constitutes a pattern. The pattern recognition functionality in Oracle CQL allows you to define conditions on the attributes of incoming events or tuples and to identify these conditions by using String names called correlation variables. The pattern to be matched is specified as a regular expression over these correlation variables and it determines the sequence or order in which conditions should be satisfied by different incoming tuples to be recognized as a valid match.

For more information, see Pattern Recognition With MATCH_RECOGNIZE.

1.1.8 Event Sources and Event Sinks

An Oracle Stream Analytics event source identifies a producer of data that your Oracle CQL queries operate on. An Oracle CQL event sink identifies a consumer of query results.

This section explains the types of event sources and sinks you can access in your Oracle CQL queries and how you connect event sources and event sinks.

1.1.8.1 Event Sources

An Oracle Stream Analytics event source identifies a producer of data that your Oracle CQL queries operate on.

In Oracle Stream Analytics, the following elements may be event sources:

  • adapter (JMS, HTTP, and file)

  • channel

  • processor

  • cache.

Note:

In Oracle Stream Analytics, you must use a channel to connect a push event source to an Oracle CQL processor and to connect an Oracle CQL processor to an event sink. A channel is optional with other Oracle Stream Analytics component types. For more information, see Streams and Relations.

Oracle Stream Analytics event sources are typically push data sources: that is, Oracle Stream Analytics expects the event source to notify it when the event source has data ready.

Oracle Stream Analytics relational database table and cache event sources are pull data sources: that is, Oracle Stream Analytics pulls the event source on arrival of an event on the data stream.

For more information, see:

1.1.8.2 Event Sinks

An Oracle CQL event sink connected to a CQL processor is a consumer of query results.

In Oracle Stream Analytics, the following elements may be event sinks:

  • adapter (JMS, HTTP, and file)

  • channel

  • processor

  • cache

  • table.

You can associate the same query with more than one event sink and with different types of event sink.

1.1.8.3 Connecting Event Sources and Event Sinks

In Oracle Stream Analytics, you define event sources and event sinks using Oracle JDeveloper to create the Event Processing Network (EPN) as Figure 1-6 shows. In this EPN, adapter PriceAdapter is the event source for channel priceStream; channel priceStream is the event source for Oracle CQL processor filterFanoutProcessor. Similarly, Oracle CQL processor filterFanoutProcessor is the event sink for channel priceStream.

Figure 1-6 Event Sources and Event Sinks in the Event Processing Network

Description of Figure 1-6 follows
Description of "Figure 1-6 Event Sources and Event Sinks in the Event Processing Network"

For more information, see:

1.1.9 Table Event Sources

Using Oracle CQL, you can access tabular data, including:

For more information, see Event Sources and Event Sinks.

1.1.9.1 Relational Database Table Event Sources

Using an Oracle CQL processor, you can specify a relational database table as an event source. You can query this event source, join it with other event sources, and so on.

For more information, see Oracle CQL Queries and Relational Database Tables.

1.1.9.2 XML Table Event Sources

Using the Oracle CQL XMLTABLE clause, you can parse data from an xmltype stream into columns using XPath expressions and conveniently access the data by column name.

For more information, see XMLTABLE Query.

1.1.9.3 Function Table Event Sources

Use the TABLE clause to access, as a relation, the multiple rows returned by a built-in or user-defined function, as an array or Collection type, in the FROM clause of an Oracle CQL query.

For more information, see:

1.1.10 Table Event Sink

The table event sink feature supports the insert, delete or update events from the EPN upstream, and send the events to the downstream connected to the table.

Spring Assembly File

By default, the adapter sends the SampleEvent type event and all other stages receive this type event. If the event type changes to other types, it must follow the configuration given below:

Example 1-1 Event Type

<wlevs:event-type type-name="SampleEvent">
        <wlevs:properties>
                <wlevs:property name="eventId" type="int"/>
                <wlevs:property name="msg" type="char[]" length="64"/>
        </wlevs:properties>
</wlevs:event-type>

Example 1-2 Table Tag

<wlevs:table id="tableSink" event-type="SampleEvent" data-source="test-ds" key-properties="eventId" table-name="TTest">
        <wlevs:listener ref="tableRelationSinkBean"/>
</wlevs:table>

For other channels, the attributes are the same.

Example 1-3 Channel

The outputChannel is defined as below.

<wlevs:channel id="outputChannel" event-type="SampleEvent" is-relation="true" primary-key="eventId>
        <wlevs:listener ref="outputChannelRelationSinkBean"/>
        <wlevs:listener ref="tableSink"/>
        <wlevs:source ref="processor"/>
</wlevs:channel>

Example 1-4 Application Configuration File

To support table-sink, you must configure the processor.

<processor>
        <name>processor</name>
        <rules>
        <query id="getAllEventsRule">
                <![CDATA[ select * from inputChannel ]]>
        </query>
        </rules>
</processor>

During the Adapter initialization phase, it connects to the data source defined in the server configuration file and create the table with the SQL statement:

CREATE TABLE TTest (eventId INTEGER,msg VARCHAR(64))

As an adapter implements RunnableBean and RelationSource, inside the run() method, it sends insert, delete, or update events by case.

The three event sink beans implement the same Java class BatchRelationSink which receives both non-batch and batch events.

Example 1-5 Server Configuration File

The data source test-ds needs to be defined in the configuration file:

<data-source>
        <name>test-ds</name>
        <connection-pool-params>
                <initial-capacity>15</initial-capacity>
                <max-capacity>50</max-capacity>
        </connection-pool-params>
                <driver-params>
                        <url>jdbc:derby:testTableSinkDB;create=true</url>
                <driver-name>
                        org.apache.derby.jdbc.EmbeddedDriver
                </driver-name>
        </driver-params>
</data-source>

1.1.10.1 Spring Assembly File

By default, the adapter sends the SampleEvent type event and all other stages receive this type event.

If the event type changes to other types, it must follow the configuration given below:

Example 1-6 Event Type

<wlevs:event-type type-name="SampleEvent">
    <wlevs:properties>
        <wlevs:property name="eventId" type="int"/>
<wlevs:property name="msg" type="char[]" length="64"/>
    </wlevs:properties>
</wlevs:event-type>

Example 1-7 Table Tag

The outputChannel is as defined below:

<wlevs:table id="tableSink" event-type="SampleEvent" data-source="test-ds" key-properties="eventId" table-name="TTest">
<wlevs:listener ref="tableRelationSinkBean"/>
</wlevs:table>

For other channels, the attributes are the same.

Example 1-8 Channel

<wlevs:channel id="outputChannel" event-type="SampleEvent" is-relation="true" primary-key="eventId>
    <wlevs:listener ref="outputChannelRelationSinkBean"/> 
    <wlevs:listener ref="tableSink"/>
<    <wlevs:source ref="processor"/>
</wlevs:channel>

1.1.10.2 Application Configuration File

To support the table sink, you must configure the processor.

Example 1-9 Application Configuration

The configuration must be as follows:

<processor>
    <name>processor</name>
    <rules>
      <query id="getAllEventsRule">
        <![CDATA[ select * from inputChannel ]]>
      </query>
    </rules>
</processor>

During the Adapter initialization phase, it will connect to the data source defined in the server configuration file and create the table with the SQL statement:

CREATE TABLE TTest (eventId INTEGER,msg VARCHAR(64))

As an adapter implements RunnableBean and RelationSource, inside the run() method, it sends insert, delete or update events by case.

The three event sink beans implement the same Java class BatchRelationSink which receives both non-batch and batch events. The data source named test-ds is defined in the following configuration file:

Example 1-10 Server-configuration File

<data-source>
    <name>test-ds</name>
    <connection-pool-params>
      <initial-capacity>15</initial-capacity>
      <max-capacity>50</max-capacity>
    </connection-pool-params>
    <driver-params>
      <url>jdbc:derby:testTableSinkDB;create=true</url>
      <driver-name>
        org.apache.derby.jdbc.EmbeddedDriver
      </driver-name>
    </driver-params>
</data-source>

1.1.11 Cache Event Sources

Using an Oracle CQL processor, you can specify an Oracle Stream Analytics cache as an event source. You can query this event source and join it with other event sources using a now window only.

For more information, see:

1.1.12 Functions

Functions are similar to operators in that they manipulate data items and return a result. Functions differ from operators in the format of their arguments. This format enables them to operate on zero, one, two, or more arguments:

function(argument, argument, ...) 

A function without any arguments is similar to a pseudocolumn (refer to Pseudocolumns). However, a pseudocolumn typically returns a different value for each tuple in a relation, whereas a function without any arguments typically returns the same value for each tuple.

Oracle CQL provides a wide variety of built-in functions to perform operations on stream data, including:

  • single-row functions that return a single result row for every row of a queried stream or view

  • aggregate functions that return a single aggregate result based on group of tuples, rather than on a single tuple

  • single-row statistical and advanced arithmetic operations based on the Colt open source libraries for high performance scientific and technical computing.

  • aggregate statistical and advanced arithmetic operations based on the Colt open source libraries for high performance scientific and technical computing.

  • statistical and advanced arithmetic operations based on the java.lang.Math class

If Oracle CQL built-in functions do not provide the capabilities your application requires, you can easily create user-defined functions in Java by using the classes in the oracle.cep.extensibility.functions package. You can create aggregate and single-row user-defined functions. You can create overloaded functions and you can override built-in functions.

If you call an Oracle CQL function with an argument of a data type other than the data type expected by the Oracle CQL function, then Oracle Stream Analytics attempts to convert the argument to the expected data type before performing the Oracle CQL function.

Oracle CQL provides a variety of built-in single-row functions and aggregate functions based on the Colt open source libraries for high performance scientific and technical computing. The functions which are available as part of Colt library will not support Big Decimal data type and NULL input values. Also the value computation of the functions are not incremental. See the COLT website for details.

Note:

Function names are case sensitive:

  • Built-in functions: lower case.

  • User-defined functions: welvs:function element function-name attribute determines the case you use.

For more information, see:

1.1.13 Time

Timestamps are an integral part of an Oracle Stream Analytics stream. However, timestamps do not necessarily equate to clock time. For example, time may be defined in the application domain where it is represented by a sequence number. Timestamps need only guarantee that updates arrive at the system in the order of increasing timestamp values.

Note that the timestamp ordering requirement is specific to one stream or a relation. For example, tuples of different streams could be arbitrarily interleaved. The order of processing tuples with the same time-stamps is not guaranteed in the case where multiple streams are processing. In addition, there is no defined behavior for negative timestamps. For t = 0, the event will be outputted immediately, assuming total order.

Oracle Stream Analytics can observe application time or system time.

For system timestamped relations or streams, time is dependent upon the arrival of data on the relation or stream data source. Oracle Stream Analytics generates a heartbeat on a system timestamped relation or stream if there is no activity (no data arriving on the stream or relation's source) for more than a specified time: for example, 1 minute. Either the relation or stream is populated by its specified source or Oracle Stream Analytics generates a heartbeat every minute. This way, the relation or stream can never be more than 1 minute behind.

For system timestamped streams and relations, the system assigns time in such a way that no two events have the same value of time. However, for application timestamped streams and relations, events could have same value of time.

If you know that the application timestamp will be strictly increasing (as opposed to non-decreasing) you may set wlevs:channel attribute is-total-order to true. This enables the Oracle Stream Analytics engine to do certain optimizations and typically leads to reduction in processing latency.

The Oracle Stream Analytics scheduler is responsible for continuously executing each Oracle CQL query according to its scheduling algorithm and frequency.

1.2 Oracle CQL Statements

Oracle CQL provides statements for creating queries and views.

This section describes:

For more information, see:

1.2.1 Lexical Conventions

Using Oracle JDeveloper or Oracle Stream Analytics Visualizer, you write Oracle CQL statements in the XML configuration file associated with an Oracle Stream Analytics CQL processor. This XML file is called the configuration source.

The configuration source must conform with the wlevs_application_config.xsd schema and may contain only rule, view, or query elements.

<?xml version="1.0" encoding="UTF-8"?>
<n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" 
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
    <name>cqlProcessor</name>
    <rules>
        <view id="lastEvents" schema="cusip bid srcId bidQty ask askQty seq"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from inputChannel[partition by srcId, cusip rows 1]
        ]]></view>
        <view id="bidask" schema="cusip bid ask"><![CDATA[ 
            select cusip, max(bid), min(ask) 
            from lastEvents
            group by cusip
        ]]></view>
            <view ...><![CDATA[
                ...
        ]]></view>
        ...
        <view id="MAXBIDMINASK" schema="cusip bidseq bidSrcId bid askseq askSrcId ask bidQty askQty"><![CDATA[ 
            select bid.cusip, bid.seq, bid.srcId as bidSrcId, bid.bid, ask.seq, ask.srcId as askSrcId, ask.ask, bid.bidQty, ask.askQty 
            from BIDMAX as bid, ASKMIN as ask 
            where bid.cusip = ask.cusip
        ]]></view>
        <query id="BBAQuery"><![CDATA[ 
            ISTREAM(select bba.cusip, bba.bidseq, bba.bidSrcId, bba.bid, bba.askseq, 
                bba.askSrcId, bba.ask, bba.bidQty, bba.askQty, "BBAStrategy" as intermediateStrategy, 
                p.seq as correlationId, 1 as priority 
            from MAXBIDMINASK as bba, inputChannel[rows 1] as p where bba.cusip = p.cusip)
        ]]></query>
    </rules>
</processor>

When writing Oracle CQL queries in an Oracle CQL processor component configuration file, observe the following rules:

  • You may specify one Oracle CQL statement per view or query element.

  • You must not terminate Oracle CQL statements with a semicolon (;).

  • You must enclose each Oracle CQL statement in <![CDATA[ and ]]>.

  • When you issue an Oracle CQL statement, you can include one or more tabs, carriage returns, or spaces anywhere a space occurs within the definition of the statement. pcbpel/cep/test/sql/tklinroadbm3hrs_5000000.cqlx.new

    <processor>
        <name>cqlProcessor</name>
        <rules>
            <query id="QTollStr"><![CDATA[ 
                RSTREAM(select cars.car_id, SegToll.toll from CarSegEntryStr[now] as cars, SegToll 
                    where (cars.exp_way = SegToll.exp_way and cars.lane = SegToll.lane 
                        and cars.dir = SegToll.dir and cars.seg = SegToll.seg))
            ]]></query>
        </rules>
    </processor>
    
    <processor>
        <name>cqlProcessor</name>
        <rules>
            <query id="QTollStr"><![CDATA[ 
                RSTREAM(
                    select
                        cars.car_id, 
                        SegToll.toll 
                    from 
                        CarSegEntryStr[now]
                    as
                        cars, SegToll 
                    where (
                        cars.exp_way = SegToll.exp_way and 
                        cars.lane = SegToll.lane and 
                        cars.dir = SegToll.dir and 
                        cars.seg = SegToll.seg
                    )
                )
            ]]></query>
        </rules>
    </processor>
    
  • Case is insignificant in reserved words, keywords, identifiers and parameters. However, case is significant in function names, text literals, and quoted names.

    For more information, see:

  • Comments are not permitted in Oracle CQL statements. For more information, see Comments.

1.2.2 Syntactic Shortcuts and Defaults

When writing Oracle CQL queries, views, and joins, consider the syntactic shortcuts and defaults that Oracle CQL provides to simplify your queries.

For more information, see:

1.2.3 Documentation Conventions

All Oracle CQL statements in this reference (see Oracle CQL Statements) are organized into the following sections:

Syntax

The syntax diagrams show the keywords and parameters that make up the statement.

Caution:

Not all keywords and parameters are valid in all circumstances. Be sure to refer to the "Semantics" section of each statement and clause to learn about any restrictions on the syntax.

Purpose

The "Purpose" section describes the basic uses of the statement.

Prerequisites

The "Prerequisites" section lists privileges you must have and steps that you must take before using the statement.

Semantics

The "Semantics" section describes the purpose of the keywords, parameter, and clauses that make up the syntax, and restrictions and other usage notes that may apply to them. (The conventions for keywords and parameters used in this chapter are explained in the Preface of this reference.)

Examples

The "Examples" section shows how to use the various clauses and parameters of the statement.

1.3 Oracle CQL and SQL Standards

Oracle CQL is a new technology but it is based on a subset of SQL99.

Oracle strives to comply with industry-accepted standards and participates actively in SQL standards committees. Oracle is actively pursuing Oracle CQL standardization.

1.4 Oracle Stream Analytics Server

Oracle Stream Analytics server provides the light-weight Spring container for Oracle Stream Analytics applications and manages server and application lifecycle and a wide variety of essential services such as security, Jetty, JMX, JDBC, HTTP publish-subscribe, and logging and debugging.