5 変換

5.1 ストリームおよび参照の相関付け

相関は、データ・ストリーム内の着信イベントを、データベース表の静的データや他のストリームからのデータでエンリッチする場合に使用されます。

たとえば、データ・ストリーム内のイベントにSensorIdおよびSensor Temperatureのみが含まれる場合、表からのデータでイベントをエンリッチすることにより、SensorMakeSensorLocationSensorThresholdなどを取得できます。

イベントを他のソースと相関付ける場合、共通キーに基づいて結合条件を設定する必要があります。前述の例では、ストリーム内のSensorIdを使用して、データベース表のSensorKeyと相関付けることができます。次の問合せは、温度が事前定義済のしきい値を超えているすべてのセンサーのセンサー詳細を生成する、前述のデータ・エンリッチメント・シナリオを示しています。

Select T.SensorId, T.Temperature, D.SensorName, D.SensorLocation
From TemperatureStream[Now] T, SensorDetailsTable D
Where T.SensorId = D.SensorKey And T.Temperature > D.SensorThreshold

前述のような問合せや、もっと複雑な問合せは、問合せステージのソース・セクションおよびフィルタ・セクションを構成することによって自動的に生成できます。

5.1.1 複数ストリームの結合

ストリームを別のストリームと相関付けることができます。

ストリーム間の相関

  • ストリームは、バインドされていないイベント・シーケンスです。ストリームを別のストリームと関連付けるには、まず、ウィンドウ関数を適用して、両方のストリームをリレーションまたはバインドされたイベントのシーケンスに変換します。
  • 両方のストリームでウィンドウ関数を適用した後で、trueまたはfalseに評価される相関条件を定義します。
ストリーム間の相関からの出力は、両方のウィンドウからのタプルのデカルト積のサブセットであり、相関条件はtrueです。

5.1.2 参照または外部ソースとのストリームの結合

ストリームをデータベースまたはCoherenceキャッシュ内の外部データと結合できます。

ストリームからデータベース表への相関

  • ウィンドウ関数を適用して、ストリームをイベントのバインド・シーケンスに変換します。
  • ストリームにウィンドウ関数を適用した後で、trueまたはfalseに評価される相関条件を定義します。
ストリームからデータベースへの相関からの出力は、ウィンドウとデータベース表からのタプルのデカルト積であり、相関条件はtrueです。

ストリームからキャッシュへの相関

  • ウィンドウ関数を適用して、ストリームをイベントのバインド・シーケンスに変換します。
  • ストリームにウィンドウ関数を適用した後で、trueまたはfalseに評価される相関条件を定義します。
ストリームからキャッシュへの出力は、ウィンドウとキャッシュからのタプルのデカルト積であり、相関条件はtrueです。現在、OSAはCoherenceキャッシュのみをサポートします。

5.2 ストリームへのウィンドウ関数の適用

時間ベースおよびイベント・ベースのウィンドウを指定してウィンドウ関数を適用し、ストリームを処理します。

ウィンドウ関数を適用するには:
  1. パイプライン・エディタでパイプラインを開きます。
  2. ウィンドウ関数を適用する問合せステージを選択します。
  3. 「ソース」タブをクリックします。
  4. 時計アイコンをクリックし、「ウィンドウ・タイプ」ドロップダウン・リストから必要なウィンドウ・タイプを選択します。

5.2.1 スライド付きの時間ウィンドウの適用

  • 範囲値: integer

  • 範囲単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間

  • スライド値: integer

  • スライド単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間

  • 適用可能対象: :問合せステージ

CQLの例は次のとおりです:
[range 5 MINUTES slide 30 SECONDS]

前述の例では、データは5分間保持されますが、問合せは30秒ごとに評価されます。

ノート:

最新の問合せの結果が前回の結果と異なる場合のみ、出力が行われます。これにより、ダウンストリーム・アプリケーションに重複が送信されなくなります。
スライドを範囲と同じに設定すると、スライディング・ウィンドウのかわりにタンブリング・ウィンドウが作成されます。例:
[Range 5 MINUTES slide 5 MINUTES]
データは5分間のみ保持され、問合せは5分ごとにしか実行されません。

ノート:

ダウンストリーム・システムに送信する前に、タンブリング・ウィンドウを使用して出力結果をバッチ処理します。たとえば、最低10000個のイベントを蓄積した後にのみ、オブジェクト・ストアにファイルを作成する場合があります。これにより、ビッグデータ・システムでの小さいファイルの問題を回避できます。同様に、十分なイベントが蓄積された後、データベース・システムへの複数の書込みを回避し、単一の書込みを実行する場合もあります。

