5.6 CQL問合せの書込み
パイプラインのカスタムCQLステージを追加して、CQL問合せを記述すると、前のステージのデータに対して任意のSelect操作を実行できます。現在、既存のパイプライン・ステージまたはパターン・ステージでは、特定のビジネス・ルールの作成がサポートされていません。それが必要な場合には、カスタムCQLステージの一部としてCQL問合せを使用できます。
CQLの詳細は、CQLのリファレンス・ガイドを参照してください。
5.6.1 サンプル問合せ
この項では、標準のSelectおよびWhere句問合せとは別に、サンプル問合せをリストします。
サンプル問合せでは、次のサンプル・データ・セットを使用します。データ・セットをコピーして、csvファイルとして保存してください。
サンプル: Data_A_Followed_B
Trans_id | order_id | order_status | order_revenue |
1 | 1 | BOOKED | 2345.98 |
2 | 2 | BOOKED | 4345.98 |
3 | 3 | BOOKED | 3468.87 |
4 | 1 | PAID | 2345.98 |
5 | 2 | PAID | 4345.98 |
6 | 1 | SHIPPED | 4345 |
7 | 3 | PAID | 3468.87 |
8 | 3 | SHIPPED | 3468.87 |
9 | 4 | BOOKED | 3456 |
10 | 5 | BOOKED | 6546 |
11 | 6 | BOOKED | 76547 |
12 | 2 | SHIPPED | 4345.98 |
サンプル: Change_Detector
Msg_ID | Stock_ID | Stock_Price |
1 | 1 | 87 |
2 | 1 | 87 |
3 | 1 | 87 |
4 | 1 | 87 |
5 | 2 | 41 |
6 | 3 | 65 |
7 | 3 | 65 |
8 | 3 | 65 |
9 | 3 | 65 |
10 | 3 | 65 |
11 | 2 | 41 |
12 | 2 | 41 |
13 | 2 | 41 |
14 | 2 | 41 |
15 | 2 | 41 |
16 | 2 | 41 |
17 | 1 | 91 |
18 | 1 | 91 |
19 | 1 | 91 |
20 | 1 | 105 |
21 | 1 | 105 |
22 | 1 | 105 |
23 | 1 | 105 |
24 | 2 | 112 |
25 | 2 | 112 |
26 | 2 | 112 |
27 | 2 | 112 |
28 | 3 | 176 |
29 | 3 | 176 |
30 | 3 | 176 |
5.6.1.1 Bが後に続くA
ノート:
この後のサンプル問合せで、次のようにして、q1
を前のステージの正確な名前に更新します:
Replace FROM q1
to FROM <previous-stage-name>
。
サンプル・データ・ファイルData_A_Followed_Bを使用します。
SELECT
order_id AS order_id,
abInterval AS abInterval,
Trans_id AS Trans_id,
aState_Trans_id AS aState_Trans_id,
order_status AS order_status,
aState_order_status AS aState_order_status,
order_revenue AS order_revenue,
aState_order_revenue AS aState_order_revenue
FROM q1
MATCH_RECOGNIZE (
PARTITION BY
order_id
MEASURES
B.Trans_id AS Trans_id,
B.order_id AS order_id,
B.order_status AS order_status,
B.order_revenue AS order_revenue,
A.Trans_id AS aState_Trans_id,
A.order_status AS aState_order_status,
A.order_revenue AS aState_order_revenue,
(to_timestamp(B.ELEMENT_TIME) - to_timestamp(A.ELEMENT_TIME)) AS abInterval
PATTERN( A C*? B )
WITHIN 1 minutes
DEFINE
A as A.order_status like ".*BOOKED.*" ,
B as B.order_status like ".*SHIPPED.*"
) as M
出力
{"order_id":1,"abInterval":"+000000000 00:00:05.000000000","Trans_id":6,"aState_Trans_id":1,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":4345.0,"aState_order_revenue":2345.98}
{"order_id":3,"abInterval":"+000000000 00:00:05.000000000","Trans_id":8,"aState_Trans_id":3,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":3468.87,"aState_order_revenue":3468.87}
{"order_id":2,"abInterval":"+000000000 00:00:10.000000000","Trans_id":12,"aState_Trans_id":2,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":4345.98,"aState_order_revenue":4345.98}
5.6.1.2 Bが後に続かないA
ノート:
この後のサンプル問合せで、次のようにして、q1
を前のステージの正確な名前に更新します:
Replace FROM q1
to FROM <previous-stage-name>
。
サンプル・データData_A_Followed_Bを使用します。
SELECT
Trans_id AS Trans_id,
order_id AS order_id,
order_status AS order_status,
order_revenue AS order_revenue
FROM q1
MATCH_RECOGNIZE (
PARTITION BY
order_id
MEASURES
A.Trans_id AS Trans_id,
A.order_id AS order_id,
A.order_status AS order_status,
A.order_revenue AS order_revenue
INCLUDE TIMER EVENTS
PATTERN( A B* )
DURATION 1 minutes
DEFINE
A as A.order_status like ".*BOOKED.*" ,
B as NOT (B.order_status like ".*SHIPPED.*" )
) as M
出力
{"Trans_id":9,"order_id":4,"order_status":"BOOKED","order_revenue":3456.0}
{"Trans_id":10,"order_id":5,"order_status":"BOOKED","order_revenue":6546.0}
{"Trans_id":11,"order_id":6,"order_status":"BOOKED","order_revenue":76547.0}
5.6.1.3 重複の検出
ノート:
この後のサンプル問合せで、次のようにして、q1
を前のステージの正確な名前に更新します:
Replace FROM q1
to FROM <previous-stage-name>
。
サンプル・データ・ファイルData_A_Followed_Bを使用します。
RSTREAM( SELECT
count(*) AS Number_of_Duplicates,
eventSource.order_id AS order_id,
current(eventSource.Trans_id) AS Trans_id,
current(eventSource.order_status) AS order_status,
current(eventSource.order_revenue) AS order_revenue
FROM q1 [now] as eventSource,
q1 [range 1 minutes] as dup
WHERE
eventSource.order_id = dup.order_id
GROUP BY
eventSource.order_id HAVING count(*) > 1)
出力
{"Number_of_Duplicates":2,"order_id":1,"Trans_id":4,"order_status":"PAID","order_revenue":2345.98}
{"Number_of_Duplicates":2,"order_id":2,"Trans_id":5,"order_status":"PAID","order_revenue":4345.98}
{"Number_of_Duplicates":3,"order_id":1,"Trans_id":6,"order_status":"SHIPPED","order_revenue":4345.0}
{"Number_of_Duplicates":2,"order_id":3,"Trans_id":7,"order_status":"PAID","order_revenue":3468.87}
{"Number_of_Duplicates":3,"order_id":3,"Trans_id":8,"order_status":"SHIPPED","order_revenue":3468.87}
{"Number_of_Duplicates":3,"order_id":2,"Trans_id":12,"order_status":"SHIPPED","order_revenue":4345.98}
5.6.1.4 変更イベント
ノート:
この後のサンプル問合せで、次のようにして、q1
を前のステージの正確な名前に更新します:
Replace FROM q1
to FROM <previous-stage-name>
。
サンプル・データ・ファイルchange_detectorを使用します。
SELECT
Stock_ID AS Stock_ID,
Stock_Price AS Stock_Price,
orig_Stock_Price AS orig_Stock_Price,
Msg_ID AS Msg_ID,
orig_Msg_ID AS orig_Msg_ID
FROM q1
MATCH_RECOGNIZE (
PARTITION BY
Stock_ID
MEASURES
Z.Stock_ID AS Stock_ID,
last(X.Stock_Price) AS Stock_Price,
Z.Stock_Price AS orig_Stock_Price,
last(X.Msg_ID) AS Msg_ID,
Z.Msg_ID AS orig_Msg_ID
PATTERN( Z X+ )
WITHIN 1 minutes
DEFINE X as X.Stock_Price != Z.Stock_Price
) as M
出力
{"Stock_ID":1,"Stock_Price":105,"orig_Stock_Price":87,"Msg_ID":23,"orig_Msg_ID":4}
{"Stock_ID":2,"Stock_Price":112,"orig_Stock_Price":41,"Msg_ID":27,"orig_Msg_ID":16}
{"Stock_ID":3,"Stock_Price":176,"orig_Stock_Price":65,"Msg_ID":30,"orig_Msg_ID":10}
5.6.1.5 重複の除去
ノート:
この後のサンプル問合せで、次のようにして、q1
を前のステージの正確な名前に更新します:
Replace FROM q1
to FROM <previous-stage-name>
。
サンプル・データ・ファイルchange_detectorを使用します。
ISTREAM( SELECT
Msg_ID AS Msg_ID,
Stock_ID AS Stock_ID,
Stock_Price AS Stock_Price
FROM q1 [range 1 minutes]
) DIFFERENCE USING (Stock_Price)
出力
{"Msg_ID":1,"Stock_ID":1,"Stock_Price":87}
{"Msg_ID":5,"Stock_ID":2,"Stock_Price":41}
{"Msg_ID":6,"Stock_ID":3,"Stock_Price":65}
{"Msg_ID":17,"Stock_ID":1,"Stock_Price":91}
{"Msg_ID":20,"Stock_ID":1,"Stock_Price":105}
{"Msg_ID":24,"Stock_ID":2,"Stock_Price":112}
{"Msg_ID":28,"Stock_ID":3,"Stock_Price":176}