|             | 
 
This section contains information on the following subjects:
The Complex Event Processor module can be broken down into the following functional components: event representation, processing model, programmatic interface, and language specification.
Events are represented as POJOs following the JavaBeans conventions. Event properties are exposed through getter methods on the POJO. When possible, the results from EPL statement execution are also returned as POJOs. However, there are times when un-typed events are returned such as when event streams are joined. In this case, an instance of the Map collection interface is returned.
The EPL processing model is continuous: results are output as soon as incoming events are received that meet the constraints of the statement. Two types of events are generated during output: insert events for new events entering the output window and remove events for old events exiting the output window. Listeners may be attached and notified when either or both type of events occur.
Incoming events may be processed through either sliding or batched windows. Sliding windows process events by gradually moving the window over the data in single increments, while batched windows process events by moving the window over data in discrete chunks. The window size may be defined by the maximum number of events contained or by the maximum amount of time to keep an event.
The EPL programmatic interfaces allow statements to be individually compiled or loaded in bulk through a URL. Statements may be iterated over, retrieved, started and stopped. Listeners may be attached to statements and notified when either insert and/or remove events occur.
 
The Event Processor Language is a SQL-like language with SELECT, FROM, WHERE, GROUP BY, HAVING and ORDER BY clauses. Streams replace tables as the source of data with events replacing rows as the basic unit of data.  Since events are composed of data, the SQL concepts of correlation through joins, filtering through sub-queries, and aggregation through grouping may be effectively leveraged.  The INSERT INTO clause is recast as a means of forwarding events to other streams for further downstream processing.  External data accessible through JDBC may be queried and joined with the stream data.  Additional clauses such as the RETAIN, MATCHING, and OUTPUT clauses are also available to provide the missing SQL language constructs specific to event processing.  
 
The RETAIN clause constraints the amount of data over which the query is run, essentially defining a virtual window over the stream data. Unlike relational database systems in which tables bound the extents of the data, event processing systems must provide an alternative, more dynamic means of limiting the queried data.
 
The MATCHING clause detects sequences of events matching a specific pattern.  Temporal and logical operators such as AND, OR, and FOLLOWED BY enable both occurrence of and absence of events to be detected through arbitrarily complex expressions.
 
The OUTPUT clause throttles results of statement execution to prevent overloading downstream processors.  Either all or a subset of the first or last resulting events may be passed on in either time or row-based batches.
A series of use cases is presented in the last section to illustrate the language features under realistic scenarios
 
An event is an immutable record of a past occurrence of an action or state change. Event properties capture the state information for an event object. An event is represented by either a POJO or a com.bea.wlevs.cep.event.MapEventObject that extends the java.util.Map interface. 
Plain old Java object (POJO) events are object instances that expose event properties through JavaBeans-style getter methods. Events classes or interfaces do not have to be fully compliant to the JavaBeans specification; however for the EPL engine to obtain event properties, the required JavaBeans getter methods must be present.
EPL supports JavaBeans-style event classes that extend a super class or implement one or more interfaces. Also, EPL statements can refer to Java interface classes and abstract classes.
Classes that represent events should be made immutable. As events are recordings of a state change or action that occurred in the past, the relevant event properties should not be changeable. However this is not a hard requirement and the EPL engine accepts events that are mutable as well.
 
Events can also be represented by objects that implement the com.bea.wlevs.ede.api.MapEventObject  interface that extends the java.util.Map interface. Event properties of Map events are the values of each entry accessible through the get method exposed by the java.util.Map interface. 
 
Entries in the Map represent event properties. Keys must be of type java.util.String for the engine to be able to look up event property names specified by EPL statements. Values can be of any type. POJOs may also appear as values in a Map. 
EPL expressions can include simple as well as indexed, mapped and nested event properties. The table below outlines the different types of properties and their syntax in an event expression. This syntax allows statements to query deep JavaBeans objects graphs, XML structures and Map events.
 
Assume there is an EmployeeEvent event class as shown below. The mapped and indexed properties in this example return Java objects but could also return Java language primitive types (such as int or String). The Address object and Employee objects can themselves have properties that are nested within them, such as a street-Name in the Address object or a name of the employee in the Employee object. 
  public class EmployeeEvent {
      public String getFirstName();
      public Address getAddress(String type);
      public Employee getSubordinate(int index);
      public Employee[] getAllSubordinates();
  } 
