5.6 Writing CQL Queries

You can add a Custom CQL stage for pipelines, to write a CQL query to perform any Select operation on the data of the previous stage. Currently, creating certain business rules are not supported by the existing pipeline stages or patterns stages. You can use the CQL query as part of the Custom CQL stage, for such requirements.

To learn more about the CQL, see CQL Reference Guide.

5.6.1 Sample Queries

This section lists sample queries, apart from the standard Select and Where clause queries.

The sample queries use the following sample data sets. Copy the data sets, and save them as csv files.

Sample: Data_A_Followed_B

Trans_id order_id order_status order_revenue
1 1 BOOKED 2345.98
2 2 BOOKED 4345.98
3 3 BOOKED 3468.87
4 1 PAID 2345.98
5 2 PAID 4345.98
6 1 SHIPPED 4345
7 3 PAID 3468.87
8 3 SHIPPED 3468.87
9 4 BOOKED 3456
10 5 BOOKED 6546
11 6 BOOKED 76547
12 2 SHIPPED 4345.98

Sample: Change_Detector

Msg_ID Stock_ID Stock_Price
1 1 87
2 1 87
3 1 87
4 1 87
5 2 41
6 3 65
7 3 65
8 3 65
9 3 65
10 3 65
11 2 41
12 2 41
13 2 41
14 2 41
15 2 41
16 2 41
17 1 91
18 1 91
19 1 91
20 1 105
21 1 105
22 1 105
23 1 105
24 2 112
25 2 112
26 2 112
27 2 112
28 3 176
29 3 176
30 3 176

5.6.1.1 A Followed By B

Note:

In the sample query below, update q1 to the correct name of the previous stage:

Replace FROM q1 to FROM <previous-stage-name>.

Use the sample data file: Data_A_Followed_B.

SELECT

order_id AS order_id,
abInterval AS abInterval,
Trans_id AS Trans_id,
aState_Trans_id AS aState_Trans_id,
order_status AS order_status,
aState_order_status AS aState_order_status,
order_revenue AS order_revenue,
aState_order_revenue AS aState_order_revenue
FROM q1

MATCH_RECOGNIZE (
PARTITION BY
order_id
MEASURES

B.Trans_id AS Trans_id,
B.order_id AS order_id,
B.order_status AS order_status,
B.order_revenue AS order_revenue,
A.Trans_id AS aState_Trans_id,
A.order_status AS aState_order_status,
A.order_revenue AS aState_order_revenue,
(to_timestamp(B.ELEMENT_TIME) - to_timestamp(A.ELEMENT_TIME)) AS abInterval
PATTERN( A C*? B )
WITHIN 1 minutes
DEFINE
A as A.order_status like ".*BOOKED.*" ,
B as B.order_status like ".*SHIPPED.*"

) as M

Output

