1 Introduction to Application Development

An Oracle Stream Analytics application monitors and processes streaming data in real time. Streaming data flows into, through, and out of an application. Raw data flows into the application and is converted into events. Events flow through application stages for processing and filtering according to your application requirements. At the end, the application converts the processed and filtered events back to data in a format that is suitable for the destination, which could be, for example, storage, display on a web page, or further processing by another application.

If you are new to Oracle Stream Analytics application development, start with Event Processing Overview in Getting Started with Event Processing for Oracle Stream Analytics. The getting started guide presents an overview of Oracle Stream Analytics, provides hands-on walkthroughs, and describes the sample applications. This guide explains how to create, configure, and deploy an Oracle Stream Analytics application with the components provided in the platform. If you want to build an application with customized adapters or event beans, see Custom Adapters in Customizing Oracle Stream Analytics.

This chapter includes the following sections:

1.1 EPN Diagram

Oracle Stream Analytics application development centers on the Event Processing Network (EPN) application model. The EPN diagram represents how event data flows into, through, and out of an Oracle Stream Analytics application. You assemble the EPN diagram in Oracle JDeveloper by selecting and configuring EPN components and providing logic as needed. In an EPN diagram, event data flows from left to right.

The figure shows the EPN diagram for the TradeReport application. Data enters the EPN through the StockTradeCSV adapter on the left, which handles data in the form of comma-separated values (CSV). The StockTradeCSVadapter logic translates the incoming CSV data into Oracle Stream Analytics events. The AdapterOutputChannel carries the newly generated events to the Oracle CQL processor.

The GetHighVolumeProcessor component queries the events as they stream through and selects stock trades that have a volume greater than 4000. The ProcessorOutputChannel component sends the selected events to the ListenerBean component, which prints their stock symbol and volume information to the command line.

Create Oracle Stream Analytics Project in Getting Started with Event Processing for Oracle Stream Analytics describes how to use Oracle JDeveloper to create Oracle Stream Analytics applications. A walkthough of the TradeReport application and a fraud detection application are included.

1.2 Component Configuration

When you develop an Oracle Stream Analytics application, you assemble and configure a network of components into an EPN.

Each component has a role in processing the data. The following sections describe EPN components and their roles.

Events and Event Types

An event type is a data structure that defines the data contained in an event. Event types are the foundation of the EPN because they determine how event data funnels through the EPN and the operations that can be performed on it. When you start your application, the first thing to do is to create the event type or types for your EPN because you will need to configure components such as adapters, channels, relational database tables, and big data storage with the appropriate event type.

Adapters

Oracle Stream Analytics provides a selection of input and output adapters to accommodate every type of data that might flow into and out of the EPN. For example, you can access Java Message Service (JMS) objects, an HTTP Publish-Subscribe server, and financial market feeds. You can also develop your own adapters to integrate systems that are not supported by default. See Configure the Event Type Repository in Using Visualizer for Oracle Stream Analytics.

You configure adapters with an event type and other relevant configuration information. The specific configuration depends on whether the adapter handles event input or output and the source of the data. For example, in the TradeReport application, the input CSV adapter configuration specifies the location of the CSV file, and values that tell the adapter when to start reading the CSV file and how long to wait between consecutive readings.

Channels

You configure a channel with an event type so that it can transfer events of that type to the next stage in the EPN that is appropriate for the given event type. A channel can represent either a stream or a relation.

A stream or relation channel inserts events into a collection and sends the resulting stream to the next EPN stage. Events in a stream can never be deleted from the stream. Events in a relation can be inserted into, deleted from, and updated in the relation. For insert, delete, and update operations, events in a relation must always be referenced to a particular point in time. See Streams and Relations for more information.

Oracle CQL Processors

You configure Oracle CQL processors with Oracle CQL query code to examine events as they pass through. The Oracle JDeveloper Components window provides CQL Patterns to facilitate the formation of Oracle CQL queries. The wizard for each CQL Pattern prompts you for the correct configuration data to ensure that you form a valid Oracle CQL query.

Beans

