5.6 CQL問合せの書込み

パイプラインのカスタムCQLステージを追加して、CQL問合せを記述すると、前のステージのデータに対して任意のSelect操作を実行できます。現在、既存のパイプライン・ステージまたはパターン・ステージでは、特定のビジネス・ルールの作成がサポートされていません。それが必要な場合には、カスタムCQLステージの一部としてCQL問合せを使用できます。

CQLの詳細は、CQLのリファレンス・ガイドを参照してください。

5.6.1 サンプル問合せ

この項では、標準のSelectおよびWhere句問合せとは別に、サンプル問合せをリストします。

サンプル問合せでは、次のサンプル・データ・セットを使用します。データ・セットをコピーして、csvファイルとして保存してください。

サンプル: Data_A_Followed_B

Trans_id order_id order_status order_revenue
1 1 BOOKED 2345.98
2 2 BOOKED 4345.98
3 3 BOOKED 3468.87
4 1 PAID 2345.98
5 2 PAID 4345.98
6 1 SHIPPED 4345
7 3 PAID 3468.87
8 3 SHIPPED 3468.87
9 4 BOOKED 3456
10 5 BOOKED 6546
11 6 BOOKED 76547
12 2 SHIPPED 4345.98

サンプル: Change_Detector

Msg_ID Stock_ID Stock_Price
1 1 87
2 1 87
3 1 87
4 1 87
5 2 41
6 3 65
7 3 65
8 3 65
9 3 65
10 3 65
11 2 41
12 2 41
13 2 41
14 2 41
15 2 41
16 2 41
17 1 91
18 1 91
19 1 91
20 1 105
21 1 105
22 1 105
23 1 105
24 2 112
25 2 112
26 2 112
27 2 112
28 3 176
29 3 176
30 3 176

5.6.1.1 Bが後に続くA

ノート:

この後のサンプル問合せで、次のようにして、q1を前のステージの正確な名前に更新します:

Replace FROM q1 to FROM <previous-stage-name>

サンプル・データ・ファイルData_A_Followed_Bを使用します。

SELECT

order_id AS order_id,
abInterval AS abInterval,
Trans_id AS Trans_id,
aState_Trans_id AS aState_Trans_id,
order_status AS order_status,
aState_order_status AS aState_order_status,
order_revenue AS order_revenue,
aState_order_revenue AS aState_order_revenue
FROM q1

MATCH_RECOGNIZE (
PARTITION BY
order_id
MEASURES

B.Trans_id AS Trans_id,
B.order_id AS order_id,
B.order_status AS order_status,
B.order_revenue AS order_revenue,
A.Trans_id AS aState_Trans_id,
A.order_status AS aState_order_status,
A.order_revenue AS aState_order_revenue,
(to_timestamp(B.ELEMENT_TIME) - to_timestamp(A.ELEMENT_TIME)) AS abInterval
PATTERN( A C*? B )
WITHIN 1 minutes
DEFINE
A as A.order_status like ".*BOOKED.*" ,
B as B.order_status like ".*SHIPPED.*"

) as M

出力