Simple event properties require a getter-method that returns the property value. In the preceding example, the getFirstName getter method returns the firstName event property of type String. 
Indexed event properties require either one of the following getter-methods:
 
In an EPL statement, indexed properties are accessed via the property[index] syntax. 
 
Mapped event properties require a getter-method that takes a String type key value and returns a property value, such as the getAddress method. In an EPL or event pattern statement, mapped properties are accessed via the property ('key') syntax.
 
Nested event properties require a getter-method that returns the nesting object. The getAddress and getSubordinate methods are mapped and indexed properties that return a nesting object. In an EPL statement, nested properties are accessed via the property.nestedProperty syntax. 
All EPL statements allow the use of indexed, mapped and nested properties (or a combination of these) at any place where one or more event property names are expected. The example below shows different combinations of indexed, mapped and nested properties.
  address('home').streetName
  subordinate[0].name='anotherName'
  allSubordinates[1].name
  subordinate[0].address('home').streetNameSimilarly, the syntax can be used in EPL statements in all places where an event property name is expected, such as in select lists, where clauses or join criteria.
  SELECT firstName, address('work'), subordinate[0].name, subordinate[1].name
  FROM EmployeeEvent RETAIN ALL
  WHERE address('work').streetName = 'Park Ave'Event sinks provide a means of receiving programmatic notifications when events occur that meet the criteria specified in an EPL statement. Sinks may be notified when either:
ISTREAM events.RSTREAM events.Detailed examples illustrating when each of these notifications occur are provided in Processing Model.
 
To receive ISTREAM events, use the com.bea.wlevs.ede.api.EventSink  interface.  Your implementation must provide a single onEvent method that the engine invokes when results become available.  With this interface, only the new events are sent to the listener. 
public interface EventSink extends EventListener {
   void onEvent(List<Object> newEvents) 
   throws RejectEventException;
} 
The engine provides statement results to event sinks as a list of POJO or MapEventObject instances. For wildcard selects, the result will match the original event object type that was sent into the engine. For joins and select clauses with expressions, the resulting object will implement the com.bea.wlevs.ede.api.MapEventObject  interface
The EPL processing model is continuous: Listeners to statements receive updated data as soon as the engine processes events for that statement, according to the statement's choice of event streams, retain clause restrictions, filters and output rates.
In this section we look at the output of a very simple EPL statement. The statement selects an event stream without using a data window and without applying any filtering, as follows:
SELECT * FROM Withdrawal RETAIN ALL
 
This statement selects all Withdrawal events. Every time the engine processes an event of type Withdrawal or any sub-type of Withdrawal, it invokes all update listeners, handing the new event to each of the statement's listeners. 
 
The term insert stream denotes the new events arriving, and entering a data window or aggregation. The insert stream in this example is the stream of arriving Withdrawal events, and is posted to update listeners as new events. 
 
The diagram below shows a series of Withdrawal events 1 to 6 arriving over time. For this diagram as well as the others in this section, the number in parenthesis is the value of the amount property in the Withdrawal event.

 
The example statement above results in only new events and no old events posted by the engine to the statement's listeners because no RETAIN clause is specified.
There are two types of sliding windows: row-based and time-based. Each of these is discussed in the following sections.
 
A row-based sliding window instructs the engine to only keep the last N events for a stream. The next statement applies a length window onto the Withdrawal event stream. The statement serves to illustrate the concept of data window and events entering and leaving a data window: 
SELECT * FROM Withdrawal RETAIN 5 EVENTS
 
The size of this statement's window is five events. The engine enters all arriving Withdrawal events into the window. When the window is full, the oldest Withdrawal event is pushed out the window. The engine indicates to update listeners all events entering the window as new events, and all events leaving the window as old events. 
 
While the term insert stream denotes new events arriving, the term remove stream denotes events leaving a data window, or changing aggregation values. In this example, the remove stream is the stream of Withdrawal events that leave the length window, and such events are posted to update listeners as old events. 
The next diagram illustrates how the length window contents change as events arrive and shows the events posted to an update listener.