A bean defines application event logic written in the Java programming language that conforms to standard Spring-based beans.

An event bean is a Java class that implements logic to listen for and work on events. This type of Java class is called a listener Java class. A listener that receives events (event sink) might create new events when it finds a certain type of data and send the new events to the next stage for further processing. A listener event sink can also initiate other processes in the same or in another application based on the event data.

Spring beans are managed by the Spring framework, and are a good choice if you want to integrate your bean to an existing Spring deployment. Event beans use Oracle Stream Analytics conventions for configuring beans so that they are managed by the Oracle Stream Analytics server. With an event bean, for example, you get the support of Oracle Stream Analytics server features such as monitoring and event record and playback. You can use event record and playback to debug an application.

Caching

You can integrate a cache system with your Oracle Stream Analytics application to make a cache available as source or destination for data and event data that your application uses. Integrating a cache can provide access to relatively static data at a speed that is suited to an application that handles streaming data.

A cache is a temporary storage area for events that you can create to improve the overall performance of your Oracle Stream Analytics application. A cache is not necessary for the application to function correctly. To increase the availability of the events and increase the performance of their applications, Oracle Stream Analytics applications can publish to or consume events from a cache.

A caching system defines a named set of configured caches. Oracle Stream Analytics distributes the configuration for remote cache communications across multiple servers. The Spring context file supports caching configuration. Listeners that are configured with a Spring context file receive events from the cache.

Data-Related Components

Table: The Table component provides access to a relational database. You configure the Table component with an ID, event type, and a data source to feed specific events into a relational database table. Oracle Stream Analytics provides the Hadoop and NoSQLDB data cartridges for accessing big data storage.

Hadoop: A data cartridge extension for an Oracle CQL processor to access large quantities of data in a Hadoop distributed file system (HDFS). HDFS is a non-relational data store. The Oracle CQL processor provides the Oracle CQL query code for the big data access. You configure Hadoop with an ID, event type, the path to the database, and the file separator character.

NoSQLDB: A data cartridge extension for an Oracle CQL processor to access large quantities of data in an Oracle NoSQL Database. The Oracle NoSQLDB Database stores data in key-value pairs. The Oracle CQL processor provides the Oracle CQL query code for the big data access. You configure NoSQLDB with an ID, event type, store name, and store locations.

1.3 Streams and Relations

An Oracle Stream Analytics application handles events that arrive in a stream as raw event data. The raw event data enters the EPN through an adapter that converts the raw event data into an event. An event is an ordered set of values (tuple).

Events are similar to a table row in a relational database in that an event has a schema. The event schema defines the properties and types for each event value. Events are unlike a table row in a database in that a table row contains static data. In a stream of events, when an event arrives, including which event arrives before or after another event, can make a difference. Your application needs to be able to account for time and sequence.

For example, in an application that processes stock trades, events made up of stock symbol, price, last price, percentage change, and volume information would arrive one after the other in the order in which each trade was executed. Your application logic might look for trades of one stock that occurred immediately after trades of another.

In an event processing application, the sequence in which events occur in a stream is as important as the data types and values of each event property. Oracle Stream Analytics programming conventions reflect the importance of time and sequence.

Your code needs to discover which events are related to one another based on certain criteria, such as a shared stock symbol. Your code also needs to discover sequence patterns, such as trades within fifteen seconds of one another. To account for both the sequential and relational aspects of event data, Oracle Stream Analytics implements the concepts of streams and relations through low latency channels.

  • A stream is a potentially infinite sequence of events where each event has its own time stamp. In a stream, the events must be ordered by time, one after the other, so that time stamps do not decrease from one event to the next. There can be events in a stream that have the same time stamp.

  • In a relation, sequence might be unimportant. Instead, events in a relation are related because they meet certain criteria. For example, events in a relation might be the result of a query executed against a stream of stock trades, where the query looks for trade volumes above a particular level.

In a stream of stock trade events, the events arrive in sequence and each event has its own time stamp. To isolate the share price for trades that occurred within 5 seconds of one another, configure an Oracle CQL processor to query the stream when it arrives from the channel with the following Oracle CQL code:

select price from StockTradeChannel [range 5 seconds] 

Because the query uses the [range 5 seconds] window to isolate the events, the output of this query is a relation. Although the events returned from the query have time stamps, they are unordered in the relation. Because the incoming events are in a stream, the query executes continuously against every 5 seconds' worth of events as they pass into the Oracle CQL processor. As new events come along, those meeting the query terms are inserted into the relation, while those that do not meet the query terms are deleted from the relation.

This is important because the integrity of the order in a stream is important. Technically, a stream is a continuously moving and ordered set of events. In a stream, every event is inserted into the stream one after the other. When you get a subset of the stream from a CQL query, you no longer have the order. Before you pass a relation to the next stage in the EPN, you can convert the relation back into a stream with the IStream operator.

1.4 Application Scalability and High Availability

A scalable Oracle Stream Analytics application incorporates Oracle Stream Analytics design patterns with implementation and configuration conventions to ensure that the application operation scales as the event load increases.

You can achieve scalability and high availability by integrating application design patterns, server resources, and configuration conventions so that your deployed application continues to operate even in the event of software or hardware failures.

For more information, see the following:

High Availability Applications.

Scalable Applications.

1.5 Application Life Cycle

Figure 1-1 shows a state diagram for the Oracle Stream Analytics application life cycle. In this diagram, the state names (STARTING, INITIALIZING, RUNNING, SUSPENDING, SUSPENDED, and FAILED) correspond to the ApplicationRuntimeMBean method getState return values. These states are specific to Oracle Stream Analytics. They are not OSGi bundle states.

Figure 1-1 Oracle Stream Analytics Application Life Cycle State Diagram

Description of Figure 1-1 follows
Description of "Figure 1-1 Oracle Stream Analytics Application Life Cycle State Diagram"

Note:

For information on Oracle Stream Analytics server life cycle, see Server Life Cycle in Administering Oracle Stream Analytics.

This section describes the life cycle of an application deployed to the Oracle Stream Analytics server and the sequence of com.bea.wlevs.ede.api API callbacks. The information explains how Oracle Stream Analytics manages an application's life cycle so that you can better use the life cycle APIs in your application. For a description of these life cycle APIs (such as RunnableBean and SuspendableBean), see:

The life cycle description is broken down into actions that a user performs, including those described in the following sections.

Install an Application or Start the Server with the Application Deployed

Oracle Stream Analytics performs the following actions:

  1. Oracle Stream Analytics installs the application as an OSGI bundle. OSGI resolves the imports and exports, and publishes the service.

  2. Oracle Stream Analytics creates beans (for both standard Spring beans and those that correspond to the Oracle Stream Analytics tags in the EPN assembly file). For each bean, Oracle Stream Analytics:

    • Sets the properties on the Spring beans. The <wlevs:instance-property> values are set on adapters and event-beans.

    • Injects appropriate dependencies into services specified by @Service or @ServiceReference annotations.

    • Injects appropriate dependencies into static configuration properties.

    • Calls the InitializingBean.afterPropertiesSet method.

    • Calls configuration callbacks (@Prepare,@Activate) on Spring beans as well as factory-created stages.

      For more information, see Resource Access Configuration.

  3. Application state is now INITIALIZING.

  4. Oracle Stream Analytics registers the MBeans.

  5. Oracle Stream Analytics calls the ActivatableBean.afterConfigurationActive method on all ActivatableBeans.

  6. Oracle Stream Analytics calls the ResumableBean.beforeResume method on all ResumableBeans.

  7. For each bean that implements RunnableBean, Oracle Stream Analytics starts it running in a thread.

  8. Application state is now RUNNING.

Suspend the Application

Oracle Stream Analytics performs the following actions:

  1. Oracle Stream Analytics calls the SuspendableBean.suspend method on all SuspendableBeans.

  2. Application state is now SUSPENDED.

Resume the Application

Oracle Stream Analytics performs the following actions:

  1. Oracle Stream Analytics calls the ResumableBean.beforeResume method on all ResumableBeans

  2. For each bean that implements RunnableBean, starts it running in a thread.

  3. Application state is now RUNNING.