{"order_id":1,"abInterval":"+000000000 00:00:05.000000000","Trans_id":6,"aState_Trans_id":1,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":4345.0,"aState_order_revenue":2345.98}
{"order_id":3,"abInterval":"+000000000 00:00:05.000000000","Trans_id":8,"aState_Trans_id":3,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":3468.87,"aState_order_revenue":3468.87}
{"order_id":2,"abInterval":"+000000000 00:00:10.000000000","Trans_id":12,"aState_Trans_id":2,"order_status":"SHIPPED","aState_order_status":"BOOKED","order_revenue":4345.98,"aState_order_revenue":4345.98}

5.6.1.2 Bが後に続かないA

ノート:

この後のサンプル問合せで、次のようにして、q1を前のステージの正確な名前に更新します:

Replace FROM q1 to FROM <previous-stage-name>

サンプル・データData_A_Followed_Bを使用します。

SELECT

Trans_id AS Trans_id,
order_id AS order_id,
order_status AS order_status,
order_revenue AS order_revenue
FROM q1

MATCH_RECOGNIZE (
PARTITION BY
order_id
MEASURES

A.Trans_id AS Trans_id,
A.order_id AS order_id,
A.order_status AS order_status,
A.order_revenue AS order_revenue
INCLUDE TIMER EVENTS
PATTERN( A B* )
DURATION 1 minutes
DEFINE
A as A.order_status like ".*BOOKED.*" ,
B as NOT (B.order_status like ".*SHIPPED.*" )

) as M

出力

{"Trans_id":9,"order_id":4,"order_status":"BOOKED","order_revenue":3456.0}
{"Trans_id":10,"order_id":5,"order_status":"BOOKED","order_revenue":6546.0}
{"Trans_id":11,"order_id":6,"order_status":"BOOKED","order_revenue":76547.0}

5.6.1.3 重複の検出

ノート:

この後のサンプル問合せで、次のようにして、q1を前のステージの正確な名前に更新します:

Replace FROM q1 to FROM <previous-stage-name>

サンプル・データ・ファイルData_A_Followed_Bを使用します。

RSTREAM( SELECT

count(*) AS Number_of_Duplicates,
eventSource.order_id AS order_id,
current(eventSource.Trans_id) AS Trans_id,
current(eventSource.order_status) AS order_status,
current(eventSource.order_revenue) AS order_revenue
FROM q1 [now] as eventSource,
q1 [range 1 minutes] as dup

WHERE
eventSource.order_id = dup.order_id
GROUP BY
eventSource.order_id HAVING count(*) > 1)

出力

{"Number_of_Duplicates":2,"order_id":1,"Trans_id":4,"order_status":"PAID","order_revenue":2345.98}
{"Number_of_Duplicates":2,"order_id":2,"Trans_id":5,"order_status":"PAID","order_revenue":4345.98}
{"Number_of_Duplicates":3,"order_id":1,"Trans_id":6,"order_status":"SHIPPED","order_revenue":4345.0}
{"Number_of_Duplicates":2,"order_id":3,"Trans_id":7,"order_status":"PAID","order_revenue":3468.87}
{"Number_of_Duplicates":3,"order_id":3,"Trans_id":8,"order_status":"SHIPPED","order_revenue":3468.87}
{"Number_of_Duplicates":3,"order_id":2,"Trans_id":12,"order_status":"SHIPPED","order_revenue":4345.98}

5.6.1.4 変更イベント

ノート:

この後のサンプル問合せで、次のようにして、q1を前のステージの正確な名前に更新します:

Replace FROM q1 to FROM <previous-stage-name>

サンプル・データ・ファイルchange_detectorを使用します。

SELECT

Stock_ID AS Stock_ID,
Stock_Price AS Stock_Price,
orig_Stock_Price AS orig_Stock_Price,
Msg_ID AS Msg_ID,
orig_Msg_ID AS orig_Msg_ID
FROM q1

MATCH_RECOGNIZE (
PARTITION BY
Stock_ID
MEASURES

Z.Stock_ID AS Stock_ID,
last(X.Stock_Price) AS Stock_Price,
Z.Stock_Price AS orig_Stock_Price,
last(X.Msg_ID) AS Msg_ID,
Z.Msg_ID AS orig_Msg_ID
PATTERN( Z X+ )
WITHIN 1 minutes
DEFINE X as X.Stock_Price != Z.Stock_Price
) as M

出力

{"Stock_ID":1,"Stock_Price":105,"orig_Stock_Price":87,"Msg_ID":23,"orig_Msg_ID":4}
{"Stock_ID":2,"Stock_Price":112,"orig_Stock_Price":41,"Msg_ID":27,"orig_Msg_ID":16}
{"Stock_ID":3,"Stock_Price":176,"orig_Stock_Price":65,"Msg_ID":30,"orig_Msg_ID":10}

5.6.1.5 重複の除去

ノート:

この後のサンプル問合せで、次のようにして、q1を前のステージの正確な名前に更新します:

Replace FROM q1 to FROM <previous-stage-name>

サンプル・データ・ファイルchange_detectorを使用します。

ISTREAM( SELECT

Msg_ID AS Msg_ID,
Stock_ID AS Stock_ID,
Stock_Price AS Stock_Price
FROM q1 [range 1 minutes]
) DIFFERENCE USING (Stock_Price)

出力

{"Msg_ID":1,"Stock_ID":1,"Stock_Price":87}
{"Msg_ID":5,"Stock_ID":2,"Stock_Price":41}
{"Msg_ID":6,"Stock_ID":3,"Stock_Price":65}
{"Msg_ID":17,"Stock_ID":1,"Stock_Price":91}
{"Msg_ID":20,"Stock_ID":1,"Stock_Price":105}
{"Msg_ID":24,"Stock_ID":2,"Stock_Price":112}
{"Msg_ID":28,"Stock_ID":3,"Stock_Price":176}