As before, all arriving events are posted as new events to update listeners. In addition, when event W1 leaves the length window on arrival of event W6, it is posted as an old event to update listeners.
Similar to a length window, a time window also keeps the most recent events up to a given time period. A time window of 5 seconds, for example, keeps the last 5 seconds of events. As seconds pass, the time window actively pushes the oldest events out of the window resulting in one or more old events posted to update listeners.
 
EPL supports optional ISTREAM and RSTREAM keywords on SELECT clauses and on INSERT INTO clauses. These instruct the engine to only forward events that enter or leave data windows, or select only current or prior aggregation values, i.e. the insert stream or the remove stream.
A time-based sliding window is a moving window extending to the specified time interval into the past based on the system time. Time-based sliding windows enable us to limit the number of events considered by a query, as do row-based sliding windows.
The next diagram serves to illustrate the functioning of a time window. For the diagram, we assume a query that simply selects the event itself and does not group or filter events.
SELECT * FROM Withdrawal RETAIN 4 SECONDS
 
The diagram starts at a given time t and displays the contents of the time window at t+4 and t+5 seconds and so on.

The activity as illustrated by the diagram:
t  +  4  seconds an event W1 arrives and enters the time window. The engine reports the new event to update listeners.t  +  5  seconds an event W2 arrives and enters the time window. The engine reports the new event to update listeners.t +  6.5  seconds an event W3 arrives and enters the time window. The engine reports the new event to update listeners.t  +  8  seconds event W1 leaves the time window. The engine reports the event as an old event to update listeners.As a practical example, consider the need to determine all accounts where the average withdrawal amount per account for the last 4 seconds of withdrawals is greater then 1000. The statement to solve this problem is shown below.
SELECT account, AVG(amount)
FROM Withdrawal RETAIN 4 SECONDS
GROUP BY account
HAVING amount > 1000
Both row-based and time-based windows may be batched. The next sections explain each of these concepts in turn.
The time-based batch window buffers events and releases them every specified time interval in one update. Time-based batch windows control the evaluation of events, as does the length batch window.
The next diagram serves to illustrate the functioning of a time batch view. For the diagram, we assume a simple query as below:
SELECT * FROM Withdrawal RETAIN BATCH OF 4 SECONDS
 
The diagram starts at a given time t and displays the contents of the time window at t + 4 and t + 5 seconds and so on.

The activity as illustrated by the diagram:
t  +  1  seconds an event W1 arrives and enters the batch. No call to inform update listeners occurs.t  +  3  seconds an event W2 arrives and enters the batch. No call to inform update listeners occurs.t  +  4  seconds the engine processes the batched events and a starts a new batch. The engine reports events W1 and W2 to update listeners.t  +  6.5  seconds an event W3 arrives and enters the batch. No call to inform update listeners occurs. t + 8 seconds the engine processes the batched events and a starts a new batch. The engine reports the event W3 as new data to update listeners. The engine reports the events W1 and W2 as old data (prior batch) to update listeners.A row-based window may be batched as well. For example, the following query would wait to receive five events prior to doing any processing:
SELECT * FROM Withdrawal RETAIN BATCH OF 5 EVENTS
Once five events were received, the query would run and again wait for a new set of five events prior to processing.
Filters to event streams appear in a subquery expression and allow filtering events out of a given stream before events enter a data window. This filtering occurs prior to the WHERE clause executing. When possible, filtering should be done in a subquery as opposed to the WHERE clause, since this will improve performance by reducing the amount of data seen by the rest of the EPL statement.
The statement below shows a subquery that selects Withdrawal events with an amount value of 200 or more.
SELECT * FROM (SELECT * FROM Withdrawal WHERE amount >= 200) RETAIN 5 EVENTS
With the subquery, any Withdrawal events that have an amount of less then 200 do not enter the window of the outer query and are therefore not passed to update listeners.

 
The WHERE clause and HAVING clause in statements eliminate potential result rows at a later stage in processing, after events have been processed into a statement's data window or other views. 
 
The next statement applies a WHERE clause to Withdrawal events instead of a subquery.
SELECT * FROM Withdrawal RETAIN 5 EVENTS WHERE amount >= 200
 
