9 Creating a Pipeline to Transform and Analyze Data Streams

9.1 Creating a Pipeline

To create a pipeline:

  1. On the Catalog page, click Create New Item, and select Pipeline from the drop-down list.
  2. On the Type Properties screen, enter the following details:
    • Name
    • Description
    • Tags
    • Stream: Select a stream from the drop-down list.
  3. Click Save.

9.2 Adding a Query Stage

You can include simple or complex queries on the data stream without any coding to obtain refined results in the output.

  1. Open a pipeline in the Pipeline Editor.
  2. Right-click the stage after which you want to add a query stage, click Add a Stage, and then select Query.
  3. Enter a Name and Description for the Query Stage.
  4. Click Save.

9.3 Adding a Filter to a Query Stage

You can add filters in a pipeline to obtain more accurate streaming data.

To add a filter:
  1. Open a pipeline in the Pipeline Editor.
  2. Select the required query stage.
  3. Navigate to the Filters tab.
  4. Click Add a Filter.
  5. Select the required column and a suitable operator and value.

    You can also calculate fields within filters.

    Note:

    IN operator is available as an operator in the drop-down list. This operator is not supported for Interval, Interval YM, Timestamp, and SDO Geometry datatypes.

    You can use the IN filter to refer to a column in a database table. When you change the database column values at runtime, the pipeline picks up the latest values from the DB column, without republishing the pipeline.

  6. Click Add a Condition to add and apply a condition to the filter.
  7. Click Add a Group to add nested conditions.
  8. Repeat these steps for as many filters, conditions, or groups as you want to add.

    You can create blocks without adding condition expression, which you can add at any later stage.

    Link the blocks using AND/ OR

    Define complex conditions.

    Example:

    Query Filter Enhancement - Example

9.4 Adding a Summary to a Query Stage

To add a summary:
  1. Open a pipeline in the Pipeline Editor.
  2. Select the required query stage and click the Summaries tab.
  3. Click Add a Summary.
  4. Select the suitable function and the required column.
  5. Repeat the above steps to add as many summaries you want.

9.5 Adding a Summary with Group By

To add a group by:
  1. Open a pipeline in the Pipeline Editor.
  2. Select the required query stage and click the Summaries tab.
  3. Click Add a Group By.
  4. Click Add a Field and select the column on which you want to group by.

When you create a group by, the live output table shows the group by column alone by default. Turn ON Retain All Columns to display all columns in the output table.

You can add multiple group by's.

9.6 Applying a Window Function on Stream

Apply window functions, to specify time and event based windows, to process your stream.

To apply a Window function:
  1. Open a pipeline in the Pipeline Editor.
  2. Select the query stage to apply the window function.
  3. Click the Sources tab.
  4. Click the clock icon, and select the required window type from the Window Type drop-down list.
For more information on the available window types, see Applying Window Functions to Streams.

9.7 Applying Functions to Create a New Column

You can perform calculations on the data streaming in the pipeline, and also add new fields into the stream using in-built functions of the Expression Builder.

To launch the Expression Builder, click fx in the Live Output table.

Launch Expression Builder

Note:

Currently, you can use expressions only within a query stage.

Adding a Constant Value Column

A constant value is a simple string or number. No calculation is performed on a constant value. Enter a constant value directly in the expression builder to add it to the live output table.

Description of expr_constant_value.png follows
Description of the illustration expr_constant_value.png

Using Functions

You can select a CQL Function from the list of available functions and select the input parameters. Make sure to begin the expression with ”=”. Click Apply to apply the function to the streaming data.

Example expression using functions:
=float((CanceledOrdersFloat/NewOrdersFloat) * 100.0) 

Description of list_of_functions.png follows
Description of the illustration list_of_functions.png

You can see custom functions in the list of available functions when you add/import a custom jar in your pipeline.

For a list of supported functions, see Using Functions in Expression Builder .

9.8 Correlating Streams and References

A correlation is used to enrich the incoming event in the data stream with static data in a database table or with data from other streams.

For example, if the event in the data stream only includes SensorId and Sensor Temperature, the event could be enriched with data from a table to obtain SensorMake, SensorLocation, SensorThreshold, and many more.

Correlating an event with other sources requires the join condition to be based on a common key. In the above example, the SensorId from the stream cand be used to correlate with SensorKey in the database table. The following query illustrates the above data enrichment scenario producing sensor details for all sensors whose temperature exceeds their pre-defined threshold.

Select T.SensorId, T.Temperature, D.SensorName, D.SensorLocation
From TemperatureStream[Now] T, SensorDetailsTable D
Where T.SensorId = D.SensorKey And T.Temperature > D.SensorThreshold