Uninstall the Application

Oracle Stream Analytics performs the following actions:

  1. Oracle Stream Analytics calls the SuspendableBean.suspend method on all SuspendableBeans.

  2. Oracle Stream Analytics unregisters MBeans.

  3. calls the DisposableBean.dispose method on all DisposableBeans.

  4. Oracle Stream Analytics uninstalls application bundle from OSGI.

Updating the application

This is equivalent to first uninstalling an application and then installing it again.

See those user actions in this list.

Call Methods of Stream and Relation Sources and Sinks

You cannot call a method on a stream or relation source or sink from a life cycle callback because components might not be ready to receive events until after these phases of the application life cycle completes.

For example, you cannot call StreamSender method sendInsertEvent from a life cycle callback such as such as afterConfigurationActive or beforeResume.

You can call a method on a stream or relation source or sink from the run method of beans that implement RunnableBean.

See Event Beans.

1.6 API Overview

The APIs enable you to programmatically implement functionality for all aspects of Oracle Stream Analytics applications as described in this documentation set.

This section presents an overview of the API packages in terms of their intended usages and includes cross-references to where you can learn more.

For the full reference documentation (Javadocs) for all classes and interfaces, see Java API Reference for Oracle Stream Analytics. See also Samples in Oracle Stream Analytics APIs in Getting Started with Event Processing for Oracle Stream Analytics.

Configuration

The com.bea.wlevs.configuration package provides interfaces to activate, prepare, and roll back configuration objects. When you implement the Prepare interface, provide a method that accepts, checks, and stores a configuration object. The Java type of the configuration object is determined by JAXB. By default, the Java class name is the same as the name of the XML Schema complex type that describes the configuration data for the applicable stage. See the /Oracle/Middleware/my_oep/oep/ xsd/wlevs_application_config.xsd schema for schema details. See also Application and Resource Configuration.

Adapters

Oracle Stream Analytics provides several packages that provide interfaces and classes for managing adapter behavior. See Adapters.

Packages:

  • com.bea.wlevs.adapters.httppubsub.api package provides interfaces for converting inbound JavaScript Object Notation (JSON) messages to event types and back again. To customize the way inbound and outbound JSON messages are converted to an event type and back to JSON format, create a custom converter bean and use this API.

  • com.bea.wlevs.adapters.httppubsub.support package provides classes for establishing a connection to an HTTP publish-subscribe server.

  • com.bea.wlevs.adapters.jms.api package provides interfaces for converting inbound JMS messages to event types and back again. If you want to customize the way inbound and outbound JMS messages are converted to an event type and back, create a custom converter bean. and use this API.

  • com.bea.wlevs.ede.api: package provides interfaces for creating custom adapters. See Oracle Fusion Middleware Customizing Oracle Stream Analytics Components.

Channels

The com.bea.wlevs.channel package provides an interface for implementing event partitioning and a class for managing the number of events in channels. See Channels and Scalable Applications.

Event Repositories

To manage events and event types, Oracle Stream Analytics uses an event store repository and an event type repository. The event store repository persists the event and the event type repository persists the event type.

Packages:

  • com.bea.wlevs.eventstore package provides interfaces and classes to manage the event store repository. See Persistent Event Store in Using Visualizer for Oracle Stream Analytics.

  • com.bea.wlevs.ede.api package provides the EventTypeRepository interface to manage the event type repository. See Events and Event Types.

Event-Driven Environment

The com.bea.wlevs.ede.api package provides interfaces for creating and customizing Oracle Stream Analytics application code that responds to events. The package provides interfaces for creating event beans and adapters and making them event sinks and event sources. Other interfaces in this package enable you to manage all aspects of how events flow through the EPN, such as event creation, event flow through channels, event metadata and properties, the event type repository, external data sources, EPN stages, fault handling, event bean life cycle, and so on.

For sample Java code that uses some of these APIs, see Events and Event Types and Event Beans. See also Resource Access Configuration for information about using Oracle Stream Analytics annotations and deployment XML to configure resource injection.