5.2.2 スライドなしの時間ウィンドウの適用

  • 範囲値: integer

  • 範囲単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間

  • 適用可能対象: 問合せステージ、問合せグループ・ストリーム・ステージおよび問合せグループ表ステージ

CQLの例は次のとおりです:

[range 1 minutes ]

前述の例では、デフォルトのスライド値(Sparkストリーミングのバッチ間隔と同じ)が使用されます。

ノート:

スライド値を指定しない場合は、デフォルトのスライド(Sparkのバッチ間隔と同じ)が使用されます。

5.2.3 スライド付きの行ウィンドウの適用

  • 行の値: integer

  • 適用可能対象: 問合せステージ、問合せグループ・ストリーム・ステージおよび問合せグループ表ステージ

CQLの例は次のとおりです:

[rows 10 slide 1]

最大ウィンドウ・サイズは10イベントですが、1のスライドは、新規イベントが到着するたびに問合せが実行されることを意味します。

5.2.4 スライドなしの行ウィンドウの適用

  • 行の値: integer

  • 適用可能対象: 問合せステージ、問合せグループ・ストリーム・ステージおよび問合せグループ表ステージ

CQLの例は次のとおりです:

[rows 10]

直近の10イベントを使用して、問合せを評価します。デフォルトのスライド値が使用されます。

5.2.5 現在の年、月、日または時間を含むウィンドウの適用

CurrentYear

  • 適用可能対象 :問合せステージ、重複の検出パターン、重複の除去パターン

CQLの例は次のとおりです:

[ CurrentYear ]

データは、現在の年末まで保持されます。デフォルトのスライド値が使用されます。

CurrentMonth

  • 適用可能対象 :問合せステージ、重複の検出パターン、重複の除去パターン

CQLの例は次のとおりです:

[ CurrentMonth ]

データは、現在の月末まで保持されます。デフォルトのスライド値が使用されます。

CurrentDay

  • 適用可能対象 :問合せステージ、重複の検出パターン、重複の除去パターン

CQLの例は次のとおりです:

[ CurrentDay ]

CurrentHour

  • サポートされる形状フィールドの型: timestamp、int、bigint

  • 適用可能対象 :問合せステージ、重複の検出パターン、重複の除去パターン

CQLの例は次のとおりです:

[ CurrentHour ]

データは、現在の時間末まで保持されます。デフォルトのスライド値が使用されます。

5.2.6 ペイロードのフィールドを使用した独自のウィンドウの適用

  • 間隔値: interval

  • サポートされる形状フィールドの型: timestamp

  • 適用可能対象 :問合せステージ、問合せグループ・ストリーム・ステージおよび問合せグループ表ステージ

CQLの例は次のとおりです:

[range "DS_INTERVAL" on c1]

この場合、範囲はペイロードのフィールド値に基づきます。

このウィンドウ・タイプを使用し、ペイロードのタイムスタンプ列を使用してデータを集計します。例:
[range INTERVAL "2 0:0:0.0" DAY TO SECOND on EventCaptureTime]
EventCaptureTimeフィールドのタイムスタンプ値に基づいて、過去2日間のイベントのみが保持されます。

5.2.7 範囲のないパーティション付き行ウィンドウの適用

  • パーティション基準の形状フィールド: 複数選択

  • 行の値: integer

  • 適用可能対象: :問合せステージ

CQLの例は次のとおりです:

[partition by F1, F2 rows 10]

各パーティション値の直近の10イベント。たとえば、[partition by stockSymbol rows 10]では、直近10件のORCL株価、直近10件のAMZN株価、などが使用されます。

問合せは、新規イベントの到着時に評価され、タイム・ティックでは評価されません。

デフォルトのスライド値が使用されます。

5.2.8 スライドのない範囲を含むパーティション付き行ウィンドウの適用

  • パーティション基準の形状フィールド: 複数選択

  • 行の値: integer

  • 範囲値: integer

  • 範囲単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間

  • 適用可能対象: :問合せステージ

CQLの例は次のとおりです:

[partition by F1, F2 rows 10 range 15 seconds]

10行すべてが一杯になっていなくても、イベントが到着してから15秒経過した場合には、イベントがウィンドウから削除されることがあります。

5.2.9 スライドおよび範囲を含むパーティション付き行ウィンドウの適用

  • パーティション基準の形状フィールド: 複数選択

  • 行の値: integer

  • 範囲値: integer

  • 範囲単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間

  • スライド値: integer

  • スライド単位: ナノ秒、マイクロ秒、ミリ秒、秒、分、時間

  • 適用可能対象: :問合せステージ

CQLの例は次のとおりです:

[partition by F1, F2 rows 10 range 15 seconds slide 1 second]