Queries like above and more complex queries can be automatically generated by configuring sources and filter sections of the query stage.

9.8.1 Joining Mutiple Streams

You can correlate a stream with another stream.

Stream-to-stream Correlation

  • A Stream is an unbounded sequence of events. To correlate a Stream with another Stream, first convert both streams to a Relation or a bounded sequence of events, by applying window functions.
  • After applying window functions on both streams, define a correlation condition that evaluates to true or false.
The output from stream-to-stream correlation is a subset of the Cartesian product of tuples from both windows, where the correlation condition is true.

9.8.2 Joining a Stream with a Reference or an External Source

You can join a stream with external data in a Database or a Coherence Cache.

Stream-to-Database Table Correlation

  • Convert the Stream to a bounded sequence of events, by applying a window function.
  • After applying the window function on the stream, define a correlation condition that evaluates to true or false.
The output from Stream-to-database correlation is the Cartesian product of tuples from window and the database table, where the correlation condition is true.

Stream-to-Cache Correlation

  • Convert the Stream to a bounded sequence of events, by applying a window function.
  • After applying the window function on the stream, define a correlation condition that evaluates to true of false.
The output from Stream-to-Cache is the Cartesian product of tuples from window and the cache, where the correlation condition is true. Currently, OSA supports only Coherence cache.

9.9 Adding a Rule Stage

Using a rule stage, you can add the IF-THEN logic to your pipeline. A rule is a set of conditions and actions applied to a stream. There is no specific sequence to add rules.

To add a rule stage:
  1. Open a pipeline in the Pipeline Editor.
  2. Right-click the stage after which you want to add a rule stage, click Add a Stage, and then select Rule.
  3. Enter a Name and Description for the rule stage.
  4. Click Add a Rule.
  5. Enter Rule Name and Description for the rule and click Done to save the rule.
  6. Select a suitable condition in the IF statement, THEN statement, and click Add Action to add actions within the business rules.
    Actions can also be expressions. For example, SET Revenue TO =-Revenue, will convert the current value of Revenue to a negative number.
    Expressions must always start with a '=' sign. For a constant text value, just type in the text. For example, SET CustomerType TO GOLD.
The rules are applied to the incoming events one by one and actions are triggered if the conditions are met.

9.10 Adding a Pattern Stage

A pattern is a template of an Oracle GoldenGate Stream Analytics application, with a business logic built into it. You can create pattern stages within the pipeline. Patterns are not stand-alone artifacts, they need to be embedded within a pipeline.

For detailed information about the various type of patterns, see Transforming and Analyzing Data using Patterns.

To add a pattern stage:
  1. Open a pipeline in the Pipeline Editor.
  2. Right-click the stage after which you want to add a pattern stage, click Add a Stage, and then select Pattern.
  3. Choose the required pattern from the list of available patterns.
  4. Enter a Name and Description for the pattern stage.
    The selected pattern stage is added to the pipeline.
  5. Click Parameters and provide the required values for the parameters.
  6. Click Visualizations and add the required visualizations to the pattern stage.

9.11 Adding a Query Group Stage

A query group is a combination of summaries (aggregation functions), group-bys, filters and a range window. Different query groups process your input in parallel and the results are combined in the query group stage output. You can also define input filters that process the incoming stream before the query group logic is applied, and result filters that are applied on the combined output of all query groups together.

A query group stage of the stream type applies processing logic to a stream. It is in essence similar to several parallel query stages grouped together for the sake of simplicity.

A query group stage of the table type can be added to a stream containing transactional semantic. For example, change data capture stream produced by the Oracle GoldenGate BigData plugin. The stage of this type will recreate the original database table in memory using the transactional semantics contained in the stream. You can then apply query groups to this table in memory, to run real-time analytics on your transactional data, without affecting the performance of your database.

9.11.1 Adding Query Group: Stream

You can apply aggregate functions with different groupbys and window ranges to your streaming data.

To add a query group stage of type stream:
  1. Open a pipeline in the Pipeline Editor.
  2. Right-click the stage after which you want to add a query group stage, click Add a Stage, select Query Group, and then Stream.

    You can add a query stage group only at the end of the pipeline.

  3. Enter a name and a description for the query group stage of the type stream and click Save.

    The query group stage of the type stream appears in the pipeline.

  4. On the Input Filters tab, click Add a Filter. See Adding a Filter to a Query Stage.

    These filters process data before it enters the query group stage. Hence, you can only see fields of the original incoming shape.

  5. On the Groups tab, click Add a Group. A group can consist one or many of summaries, filters, and group bys.
  6. Repeat the previous step to add as many groups as you want.
  7. On the Result Filters tab, click Add a Filter to filter the results.

    These filters process data before it exits the query group stage. Hence, you can see a combined set of fields in the outgoing shape.

  8. On the Visualizations tab, click Add a Visualization and add the required type of visualization. See Adding Chart Visualizations.