Event Bean Life Cycle

The com.bea.wlevs.ede.api package also enables control over event bean life cycle. You can manage event bean initialization, configure dynamic activation, use threading, suspend and resume processing, and release resources when the application is undeployed. See Application Life Cycle for information about the event bean and application life cycles.

Note that the Spring framework implements similar bean life cycle interfaces. However, the equivalent Spring interfaces do not allow you to manipulate beans that were created by factories, while the Oracle Stream Analytics interfaces do.

JAXB

Oracle Stream Analytics provides a simplified interface for using Java Architecture for XML Binding (JAXB) mapping capabilities in adapters and event beans to marshall and unmarshall event data between XML and Java objects. See JAXB Support.

Packages:

  • com.oracle.cep.mappers.api package provides interfaces for marshalling and unmarshalling event data for most applications requirements.

  • com.oracle.cep.mappers.jaxb package provides interfaces that provide specialized method signatures for marshalling and unmarshalling.

Caching

You can configure a caching system so that applications have ready access to event data. The caches in the system can be a combination of Oracle Coherence distributed caching, Oracle Stream Analytics local caching, and caching solutions provided by third parties. You can access the events in the caches with Oracle CQL and Java classes. See Cached Event Data.

Packages:

  • com.bea.wlevs.cache.spi package provides interfaces that enable you to create a caching system that can be used by Oracle Stream Analytics applications.

  • com.bea.wlevs.cache.spi.coherence package provides interfaces that enable you to extend the caching system to include Oracle Coherence caching.

Cache Loader

The com.oracle.cep.cacheloader package provides the CsvCacheLoader class for loading CSV events into a Coherence cache. See Cached Event Data.

Cluster Group Management

The com.bea.wlevs.ede.api.cluster package provides interfaces for managing server groups within multiserver domains (clusters). You can get information about the configuration, implement event beans and adapters to listen for cluster membership changes, set the group name for the containing EPN, and get information about a group server. See Server Groups in Administering Oracle Stream Analytics.

Management Beans

Management beans (MBeans) enable you to programmatically access configuration and runtime information to perform tasks. There are two types of MBeans (tasks): configuration and run time. Configuration MBeans contain information about EPN component configuration. Run time MBeans contain information about component throughput and latency. See MBean Management Commands in Administering Oracle Stream Analytics.

Packages:

  • com.bea.wlevs.management package contains interfaces for managing constants used by client applications and to provide a super-interface for all Oracle WebLogic Event Server MBeans.

  • com.bea.wlevs.management.configuration package provides interfaces for managing applications, adapters, caches, configuration, Oracle CQL processors, event beans, stages, streams, and table sinks and sources.

  • com.bea.wlevs.management.diagnostic package provides interfaces for managing diagnostic profiles. A diagnostic profile is an XML file that contains application stage information for testing throughput and latency. See Monitor the Throughput and Latency in Using Visualizer for Oracle Stream Analytics.

  • com.bea.wlevs.management.diagnostic.notification package provides a class for wrapping diagnostic change notifications sent by background probes.

  • com.bea.wlevs.diagnostic package provides interfaces and classes for listening for newly deployed applications and removed applications. When applications are deployed and undeployed a profile manager (group of diagnostic profiles) is also created and removed and corresponding profile manager events are issued.

  • com.bea.wlevs.management.runtime package provides interfaces for getting runtime information about the application, the application Oracle CQL processors, the domain, the server, and EPN stages.

  • com.bea.wlevs.monitor package provides interfaces for monitoring the throughput and latency of application endpoints in the event server.

  • com.bea.wlevs.monitor.management package provides interfaces for receiving monitoring metrics for an application stage and for monitoring latency between endpoints in the EPN.

  • com.bea.wlevs.deployment.mbean package provides interfaces to manage application deployment.

  • com.bea.wlevs.eventinspector.management package provides interfaces and classes for controlling the behavior of event tracing and event injection. See Testing 1-2-3.

  • com.oracle.cep.cluster.ha.adapter.management package provides interfaces and classes for managing JMX communications in a high availability environment.

