13 Analyzing Data Streams
This topic only applies to Data Integration Platform Cloud Classic.
Stream Analytics is available only for user-managed Data Integration Platform Cloud instances.
Topics:
Planning Your Stream Analytics Installation
This topic applies only to Oracle user-managed services.
Download the following required software to your Data Integration Platform Cloud VM:
-
JDK 8 Update 131+
-
Spark version 2.2.1 with Hadoop 2.7 or higher
-
OSA-18.1.0.0.1.zip
Ensure that you have Google Chrome version 60+ downloaded on your local machine.
It’s assumed that you have an Oracle Cloud account with Database Cloud Service, Data Integration Platform Cloud, and Storage instances provisioned. If not, see Create Instances for Data Integration Platform Cloud
Provisioning Oracle Event Hub and Big Data Cloud Instances
This topic applies only to Oracle user-managed services.
Complete the steps in the following sections to provision the necessary service instances needed to use Oracle Stream Analytics with Data Integration Platform Cloud.
Provisioning an Oracle Event Hub — Dedicated Instance
This topic applies only to Oracle user-managed services.
- Log in to Oracle Cloud My Services Dashboard.
- Click Create Instance, then click All Services and select Event Hub — Dedicated.
- On the Instance page of the Create New Instance wizard, enter an Instance Name, Description, and Notification Email, and then click Next.
- On the Details page, complete the following fields, and then click Next:
- Select Basic or Recommended for Deployment Type.
- Click Edit for SSH Public Key and download new keys to your local machine.
- Select Enable REST Access, and then enter the Username and Password credentials for REST Proxy Acesss.
- Confirm your instance details and click Finish.
- After your Event Hub — Dedicated instance has been provisioned successfully, click Manage this Service and select Access Rules.
- Go to Actions and select Enable all rules.
Provisioning an Oracle Event Hub Cloud Instance
This topic applies only to Oracle user-managed services.
- Log in to Oracle Cloud My Services Dashboard.
- Click Create Instance, then click All Services and select Event Hub.
- On the Instance page of the Create New Instance wizard, enter an Instance Name, Description, and Notification Email.
- Select your Oracle Event Hub — Dedicated instance from the Hosted On menu.
- Enter the Number of Partitions and Retention Period in hours, and then click Next.
- Confirm your instance details and click Create.
Installing Oracle Stream Analytics
This topic applies only to Oracle user-managed services.
Configuring Stream Analytics
This topic applies only to Oracle user-managed services.
Initializing Metadata Store
This topic applies only to Oracle user-managed services.
Note:
If you do not have the database admin credentials, ask your database administrator to create a Oracle Stream Analytics database user and initialize the content under the user by using the SQL scripts available in theOSA-18.1.0.0.1/osa-base/sql
folder. The Oracle Stream Analytics database username must match the one configured in jetty-osa-datasource.xml
.
Changing the Admin Password
This topic applies only to Oracle user-managed services.
osaadmin
. You must create your own password to login by using that user ID.
Note:
This on-premise version of Oracle Stream Analytics doesn’t support role based access. All users have admin privileges. To create additional users with obfuscated passwords, see Adding Users.Setting up Runtime for Oracle Stream Analytics Server
This topic applies only to Oracle user-managed services.
Upgrading Oracle Stream Analytics
This topic applies only to Oracle user-managed services.
Metadata Upgrade
Use these steps if there is no additional configuration apart from the data source configuration and environment variables:
Administering Stream Analytics
This topic applies only to Oracle user-managed services.
Managing Users in Stream Analytics
This topic applies only to Oracle user-managed services.
In this release of Oracle Stream Analytics, user details are stored in a database. When you create Oracle Stream Analytics schema at the time of installation, the following database tables are populated with one record in each table:
-
osa_users
— table containing the users -
osa_user_roles
— table containing the user names and their associated roles
When you execute a query to pull in all the data from the osa_users
table, you can see the following:
select * from osa_users;
+----+----------+--------------------------------------+
| id | username | pwd |
+----+----------+--------------------------------------+
| 1 | osaadmin | MD5:201f00b5ca5d65a1c118e5e32431514c |
+----+----------+--------------------------------------+
where osaadmin
is the pre-configured user along with the encrypted password.
When you execute a query to pull in all the data from the osa_user_roles
table, you can see the following:
select * from osa_user_roles;
+---------+---------+
| user_id | role_id |
+---------+---------+
| 1 | 1 |
+---------+---------+
where role_id
of value 1
indicates that the user is an administrator.
Configuring Stream Analytics System Settings
This topic applies only to Oracle user-managed services.
Configuring User Preferences
This topic applies only to Oracle user-managed services.
General
To set/update user preferences:
-
Click the user name in the top right corner of the screen.
-
Click Preferences. The Preferences page opens.
Provides a set of general preferences that you can view and set according to your requirements.
Description of the illustration general_pref.png
Start Page
Select if you want the Home page, the Catalog page, or the Patterns page to appear as the Start Page.
Notifications
Provides a set of notifications preferences that you can view and set according to your requirements.
Description of the illustration notifications_pref.png
Show Information Notifications
Select this option if you want the information notifications to appear in the pipeline. This option is selected by default.
Information Notification duration (in seconds)
Choose the number of seconds for which the notifications appear. The default value is 5.
Catalog
Provides a set of catalog preferences that you can view and set according to your requirements.
Description of the illustration catalog_pref.png
Default Sorting Column
Select the column by which you want the columns to be sorted. This value will be used as the default for all columns until you change the value again.
Default Sorting Order
Select the order by which you want the columns to be sorted. This value will be used as the default value for all columns until you change the value again.
Default Page Size
Select the value to be used as the default page size. Based on the value selected, the number of records that appear on a page vary. This value will be used as the default for all pages until you change the value again.
Pipeline
Provides a set of pipeline preferences that you can view and set according to your requirements.
Description of the illustration application_ua_pref.png
Select Yes if you want to display the User Assistance text for the pipelines in the Pipeline Editor.
Live Output Stream
Provides a set of pipeline live output stream preferences that you can view and set according to your requirements.
Select a value that you want to be applied as the default table size for the data in Live Output Stream of a pipeline.
Timestamp
Provides a set of pipeline timestamp preferences that you can view and set according to your requirements.
Map
Provides a set of map preferences that you can view and set according to your requirements.
Select a value that you want to be used as the default tile layer preference in the geo fences.
Working with Stream Analytics
This topic applies only to Oracle user-managed services.
Home Page
The Home page is the first page that you see when you login to Stream Analytics. This page lists the industry verticals that Stream Analytics supports.
Each industry vertical has a tag associated with it and the tags are case-sensitive.
-
Distributed Intelligence for IOT - Acquire, analyze, and act on high-volume, high-velocity data from sensors and devices both at the edge and in the data center in real-time. Tag for this vertical is IOT.
-
Risk and Fraud Management - Leverage industry's best stream processing platform to assess risk and prevent financial fraud in real-time. Tag for this vertical is risk.
-
Transportation and Logistics - Manage fleet, track assets, and improve supply chain efficiencies by combining streaming data with Oracle's advanced spatial functions. Tag for this vertical is transportation.
-
Customer Experience and Consumer Analytics - Know the sentiment of your customers to reduce churn, improve loyalty, make offers, and attract customers in real-time. Tag for this vertical is customer.
-
Telecommunications - Pro actively monitor your networks, predict network failures, and prevent distributed denial of service type attacks. Tag for this vertical is telecom.
-
Retail — Understand and Apply instant Retail Shopping trends, instigate beneficial shelf life patterns and placements, be responsive to Customers cart utilization and interoperate with advanced Vending Machines. Tag for this vertical is retail.
The Home page is as shown below:
Description of the illustration home_page.png
You can navigate to the Catalog or the Patterns page from the home page to get started with Stream Analytics.
About the Catalog
The Catalog page is the location where resources including pipelines, streams, references, maps, connections, targets, dashboards, predictive models, custom jars, visualizations, and cubes are listed. This is the go-to place for you to perform any tasks in Stream Analytics.
You can mark a resource as a favorite in the Catalog by clicking on the Star icon. Click the icon again to remove it from your favorites. You can also delete a resource or view its topology using the menu icon to the right of the favorite icon.
The tags applied to items in the Catalog are also listed on the screen below the left navigation pane. You can click any of these tags to display only the items with that tag in the Catalog. The tag appears at the top of the screen. Click Clear All at the top of the screen to clear the Catalog and display all the items.
You can include or exclude pipelines, streams, references, predictive models, geo fences, connections, targets, custom jars, visualizations, dashboards, and cubes using the View All link in the left panel under Show Me. When you click View All, a check mark appears beside it and all the components are displayed in the Catalog.
When you want to display or view only a few or selective items in the Catalog, deselect View All and select the individual components. Only the selected components will appear in the Catalog.
Typical Workflow for Administering Stream Analytics
The typical workflow lists the artifacts required to create a pipeline in Stream AnalyticsStream Analytics.
The prerequisites for a pipeline are:
-
A connection is required to create a stream, except for a file stream.
-
A stream is required to create a pipeline.
Cache Configuration for Coherence
Stream Analytics requires a special coherence cache configuration and the proxy schema, so that it can connect to the coherence.
To enrich stream data with external coherence cluster reference data, you must access external coherence cluster using extend client APIs. To access external cluster as client, you need to configure cache-config
with ExtendTcpCacheService
and ExtendTcpInvocationService
.
Configure the Coherence Cluster
Make sure that you have Coherence for Java is installed.
To configure the external cluster as client:
-
Create an XML file named
cache-config.xml
. -
Copy the following XML to the file:
<?xml version="1.0"?> <cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"> <caching-scheme-mapping> <cache-mapping> <cache-name> externalcache* </cache-name> <schema-name> remote </schema-name> </cahce-mapping> </caching-scheme-mapping> <caching-schemes> <remote-cache-scheme> <scheme-name> remote </scheme-name> <service-name> ExtendTcpCacheService </service-name> <initiator-config> <tcp-initiator> <remote-addresses> <socket-address> <address>localhost </address> <port>9099</port> </socket-address> </remote-addresses> </tcp-initiator> <outgoing-message-handler> <request-timeout>5s</request-timeout> </outgoing-message-handler> </initiator-config> </remote-cache-scheme> <remote-invocation-scheme> <scheme-name>extend-invocation</scheme-name> <service-name>ExtendTcpInvocationService</service-name> <initiator-config> <tcp-initiator> <remote-addresses> <socket-address> <address>localhost</address> <port>9099</port> </socket-address> </remote-addresses> </tcp-initiator> <outgoing-message-handler> <request-timeout>5s</request-timeout> </outgoing-message-handler> </initiator-config> </remote-invocation-scheme> </caching-schemes> </cache-config>
-
Save and close the file.
-
Test the connection to the cluster.
InvocationService service = (InvocationService) CacheFactory.getConfigurableCacheFactory().ensureService("ExtendTcpInvocationService");
ensureService()
will throw exception if there is no coherence cluster available with the given host and port. -
Create a coherence reference using a coherence connection.
-
Register the coherence as reference.
The following is the sample code to register the coherence as reference:
override def initialize():Unit = {
repartition = true
val externalEvent = EventType("externalorders",IntAttr("orderId"), VarCharAttr("orderDesc", 20))
val sExtSrcProps = Map(EXT_URL -> "",EXT_ENTITY_NAME -> "externalcache")
val jExtSrcProps = new java.util.HashMap[String,String](sExtSrcProps)
val converter = ConverterFactory(ConverterType.COHERENCE,externalEvent)
cc.registerEventType(externalEvent)
cc.registerRelation(externalEvent).onExternal(jExtSrcProps,ExtSourceType.COHERENCE,converter)
}
def main(args: Array[String]) {
cql = "istream(select R.orderId as orderId, R.orderStatus as orderStatus, Ext.orderDesc as orderDesc from orders[now] as R, externalorders as Ext where R.orderId = Ext.orderId)"
name = "CoherenceCorrelation"
processOrders(args)
}
}
// EXT_URL is not used for coherence as reference , currently used for webservice & database, so this will be set to EMPTY
//EXT_ENTITY_NAME is the cache name of the external coherence cluster
For the above example, coherence cache must have key as orderId <Integer>
and value as Map of values for orderId and orderDesc
. A sample cache similar to the following will populate:
NamedCache cache = CacheFactory.getCache("externalcache");
Map<String,Object> order1 = new HashMap<String, Object>();
order1.put("orderId", new Integer(1));
order1.put("orderDesc", "HP Deskjet v2");
Map<String,Object> order2 = new HashMap<String, Object>();
order2.put("orderId", new Integer(2));
order2.put("orderDesc", "Oracle Database 12");
MapString,Object> order3 = new HashMap<String, Object>();
order3.put("orderId", new Integer(3));
order3.put("orderDesc", "Apple iPhone6s");
Map<String,Object> order4 = new HashMap<String, Object>();
order4.put("orderId", new Integer(4));
order4.put("orderDesc", "Logitech Mouse");
cache.put(1,order1);
cache.put(2,order2);
cache.put(3,order3);
cache.put(4,order4);
Creating a Stream
A stream is a source of events with a given content (shape).
To create a stream:
-
Navigate to Catalog.
-
Select Stream in the Create New Item menu.
-
Provide details for the following fields on the Type Properties page and click Next:
-
Name — name of the stream
-
Description — description of the stream
-
Tags — tags you want to use for the stream
-
Stream Type — select suitable stream type. Supported types are File, GoldenGate, JMS, and Kafka.
-
-
Provide details for the following fields on the Source Details page and click Next:
When the stream type is File:
-
File Path or URL — the location of the file that you want to upload
-
Read whole content — select this option if you want to read the whole content of the file
-
Number of events per batch — the number of events that you want to process per batch
-
Loop — select this option if you want to process the file in a loop
-
Data Format — the format of the data. The supported types are: CSV and JSON.
When the stream type is GoldenGate:
-
Connection — the connection for the stream
-
Topic name — the topic name that receives events you want to analyze
-
Data Format — the format of the data. The supported types are: CSV, JSON, AVRO. AVRO is a data serialization system.
When the stream type is JMS:
-
Connection — the connection for the stream
-
Jndi name — the Jndi that reads messages from topics, distributed topics, queues and distributed queues
-
Client ID — the client to be used for durable subscriber
-
Message Selector — the message selector to filter messages. If your messaging application needs to filter the messages it receives, you can use a JMS API message selector, which allows a message consumer to specify the messages it is interested in. Message selectors assign the work of filtering messages to the JMS provider rather than to the application.
A message selector is a
String
that contains an expression. The syntax of the expression is based on a subset of the SQL92 conditional expression syntax. The message selector in the following example selects any message that has aNewsType
property that is set to the value'Sports'
or'Opinion'
:NewsType = ’Sports’ OR NewsType = ’Opinion’
The
createConsumer
andcreateDurableSubscriber
methods allow you to specify a message selector as an argument when you create a message consumer. -
Subscription ID — the subscription id for durable selector
-
Data Format — the format of the data. The supported types are: CSV, JSON, AVRO, MapMessage. MapMessage is supported only for JNDI based streams.
If the data format is AVRO, you must also specify the message schema by setting
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
andKEY_SERIALIZER_CLASS_CONFIG
parameters asByteArraySerializer
.A MapMessage object is used to send a set of name-value pairs. The names are String objects, and the values are primitive data types in the Java programming language. The names must have a value that is not null, and not an empty string. The entries can be accessed sequentially or randomly by name. The order of the entries is undefined.
When the stream type is Kafka:
-
Connection — the connection for the stream
-
Topic name — the topic name that receives events you want to analyze
-
Data Format — the format of the data within the stream. The supported types are: CSV, JSON, AVRO.
If the data format is AVRO, you must also specify the message schema by setting
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
andKEY_SERIALIZER_CLASS_CONFIG
parameters asByteArraySerializer
.
-
-
Select one of the mechanisms to define the shape on the Shape page:
-
Infer Shape — detects the shape automatically from the input data stream.
You can infer the shape from Kafka, JSON schema file, or CSV message/data file. You can also save the auto detected shape and use it later.
-
Select Existing Shape — lets you choose one of the existing shapes from the drop-down list.
-
Manual Shape — populates the existing fields and also allows you to add or remove columns from the shape. You can also update the datatype of the fields.
-
A stream is created with specified details.
CSV Data for Pre-defined Formats
When your data format is CSV, select a predefined format based on the variations of CSV data that differs due to the originating source of these CSV. The following table describes the CSV data for each of these predefined formats:
CSV Predefined Format | Description |
---|---|
|
Standard comma separated format, as for |
|
Excel file format (using a comma as the value delimiter). |
|
Default |
|
Default |
|
Default |
|
Default |
|
Comma separated format as defined by |
|
Tab-delimited format |
Capabilities of JMS Source
The capabilities of JMS Source are listed in the following table:
Capability | Description | Comments |
---|---|---|
Ability to connect to JMS Cluster |
JMS consumer should be able to connect to JMS cluster and handle JMS server fail-over |
|
Message Format support |
Map and TextMessage (JSON, CSV and AVRO) |
Does not support xml and object |
Message selector |
JMS message selector to use to filter messages. Only messages that match the selector will produce events. |
|
Re-connection |
Reconnect to JMS server or JMS cluster |
|
Read messages from queue/distributed queue |
|
|
Read messages from topic |
Read messages from JMS topic. By default the subscriber is non-durable |
|
Support for Durable subscriber |
A durable subscriber registers a durable subscription by specifying a unique identity that is retained by the JMS provider. If the consumer reconnects to JMS topic, it would read messages from where it last read. |
|
T3 Support |
Weblogic JMS Protocol |
|
JMS Server Clean Up
When you create a JMS stream and select the durable subscription option (by providing client ID and subscription ID value), Stream Analytics creates the durable subscription (if not already present) when the pipeline using this stream is running. When you come out of the pipeline or unpublish the pipeline(or kill the running pipeline), the durable subscription remains on the JMS Server. It is advisable to delete the durable subscription from the JMS Server and clean up the resources, if you do not intend to publish this pipeline anymore.
Creating a Reference
The reference defines a read-only source of reference data to enrich a stream. A stream containing a customer name could use a reference containing customer data to add the customer’s address to the stream by doing a lookup using the customer name.
A database reference is a reference to specified table in the database. With cache enabled for database reference, when the values gets pulled from database, they are maintained in coherence cache for reference from where they can be served on next request. A database reference requires a database connection.
A coherence reference can be any external cache defined in coherence cluster that can have data from an external system.
To create a reference:
-
Navigate to Catalog.
-
Select Reference in the Create New Item menu.
-
Provide details for the following fields on the Type Properties page and click Next:
-
Name — name of the reference
-
Description — description of the reference
-
Tags — tags you want to use for the reference
-
Reference Type — the reference type of the reference. The supported reference types are: Coherence and Database.
-
-
Provide details for the following fields on the Source Details page and click Next:
When the reference type is Coherence, enter or select appropriate values for:
-
Connection — the connection for the coherence reference
-
Cache name — the name of the cache to enable caching for better performance at the cost of higher memory usage of the Spark applications. Caching is supported only for single equality join condition. When you update the cache, the application will receive the updated data very quickly.
Coherence reference has data in key-value pairs. Key is object type and value is
Map<String,Object>
.Map<String,Object>
is map of attribute names and values, attributes list should match with external event type. In this release, only external schema for key and value s supported.When the reference type is Database Table, enter or select appropriate values for:
-
Connection — the connection for the database reference
-
Enable Caching — select this option if you want to enable caching
-
Expiry Delay — the amount of time from last update that entries will be kept by the cache before being marked as expired. Any attempt to read an expired entry will result in a reloading of the entry from the configured cache store. This field is enabled only when caching is enabled.
-
-
Provide details for the following fields on the Shape page and click Save:
When the reference type is Coherence:
-
Select Existing Shape — select a shape that you want to use for the reference
Remember:
Ensure that you do not use any of the CQL reserved words as the column names. If you use the reserved keywords, you cannot deploy the pipeline. -
Manual Shape — select this option if you want to define your own shape
Note:
When you load coherence data, ensure that you include precision and scale for number type. Only when these values are specified, the join works. For example,NamedCache cache = CacheFactory.getCache("externalcachetimestamp"); java.math.BigDecimal big10 = new java.math.BigDecimal("10",new MathContext(58)).setScale(56, RoundingMode.HALF_UP); Map<String,Object> order1 = new HashMap<String, Object>(); order1.put("strValue", "Test"); order1.put("intervalValue", "+000000002 03:04:11.330000000"); order1.put("orderTag", big10); cache.put(big10,order1);
When the reference type is Database Table:
-
Shape Name — select a shape that you want to use for the reference
-
When the datatype of the table data is not supported, the table columns do not have auto generated datatype. Only the following datatypes are supported:
-
numeric
-
interval day to second
-
text
-
timestamp
(without timezone) -
date time
(without timezone)Note:
Thedate
column cannot be mapped totimestamp
. This is a limitation in the current release.
A reference is created with the specified details.
Limitations of Coherence as Reference
With coherence as reference, there are a few limitations:
-
You cannot test the connection
-
You need to specify the cache name manually
-
Only equal operator is allowed while establishing a correlation with coherence reference
-
You must use manual shape
Creating a Dashboard
Dashboard is a visualization tool that helps you look at and analyze the data related to a pipeline based on various metrics like visualizations. A dashboard can have visualizations created out of cubes as well.
Dashboard is an analytics feature. You can create dashboards in Stream Analytics to have a quick view at the metrics.
To create a dashboard:After you have created the dashboard, it is just an empty dashboard. You need to start adding visualizations to the dashboard.
Editing a Dashboard
To edit a dashboard:
-
Click the required dashboard in the catalog.
The dashboard opens in the dashboard editor.
-
Click the Add a new visualization icon to see a list of existing visualizations. Visualizations from the pipelines and as well as from the cube explorations appear here. Go through the list, select one or more visualizations and add them to the dashboard.
-
Click the Specify refresh interval icon to select the refresh frequency for the dashboard. This is applicable only for cube based visualizations not applicable for streaming charts created out of pipeline.
This just a client side setting and is not persisted with the Superset Version
0.17.0
. -
Click the Apply CSS to the dashboard icon to select a CSS. You can also edit the CSS in the live editor.
You can also see the active filter applied to the dashboard by clicking the Active dashboard filters icon. You can save the link to the dashboard or email the link to someone using the Copy the link to the clipboard and Email the link icons respectively.
-
Click the Save icon to save the changes you have made to the dashboard.
-
Hover over the added visualization, click the Explore chart icon to open the chart editor of the visualization.
Description of the illustration explore_chart.pngYou can see the metadata of the visualization. You can also move the chart around the canvas, refresh it, or remove it from the dashboard.
A cube exploration looks like the following:
Description of the illustration cube_exploration.pngThe various options like time granularity, group by, table timestamp format, row limit, filters, and result filters add more granularity and details to the dashboard.
-
Click Save as to make the following changes to the dashboard:
-
Overwrite the visualization
-
Overwrite the current visualization with a different name
-
Add the visualization to an existing dashboard
-
Add the visualization to a new dashboard
-
Creating a Cube
Cube is a data structure that helps in quickly analyzing the data related to a business problem on multiple dimensions.
To create a cube:
Creating a Target
The target defines a destination for output data coming from a pipeline.
To create a target:
-
Navigate to Catalog.
-
Select Target in the Create New Item menu.
-
Provide details for the following fields on the Type Properties page and click Save and Next:
-
Name — name of the target
-
Description — description of the target
-
Tags — tags you want to use for the target
-
Target Type — the transport type of the target. Supported types are JMS, Kafka and Rest. The target is a sink for the output event. Each type of target is a different sink system and therefore different configuration parameters are required for different types.
-
-
Provide details for the following fields on the Target Details page and click Next:
When the target type is JMS:
-
Connection — the connection for the target
-
Jndi name — the topic or queue name defined in Jndi to be used in the target
-
Data Format — select a suitable data format. This is a mandatory field. The supported data format types are: CSV and JSON.
When the target type is Kafka:
-
Connection — the connection for the target
-
Topic Name — the Kafka topic to be used in the target
-
Data Format — select a suitable data format. This is a mandatory field. The supported data format types are: CSV and JSON.
When the target type is REST:
-
URL — enter the REST service URL. This is a mandatory field.
-
Custom HTTP headers — set the custom headers for HTTP. This is an optional field.
-
Batch processing — select this option to send events in batches and not one by one. Enable this option for high throughput pipelines. This is an optional field.
-
Data Format — select a suitable data format. This is a mandatory field.
Click Test connection to check if the connection has been established successfully.
Testing REST targets is a heuristic process. It uses proxy settings. The testing process uses GET request to ping the given URL and returns success if the server returns
OK (status code 200)
. The return content is of the type ofapplication/json
. -
-
Provide details for the following fields on the Data Format page and click Next:
When the data format type is CSV:
-
CSV Predefined Format — select a predefined CSV format. This supported formats are: Excel, InfomixUnload, InfomixUnloadCsv, MySQL, PostgreSQLCsv, PostgreSQLText.
-
Create the header row — select this option if you want to create a header row in the target.
When the data format type is JSON:
-
Create nested json object — select this option if you want a nested json object to be created for the target
Description of the illustration create_target_dataformat.png
-
-
Select one of the mechanisms to define the shape on the Shape page and click Save:
-
Select Existing Shape lets you choose one of the existing shapes from the drop-down list.
-
Manual Shape populates the existing fields and also allows you to add or remove columns from the shape. You can also update the datatype of the fields.
-
A target is created with specified details.
Creating Target from Pipeline Editor
Alternatively, you can also create a target from the pipeline editor. When you click Create in the target stage, you are navigated to the Create Target dialog box. Provide all the required details and complete the target creation process. When you create a target from the pipeline editor, the shape gets pre-populated with the shape from the last stage.
Creating a Geo Fence
Geo fences are further classified into two categories: manual geo fence and database-based geo fence.
Create a Manual Geo Fence
To create a manual geo fence:
-
Navigate to the Catalog page.
-
Click Create New Item and select Geo Fence from the drop-down list.
The Create Geo Fence dialog opens.
-
Enter a suitable name for the Geo Fence.
-
Select Manually Created Geo Fence as the Type.
-
Click Save.
The Geo Fence Editor opens. In this editor you can create the geo fence according to your requirement.
-
Within the Geo Fence Editor, Zoom In or Zoom Out to navigate to the required area using the zoom icons in the toolbar located on the top-left side of the screen.
You can also use the Marquee Zoom tool to zoom a specific area on the map. You can mark an area using the marquee zoom and that area in map is zoomed.
-
Click the Polygon Tool and mark the area around a region to create a geo fence.
-
Enter a name and description, and click Save to save your changes.
Update a Manual Geo Fence
To update a manual geo fence:
-
Navigate to the Catalog page.
-
Click the name of the geo fence you want to update.
The Geo Fence Editor opens. You can edit/update the geo fence here.
Search Within a Manual Geo Fence
You can search the geo fence based on the country and a region or address. The search field allows you search within the available list of countries. When you click the search results tile in the left center of the geo fence and select any result, you are automatically zoomed in to that specific area.
Delete a Manual Geo Fence
To delete a manual geo fence:
-
Navigate to Catalog page.
-
Click Actions, then select Delete Item to delete the selected geo fence.
Create a Database-based Geo Fence
To create a database-based geo fence:
-
Navigate to Catalog page.
-
Click Create New Item and then select Geo Fence from the drop-down list.
The Create Geo Fence dialog opens.
-
Enter a suitable name for the geo fence.
-
Select Geo Fence from Database as the Type.
-
Click Next and select Connection.
-
Click Next.
All tables that have the field type as
SDO_GEOMETRY
appear in the drop-down list. -
Select the required table to define the shape.
-
Click Save.
Note:
You cannot edit/update database-based geo fences.Delete a Database-based Geo Fence
To delete a database-based geo fence:
-
Navigate to Catalog page.
-
Click Actions and then select Delete Item to delete the selected geo fence.
Display the Map Using Tile Layers
Tile layer is the base map that provides immediate geographic context. Tiles are stored in the map tile server. <ph ishcondition="Product_Family=Cloud" varref="streaming">Stream Analytics</ph><ph ishcondition="Product_Family=OnPremise" varref="osa">Oracle Stream Analytics</ph> supports two types of tile layers. Open Street Maps tile layer is a free map. And, Elocation tile layer is an Oracle tile layer. These tile layers contains huge amount of data pertaining to:
-
Roads, railways, waterways, etc.
-
Restaurants, shops, stations, ATMs, and more
-
Walking and cycling paths
-
Buildings, campuses, etc.
You can choose if you would like to see the map in Elocation tile layer or Open Street Maps tile layer. To set your preference:
-
Click the user name in the top right corner of the screen.
-
Click Preferences. The Preferences page opens.
-
Click Map.
-
Under Tile Layer, choose Open Street Maps Tile Layer option from the drop-down list.
-
Click Save. The map looks like this:
-
To display the map in Elocation tile layer, follow steps 1 to 3.
-
From the Tile Layer drop-down list, choose Elocation Tile Layer.
-
Click Save. The map looks like this:
Creating a Predictive Model
Limited Support for Predictive Models
The menu commands for creating Predictive Models and Scoring Stages are marked Beta, for example, Predictive Model (Beta). The Beta label indicates that the functionality has been tested, but is not fully supported. The import and scoring of Predictive Models might contain undocumented limitations and you should use them as is.
Creating a Custom Jar
A custom jar is a user-supplied Jar archive containing Java classes for custom stage types or custom functions that will be used within a pipeline.
Creating a Pipeline
A pipeline is a Spark application where you implement your business logic. It can have multiple stages such as a query stage, a pattern stage, a business rule stage, a query group stage, a custom stage and many more.
To create a pipeline:
-
Navigate to Catalog.
-
Select Pipeline in the Create New Item menu.
-
Provide details for the following fields and click Save:
-
Name — name of the pipeline
-
Description — description of the pipeline
-
Tags — tags you want to use for the pipeline
-
Stream — the stream you want to use for the pipeline
-
A pipeline is created with specified details.
Configuring a Pipeline
You can configure the pipeline to use various stages like query, pattern, rules, query group, scoring, and custom stage from custom jars.
Exporting and Importing a Pipeline and Its Dependent Artifacts
The export and import feature lets you migrate your pipeline and its contents between Stream Analytics systems (such as development and production) in a matter of few clicks. You also have the option to migrate only select artifacts. You can import a pipeline developed with the latest version of Stream Analytics. On re-import, the existing metadata is overwritten with the newly imported metadata if the pipeline is not published. You can delete the imported artifacts by right-clicking them and selecting Delete.
-
Cubes
-
Dashboards
-
Custom Stages
-
Visualizations
-
File Streams
-
Predictive Models
Publishing a Pipeline
You must publish a pipeline to make the pipeline available for all the users of Stream Analytics and send data to targets.
A published pipeline will continue to run on your Spark cluster after you exit the Pipeline Editor, unlike the draft pipelines which are undeployed to release resources.
To publish a pipeline:
Using the Topology Viewer
Topology is a graphical representation and illustration of the connected entities and the dependencies between the artifacts.
The topology viewer helps you in identifying the dependencies that a selected entity has on other entities. Understanding the dependencies helps you in being cautious while deleting or undeploying an entity. Stream Analytics supports two contexts for the topology — Immediate Family and Extended Family.
You can launch the Topology viewer in any of the following ways:
-
Select Show topology from the Catalog Actions menu to launch the Topology Viewer for the selected entity.
Description of the illustration show_topology_catalog_actions_menu.png -
Click the Show Topology icon in the Pipeline Editor.
Description of the illustration show_topology_exploration_editor.png
Click the Show Topology icon at the top-right corner of the editor to open the topology viewer.By default, the topology of the entity from which you launch the Topology Viewer is displayed. The context of this topology is Immediate Family, which indicates that only the immediate dependencies and connections between the entity and other entities are shown. You can switch the context of the topology to display the full topology of the entity from which you have launched the Topology Viewer. The topology in an Extended Family context displays all the dependencies and connections in the topology in a hierarchical manner.
Note:
The entity for which the topology is shown has a grey box surrounding it in the Topology Viewer.Immediate Family
Immediate Family context displays the dependencies between the selected entity and its child or parent.
The following figure illustrates how a topology looks in the Immediate Family.
Description of the illustration topology_viewer_immediate.png
Extended Family
Extended Family context displays the dependencies between the entities in a full context, that is if an entity has a child entity and a parent entity, and the parent entity has other dependencies, all the dependencies are shown in the Full context.
The following figure illustrates how a topology looks in the Extended Family.
Working with Patterns
This topic applies only to Oracle user-managed services.
About Stream Analytics Patterns
This topic applies only to Oracle user-managed services.
Click Patterns on the Home page to see all the available patterns. Use the filters at left to view different categories of pattern. You can see full descriptions and learn more about each pattern by clicking the user assistant icon. Click again to hide the extra information.
A pattern provides you with the results displayed in a live output stream based on common business scenarios.
Note:
While entering data in the fields for a specific pattern, ensure that the data you enter corresponds to the datatype of the field. If there is a mismatch between the entered data and the datatype, the pattern will not deploy and throw an error.You can include or exclude patterns based on their categories using the View All link in the left panel under Show Me. When you click View All, a check mark appears next to it and all the patterns are displayed on the page.
When you want to display/view only a few/selective patterns, deselect View All and select the individual patterns. Only the selected patterns are shown in the catalog.
Description of the illustration patterns_showtime.png
The following table lists the categories of patterns:
Category | Pattern |
---|---|
Enrichment |
Reverse Geo Code: Near By Left Outer Join |
Outlier |
Fluctuation |
Inclusion |
Union Left Outer Join |
Missing Event |
'A' Not Followed by 'B' Detect Missing Event |
Spatial |
Proximity: Stream with Geo Fence Geo Fence Spatial: Speed Interaction: Single Stream Reverse Geo Code: Near By Geo Code Spatial: Point to Polygon Interaction: Two Stream Proximity: Two Stream Direction Reverse Geo Code: Near By Place Proximity: Single Stream Geo Filter |
Filter |
Eliminate Duplicates Fluctuation |
State |
'A' Not Followed by 'B' Inverse W Detect Missing Event W 'A' Followed by 'B' ‘B’ Not Preceded by ‘A’ |
Finance |
Inverse W W |
Trend |
'A' Not Followed by 'B Top N Change Detector Up Trend Detect Missing Event Down Trend 'A' Followed by 'B' Detect Duplicates Bottom N |
Shape Detector |
Inverse W W |
Statistical |
Correlation Quantile |
Understanding Expression Builder Functions
This topic applies only to Oracle user-managed services.
Topics:
What are Bessel Functions?
The mathematical cylinder functions for integers are known as Bessel functions.
Function Name | Description |
---|---|
|
Returns the modified Bessel function of order 0 of the double argument as a double |
|
Returns the exponentially scaled modified Bessel function of order 0 of the double argument as a double |
|
Returns the modified Bessel function of order 1 of the double argument as a double |
|
Returns the exponentially scaled modified Bessel function of order 1 of the double argument as a double |
|
Returns the Bessel function of the first kind of order n of the argument as a double |
|
Returns the modified Bessel function of the third kind of order n of the argument as a double |
|
Returns the exponentially scaled modified Bessel function of the third kind of order 0 of the double argument as a double |
|
Returns the exponentially scaled modified Bessel function of the third kind of order 1 of the double argument as a double |
|
Returns the Bessel function of the second kind of order n of the double argument as a double |
What are Conversion Functions?
The conversion functions help in converting values from one data type to other.
The following conversion functions are supported in this release:
Function Name | Description |
---|---|
|
Converts the given value to bigdecimal |
|
Converts the given value to logical |
|
Converts the given value to datetime |
|
Converts the given value to double |
|
Converts the given value to float |
|
Converts the given value to integer |
|
Converts the given value to long |
|
Converts the given value to string |
What are Date Functions?
The following date functions are supported in this release:
Function Name | Description |
---|---|
|
Returns day of the date |
|
Returns event timestamp from stream |
|
Returns hour of the date |
|
Returns minute of the date |
|
Returns month of the date |
|
Returns nanosecond of the date |
|
Returns second of the date |
|
Returns the system’s timestamp on which the application is running |
|
Returns the provided timestamp in required time format |
|
Returns the current output time |
|
Returns year of the date |
What are Geometry Functions?
The Geometry functions allow you to convert the given values into a geometrical shape.
The following interval functions are supported in this release:
Function Name | Description |
---|---|
|
Returns a 2–dimensional point type geometry from the given latitude and longitude. The default SRID is 8307. The return value is of the datatype |
|
Returns distance between the first set of latitude, longitude and the second set of latitude, longitude values. The default SRID is 8307. The return value is of the datatype |
What are Interval Functions?
The Interval functions help you in calculating time interval from given values.
The following interval functions are supported in this release:
Function Name | Description |
---|---|
|
Converts the given value to an The return value is of the datatype |
|
Converts a string in format The return value is of the datatype |
What are Math Functions?
The math functions allow you to perform various mathematical operations and calculations ranging from simple to complex.
The following math functions are supported in this release:
Function Name | Description |
---|---|
|
Computes the remainder operation on two arguments as prescribed by the IEEE 754 standard |
|
Returns the absolute value of a number |
|
Returns arc cosine of a value |
|
Returns arc sine of a value |
|
Returns arc tangent of a value |
|
Returns polar angle of a point ( |
|
Returns binomial coefficient of the base raised to the specified power |
|
BitMask with BitsSet (From, To) |
|
Returns cubic root of the specified value |
|
Rounds to ceiling |
|
Returns the first floating-point argument with the sign of the second floating-point argument |
|
Returns cosine of a value |
|
Returns cosine hyperbolic of a value |
|
Returns exponent of a value |
|
More precise equivalent of |
|
Returns factorial of a natural number |
|
Rounds to floor |
|
Returns the unbiased exponent used in the representation of a double |
|
Returns a deterministic seed as an integer from a (seemingly gigantic) matrix of predefined seeds |
|
Returns an integer hashcode for the specified double value |
|
Returns square root of sum of squares of the two arguments |
|
Returns the least significant 64 bits of this UUID's 128 bit value |
|
Calculates the log value of the given argument to the given base, where |
|
Returns the natural logarithm of a number |
|
Calculates the log value of the given argument to base 10 |
|
Calculates the log value of the given argument to base 2 |
|
Returns the natural logarithm (base e) of the factorial of its integer argument as a double |
|
Returns the factorial of its integer argument (in the range k >= 0 && k < 21) as a long |
|
Returns the maximum of 2 arguments |
|
Returns the minimum of 2 arguments |
|
Returns modulo of a number |
|
Returns the most significant 64 bits of this UUID's 128 bit value |
|
Returns the floating-point number adjacent to the first argument in the direction of the second argument |
|
Returns the floating-point value adjacent to the input argument in the direction of negative infinity |
|
Returns the floating-point value adjacent to the input argument in the direction of positive infinity |
|
Returns m raised to the nth power |
|
Returns the double value that is closest in value to the argument and is equal to a mathematical integer |
|
Rounds to the nearest integral value |
|
Returns d × 2scaleFactor rounded as if performed by a single correctly rounded floating-point multiply to a member of the double value set |
|
Returns signum of an argument as a double value |
|
Returns sine of a value |
|
Returns sine hyperbolic of a value |
|
Returns square root of a value |
|
Returns the correction term of the Stirling approximation of the natural logarithm (base e) of the factorial of the integer argument as a double |
|
Returns tangent of a value |
|
Returns tangent hyperbolic of a value |
|
Converts the argument value to degrees |
|
Returns the measurement of the angle in radians |
|
Returns the size of an ulp of the argument |
What are Null-related Functions?
The following null-related functions are supported in this release:
Function Name | Description |
---|---|
|
Replaces null with a value of the same type |
What are Statistical Functions?
Statistical functions help you in calculating the statistics of different values.
The following statistical functions are supported in this release:
Function Name | Description |
---|---|
|
Returns the area from zero to |
|
Returns the area under the right hand tail (from |
|
Returns the sum of the terms 0 through |
|
Returns the sum of the terms |
|
Returns the area under the left hand tail (from 0 to |
|
Returns the area under the right hand tail (from |
|
Returns the error function of the normal distribution |
|
Returns the complementary error function of the normal distribution |
|
Returns the gamma function of the arguments |
|
Returns the integral from |
|
Returns the incomplete beta function evaluated from zero to |
|
Returns the incomplete gamma function |
|
Returns the complemented incomplete gamma function |
|
Returns the natural logarithm of the gamma function |
|
Returns the sum of the terms 0 through |
|
Returns the sum of the terms |
|
Returns the area under the normal (Gaussian) probability density function, integrated from minus infinity to |
|
Returns the value for which the area under the normal (Gaussian) probability density function is equal to the argument |
|
Returns the sum of the first |
|
Returns the sum of the terms |
|
Returns the integral from minus infinity to |
|
Returns the value, for which the area under the Student-t probability density function is equal to |
What are String Functions?
The following String functions are supported in this release:
Function Name | Description |
---|---|
|
Returns the first non-null expression in the list. If all expressions evaluate to null, then the COALESCE function will return null |
|
Returns concatenation of values converted to strings |
|
Returns first index of |
|
Returns a specified text expression, with the first letter of each word in uppercase and all other letters in lowercase |
|
Returns the length of the specified string |
|
Returns a matching pattern |
|
Converts the given string to lower case |
|
Pads the left side of a string with a specific set of characters (when |
|
Removes all specified characters from the left hand side of a string |
|
Replaces all |
|
Pads the right side of a string with a specific set of characters (when |
|
Removes all specified characters from the right hand side of a string |
|
Returns substring of a 'string' when indices are between 'from' (inclusive) and up to the end of the string |
|
Returns substring of a \'string\' when indices are between \'from\' (inclusive) and \'to\' (exclusive) |
|
Replaces a sequence of characters in a string with another set of characters. However, it replaces a single character at a time. |
|
Converts given string to uppercase |