9.11.2 Adding Query Group: Table

You can apply aggregate functions with different groupbys and window ranges to a database table data recreated in memory.

To add a query group stage of the type table:
  1. Open a pipeline in the Pipeline Editor.
  2. Right-click the stage after which you want to add a query group stage, click Add a Stage, select Query Group, and then Table.
  3. Enter a name and a description for the Query Group Table and click Next.
  4. On the Transactions Settings screen, select a column in the Transaction Field drop-down list.

    The transaction column is a column from the output of the previous stage that carries the transaction semantics (insert/update/delete). Make sure that you use the values that correspond to your change data capture dataset. The default values work for Oracle GoldenGate change data capture dataset.

  5. On the Field Mappings screen, select the columns that carry the before and after transaction values from the original database table. For example, in case of Oracle GoldenGate, the before and after values have before_ and after_ as prefixes, respectively. Specify a column as primary key in the table.
  6. Click Save to create a query group stage of the type table.
    You can see the table configuration that you have specified while creating the table stage in the Table Configuration tab.
  7. On the Input Filters tab, click Add a Filter. See Adding a Filter to a Query Stage.
  8. On the Groups tab, click Add a Group. A group can consist one or many of summaries, filters, and groupbys.
  9. Repeat the previous step to add as many groups as you want.
  10. On the Result Filters tab, click Add a Filter to filter the results.
  11. On the Visualizations tab, click Add a Visualization and add the required type of visualization. See Adding Chart Visualizations.

9.12 Adding a Scoring Stage

To add a scoring stage:
  1. Open the required pipeline in Pipeline Editor.
  2. Right-click the stage after which you want to add a scoring stage, click Add a Stage, and then select Scoring.
  3. Enter a meaningful name and suitable description for the scoring stage and click Save.
  4. In the stage editor, enter the following details:
    1. Model name: Select the predictive model that you want to use in the scoring stage
    2. Model Version: Select the version of the predictive model
    3. Mapping: Select the corresponding model fields that appropriately map to the stage fields
You can add multiple scoring stages based on your use case.

9.13 Adding a Chart Visualization

Visualizations are graphical representations of the streaming data in a pipeline. You can add visualizations on all the stages in a pipeline, except on a target stage.

For more information on Chart Visualizations and Dashboards, see Adding Realtime Charts and Creating Dashboards.

9.14 Working with Live Output Table

The streaming data in the pipeline appears in a live output table. Select any stage in the pipeline to see its output.

Hide/Unhide Columns

In the live output table, right-click a column and click Hide to hide that column from the output. This option only hides the columns from the UI and does not remove them from the output. To unhide the hidden columns, click Columns and then click the eye icon to make the columns visible in the output.

Select/Unselect the Columns

Click the Columns link at the top of the output table to view all the columns available. Use the arrow icons to either select or unselect individual columns or all columns. Only the columns that you select appear in the output table and in the actual output when the pipeline is published.

Pause/Restart the Table

Click Pause/Resume to pause or resume the streaming data in the output table.

Perform Operations on Column Headers

Right-click on any column header to perform the following operations:

  • Hide: Hides the column from the output table. Click the Columns link and unhide the hidden columns.

  • Remove from output: Removes the column from the output table. Click the Columns link and select the columns to be included in the output table.

  • Rename: Renames the column to the specified name.

  • Function: Captures the column in Expression Builder using which you can perform various operations through the in-built functions.

Add a Timestamp

Include timestamp in the live output table by clicking the clock icon in the output table.

Reorder the Columns

Click and drag the column headers to right or left in the output table to reorder the columns.

9.15 Adding a Target Stage

To add a target stage:
  1. Open the required pipeline in Pipeline Editor.
  2. Right-click the stage after which you want to add a target stage, click Add a Stage, and then select Target.
  3. Enter a name and suitable description for the target.
  4. Click Save.

For more information on creating different target types, see Managing Targets.

9.16 Exporting and Importing a Pipeline and Its Dependent Artifacts

The export and import features let you migrate your pipeline and its contents between Oracle Stream Analytics systems (such as development and production). You also have the option to migrate only select artifacts. You can import a pipeline developed with the latest version of Oracle Stream Analytics. On re-import, the existing metadata is overwritten with the newly imported metadata if the pipeline is not published. You can delete the imported artifacts by right-clicking them and selecting Delete.