{"order_id":1,"abInterval":"+000000000 00:00:05.000000000","Trans_id":6,"aState_Trans_id":1,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":4345.0,"aState_order_revenue":2345.98}
{"order_id":3,"abInterval":"+000000000 00:00:05.000000000","Trans_id":8,"aState_Trans_id":3,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":3468.87,"aState_order_revenue":3468.87}
{"order_id":2,"abInterval":"+000000000 00:00:10.000000000","Trans_id":12,"aState_Trans_id":2,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":4345.98,"aState_order_revenue":4345.98}

5.6.1.2 A Not Followed by B

Note:

In the sample query below, update q1 to the correct name of the previous stage:

Replace FROM q1 to FROM <previous-stage-name>.

Use the sample data: Data_A_Followed_B.

SELECT

Trans_id AS Trans_id,
order_id AS order_id,
order_status AS order_status,
order_revenue AS order_revenue
FROM q1

MATCH_RECOGNIZE (
PARTITION BY
order_id
MEASURES

A.Trans_id AS Trans_id,
A.order_id AS order_id,
A.order_status AS order_status,
A.order_revenue AS order_revenue
INCLUDE TIMER EVENTS
PATTERN( A B* )
DURATION 1 minutes
DEFINE
A as A.order_status like ".*BOOKED.*" ,
B as NOT (B.order_status like ".*SHIPPED.*" )

) as M

Output

{"Trans_id":9,"order_id":4,"order_status":"BOOKED","order_revenue":3456.0}
{"Trans_id":10,"order_id":5,"order_status":"BOOKED","order_revenue":6546.0}
{"Trans_id":11,"order_id":6,"order_status":"BOOKED","order_revenue":76547.0}

5.6.1.3 Detect Duplicates

Note:

In the sample query below, update q1 to the correct name of the previous stage:

Replace FROM q1 to FROM <previous-stage-name>.

Use the sample data file: Data_A_Followed_B.

RSTREAM( SELECT

count(*) AS Number_of_Duplicates,
eventSource.order_id AS order_id,
current(eventSource.Trans_id) AS Trans_id,
current(eventSource.order_status) AS order_status,
current(eventSource.order_revenue) AS order_revenue
FROM q1 [now] as eventSource,
q1 [range 1 minutes] as dup

WHERE
eventSource.order_id = dup.order_id
GROUP BY
eventSource.order_id HAVING count(*) > 1)

Output

{"Number_of_Duplicates":2,"order_id":1,"Trans_id":4,"order_status":"PAID","order_revenue":2345.98}
{"Number_of_Duplicates":2,"order_id":2,"Trans_id":5,"order_status":"PAID","order_revenue":4345.98}
{"Number_of_Duplicates":3,"order_id":1,"Trans_id":6,"order_status":"SHIPPED","order_revenue":4345.0}
{"Number_of_Duplicates":2,"order_id":3,"Trans_id":7,"order_status":"PAID","order_revenue":3468.87}
{"Number_of_Duplicates":3,"order_id":3,"Trans_id":8,"order_status":"SHIPPED","order_revenue":3468.87}
{"Number_of_Duplicates":3,"order_id":2,"Trans_id":12,"order_status":"SHIPPED","order_revenue":4345.98}

5.6.1.4 Change Event

Note:

In the sample query below, update q1 to the correct name of the previous stage:

Replace FROM q1 to FROM <previous-stage-name>.

Use the sample data file: change_detector.

SELECT

Stock_ID AS Stock_ID,
Stock_Price AS Stock_Price,
orig_Stock_Price AS orig_Stock_Price,
Msg_ID AS Msg_ID,
orig_Msg_ID AS orig_Msg_ID
FROM q1

MATCH_RECOGNIZE (
PARTITION BY
Stock_ID
MEASURES

Z.Stock_ID AS Stock_ID,
last(X.Stock_Price) AS Stock_Price,
Z.Stock_Price AS orig_Stock_Price,
last(X.Msg_ID) AS Msg_ID,
Z.Msg_ID AS orig_Msg_ID
PATTERN( Z X+ )
WITHIN 1 minutes
DEFINE X as X.Stock_Price != Z.Stock_Price
) as M

Output

{"Stock_ID":1,"Stock_Price":105,"orig_Stock_Price":87,"Msg_ID":23,"orig_Msg_ID":4}
{"Stock_ID":2,"Stock_Price":112,"orig_Stock_Price":41,"Msg_ID":27,"orig_Msg_ID":16}
{"Stock_ID":3,"Stock_Price":176,"orig_Stock_Price":65,"Msg_ID":30,"orig_Msg_ID":10}

5.6.1.5 Eliminate Duplicates

Note:

In the sample query below, update q1 to the correct name of the previous stage:

Replace FROM q1 to FROM <previous-stage-name>.

Use the sample data file: change_detector.

ISTREAM( SELECT

Msg_ID AS Msg_ID,
Stock_ID AS Stock_ID,
Stock_Price AS Stock_Price
FROM q1 [range 1 minutes]
) DIFFERENCE USING (Stock_Price)

Output

{"Msg_ID":1,"Stock_ID":1,"Stock_Price":87}
{"Msg_ID":5,"Stock_ID":2,"Stock_Price":41}
{"Msg_ID":6,"Stock_ID":3,"Stock_Price":65}
{"Msg_ID":17,"Stock_ID":1,"Stock_Price":91}
{"Msg_ID":20,"Stock_ID":1,"Stock_Price":105}
{"Msg_ID":24,"Stock_ID":2,"Stock_Price":112}
{"Msg_ID":28,"Stock_ID":3,"Stock_Price":176}