5 Transform

Correlating Streams and References

A correlation is used to enrich the incoming event in the data stream with static data in a database table or with data from other streams.

For example, if the event in the data stream only includes SensorId and Sensor Temperature, the event could be enriched with data from a table to obtain SensorMake, SensorLocation, SensorThreshold, and many more.

Correlating an event with other sources requires the join condition to be based on a common key. In the above example, the SensorId from the stream cand be used to correlate with SensorKey in the database table. The following query illustrates the above data enrichment scenario producing sensor details for all sensors whose temperature exceeds their pre-defined threshold.

Select T.SensorId, T.Temperature, D.SensorName, D.SensorLocation
From TemperatureStream[Now] T, SensorDetailsTable D
Where T.SensorId = D.SensorKey And T.Temperature > D.SensorThreshold

Queries like above and more complex queries can be automatically generated by configuring sources and filter sections of the query stage.

Joining Mutiple Streams

You can correlate a stream with another stream.

Stream-to-stream Correlation

  • A Stream is an unbounded sequence of events. To correlate a Stream with another Stream, first convert both streams to a Relation or a bounded sequence of events, by applying window functions.
  • After applying window functions on both streams, define a correlation condition that evaluates to true or false.
The output from stream-to-stream correlation is a subset of the Cartesian product of tuples from both windows, where the correlation condition is true.

Joining a Stream with a Reference or an External Source

You can join a stream with external data in a Database or a Coherence Cache.

Stream-to-Database Table Correlation

  • Convert the Stream to a bounded sequence of events, by applying a window function.
  • After applying the window function on the stream, define a correlation condition that evaluates to true or false.
The output from Stream-to-database correlation is the Cartesian product of tuples from window and the database table, where the correlation condition is true.

Stream-to-Cache Correlation

  • Convert the Stream to a bounded sequence of events, by applying a window function.
  • After applying the window function on the stream, define a correlation condition that evaluates to true of false.
The output from Stream-to-Cache is the Cartesian product of tuples from window and the cache, where the correlation condition is true. Currently, OSA supports only Coherence cache.

Applying Window Functions to a Stream

Apply window functions, to specify time and event based windows, to process your stream.

To apply a Window function:
  1. Open a pipeline in the Pipeline Editor.
  2. Select the query stage to apply the window function.
  3. Click the Sources tab.
  4. Click the clock icon, and select the required window type from the Window Type drop-down list.

Applying a Time Window with Slide

  • Range value: Integer

  • Range unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours

  • Slide value: Integer

  • Slide unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours

  • Applicable on: Query Stage

The CQL example is as follows:
[range 5 MINUTES slide 30 SECONDS]

In the above example, the data is retained for 5 minutes but the query is evaluated every 30 seconds.

Note:

There will be an output only if the current results of the query is different from the previous results. This avoids sending duplicates to downstream applications.
If you set the slide to same as range, it will create a tumbling window instead of a sliding window. For example,
[Range 5 MINUTES slide 5 MINUTES]
will only retain 5 minutes of data and the query will only execute every 5 minutes.

Note:

Use the tumbling window to batch output results before sending to downstream systems. For example, you may want to create a file on object store only after you have accumulated at least 10000 events. This would avoid the small-file problem in Big-Data systems. Similarly, you may want to avoid multiple writes to a database system and instead perform a single write, after sufficient events have been accumulated.

Applying a Time Window without Slide

  • Range value: Integer

  • Range unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours

  • Applicable on: Query Stage, Query Group Stream Stage and Query Group Table Stage

The CQL example is as follows:

[range 1 minutes ]

In the example above, the default slide value which is same as spark streaming batch interval is used.

Note:

If you do not specify a slide value, it will take the default slide, which is same as the Spark batch interval.

Applying a Row Window with Slide

  • Rows value: Integer

  • Applicable on: Query Stage, Query Group Stream Stage and Query Group Table Stage

The CQL example is as follows:

[rows 10 slide 1]

Maximum window size is 10 events, but Slide of 1 implies the query is executed on the arrival of every new event.

Applying a Row Window without Slide

  • Rows value: Integer

  • Applicable on: Query Stage, Query Group Stream Stage and Query Group Table Stage

The CQL example is as follows:

[rows 10]

Last 10 events is used to evaluate the query. Default slide value is used.

Applying a window with current year, month, day, or hour

CurrentYear

  • Applicable on: Query Stage, Detect Duplicates Pattern, Eliminate Duplicate Pattern

The CQL example is as follows:

[ CurrentYear ]

Data is retained until end of the current year. Default slide value is used.

CurrentMonth

  • Applicable on: Query Stage, Detect Duplicates Pattern, Eliminate Duplicate Pattern

The CQL example is as follows:

[ CurrentMonth ]

Data is retained until the end of the current month. Default slide value is used.

CurrentDay

  • Applicable on: Query Stage, Detect Duplicates Pattern, Eliminate Duplicate Pattern

The CQL example is as follows:

[ CurrentDay ]

CurrentHour

  • Supported types of shape fields: timestamp, int, bigint

  • Applicable on: Query Stage, Detect Duplicates Pattern, Eliminate Duplicate Pattern

The CQL example is as follows:

[ CurrentHour ]

Data is retained until the end of the current hour. Default slide value is used.

Applying your own Window using Field from Payload

  • Interval value: interval

  • Supported types of shape fields: timestamp

  • Applicable on: Query and Query Group Stream Stage and Query Group Table Stage

The CQL example is as follows:

[range "DS_INTERVAL" on c1]

Here the range is based on a field value in the payload.

Use this window type to aggregate data using a timestamp column from payload. For example,
[range INTERVAL "2 0:0:0.0" DAY TO SECOND on EventCaptureTime]
will only retain events from the last 2 days, based on the timestamp value in EventCaptureTime field.

Applying a Row window with Partition without Range

  • Shape fields of Partition by: MultiSelect

  • Rows value: Integer

  • Applicable on: Query Stage

The CQL example is as follows:

[partition by F1, F2 rows 10]

Last 10 events for each partition value. For example [partition by stockSymbol rows 10] will use last 10 quotes for ORCL, last 10 quotes for AMZN, etc.

Query is evaluated on the arrival of new events and not on time ticks.

Default slide value is used.

Applying a Row Window with Partition with Range without Slide

  • Shape fields of Partition by: MultiSelect

  • Rows value: Integer

  • Range value: Integer

  • Range unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours

  • Applicable on: Query Stage

The CQL example is as follows:

[partition by F1, F2 rows 10 range 15 seconds]

Events may be evicted from the window even when it is not full with all 10 rows, but 15 seconds have elapsed since the event arrived.

Applying a Row Window with Partition with Slide and Range

  • Shape fields of Partition by: MultiSelect

  • Rows value: Integer

  • Range value: Integer

  • Range unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours

  • Slide Value: Integer

  • Slide unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours

  • Applicable on: Query Stage

The CQL example is as follows:

[partition by F1, F2 rows 10 range 15 seconds slide 1 second]