To export a pipeline:

  1. On the Catalog page, hover the mouse over, or select the pipeline that you want to export to another GGSA instance.
  2. Click the Export option that appears to your right side on the screen.
  3. The selected pipeline and its dependent artifacts are exported as a JSON zip file, to your computer's default Downloads folder.

To import a pipeline:

  1. Go to the GGSA instance to which you want to import the exported metadata.
  2. On the Catalog page, click Import.
  3. In the Import dialog box, click Select, to locate and select the exported zip file on your computer.
  4. Click Import.

The imported pipeline and its dependent artifacts are available on the Catalog page.

Note:

  • Each pipeline should have a unique name. If you are importing an updated version of a pipeline, you can retain the same name. If you are importing a new pipeline and if a pipeline with the same name already exists in the catalog, change the name of the pipeline that you are importing.
  • If you have already exported a pipeline with the same name, update the pipeline name as below:
    1. Create a directory exportUpdate.
    2. Copy the exported zip, say exportNameUpdateExample.zip, to the folder exportUpdate.
    3. Unzip the file exportNameUpdateExample.zip.
    4. Open the json file in edit mode.
    5. Search for pipeline/ artifact name in the json file. For example, if Nano pipeline was the name given to the pipeline, update it to Nano pipeline updated.
    6. Update the json file in exportNameUpdateExample.zip.
    7. Import this zip.
    8. The pipeline is automatically assigned a name, using the display name.
    9. The draft pipeline and publish pipeline topic are created as below:
      1. sx_Nanopipelineupdated_Nano_Stream_draft
      2. sx_Nanopipelineupdated_Nano_Stream_public

9.17 Publishing a Pipeline

You must publish a pipeline to make the pipeline available for all the users of Oracle Stream Analytics and send data to targets.

A published pipeline will continue to run on your Spark cluster after you exit the Pipeline Editor, unlike the draft pipelines which are undeployed to release resources.

To publish a pipeline:

  1. Open a draft pipeline in the Pipeline Editor.
  2. Click Publish.
    The Pipeline Settings dialog box opens.
  3. Update any required settings.

    Note:

    Make sure to allot more memory to executors in the scenarios where you have large windows.
  4. Click Publish to publish the pipeline.
    A confirmation message appears when the pipeline is published.
You can also publish a pipeline from the Catalog using the Publish option in the Actions menu.

9.18 Unpublishing a Pipeline

Unpublishing a pipeline from the Catalog page

  1. Go to the Catalog page and hover the mouse over the pipeline that you want to unpublish.
  2. Click the Unpublish icon that appears to your right side on the screen.
  3. On the Warning screen, click OK.

Unpublishing a pipeline from the Pipeline Editor

  1. Click the Unpublish button at the top right corner of the pipeline editor.
  2. On the Warning screen, click OK.

9.19 Using the Topology Viewer

Topology is a graphical representation and illustration of the connected entities and the dependencies between the artifacts.

The topology viewer helps you in identifying the dependencies that a selected entity has on other entities. Understanding the dependencies helps you in being cautious while deleting or undeploying an entity. Oracle Stream Analytics supports two contexts for the topology — Immediate Family and Extended Family.

You can launch the Topology viewer in any of the following ways:

Click the Show Topology icon at the top-right corner of the editor to open the topology viewer. By default, the topology of the entity from which you launch the Topology Viewer is displayed. The context of this topology is Immediate Family, which indicates that only the immediate dependencies and connections between the entity and other entities are shown. You can switch the context of the topology to display the full topology of the entity from which you have launched the Topology Viewer. The topology in an Extended Family context displays all the dependencies and connections in the topology in a hierarchical manner.

Note:

The entity for which the topology is shown has a grey box surrounding it in the Topology Viewer.

Immediate Family

Immediate Family context displays the dependencies between the selected entity and its child or parent.

The following figure illustrates how a topology looks in the Immediate Family.

Description of topology_viewer_immediate.png follows
Description of the illustration topology_viewer_immediate.png

Extended Family

Extended Family context displays the dependencies between the entities in a full context, that is if an entity has a child entity and a parent entity, and the parent entity has other dependencies, all the dependencies are shown in the Full context.

The following figure illustrates how a topology looks in the Extended Family.

Description of topology_viewer_full.png follows
Description of the illustration topology_viewer_full.png

9.20 Deleting a Stage in a Pipeline

You can delete any intermediate stage in a pipeline, without breaking the pipeline.

To delete a stage in a pipeline:
  1. Open the pipeline in the Pipeline Editor.
  2. Right-click the stage you want to delete, and click Delete Stages.
  3. On the Confirmation screen, click Delete.

Note:

  • You can delete stages only in unpublished pipelines.
  • You cannot delete a source stage.
  • If you delete any intermediate stage in a pipeline, all its child stages are also deleted.