The WHERE clause applies to both new events and old events. As the diagram below shows, arriving events enter the window regardless of the value of the "amount" property.  However, only events that pass the WHERE clause are handed to update listeners. Also, as events leave the data window, only those events that pass the conditions in the WHERE clause are posted to update listeners as old events.

 
The WHERE clause can contain complex conditions while event stream filters are more restrictive in the type of filters that can be specified. The next statement's WHERE clause applies the ceil function of the java.lang.Math Java library class in the where clause. The INSERT INTO clause makes the results of the first statement available to the second statement: 
INSERT INTO BigWithdrawal
SELECT * FROM Withdrawal RETAIN ALL WHERE Math.ceil(amount) >= 200
SELECT * FROM BigWithdrawal RETAIN ALL
 
Statements that aggregate events via aggregations functions also post remove stream events as aggregated values change. Consider the following statement that alerts when two Withdrawal events have been received: 
SELECT COUNT(*) AS mycount
FROM Withdrawal RETAIN ALL
HAVING COUNT(*) = 2
 
When the engine encounters the second withdrawal event, the engine posts a new event to update listeners. The value of the mycount property on that new event is 2. Additionally, when the engine encounters the third Withdrawal event, it posts an old event to update listeners containing the prior value of the count. The value of the mycount property on that old event is also 2. 
 
The ISTREAM or RSTREAM keyword can be used to eliminate either new events or old events posted to update listeners. The next statement uses the ISTREAM keyword causing the engine to call the update listener only once when the second Withdrawal event is received: 
SELECT ISTREAM COUNT(*) AS mycount
FROM Withdrawal RETAIN ALL
HAVING COUNT(*) = 2
The use cases below illustrate through examples usage of various language features.
For the throughput statistics and to detect rapid fall-off we calculate a ticks per second rate for each market data feed.
We can use an EPL statement that batches together 1 second of events from the market data event stream source. We specify the feed and a count of events per feed as output values. To make this data available for further processing, we insert output events into the TicksPerSecond event stream:
INSERT INTO TicksPerSecond
SELECT feed, COUNT(*) AS cnt
FROM MarketDataEvent
RETAIN BATCH OF 1 SECOND
GROUP BY feed
 
For computing the highest priced stocks, we define a sliding window that retains 100 events for each unique stock symbol where the block size of the trade is greater than 10.  For example, if there are 5,000 stock symbols, then 5,000 x 100 or 5,000,000 events would be kept.  Only MarketTrade events with a block size of greater than 10 will enter the window and only the 100 highest priced events will be retained.
The results will be grouped by stock symbol and ordered alphabetically with stock symbols having an average price of less than 100 being filtered from the output.
SELECT symbol, AVG(price)
FROM (SELECT * FROM MarketTrade WHERE blockSize > 10)
RETAIN 100 EVENTS WITH LARGEST price PARTITION BY symbol
GROUP BY symbol
HAVING AVG(price) >= 100
ORDER BY symbol
We detect the route a car is taking based on the car location event data that contains information about the location and direction of a car on a highway. We first segment the data by carId to isolate information about a particular car and subsequently segment by expressway, direction and segment to plot its direction. We are then able to calculate the speed of the car based on this information.
 
The first PARTITION BY carId groups car location events by car while the following PARTITION BY expressway PARTITION BY direction further segment the data by more detailed location and direction property values. The number of events retained, 4 in this query, applies to the maximum number kept for the last PARTITION BY clause.   Thus at most 4 events will be kept for each distinct segment property value.
SELECT carId, expressway, direction,
SUM(segment)/(MAX(timestamp)-MIN(timestamp)) AS speed
FROM CarLocationEvent
RETAIN 4 events
PARTITION BY carId PARTITION BY expressway PARTITION BY direction
We define a rapid fall-off by alerting when the number of ticks per second for any second falls below 75% of the average number of ticks per second over the last 10 seconds.
 
We can compute the average number of ticks per second over the last 10 seconds simply by using the TicksPerSecond events computed by the prior statement and averaging the last 10 seconds. Next, we compare the current rate with the moving average and filter out any rates that fall below 75% of the average:  
SELECT feed, AVG(cnt) AS avgCnt, cnt AS feedCnt
FROM TicksPerSecond
RETAIN 10 seconds
GROUP BY feed
HAVING cnt < AVG(cnt) * 0.75
 
