5.6 Writing CQL Queries
You can add a Custom CQL stage for pipelines, to write a CQL query to perform any Select operation on the data of the previous stage. Currently, creating certain business rules are not supported by the existing pipeline stages or patterns stages. You can use the CQL query as part of the Custom CQL stage, for such requirements.
To learn more about the CQL, see CQL Reference Guide.
5.6.1 Sample Queries
This section lists sample queries, apart from the standard Select and Where clause queries.
The sample queries use the following sample data sets. Copy the data sets, and save them as csv files.
Sample: Data_A_Followed_B
Trans_id | order_id | order_status | order_revenue |
1 | 1 | BOOKED | 2345.98 |
2 | 2 | BOOKED | 4345.98 |
3 | 3 | BOOKED | 3468.87 |
4 | 1 | PAID | 2345.98 |
5 | 2 | PAID | 4345.98 |
6 | 1 | SHIPPED | 4345 |
7 | 3 | PAID | 3468.87 |
8 | 3 | SHIPPED | 3468.87 |
9 | 4 | BOOKED | 3456 |
10 | 5 | BOOKED | 6546 |
11 | 6 | BOOKED | 76547 |
12 | 2 | SHIPPED | 4345.98 |
Sample: Change_Detector
Msg_ID | Stock_ID | Stock_Price |
1 | 1 | 87 |
2 | 1 | 87 |
3 | 1 | 87 |
4 | 1 | 87 |
5 | 2 | 41 |
6 | 3 | 65 |
7 | 3 | 65 |
8 | 3 | 65 |
9 | 3 | 65 |
10 | 3 | 65 |
11 | 2 | 41 |
12 | 2 | 41 |
13 | 2 | 41 |
14 | 2 | 41 |
15 | 2 | 41 |
16 | 2 | 41 |
17 | 1 | 91 |
18 | 1 | 91 |
19 | 1 | 91 |
20 | 1 | 105 |
21 | 1 | 105 |
22 | 1 | 105 |
23 | 1 | 105 |
24 | 2 | 112 |
25 | 2 | 112 |
26 | 2 | 112 |
27 | 2 | 112 |
28 | 3 | 176 |
29 | 3 | 176 |
30 | 3 | 176 |
5.6.1.1 A Followed By B
Note:
In the sample query below, updateq1
to the correct name of the previous stage:
Replace FROM q1
to FROM <previous-stage-name>
.
Use the sample data file: Data_A_Followed_B.
SELECT
order_id AS order_id,
abInterval AS abInterval,
Trans_id AS Trans_id,
aState_Trans_id AS aState_Trans_id,
order_status AS order_status,
aState_order_status AS aState_order_status,
order_revenue AS order_revenue,
aState_order_revenue AS aState_order_revenue
FROM q1
MATCH_RECOGNIZE (
PARTITION BY
order_id
MEASURES
B.Trans_id AS Trans_id,
B.order_id AS order_id,
B.order_status AS order_status,
B.order_revenue AS order_revenue,
A.Trans_id AS aState_Trans_id,
A.order_status AS aState_order_status,
A.order_revenue AS aState_order_revenue,
(to_timestamp(B.ELEMENT_TIME) - to_timestamp(A.ELEMENT_TIME)) AS abInterval
PATTERN( A C*? B )
WITHIN 1 minutes
DEFINE
A as A.order_status like ".*BOOKED.*" ,
B as B.order_status like ".*SHIPPED.*"
) as M
Output
{"order_id":1,"abInterval":"+000000000 00:00:05.000000000","Trans_id":6,"aState_Trans_id":1,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":4345.0,"aState_order_revenue":2345.98}
{"order_id":3,"abInterval":"+000000000 00:00:05.000000000","Trans_id":8,"aState_Trans_id":3,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":3468.87,"aState_order_revenue":3468.87}
{"order_id":2,"abInterval":"+000000000 00:00:10.000000000","Trans_id":12,"aState_Trans_id":2,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":4345.98,"aState_order_revenue":4345.98}
5.6.1.2 A Not Followed by B
Note:
In the sample query below, updateq1
to the correct name of the previous stage:
Replace FROM q1
to FROM <previous-stage-name>
.
Use the sample data: Data_A_Followed_B.
SELECT
Trans_id AS Trans_id,
order_id AS order_id,
order_status AS order_status,
order_revenue AS order_revenue
FROM q1
MATCH_RECOGNIZE (
PARTITION BY
order_id
MEASURES
A.Trans_id AS Trans_id,
A.order_id AS order_id,
A.order_status AS order_status,
A.order_revenue AS order_revenue
INCLUDE TIMER EVENTS
PATTERN( A B* )
DURATION 1 minutes
DEFINE
A as A.order_status like ".*BOOKED.*" ,
B as NOT (B.order_status like ".*SHIPPED.*" )
) as M
Output
{"Trans_id":9,"order_id":4,"order_status":"BOOKED","order_revenue":3456.0}
{"Trans_id":10,"order_id":5,"order_status":"BOOKED","order_revenue":6546.0}
{"Trans_id":11,"order_id":6,"order_status":"BOOKED","order_revenue":76547.0}
5.6.1.3 Detect Duplicates
Note:
In the sample query below, updateq1
to the correct name of the previous stage:
Replace FROM q1
to FROM <previous-stage-name>
.
Use the sample data file: Data_A_Followed_B.
RSTREAM( SELECT
count(*) AS Number_of_Duplicates,
eventSource.order_id AS order_id,
current(eventSource.Trans_id) AS Trans_id,
current(eventSource.order_status) AS order_status,
current(eventSource.order_revenue) AS order_revenue
FROM q1 [now] as eventSource,
q1 [range 1 minutes] as dup
WHERE
eventSource.order_id = dup.order_id
GROUP BY
eventSource.order_id HAVING count(*) > 1)
Output
{"Number_of_Duplicates":2,"order_id":1,"Trans_id":4,"order_status":"PAID","order_revenue":2345.98}
{"Number_of_Duplicates":2,"order_id":2,"Trans_id":5,"order_status":"PAID","order_revenue":4345.98}
{"Number_of_Duplicates":3,"order_id":1,"Trans_id":6,"order_status":"SHIPPED","order_revenue":4345.0}
{"Number_of_Duplicates":2,"order_id":3,"Trans_id":7,"order_status":"PAID","order_revenue":3468.87}
{"Number_of_Duplicates":3,"order_id":3,"Trans_id":8,"order_status":"SHIPPED","order_revenue":3468.87}
{"Number_of_Duplicates":3,"order_id":2,"Trans_id":12,"order_status":"SHIPPED","order_revenue":4345.98}
5.6.1.4 Change Event
Note:
In the sample query below, updateq1
to the correct name of the previous stage:
Replace FROM q1
to FROM <previous-stage-name>
.
Use the sample data file: change_detector.
SELECT
Stock_ID AS Stock_ID,
Stock_Price AS Stock_Price,
orig_Stock_Price AS orig_Stock_Price,
Msg_ID AS Msg_ID,
orig_Msg_ID AS orig_Msg_ID
FROM q1
MATCH_RECOGNIZE (
PARTITION BY
Stock_ID
MEASURES
Z.Stock_ID AS Stock_ID,
last(X.Stock_Price) AS Stock_Price,
Z.Stock_Price AS orig_Stock_Price,
last(X.Msg_ID) AS Msg_ID,
Z.Msg_ID AS orig_Msg_ID
PATTERN( Z X+ )
WITHIN 1 minutes
DEFINE X as X.Stock_Price != Z.Stock_Price
) as M
Output
{"Stock_ID":1,"Stock_Price":105,"orig_Stock_Price":87,"Msg_ID":23,"orig_Msg_ID":4}
{"Stock_ID":2,"Stock_Price":112,"orig_Stock_Price":41,"Msg_ID":27,"orig_Msg_ID":16}
{"Stock_ID":3,"Stock_Price":176,"orig_Stock_Price":65,"Msg_ID":30,"orig_Msg_ID":10}
5.6.1.5 Eliminate Duplicates
Note:
In the sample query below, updateq1
to the correct name of the previous stage:
Replace FROM q1
to FROM <previous-stage-name>
.
Use the sample data file: change_detector.
ISTREAM( SELECT
Msg_ID AS Msg_ID,
Stock_ID AS Stock_ID,
Stock_Price AS Stock_Price
FROM q1 [range 1 minutes]
) DIFFERENCE USING (Stock_Price)
Output
{"Msg_ID":1,"Stock_ID":1,"Stock_Price":87}
{"Msg_ID":5,"Stock_ID":2,"Stock_Price":41}
{"Msg_ID":6,"Stock_ID":3,"Stock_Price":65}
{"Msg_ID":17,"Stock_ID":1,"Stock_Price":91}
{"Msg_ID":20,"Stock_ID":1,"Stock_Price":105}
{"Msg_ID":24,"Stock_ID":2,"Stock_Price":112}
{"Msg_ID":28,"Stock_ID":3,"Stock_Price":176}