High Availability

Oracle Stream Analytics provides application design patterns and high availability adapters, to enable you to increase the backup and failover processing capabilities of your applications. See High Availability Applications.

Packages:

  • com.oracle.cep.cluster.ha.adapter package provides interfaces and classes for queue trimming.

  • com.oracle.cep.cluster.ha.adapter.inbound package provides classes for creating a high availability broadcast inbound adapter. This adapter is for applications that use system time and need to be highly available.

  • com.oracle.cep.cluster.ha.adapter.management package provides interfaces and classes for managing JMX communications in a high availability environment.

  • com.oracle.cep.cluster.ha.adapter.runtime package provides interfaces and class implementations for managing JMX interfaces to other high availability interfaces.

  • com.oracle.cep.cluster.ha.api package provides interfaces and classes for simple fail over functionality.

  • com.oracle.cep.cluster.hagroups package provides interfaces and classes for creating event beans and adapter that listen for property group membership changes, make the changes available, and enable subscriptions to broadcast group members.

  • com.oracle.cep.cluster.hagroups.runtime package provides interfaces and classes to get notification group information.

Testing and Utility Tools

Oracle Stream Analytics provides different ways to test your application depending on what and how you want to test. See Testing 1-2-3.

Packages:

  • com.bea.wlevs.eventinspector.management package provides interfaces and classes for managing event tracing and injection.

  • com.oracle.cep.shell package provides interfaces and classes for programmatically invoking commands for testing Oracle Stream Analytics applications.

  • com.bea.wlevs.util package provides interfaces and classes for marking methods as requiring an OSGi service reference, getting and setting error messages, parsing parameters, returning OSGi importer services cardinality, and loading a service class.

Cartridge Framework

The com.oracle.cep.cartridge package provides interfaces and classes that form the Data Cartridge Framework. The Data Cartridge Framework is a service provider interface (SPI) that enables users and vendors to create cartridges to extend Oracle CQL functionality. See Oracle CQL Data Cartridge Framework in Developing Applications with Oracle CQL Data Cartridges.

Spring Support

The com.bea.wlevs.spring.support package provides interfaces and classes for using Spring functionality in Oracle Stream Analytics applications.

1.7 Spring Framework

The Spring Framework provides Java-based APIs and a configuration model that you can use to create portable and flexible enterprise applications.

For more information about Spring:

1.8 OSGi Service Platform

The OSGi Service Platform provides a dynamic application execution environment where you can install, update, ore remove OSGI bundles (modules) dynamically.

For more information about OSGi:

Service Annotations

Use the com.bea.wlevs.util.Service (@Service) annotation to specify a component method that is injected with an OSGi service reference. You typically add this annotation to JavaBean setter methods where needed. The @Service annotation has the following attributes.

Table 1-1 Attributes of the com.bea.wlevs.util.Service JWS Annotation Tag

Name Description Data Type Required?

serviceBeanName

The name of the bean that backs the injected service. Can be null.

String

No.

cardinality

Valid values for this attribute are:

  • ServiceCardinality.C0__1

  • ServiceCardinality.C0__N

  • ServiceCardinality.C1__1

  • ServiceCardinality.C1__N

Default value is ServiceCardinality.C1__1.

enum

No.

contextClassloader

Valid values for this attribute are:

  • ServiceClassloader.CLIENT

  • ServiceClassloader.SERVICE_PROVIDER

  • ServiceClassloader.UNMANAGED

Default value is ServiceClassloader.CLIENT.

enum

No.

timeout

Timeout for service resolution in milliseconds.

Default value is 30000.

int

No.

serviceType

Interface (or class) of the service to be injected

Default value is Service.class.

Class

No.

filter

Specifies the filter used to narrow service matches. Value may be null.

String

No.

The following example shows how to use the @Service annotation. For another example, see Access the Event Type Repository.

@Service(filter = "(Name=StockDs)")
public void setDataSourceService(DataSourceService dss) {
    initStockTable(dss.getDataSource());
}