5 Transform
Correlating Streams and References
A correlation is used to enrich the incoming event in the data stream with static data in a database table or with data from other streams.
For example, if the event in the data stream only includes SensorId
and Sensor Temperature
, the event could be enriched with data from a table to obtain SensorMake
, SensorLocation
, SensorThreshold
, and many more.
Correlating an event with other sources requires the join condition to be based on a common key. In the above example, the SensorId
from the stream cand be used to correlate with SensorKey
in the database table. The following query illustrates the above data enrichment scenario producing sensor details for all sensors whose temperature exceeds their pre-defined threshold.
Select T.SensorId, T.Temperature, D.SensorName, D.SensorLocation
From TemperatureStream[Now] T, SensorDetailsTable D
Where T.SensorId = D.SensorKey And T.Temperature > D.SensorThreshold
Queries like above and more complex queries can be automatically generated by configuring sources and filter sections of the query stage.
Joining Mutiple Streams
You can correlate a stream with another stream.
Stream-to-stream Correlation
- A Stream is an unbounded sequence of events. To correlate a Stream with another Stream, first convert both streams to a Relation or a bounded sequence of events, by applying window functions.
- After applying window functions on both streams, define a correlation condition that evaluates to true or false.
Joining a Stream with a Reference or an External Source
You can join a stream with external data in a Database or a Coherence Cache.
Stream-to-Database Table Correlation
- Convert the Stream to a bounded sequence of events, by applying a window function.
- After applying the window function on the stream, define a correlation condition that evaluates to true or false.
Stream-to-Cache Correlation
- Convert the Stream to a bounded sequence of events, by applying a window function.
- After applying the window function on the stream, define a correlation condition that evaluates to true of false.
Applying Window Functions to a Stream
Apply window functions, to specify time and event based windows, to process your stream.
- Open a pipeline in the Pipeline Editor.
- Select the query stage to apply the window function.
- Click the Sources tab.
- Click the clock icon, and select the required window type from the Window Type drop-down list.
Applying a Time Window with Slide
-
Range value: Integer
-
Range unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours
-
Slide value: Integer
-
Slide unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours
-
Applicable on: Query Stage
[range 5 MINUTES slide 30 SECONDS]
In the above example, the data is retained for 5 minutes but the query is evaluated every 30 seconds.
Note:
There will be an output only if the current results of the query is different from the previous results. This avoids sending duplicates to downstream applications.[Range 5 MINUTES slide 5 MINUTES]
will only retain 5 minutes of data and the query will only execute every 5 minutes. Note:
Use the tumbling window to batch output results before sending to downstream systems. For example, you may want to create a file on object store only after you have accumulated at least 10000 events. This would avoid the small-file problem in Big-Data systems. Similarly, you may want to avoid multiple writes to a database system and instead perform a single write, after sufficient events have been accumulated.Applying a Time Window without Slide
-
Range value: Integer
-
Range unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours
-
Applicable on: Query Stage, Query Group Stream Stage and Query Group Table Stage
The CQL example is as follows:
[range 1 minutes ]
In the example above, the default slide value which is same as spark streaming batch interval is used.
Note:
If you do not specify a slide value, it will take the default slide, which is same as the Spark batch interval.Applying a Row Window with Slide
-
Rows value: Integer
-
Applicable on: Query Stage, Query Group Stream Stage and Query Group Table Stage
The CQL example is as follows:
[rows 10 slide 1]
Maximum window size is 10 events, but Slide of 1 implies the query is executed on the arrival of every new event.
Applying a Row Window without Slide
-
Rows value: Integer
-
Applicable on: Query Stage, Query Group Stream Stage and Query Group Table Stage
The CQL example is as follows:
[rows 10]
Last 10 events is used to evaluate the query. Default slide value is used.
Applying a window with current year, month, day, or hour
CurrentYear
-
Applicable on: Query Stage, Detect Duplicates Pattern, Eliminate Duplicate Pattern
The CQL example is as follows:
[ CurrentYear ]
Data is retained until end of the current year. Default slide value is used.
CurrentMonth
-
Applicable on: Query Stage, Detect Duplicates Pattern, Eliminate Duplicate Pattern
The CQL example is as follows:
[ CurrentMonth ]
Data is retained until the end of the current month. Default slide value is used.
CurrentDay
-
Applicable on: Query Stage, Detect Duplicates Pattern, Eliminate Duplicate Pattern
The CQL example is as follows:
[ CurrentDay ]
CurrentHour
-
Supported types of shape fields: timestamp, int, bigint
-
Applicable on: Query Stage, Detect Duplicates Pattern, Eliminate Duplicate Pattern
The CQL example is as follows:
[ CurrentHour ]
Data is retained until the end of the current hour. Default slide value is used.
Applying your own Window using Field from Payload
-
Interval value: interval
-
Supported types of shape fields: timestamp
-
Applicable on: Query and Query Group Stream Stage and Query Group Table Stage
The CQL example is as follows:
[range "DS_INTERVAL" on c1]
Here the range is based on a field value in the payload.
[range INTERVAL "2 0:0:0.0" DAY TO SECOND on EventCaptureTime]
will only retain events from the last 2 days, based on the timestamp value in EventCaptureTime field.
Applying a Row window with Partition without Range
-
Shape fields of Partition by: MultiSelect
-
Rows value: Integer
-
Applicable on: Query Stage
The CQL example is as follows:
[partition by F1, F2 rows 10]
Last 10 events for each partition value. For example [partition by stockSymbol rows 10] will use last 10 quotes for ORCL, last 10 quotes for AMZN, etc.
Query is evaluated on the arrival of new events and not on time ticks.
Default slide value is used.
Applying a Row Window with Partition with Range without Slide
-
Shape fields of Partition by: MultiSelect
-
Rows value: Integer
-
Range value: Integer
-
Range unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours
-
Applicable on: Query Stage
The CQL example is as follows:
[partition by F1, F2 rows 10 range 15 seconds]
Events may be evicted from the window even when it is not full with all 10 rows, but 15 seconds have elapsed since the event arrived.
Applying a Row Window with Partition with Slide and Range
-
Shape fields of Partition by: MultiSelect
-
Rows value: Integer
-
Range value: Integer
-
Range unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours
-
Slide Value: Integer
-
Slide unit: nanoseconds, microseconds, milliseconds, seconds, minutes, hours
-
Applicable on: Query Stage
The CQL example is as follows:
[partition by F1, F2 rows 10 range 15 seconds slide 1 second]