A customer may be in the middle of a check-in when the terminal detects a hardware problem or when the network goes down. In that situation we want to alert a team member to help the customer. When the terminal detects a problem, it issues an OutOfOrder event. A pattern can find situations where the terminal indicates out-of-order and the customer is in the middle of the check-in process:  
SELECT ci.term
MATCHING ci:=Checkin FOLLOWED BY
( OutOfOrder (term.id=ci.term.id) AND NOT
(Cancelled (term.id=ci.term.id) OR
Completed (term.id=ci.term.id) ) WITHIN 3 MINUTES )
Each self-service terminal can publish any of the four events below.
 
All events provide information about the terminal that published the event, and a timestamp. The terminal information is held in a property named term and provides a terminal id. Because all events carry similar information, we model each event as a subtype to a base class TerminalEvent, which will provide the terminal information that all events share. This enables us to treat all terminal events polymorphically, which simplifies our queries by allowing us to treat derived event types just like their parent event types.
 
Because Status events arrive in regular intervals of 60 seconds, we can make use of temporal pattern matching using the MATCHING clause to find events that did not arrive in time. We can use the WITHIN  operator to keep a 65 second window to account for a possible delay in transmission or processing and the NOT operator to detect the absence of a Status event with a term.id equal to T1: 
SELECT 'terminal 1 is offline'
MATCHING NOT Status(term.id = 'T1') WITHIN 65 SECONDS
OUTPUT FIRST EVERY 5 MINUTES
 
By presenting statistical information about terminal activity to our staff in real-time we enable them to monitor the system and spot problems. The next example query simply gives us a count per event type every 1 minute. We could further use this data, available through the CountPerType event stream, to join and compare against a recorded usage pattern, or to just summarize activity in real-time. 
INSERT INTO CountPerType
SELECT type, COUNT(*) AS countPerType
FROM TerminalEvent
RETAIN 10 MINUTES
GROUP BY type
OUTPUT ALL EVERY 1 MINUTE
In this example an array of RFID readers sense RFID tags as pallets are coming within the range of one of the readers. A reader generates XML documents with observation information such as reader sensor ID, observation time and tags observed. A statement computes the total number of tags per reader sensor ID within the last 60 seconds.
SELECT ID AS sensorId, SUM(countTags) AS numTagsPerSensor
FROM AutoIdRFIDExample
RETAIN 60 SECONDS
WHERE Observation[0].Command = 'READ_PALLET_TAGS_ONLY'
GROUP BY ID
 
In this example we compose an EPL statement to detect combined events in which each component of the transaction is present. We restrict the event matching to the events that arrived within the last 30 minutes. This statement uses the INSERT INTO syntax to generate a CombinedEvent event stream.
INSERT INTO CombinedEvent(transactionId, customerId, supplierId,
latencyAC, latencyBC, latencyAB)
SELECT C.transactionId, customerId, supplierId,
C.timestamp - A.timestamp,
C.timestamp - B.timestamp,
B.timestamp - A.timestamp
FROM TxnEventA A, TxnEventB B, TxnEventC C
RETAIN 30 MINUTES
WHERE A.transactionId = B.transactionId AND
B.transactionId = C.transactionId
To derive the minimum, maximum and average total latency from the events (difference in time between A and C) over the past 30 minutes we can use the EPL below. In addition, in order to monitor the event server, a dashboard UI will subscribe to a subset of the events to measure system performance such as server and end-to-end latency. It is not feasible to expect a UI to monitor every event flowing through the system, so there must be a way of rate limiting the output to a subset of the events that can be handled by the monitoring application. Only the single last event or all events can be output.
SELECT MIN(latencyAC) as minLatencyAC,
MAX(latencyAC) as maxLatencyAC,
AVG(latencyAC) as avgLatencyAC
FROM CombinedEvent
RETAIN 30 MINUTES
GROUP BY customerId
OUTPUT LAST 50 EVERY 1 SECOND
 
An OUTER JOIN allows us to detect a transaction that did not make it through all three events. When TxnEventA or TxnEventB events leave their respective time windows consisting of the last 30 minutes of events, EPL filters out rows in which no EventC row was found.
SELECT *
FROM TxnEventA A
FULL OUTER JOIN TxnEventC C ON A.transactionId = C.transactionId
FULL OUTER JOIN TxnEventB B ON B.transactionId = C.transactionId
RETAIN 30 MINUTES
WHERE C.transactionId is null
|       |