5 変換
5.1 ストリームおよび参照の相関付け
相関は、データ・ストリーム内の着信イベントを、データベース表の静的データや他のストリームからのデータでエンリッチする場合に使用されます。
たとえば、データ・ストリーム内のイベントにSensorId
およびSensor Temperature
のみが含まれる場合、表からのデータでイベントをエンリッチすることにより、SensorMake
、SensorLocation
、SensorThreshold
などを取得できます。
イベントを他のソースと相関付ける場合、共通キーに基づいて結合条件を設定する必要があります。前述の例では、ストリーム内のSensorId
を使用して、データベース表のSensorKey
と相関付けることができます。次の問合せは、温度が事前定義済のしきい値を超えているすべてのセンサーのセンサー詳細を生成する、前述のデータ・エンリッチメント・シナリオを示しています。
Select T.SensorId, T.Temperature, D.SensorName, D.SensorLocation
From TemperatureStream[Now] T, SensorDetailsTable D
Where T.SensorId = D.SensorKey And T.Temperature > D.SensorThreshold
前述のような問合せや、もっと複雑な問合せは、問合せステージのソース・セクションおよびフィルタ・セクションを構成することによって自動的に生成できます。
5.1.1 複数ストリームの結合
ストリームを別のストリームと相関付けることができます。
ストリーム間の相関
- ストリームは、バインドされていないイベント・シーケンスです。ストリームを別のストリームと関連付けるには、まず、ウィンドウ関数を適用して、両方のストリームをリレーションまたはバインドされたイベントのシーケンスに変換します。
- 両方のストリームでウィンドウ関数を適用した後で、trueまたはfalseに評価される相関条件を定義します。
5.1.2 参照または外部ソースとのストリームの結合
ストリームをデータベースまたはCoherenceキャッシュ内の外部データと結合できます。
ストリームからデータベース表への相関
- ウィンドウ関数を適用して、ストリームをイベントのバインド・シーケンスに変換します。
- ストリームにウィンドウ関数を適用した後で、trueまたはfalseに評価される相関条件を定義します。
ストリームからキャッシュへの相関
- ウィンドウ関数を適用して、ストリームをイベントのバインド・シーケンスに変換します。
- ストリームにウィンドウ関数を適用した後で、trueまたはfalseに評価される相関条件を定義します。
5.2 ストリームへのウィンドウ関数の適用
時間ベースおよびイベント・ベースのウィンドウを指定してウィンドウ関数を適用し、ストリームを処理します。
- パイプライン・エディタでパイプラインを開きます。
- ウィンドウ関数を適用する問合せステージを選択します。
- 「ソース」タブをクリックします。
- 時計アイコンをクリックし、「ウィンドウ・タイプ」ドロップダウン・リストから必要なウィンドウ・タイプを選択します。
5.2.1 スライド付きの時間ウィンドウの適用
-
範囲値: integer
-
範囲単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間
-
スライド値: integer
-
スライド単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間
-
適用可能対象: :問合せステージ
[range 5 MINUTES slide 30 SECONDS]
前述の例では、データは5分間保持されますが、問合せは30秒ごとに評価されます。
ノート:
最新の問合せの結果が前回の結果と異なる場合のみ、出力が行われます。これにより、ダウンストリーム・アプリケーションに重複が送信されなくなります。[Range 5 MINUTES slide 5 MINUTES]
データは5分間のみ保持され、問合せは5分ごとにしか実行されません。 ノート:
ダウンストリーム・システムに送信する前に、タンブリング・ウィンドウを使用して出力結果をバッチ処理します。たとえば、最低10000個のイベントを蓄積した後にのみ、オブジェクト・ストアにファイルを作成する場合があります。これにより、ビッグデータ・システムでの小さいファイルの問題を回避できます。同様に、十分なイベントが蓄積された後、データベース・システムへの複数の書込みを回避し、単一の書込みを実行する場合もあります。5.2.2 スライドなしの時間ウィンドウの適用
-
範囲値: integer
-
範囲単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間
-
適用可能対象: 問合せステージ、問合せグループ・ストリーム・ステージおよび問合せグループ表ステージ
CQLの例は次のとおりです:
[range 1 minutes ]
前述の例では、デフォルトのスライド値(Sparkストリーミングのバッチ間隔と同じ)が使用されます。
ノート:
スライド値を指定しない場合は、デフォルトのスライド(Sparkのバッチ間隔と同じ)が使用されます。5.2.3 スライド付きの行ウィンドウの適用
-
行の値: integer
-
適用可能対象: 問合せステージ、問合せグループ・ストリーム・ステージおよび問合せグループ表ステージ
CQLの例は次のとおりです:
[rows 10 slide 1]
最大ウィンドウ・サイズは10イベントですが、1のスライドは、新規イベントが到着するたびに問合せが実行されることを意味します。
5.2.4 スライドなしの行ウィンドウの適用
-
行の値: integer
-
適用可能対象: 問合せステージ、問合せグループ・ストリーム・ステージおよび問合せグループ表ステージ
CQLの例は次のとおりです:
[rows 10]
直近の10イベントを使用して、問合せを評価します。デフォルトのスライド値が使用されます。
5.2.5 現在の年、月、日または時間を含むウィンドウの適用
CurrentYear
-
適用可能対象 :問合せステージ、重複の検出パターン、重複の除去パターン
CQLの例は次のとおりです:
[ CurrentYear ]
データは、現在の年末まで保持されます。デフォルトのスライド値が使用されます。
CurrentMonth
-
適用可能対象 :問合せステージ、重複の検出パターン、重複の除去パターン
CQLの例は次のとおりです:
[ CurrentMonth ]
データは、現在の月末まで保持されます。デフォルトのスライド値が使用されます。
CurrentDay
-
適用可能対象 :問合せステージ、重複の検出パターン、重複の除去パターン
CQLの例は次のとおりです:
[ CurrentDay ]
CurrentHour
-
サポートされる形状フィールドの型: timestamp、int、bigint
-
適用可能対象 :問合せステージ、重複の検出パターン、重複の除去パターン
CQLの例は次のとおりです:
[ CurrentHour ]
データは、現在の時間末まで保持されます。デフォルトのスライド値が使用されます。
5.2.6 ペイロードのフィールドを使用した独自のウィンドウの適用
-
間隔値: interval
-
サポートされる形状フィールドの型: timestamp
-
適用可能対象 :問合せステージ、問合せグループ・ストリーム・ステージおよび問合せグループ表ステージ
CQLの例は次のとおりです:
[range "DS_INTERVAL" on c1]
この場合、範囲はペイロードのフィールド値に基づきます。
[range INTERVAL "2 0:0:0.0" DAY TO SECOND on EventCaptureTime]
EventCaptureTimeフィールドのタイムスタンプ値に基づいて、過去2日間のイベントのみが保持されます。
5.2.7 範囲のないパーティション付き行ウィンドウの適用
-
パーティション基準の形状フィールド: 複数選択
-
行の値: integer
-
適用可能対象: :問合せステージ
CQLの例は次のとおりです:
[partition by F1, F2 rows 10]
各パーティション値の直近の10イベント。たとえば、[partition by stockSymbol rows 10]では、直近10件のORCL株価、直近10件のAMZN株価、などが使用されます。
問合せは、新規イベントの到着時に評価され、タイム・ティックでは評価されません。
デフォルトのスライド値が使用されます。