4 Adapters

Adapters manage data entering and leaving the EPN. Oracle Stream Analytics provides a number of different kinds of inbound and outbound adapters to handle different types of data such as CSV, RMI, and HTTP. All adapters have a provider property that is a reference to the OSGi-registered adapter factory service and defines the type of data that the adapter handles.

Inbound adapters receive event data from a data stream entering the EPN, assign the data to an event according to the event type, and send the data to the next stage in the EPN. Outbound adapters receive events processed by the EPN, convert the events to their output form, and send the converted data to an output data source such as another EPN, a non-EPN application, a CSV file, or a web page.

This chapter includes the following sections:

For information about the high availability adapters, see High Availability Applications.

See Testing 1-2-3 for information about how to use the csvgen adapter with the load generator to simulate a data feed to test your application.

4.1 Create Adapters

The best way to create most adapters is with Oracle JDeveloper. The Oracle JDeveloper components window provides the following inbound and outbound adapters: CSV, EDN, RMI, HTTP, and JMS.

For the other adapters, edit the configuration files directly. Each adapter section in this chapter provides example assembly and configuration file configurations so that you can see the settings.

Before you create an adapter, use Oracle JDeveloper to create an event type to assign to the adapter. See Create and Register an Event Type in Getting Started with Event Processing for Oracle Stream Analytics for information about creating event types.

This chapter describes some of the assembly and configuration file settings for the different types of adapters. For complete information about adapter settings, see adapter in Schema Reference for Oracle Stream Analytics. See also the Oracle/Middleware/my_oep/oep/wlevs_application_config.xsd directory in your Oracle Stream Analytics installation for adapter schema information.

4.2 Cluster Distribution Service

The cluster distribution feature provides a mechanism for various Oracle Stream Analytics adapter types to distribute incoming events to all of the servers in a cluster.

An individual adapter or event bean instance can be configured to distribute events, and in this case, all input events processed by that adapter are sent (distributed) to all servers in the cluster.

The distribution adapter ensures that all input events are sent (distributed) to all servers in the cluster. To convert an input adapter to a distribution adapter, add the distributeInput element and set it to true as follows. The distributionThreadsCount property is optional and defaults to 1.

<wlevs:adapter id="myLoadgenAdapter" provider="loadgen">
  <wlevs:instance-property name="distributeToClusterGroup" value="true"/>
  <wlevs:instance-property name="distributionThreadsCount" value="1"/>
</wlevs:adapter>

Oracle Stream Analytics supports the Cluster Distribution service for the loadgen, CSV inbound, and JMS inbound (queue) adapters. Oracle Stream Analytics does not support the Cluster Distribution service for the CSV outbound, JMS inbound (topic), JMS outbound, and HTTP publish-subscribe adapters. It is an error to configure topic destinations for input distribution. A topic configuration generates a warning log message and is ignored.

In addition to the adapter types listed above, you can also configure an event bean to distribute all of the events it receives to all cluster members by specifying the provider for the bean to be clusterGroupDistributor as shown in the following example:

<wlevs:event-bean id="distributor-bean" provider="clusterGroupDistributor">
   <wlevs:instance-property name="distributionThreadsCount" value="1"/>
   ... other event bean properties ...
</wlevs:event-bean>

Oracle JDeveloper does not currently provide a component for the Cluster Distribution service. However, you can create a Cluster Distribution service by adding entries to the assembly and configuration files for your Oracle Stream Analytics application.

4.3 Password Encryption

Some of the adapters have user name and password child elements. Oracle Stream Analytics provides the encryptMSAConfig command so that you can encrypt the file that contains the password. See encryptMSAConfig Command-Line Utility in Administering Oracle Stream Analytics for more information.

4.4 JAXB Support

Oracle Event Processing provides a simplified interface for using Java Architecture for XML Binding (JAXB) mapping capabilities in adapters and event beans to marshall and unmarshall event data between XML and Java objects.

The JAXB interface supports the JAXB 2.2 specification and EclipseLink Moxy provider extensions.

You can configure the mapping operations in the following ways:

  • Map from an XML schema to Java objects to output a set of annotated Java classes.

  • Map from one set of Java objects to another set of Java Objects or to XML using JAXB annotations.

  • Map from an existing XML schema to an existing, predefined Java object representation. This approach uses the EclipseLink Moxy extensions and requires an external metadata file that contains the mapping details. The metadata file is referenced by the application configuration.

4.4.1 EclipseLink Moxy

EclipseLink Moxy provides extensions that enable you to map between an existing XML schema and a predefined set of Java classes without modifying the XML schema or the Java classes without providing annotations. You provide the mapping information in an external metadata file using a XPath syntax.

The flexible EclipseLink Moxy extensions enable you to perform complex operations. For example, you can map a subset of complex XML data to a much simpler event representation. You can also flatten a deeply nested XML document into a flat Java bean event format for processing by Oracle CQL.

You specify EclipseLink Moxy external metadata in XML. Access the schema at: http://www.eclipse.org/eclipselink/xsds/eclipselink_oxm_2_2.xsd.

4.4.2 APIs

The adapter or event bean that requires JAXB functionality obtains the functionality by injection of a bean that implements the com.oracle.cep.mappers.api.Mapper interface. The Mapper interface follows:

public interface Mapper {
  Marshaller createMarshaller() throws MapperException;
  Unmarshaller createUnmarshaller() throws MapperException;
}

The adapter or other EPN component code uses the injected bean to create marshalling and unmarshalling objects. The com.oracle.cep.mappers.api.Marshaller and com.oracle.cep.mappers.api.Unmarshaller interfaces shown below work for most applications.

public interface Marshaller { 
  void marshal(Object object, javax.xml.transform.Result result) 
               throws MapperException;
 }

public interface Unmarshaller {
  Object unmarshal(javax.xml.transform.Source source) 
                   throws MapperException;
}

Some applications might need specialized method signatures for marshalling and unmarshalling such as an unmarshall method that takes the target class as an argument. In these cases, use the com.oracle.cep.mappers.jaxb.JAXBMarshallerImpl and com.oracle.cep.mappers.jaxb.JAXBUnmarshallerImplinterfaces instead. These interfaces provide methods that correspond to the full set of marshall and unmarshall methods that are supported by the javax.xml.bind.Marshaller and javax.xml.bind.Unmarshaller interfaces.

Assembly File

The following assembly file entries call a mapper bean with properties to specify the event type and the metadata file.

<bean id="mapperBean" class="com.oracle.cep.mappers.jaxb.JAXBMapperImpl" >
  <property name="eventTypeName" value="CallCenterActivity" />
  <property name="metadata" value="external_metadata_case1.xml" />
</bean>

If you want to call a factory to make the mapper bean, specify the following for the bean element:

<bean id="mapperBean" class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory"
      factorymethod="create"/>

Configuration File

The following configuration file entries specify properties for the mapper bean.

<jaxb-mapper>
  <name>mapperBean</name>
  <event-type-name>CallCenterActivity</event-type-name>
  <metadata>external_metadata_case1.xml</metadata>
</jaxb-mapper>

Properties

A mapper bean supports properties. All of the properties except metadataMap can be configured as assembly file properties or as elements in the configuration file.

Table 4-1 Mapper Bean Properties and Elements

Assembly File Property Name Configuration File Element Name Description

eventTypeName

event-type-name

The name of an event type registered by the application in the event type repository. The event type corresponds to a Java class. The package name of this class is used as the context path when initializing the JAXBContext represented by the mapper bean.

The configuration must specify either an eventTypeName or a contextPath to be used in constructing the context path for the JAXBContext represented by the mapper. The packages must exist on the classpath of the application and contain either schema generated classes, JAXB annotated classes, or classes referenced by Moxy external metadata. The classes will be used as the Java object graph for marshalling and unmarshalling operations.

See context-path inSchema Reference for Oracle Stream Analytics for more information.

contextPath

context-path

A colon-separated list of Java package names. The specified context path to initialize the JAXBContext represented by the mapper bean.

The configuration must specify either an eventTypeName or a contextPath to be used in constructing the context path for the JAXBContext represented by the mapper. The packages must exist on the classpath of the application and contain either schema generated classes, JAXB annotated classes, or classes referenced by Moxy external metadata. The classes will be used as the Java object graph for marshalling and unmarshalling operations.

validate

validate

Boolean value that defaults to false. When true, you must provide the schema. Schema validation occurs during marshalling and unmarshalling.

schema

schema

The file name of the XML schema file used for validation. Package the schema file with the application in the META-INF/wlevs/mappers/jaxb directory.

metadata

metadata

The name of the file that contains the EclipseLink Moxy external metadate for mapping customization. Package the file with the application in the Meta-INF/wlevs/mappers/jaxb directory.

metadataMap

N/A

The Spring <map> element that contains one entry that corresponds to each component of the contextPath. For each entry, the key is the package name from the contextPath and the corresponding value is the name of a file that contains the EclipseLink Moxy external metadata for that package.

If the application uses EclipseLink Moxy-specific external metadata, the location of the metadata is specified by either the metadata property or the metadataMap property. The metadataMap property is required when there is more than one package on the contextPath. There is no support for specifying the metadataMap property in a component configuration file.

4.5 CSV Adapters

CSV adapters handle inbound and outbound data that is separated by commas. Use a CSVInbound adapter to accept data in the form of comma-separated values entering the EPN, and use a CSVOutbound adapter to send data in comma-separated values out of the EPN.

You can test an Oracle Stream Analytics Application that uses CSV inbound adapters with the load generator provided in your Oracle Stream Analytics installation. The load generator reads an ASCII file that contains sample data. You must use the CSV Inbound adapter because it is coded to decipher the data packets generated by the load generator. See Load Generator and the csvgen Adapter.

Note:

With the java.sql.TimeStamp type, the CSV adapter reads and writes data in the format yyyy-mm-dd‘T‘hh:mm:ss[.fffffffff]. For example, "2012-12-12T12:12:12.120".

The best way to create CSV adapters is through the Oracle JDeveloper components window. The following assembly and configuration files generated by Oracle JDeveloper show the CSV inbound and outbound adapter configurations.

Assembly File

The inbound CSV adapter translates data read from the StockData.csv file into an event with the TradeEvent event type.

The wlevs:listener element specifies the component that listens to the inbound CSV adapter for TradeEvent activity. In this example, the listener is AdapterOutputChannel. The AdapterOutputChannel component listens for and receives trade events from StockTradeCSVInboundAdapter to send to the next stage.

<wlevs:adapter id="StockTradeCSVInboundAdapter" provider="csv-inbound">
    <wlevs:listener ref="AdapterOutputChannel"/>
    <wlevs:instance-property name="eventType" value="TradeEvent"/>
    <wlevs:instance-property name="sourceUrl"
    value="file:/scratch/mpawlan/oep9-19/oep/utils/load-generator/StockData.csv"/>
  </wlevs:adapter>

The outbound adapter assembly file configuration is similar to the inbound adapter, but includes an append attribute. When set to true, Oracle Stream Analytics appends data to an existing output file. When set to false, Oracle Stream Analytics creates a new file or overwrites an existing file of the same name.

 <wlevs:adapter id="StockTradeCSVOutboundAdapter" provider="csv-outbound">
    <wlevs:instance-property name="eventType" value="TradeEvent"/>
    <wlevs:instance-property name="outputFile" value="/scratch/mpawlan/oep9-19/oep/utils/load-generator/StockData.csv"/>
    <wlevs:instance-property name="append" value="false"/>
  </wlevs:adapter>

You can provide an absolute or relative path for the outputFile value. For the relative path, you can specify ../filename.csv. ./result.csv, or upload/result.csv. When you specify a relative path, make sure that the abstract path includes the parent directory. For example, in UNIX, specify a file in the current directory as ./result.csv instead of simply result.csv.

Configuration File

The adapter elements in the configuration file show the adapter name attribute and its value. The adapter name must match the adapter id attribute in the assembly file.

<csv-adapter>
  <name>StockTradeCSVInboundAdapter</name>
  <event-interval units="nanoseconds">5</event-interval>
</csv-adapter>
<csv-adapter>
  <name>StockTradeCSVOutboundAdapter</name>
</csv-adapter>

4.6 EDN Adapters

Event Delivery Network (EDN) inbound and outbound adapters use JAXB to enable an EPN to interface with an Oracle SOA Suite event network.

The EDN adapters have a raw-xml-content configuration element that specifies whether to represent the EDN XML data transmission as raw XML (if true) or as a Java object using JAXB. In the JAXB case, the adapter expects the Oracle Stream Analytics application bundle to include the appropriate set of schema (xjc) generated classes on its class path.

You configure an EDN adapter with an event type and a reference to an EDL file. During initialization, the adapter searches the EDL file that contains an event definition QName that matches the configured event type. If the configured event type is found in the EDL, the adapter registers a subscription with EDN for the corresponding QName.

To learn how to use EDN adapters and an EDL file, see Create an Application with EDN Adapter in Getting Started with Event Processing for Oracle Stream Analytics.

4.6.1 Usage

Use an EDNInbound adapter to receive incoming data from the Oracle SOA Suite event network. The EDN input adapter subscribes to a specified EDN event type and converts the incoming EDN events to an Oracle Stream Analytics event type for processing by an Oracle Stream Analytics application.

Use an EDNOutbound adapter to send outbound data to the Oracle SOA Suite event network. The EDN output adapter converts the Oracle Stream Analytics events into corresponding EDN events and publishes them to the EDN. The published events can be new events that originate in the Oracle Stream Analytics application or EDN events that were received by the EDN input adapter, processed by the Oracle Stream Analytics application, and sent to the EDN output adapter.

4.6.2 Create EDN Adapters

The best way to create EDN adapters is through the Oracle JDeveloper components window. The following assembly and configuration files generated by Oracle JDeveloper show the EDN inbound and outbound adapter configurations.

See edn-adapter in Schema Reference for Oracle Stream Analytics for information about the EDN adapter.

Assembly File

The following assembly file entries show the elements and attribute settings for the inbound and outbound EDN adapters created in the Fraud Detection walkthrough from Fraud Detection Application in Getting Started with Event Processing for Oracle Stream Analytics.

  • The input EDN adapter listens to the EDN input channel for events of type FraudCheckRequest.

  • The EDN output adapter sends events of type FraudCheckRequest to the next stage in the EPN.

<wlevs:adapter id="edn-inbound-adapter" provider="edn-inbound">
  <wlevs:listener ref="ednInputChannel"/>
  <wlevs:instance-property name="eventType" value="FraudCheckRequest"/>
</wlevs:adapter>

<wlevs:adapter id="edn-outbound-adapter" provider="edn-outbound">
  <wlevs:instance-property name="eventType" value="FraudCheckResponse"/>
</wlevs:adapter>

Configuration file

The following configuration file entries show the configuration settings for the EDN inbound and outbound EDN adapters created in the Fraud Detection walkthrough from Fraud Detection Application in Getting Started with Event Processing for Oracle Stream Analytics.

Note:

You must put the EDL and schema (xsd) files in the fixed path of the bundled JAR file.
<edn-adapter>
  <name>edn-outbound-adapter</name>
  <edl-file>FraudCheckEvent.edl</edl-file>
  <validate>false</validate>
  <raw-xml-content>false</raw-xml-content>
  <jndi-provider-url>t3://localhost:7101</jndi-provider-url>
  <jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
  <user>weblogic</user>
  <password>welcome1</password>
</edn-adapter>

<edn-adapter>
  <name>edn-inbound-adapter</name>
  <edl-file>FraudCheckEvent.edl</edl-file>
  <schema-file>FraudCheckType.xsd</schema-file>
  <validate>false</validate>
  <raw-xml-content>false</raw-xml-content>
  <jndi-provider-url>t3://localhost:7101</jndi-provider-url>
  <jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
  <user>weblogic</user>
  <password>welcome1</password>
</edn-adapter>

4.7 File Adapter

The File adapter reads data from a file into the EPN and converts the data to an event.

Oracle JDeveloper does not currently provide a component for the File adapter. However, you can create a File adapter by adding entries to the assembly and configuration files for your Oracle Stream Analytics application.

Assembly File

The path property provides the location of the input file. As the adapter reads the data from the input file, it converts the incoming data to an event of type OrderArrivalEvent. There is an initialDelay of 5000 nanoseconds before the File adapters starts to read the file. The downstream OrderArrival channel listens for events of type OrderArrivalEvent.

<wlevs:adapter id="inputAdapter" provider="file" >
        <wlevs:instance-property name="path"
            value="@wlevs.domain.home@/inpOrderArrival.txt"/>
        <wlevs:instance-property name="eventType" value="OrderArrivalEvent"/>
        <wlevs:instance-property name="initialDelay" value="5000"/>
        <wlevs:listener ref="OrderArrival"/>
</wlevs:adapter>

Configuration File

 <adapter>
    <name>inputAdapter</name>
 </adapter>

4.8 HTTP Publish-Subscribe Adapter

Use the HTTP Publisher adapter to send JavaScript Object Notation (JSON) event data out of the EPN to a web-based user interface. Use the HTTP Subscriber adapter to accept JavaScript Object Notation (JSON) event data entering the EPN. JSON event data comes from an HTTP server where user actions generate events.

The HTTP Publish-Subscribe server in Oracle Stream Analytics is based on the Bayeux protocol that is proposed by the cometd project. The Bayeux protocol defines a contract between the client and the server for communicating with asynchronous messages over HTTP.

You can create a remote or a local HTTP Publisher adapter, and a remote HTTP Subscriber adapter. Whether an HTTP adapter is local or remote is determined by the local or remote URL you supply to the required <server-url> child element.

The best way to create HTTP publish and subscribe adapters is to use Oracle JDeveloper.

Note:

Byte arrays are not supported as property types in event types used with the HTTP Publish and Subscribe adapters.

Assembly File

<wlevs:adapter id="http-pub-adapter" provider="httppub"/>
<wlevs:adapter id="http-sub-adapter" provider="httpsub" />

Configuration File

For every local and remote adapter, provide a URL to the server in the server-url property. The server can be an Oracle Stream Analytics server, a WebLogic Server instance, or any third-party HTTP Publish-Subscribe server.

For every local adapter for publishing, add a server-context-path element to specify the path to the local HTTP Publish-Subscribe server associated with the Oracle Stream Analytics instance hosting the current Oracle Stream Analytics application.

By default, each Oracle Stream Analytics server is configured with an HTTP Publish-Subscribe server with path /pubsub. If you have created a new local HTTP Publish-Subscribe server or changed the default configuration, then specify the location of the server in the server file. In the file specify the http-pubsub element path value with the location of the server. You can locate the server file in your Oracle Stream Analytics installation at: /Oracle/Middleware/my_oep/examples/domains/<my_domain>/defaultserver.

The channel child element specifies the channel that the adapter publishes or subscribes to.

<http-pub-sub-adapter>
  <name>http-pub-adapter</name>
  <server-url>http://myhost.com:9102/pubsub</server-url>
  <channel>/channel2</channel>
  <event-type>com.mycompany.httppubsub.PubsubEvent</event-type>
  <user>wlevs</user>
  <password>wlevs</password>
</http-pub-sub-adapter>

<http-pub-sub-adapter>
  <name>http-sub-adapter</name>
  <server-url>http://myhost.com:9102/pubsub</server-url>
  <channel>/channel2</channel>
  <event-type>com.mycompany.httppubsub.PubsubEvent</event-type>
</http-pub-sub-adapter>

4.9 HTTP Publish-Subscribe Adapter Custom Converter Bean

The HTTP Publish-Subscribe adapter converts incoming JavaScript Object Notation (JSON) messages to event types and back again. To customize the way inbound and outbound JSON messages are converted to an event type and back, create a custom converter bean.

4.9.1 Bayeux Protocol

The HTTP Publish-Subscribe (pub-sub) server is based on the Bayeux protocol that is proposed by the cometd project. The Bayeux protocol defines a contract between the client and the server for communicating with asynchronous messages over HTTP. The pub-sub server can communicate with any client that understands the Bayeux protocol.

You can develop your web client with the following frameworks:

  • Dojo JavaScript library that supports the Bayeux protocol. Oracle Stream Analytics does not provide this library. You can find information about it at: http://dojotoolkit.org/.

  • WebLogic Workshop Flex plug-in that enables development of a Flex client that uses the Bayeux protocol to communicate with a pub-sub server.

For information about securing an HTTP pub-sub server channel, see HTTP Publish-Subscribe Server in Administering Oracle Stream Analytics.

4.9.2 Create a Custom Converter Bean

A custom converter bean is a Java class that implements the following interfaces:

  • InboundMessageConverter interface to convert inbound JSON messages to events.

  • OutboundMessageConverter interface to convert events to JSON messages.

See the Java API Reference for Oracle Stream Analytics for a full description of these APIs.

Inbound HTTP Pub-Sub JSON Message

The custom converter bean for an inbound HTTP pub-sub JSON message implements the com.bea.wlevs.adapters.httppubsub.api.InboundMessageConverter interface. This interface has only the convert method:

public List convert(JSONObject message) throws Exception;

The message parameter is the inbound HTTP pub-sub message in JSON format. The return value is a List of events to pass to the next node in the EPN.

Outbound HTTP Pub-Sub JSON Message

The custom converter bean for an outbound HTTP pub-sub message implements the com.bea.wlevs.adapters.httppubsub.api.OutboundMessageConverter interface. This interface has only the convert method:

public List<JSONObject> convert(Object event) throws Exception;

The event parameter is an event received by the outbound HTTP pub-sub adapter from the source node in the EPN. The return value is a List of JSON messages.

Example

The following example shows a custom converter bean that implements both the InboundMessageConverter and OutboundMessageConvert interfaces. You can use this bean for both inbound and outbound HTTP pub-sub adapters.

Note:

You can use the GSON Java library to convert Java objects to JSON format. For more information, see http://www.json.org and http://code.google.com/p/google-gson.

package com.sample.httppubsub;
import com.bea.wlevs.adapters.httppubsub.api.InboundMessageConverter;
import com.bea.wlevs.adapters.httppubsub.api.OutboundMessageConverter;
import com.bea.httppubsub.json.JSONObject;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
public class TestConverter implements InboundMessageConverter, OutboundMessageConverter {
  public List convert(JSONObject message) throws Exception {
    List eventCollection = new ArrayList();
    PubsubTestEvent event = new PubsubTestEvent();
    event.setMessage("From TestConverter: " + message);
    eventCollection.add(event);
    return eventCollection;
  }
  public List<JSONObject> convert(Object event) throws Exception {
    List<JSONObject> list = new ArrayList<JSONObject>(1);
    Map map = new HashMap();
    map.put("message", ((PubsubTestEvent) event).getMessage());
    list.add(new JSONObject(map));
    return list;
  }
}

4.10 JMS Adapters

Use JMS adapters to connect the Java Message Service (JMS) with an Oracle Stream Analytics EPN to receive and send JMS messages.

The Oracle Stream Analytics JMS adapters support any JMS service provider that provides a Java client that is compliant with Java EE.

The JMS Inbound adapter converts the incoming JMS messages to Oracle Stream Analytics events and the JMS outbound adapter converts Oracle Stream Analytics events to JMS messages. You can customize the inbound conversion by writing your own Java class. See Custom Adapters in Customizing Oracle Stream Analytics.

The best way to create JMS adapters is through the Oracle JDeveloper components window. The following assembly and configuration files generated by Oracle JDeveloper show the JMS inbound and outbound adapter configurations.

Note:

An exception that occurs in the MessageConverter object associated with a outbound JMS adapter does not cause the underlying JMS transaction to roll back. If the exception occurs outside of the MessageConverter object within the outbound JMS adapter, then an existing JMS transaction is rolled back.

4.10.1 Service Providers

Oracle Stream Analytics is tested against the following service providers:

  • WebLogic T3 Client, which is a Java RMI client that uses Oracle T3 protocol to communicate with Oracle WebLogic Server.

  • Version 10.0, 10.3, and 10.3.1 of Oracle WebLogic Server JMS

  • The current version of Tibco EMS JMS

If the service provider you want to use is not in the list, you can configure Oracle Stream Analytics JMS adapters for use with your service provider by contacting your service provider and getting the jndi-provider-url and jndi-factory information needed for the jms-adapter configuration.

4.10.2 Inbound Adapter Configuration

Assembly File

<wlevs:adapter id="jms-inbound-adapter" provider="jms-inbound" />

Configuration File

The inbound adapter converts incoming JMS messages to a TradeEvent. The JNDI factory and service provider are weblogic.jndi.WLInitialContextFactory and t3://localhost:7101. The incoming client finds the adapter with the JNDI name of JNDIName. After the JMS adapter converts the JMS message to an event, the adapter sends the events to the JNDI destination of Queue1.

The optional connection-jndi-name element provides the JNDI name of the JMS connection factory. The required destination-jndi-name element provides the JNDI name of the JMS destination. The session-transacted element when false indicates that the session is not transactional.

  <jms-adapter>
    <name>jms-inbound-adapter</name>
    <event-type>TradeEvent</event-type>
    <jndi-provider-url>t3://localhost:7101</jndi-provider-url>
    <jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
    <connection-jndi-name>JNDIName</connection-jndi-name>
    <destination-jndi-name>Queue1</destination-jndi-name>
    <session-transacted>false</session-transacted>
  </jms-adapter>

4.10.2.1 Single and Multithreaded Inbound JMS Adapters

By default, an inbound JMS adapter is single-threaded. When the inbound JMS adapter is single-threaded, event order is guaranteed.

To improve scalability, you can configure an inbound JMS adapter to use multiple threads to read messages from the JMS destination. When the inbound JMS adapter is multi-threaded, event order is not guaranteed. To use multiple threads, configure the adapter with a work manager with the work-manager child element. You can specify a dedicated work manager to be used only by the adapter, or you can share a work manager among several components such as other adapters and Jetty.

4.10.2.2 Configure a JMS Adapter for Durable Subscriptions

You can configure an inbound JMS adapter to be a client in a durable subscription to a JMS topic. A durable subscription ensures that the adapter receives published messages even when the adapter becomes inactive. When the inbound adapter connects to the JMS server, it registers the durable subscription and subsequent messages sent to the topic are retained during periods when the subscriber is disconnected (unless they expire) and delivered when the subscriber reconnects.

A durable subscription assumes that the publisher that is publishing JMS messages to the topic is using the persistent delivery mode. Note that publisher might be the Oracle Stream Analytics outbound JMS adapter (in other words, its delivery-mode value must be persistent, the default value).

Create a Durable Subscription in the Adapter

  1. Ensure that the JMS message publisher is delivering messages in persistent mode.

  2. Specify a client ID for the connection factory. On Oracle WebLogic Server, the client ID can be set on the connection factory administratively with the console. You should have a dedicated connection factory configured for each adapter instance that is using durable subscribers.

  3. Set the following three jms-adapter properties:

    • destination-type to TOPIC.

    • durable-subscription to true.

    • durable-subscription-name to a unique subscription identifier.

4.10.3 Outbound Adapter Configuration

The outbound JMS adapter converts events into a JMS map message and sends the JMS message to a JMS destination. You can also customize this conversion by writing your own Java class to specify exactly how you want the event types to be converted into outgoing JMS messages. See Custom Adapters in Customizing Oracle Stream Analytics.

Assembly File

<wlevs:adapter id="jms-outbound-adapter" provider="jms-outbound"/>

Configuration File

The JMS Outbound adapter configuration is almost the same as the JMS Inbound adapter configuration. The outbound adapter specifies a JMS destination and provides a user name and password to access the JMS destination. This session is transactional and non-persistent.

  <jms-adapter>
    <name>jms-outbound-adapter</name>
    <event-type>TradeEvent</event-type>
    <jndi-provider-url>t3://localhost:7101</jndi-provider-url>
    <jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
    <connection-jndi-name>Topic</connection-jndi-name>
    <destination-jndi-name>Queue2</destination-jndi-name>
    <user>weblogic</user>
    <password>welcome1</password>
    <session-transacted>true</session-transacted>
    <delivery-mode>nonpersistent</delivery-mode>
  </jms-adapter>

4.11 JMS Custom Message Converter Bean

To customize the conversion between JMS messages and event types, create inbound and outbound converter beans and package them with your Oracle Stream Analytics Application.

4.11.1 Implement Interfaces

The inbound and outbound converter beans implement methods in the following two inbound and outbound interfaces. See the Java API Reference for Oracle Stream Analytics for a full description of these APIs.

  • Inbound: com.bea.wlevs.adapters.jms.api.InboundMessageConverter. You have to implement its convert method. The return value is a List of events to be passed downstream.

    public List convert(Message message) 
         throws MessageConverterException, JMSException;
    

    message parameter: Corresponds to the incoming JMS message.

  • Outbound: com.bea.wlevs.adapters.jms.api.OutboundMessageConverter interface. You have to implement its convert method. The return value is a List of JMS messages.

    public List<Message> convert(Session session, Object event) 
         throws MessageConverterException, JMSException;
    

    session parameter: The javax.jms.Session to use to create the messages.

    event parameter: An event received by the outbound JMS adapter from the source stage in the EPN.

4.11.2 Implement the Inbound JMS Adapter

This example shows you how to implement the convert method for the inbound JMS adapter.

  1. In Oracle JDeveloper, add a Java class to your application project.
  2. Implement the com.bea.wlevs.adapters.jms.api.InboundMessageConverter interface.

    The following example shows a possible implementation.

    package com.customer;
    import com.bea.wlevs.adapters.jms.api.InboundMessageConverter;
    import com.bea.wlevs.adapters.jms.api.MessageConverterException;
    import com.bea.wlevs.adapters.jms.api.OutboundMessageConverter;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import java.util.ArrayList;
    import java.util.List;
    public class MessageConverter implements InboundMessageConverter, 
        OutboundMessageConverter {
        public List convert(Message message) throws MessageConverterException, JMSException {
            TestEvent event = new TestEvent();
            TextMessage textMessage = (TextMessage) message;
            event.setString_1(textMessage.getText());
            List events = new ArrayList(1);
            events.add(event);
            return events;
        }
        public List<Message> convert(Session session, Object inputEvent) 
            throws MessageConverterException, JMSException {
            TestEvent event = (TestEvent) inputEvent;
            TextMessage message = session.createTextMessage(
                "Text message: " + event.getString_1()
            );
            List<Message> messages = new ArrayList<Message>();
            messages.add(message);
            return messages;
        }
    }
    
  3. Specify the converter in your application EPN assembly file.
    • Register the converter class using a bean element.

    • Associate the converter class with the JMS adapter by adding a wlevs:instance-property with name set to converterBean and ref set to the id of bean.

    The following example shows how to register and associate the converter class.

    ...
        <bean id="myConverter" class="com.customer.MessageConverter"/>
        <wlevs:adapter id="jmsInbound" provider="jms-inbound">
            <wlevs:instance-property name="converterBean" ref="myConverter"/>
            <wlevs:listener ref="mySink"/>
        </wlevs:adapter>
    

4.11.3 Implement the Outbound JMS Adapter

This example shows you how to implement the convert method for the outbound JMS adapter.

  1. Using the Oracle JDeveloper (or your preferred IDE), add a Java class to your application project.
  2. Implement the com.bea.wlevs.adapters.jms.api.OutboundMessageConverter interface.

    The example shows a possible implementation.

    package com.customer;
    import com.bea.wlevs.adapters.jms.api.InboundMessageConverter;
    import com.bea.wlevs.adapters.jms.api.MessageConverterException;
    import com.bea.wlevs.adapters.jms.api.OutboundMessageConverter;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import java.util.ArrayList;
    import java.util.List;
    public class MessageConverter implements InboundMessageConverter, 
        OutboundMessageConverter {
        public List convert(Message message) throws MessageConverterException, JMSException {
            TestEvent event = new TestEvent();
            TextMessage textMessage = (TextMessage) message;
            event.setString_1(textMessage.getText());
            List events = new ArrayList(1);
            events.add(event);
            return events;
        }
        public List<Message> convert(Session session, Object inputEvent) 
            throws MessageConverterException, JMSException {
            TestEvent event = (TestEvent) inputEvent;
            TextMessage message = session.createTextMessage(
                "Text message: " + event.getString_1()
            );
            List<Message> messages = new ArrayList<Message>();
            messages.add(message);
            return messages;
        }
    }
    
  3. Specify the converter in your application EPN assembly file.
    • Register the converter class using a bean element.

    • Associate the converter class with the JMS adapter by adding a wlevs:instance-property with name set to converterBean and ref set to the id of bean.

    The following example shows how to register and associate the converter class.

    <bean id="myConverter" class="com.customer.MessageConverter"/>
      <wlevs:adapter id="jmsOutbound" provider="jms-outbound">
        <wlevs:instance-property name="converterBean" ref="myConverter"/>
      </wlevs:adapter>
    

4.12 Oracle Business Rules Adapter

The Oracle Business Rules (OBR) adapter is an event bean that wraps the business rules engine from the Oracle Business Rules product. The OBR adapter lets you assert and retract events as facts to trigger business rules.

You can configure OBR rules to generate events and add business logic to an Oracle CQL processor downstream to process the events. For example you can invoke StreamSender.sendInsertEvent within the rules file to send data out of an OBR adapter as an event.

Oracle JDeveloper does not provide a drag and drop component for the assembly file or the EPN diagram, but it does provide a drag-and-drop component for the configuration file.

You can create an OBR adapter by adding entries to the assembly file and by dragging and dropping the OBR adapter into the configuration file. For more information about creating OBR adapters by adding entries, see the OBR documentation at: http://www.oracle.com/technetwork/middleware/business-rules/documentation/index.html.

Assembly File

The event-type-repository element specifies the event type repository for the application. In the following example, the repository has a single event type named HelloWorldEvent and is implemented by the HelloWorldEvent.java class.

The next adapter specifies an id equal to helloworldAdapter ID with a value of the HelloWorldAdapter Java class. An adapter is created from the HelloWorldAdapter Java class. The OBR adapter configuration includes a message element with the specified message text. The HelloWorldAdapter class prints the message during application execution. In this example, the HelloWorldAdapter class is the event source.

The OBR adapter declaration comes after the channel and Oracle CQL processor configurations: <wlevs:adapter id="OBRAdapter" provider = "obr"> followed by the decisionFunction and dictionaryURL properties. The dictionaryURL property is the path to the OBR dictionary file that contains the rules, and decisionFunction property is the OBR function you want to use. The handler1 property is a handle for other components to access this information.

Note:

The OBR adapter does not handle automatic Fact retraction. If the upstream processor outputs a stream, retract the Fact in the rule file when appropriate or when the last rule is triggered according to rule priority.

At the bottom is the HelloWorldBeand configuration. The HelloWorldBean is a Java class that instantiates the HelloworldEvent and HelloWorldAdapter classes.

<wlevs:event-type-repository>
  <wlevs:event-type type-name="HelloWorldEvent">
    <wlevs:class>com.bea.wlevs.event.example.helloworld.HelloWorldEvent
    </wlevs:class>
  </wlevs:event-type>
</wlevs:event-type-repository>
<wlevs:adapter id="helloworldAdapter"
  class="com.bea.wlevs.adapter.example.helloworld.HelloWorldAdapter" >
  <wlevs:instance-property name="message" value="HelloWorld - The time is:"/>
</wlevs:adapter>
 
<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent" >
  <wlevs:listener ref="helloworldProcessor"/>
  <wlevs:source ref="helloworldAdapter"/>
</wlevs:channel>
 
<wlevs:processor id="helloworldProcessor" />
 
<wlevs:channel id="helloworldOutputChannel" event-type="HelloWorldEvent"
  advertise="true" max-threads="0" max-size="0" >
<wlevs:listener ref="OBRAdapter"/>
<wlevs:source ref="helloworldProcessor"/>
</wlevs:channel>
    
<wlevs:adapter id="OBRAdapter" provider = "obr">
<wlevs:instance-property name="decisionFunction" value="handler1" />
<wlevs:instance-property name="dictionaryUrl" value="file:helloworld.rules"/>
<wlevs:listener ref="OutputBean"/>    
</wlevs:adapter>
    
<wlevs:event-bean id="OutputBean"
  class="com.bea.wlevs.example.helloworld.HelloWorldBean">
</wlevs:event-bean>

Configuration File

The configuration file declares the Oracle CQL processor and query rules to use to process the HelloworldEvent received from the OBR adapter. It also provides the OBR adapter handler (handler1) for accessing the OBR rules. The dictionary-url element specifies the path to the OBR dictionary file that contains the rules and decision function you want to use. The decision-function element specifies the name of the OBR decision function you want to use.

<processor>
  <name>helloworldProcessor</name>
  <rules>
    <query id="helloworldRule">
      <![CDATA[ select * from helloworldInputChannel[range 10 slide 5] ]] >
       select * from helloworldInputChannel[now]
    </query>
  </rules>
</processor>
<obr-adapter>
  <name>OBRAdapter</name>
  <dictionary-url>file:helloworld.rules</dictionary-url>
  <decision-function>handler1</decision-function>
</obr-adapter>

4.13 QuickFix Adapter

The QuickFix Adapter is a full-featured messaging engine for handling the real-time electronic exchange of securities transactions according to the Financial Information eXchange (FIX) standard.

The QuickFix adapter listens for FIX messages, converts the FIX messages to Oracle Stream Analytics events, and sends the events to the next stage for processing. See Sample Application with QuickFix Adapter in Getting Started with Event Processing for Oracle Stream Analytics for an example application.

The QuickFix adapter supports all of the configuration data that the underlying QuickFIX engine provides. For information about FIX configuration data, see the “QuickFIX/J User Manual" at http://www.quickfixj.org/documentation/.

Note:

Oracle Stream Analytics does not support QuickFIX dynamic acceptor sessions in the 12c release.

The QuickFix adapter does not support SSL.

4.13.1 Supported QuickFIX Versions and Unsupported Message Types

The QuickFIX engine supports a wide-range of message types and validates the data in those message types. If your Oracle Stream Analytics application requires a message that is not supported by the QuickFIX engine, you must create a custom handler by extending the default QuickFIX message handler and overriding the appropriate handler method(s).

The QuickFIX adapter supports the following QuickFIX versions. The messages listed under each QuickFIX version are not supported.

QuickFIX Version 4.0

The following message types are not supported in this version:

Heartbeat, Logon, TestRequest, ResendRequest, Reject, SequenceReset, Logout.

QuickFIX Version 4.1

The following message types are not supported in this version:

Heartbeat, Logon, TestRequest, ResendRequest, Reject, SequenceReset, Logout.

QuickFIX Version 4.2

The following message types are not supported in this version:

Heartbeat, Logon, TestRequest, ResendRequest, Reject, SequenceReset, Logout.

QuickFIX Version 4.3

The following message types are not supported in this version:

Heartbeat, Logon, TestRequest, ResendRequest, Reject, SequenceReset, Logout.

QuickFIX Version 4.4

The following message types are not supported in this version:

Heartbeat, Logon, TestRequest, ResendRequest, Reject, SequenceReset, Logout.

4.13.2 Configure QuickFix Adapters

Oracle JDeveloper does not currently provide a component for the QuickFix adapter. However, you can create a QuickFix adapter by adding entries to the assembly and configuration files for your Oracle Stream Analytics application.

Assembly File

<wlevs:adapter id="QuickFIXInbound" provider="quickfix-inbound"/>

Configuration File

The configuration requires at least one default-session element and can have zero or more session elements. The session element represents a group of configuration settings that is used for a particular QuickFix Session. If there are more than one sessions required, then common configurations are grouped into the default-session element. All of the session tags, by default, inherit all of the elements declared in the default-session tag.

The BeginString value in the config-name element indicates the FIX message version used. BeginString data is not validated by the QuickFIX engine.

According to the documentation at http://quickfixj.org/quickfixj/usermanual/1.5.3/usage/configuration.html, the required and supported identifier fields are BeginString, SenderCompID and TargetCompID.

Other fields such as SenderSubID, SenderLocationID, TargetSubID and TargetLocationID are not mandatory and are not supported in release 12.1.3.

<quickfix-adapter>
    <name>quickfixAdpater</name>
    <event-type>MyConfigEvent</event-type>
    <default-session description="default configuration">
      ...
      <configuration description="identifier-message format">
        <config-name>BeginString</config-name>
        <config-value>FIXT.0.1</config-value>
      </configuration>
    </default-session>
    <session description="ordertracker configuration">
      <configuration description="identifier-acceptor">
        <config-name>SenderCompID</config-name>
        <config-value>QA</config-value>
      </configuration>
      <configuration description="identifier-initiator">
        <config-name>TargetCompID</config-name>
        <config-value>ORACLE</config-value>
      </configuration>
    </session>
</quickfix-adapter> 

4.13.3 Configure a Socket-Based Acceptor Failover

You can configure a simple failover scheme for socket-based acceptors by running two acceptor processes with a shared MessageStore. One process is the active acceptor and the other process is the standby for any session. When one acceptor process dies, the client having been configured with failover addresses, attempts to log on to the other acceptor. Upon successful logon, the message store for that session refreshes and the failover session continues normally.

The following configuration settings are required for a successful failover scheme.

RefreshMessageStoreAtLogon=Y
SocketConnectPort1=8392
SocketConnectHost1=8.8.8.8
SocketConnectPort2=2932
SocketConnectHost2=12.12.12.12

4.14 REST Adapter

The Representational State Transfer (REST) inbound adapter receives HTTP Post data from an external client through the HTTP protocol. A REST adapter can accept data in XML, CSV, and JavaScript Object Notation (JSON) and convert that data into the Oracle Stream Analytics event configured on the inbound REST adapter.

To convert data to events, the REST adapter requires a Java Architecture for XML Binding (JAXB) mapper and a CSV mapper. A mapper is a JavaBean class that implements the marshalling and unmarshalling of the incoming data.

Oracle JDeveloper does not currently provide a component for the REST adapter. However, you can create a REST adapter by adding entries to the assembly and configuration files for your Oracle Stream Analytics application.

Assembly File

The following assembly file shows the settings for an inbound REST adapter that handles input data of type XML, CSV, and JSON.

<bean id="xmlMapperBean" class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory"
   factory-method="create" />
<bean id="csvMapperBean" class="com.oracle.cep.mappers.csv.CSVMapper" />

<bean id="jsonMapperBean"
  class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory"
  factory-method="create" />
<wlevs:adapter id="restInbound" provider="rest-inbound">
<wlevs:instance-property name="mapper" ref="xmlMapperBean" />
<wlevs:instance-property name="csvMapper" ref="csvMapperBean" />
<wlevs:instance-property name="eventTypeName" value="CallCenterActivity" />
<wlevs:instance-property name="contextPath" value="/testhttpadapter" />
</wlevs:adapter>

The following assembly file shows the settings for an outbound REST adapter that unmarshalls an event to the XML or JSON content types.

<wlevs:adapter id="restXmlOutbound" provider="rest-outbound">
                <wlevs:instance-property name="mapper" ref="xmlMapperBean" />
                  <wlevs:instance-property 
   name="url" value="http://localhost:9002/testadapter" />
        </wlevs:adapter>
 
        <wlevs:adapter id="restJsonOutbound" provider="rest-outbound">
                <wlevs:instance-property name="mapper" ref="jsonMapperBean" />
                <wlevs:instance-property name="url" 
   value="http://localhost:9002/testadapter" />
        </wlevs:adapter>

Configuration File

The following configuration file shows the rest-adapter configuration for receiving POST data, and the jaxb-mapper configuration for handling incoming XML and JSON data.

 <rest-adapter>
    <name>restInbound</name>
    <event-type-name>CallCenterActivity</event-type-name>
    <context-path>/testhttpadapter</context-path>
 </rest-adapter>
 <jaxb-mapper>
    <name>xmlMapperBean</name>
    <event-type-name>CallCenterActivity</event-type-name>
    <metadata>external_metadata_case1.xml</metadata>
 </jaxb-mapper>
<json-mapper>
  <name>jsonMapperBean</name>
  <event-type-name>CallCenterActivity</event-type-name>
  <media-type>application/json</media-type>
</json-mapper>

The following configuration file shows the settings for an outbound REST adapter that unmarshalls an event to the XML or JSON content types.

 <rest-adapter>
   <name>restXmlOutbound</name>
   <url>http://localhost:9002/testrestadapter</url>
 </rest-adapter>
 
 <rest-adapter>
   <name>restJsonOutbound</name>
   <url>http://localhost:9002/testrestadapter</url>
 </rest-adapter>
 
<jaxb-mapper>
  <name>xmlMapperBean</name>
  <event-type-name>CallCenterActivity</event-type-name>
  <metadata>external_metadata_case1.xml</metadata>
</jaxb-mapper>
 
<json-mapper>
  <name>jsonMapperBean</name>
  <event-type-name>CallCenterActivity</event-type-name>
  <media-type>application/json</media-type>
</json-mapper> 

Note:

To support XML content type in the REST inbound and outbound adapters, use the XML Mapper. Adding XML annotations or generating JAXB bindings file automatically is not supported in this release.

4.15 RMI Adapters

Use the RMI Inbound and Outbound adapters to read event information from and write event information to an RMI connection. The best way to create RMI adapters is through the Oracle JDeveloper components window.

The following assembly and configuration files generated by Oracle JDeveloper show the RMI inbound and outbound adapter configurations.

Note:

The RMI client connection cannot be closed. See question F1 at http://docs.oracle.com/javase/8/docs/technotes/guides/rmi/faq.html.

Assembly File

The inbound RMI adapter has a JNDI name to enable inbound clients to locate the EPN.

<wlevs:adapter id="rmi-inbound-adapter" provider="rmi-inbound">
    <wlevs:instance-property name="jndiName"
      value="TradeReportApplication.TradeReport/rmi-inbound-adapter"/>
  </wlevs:adapter>
<wlevs:adapter id="rmi-outbound-adapter" provider="rmi-outbound"/>

Configuration File

The JNDI name enables the RMI outbound adapter to locate the output resource for the event data. The JNDI provider enables directory service implementations to be plugged into the JNDI framework.

In this example, the JNDI provider is the default Oracle WebLogic T3 client. Oracle WebLogic T3 clients are Java RMI clients that use the Oracle T3 protocol to communicate with Oracle WebLogic Server. T3 clients typically outperform other client types.

<rmi-adapter>
    <name>rmi-outbound-adapter</name>
    <jndi-name>RMIOutboundJNDIName</jndi-name>
    <jndi-provider-url>t3://localhost:7001</jndi-provider-url>
    <jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
 </rmi-adapter>

4.16 Twitter Adapter

You can use the inbound Twitter adapter to read tweets.

The inbound adapter reads tweets from the Twitter using Twitter API and sends these tweets to the processor through the inbound channel. The adapter also filters tweets based on various filter options such as hashtag, users, and language. The input for these operations comes from the inbound adapter.

The Twitter adapters enable you to create an EPN diagram with a Twitter Inbound Adapter and connect it to any Outbound Adapter.

This section includes the following sections:

4.16.1 Configuration of the Twitter Adapter

You need to do minimal configuration for both the inbound and outbound adapters.

<wlevs:instance-property name="oauthConsumerKey" value="XXXXXXXXXXXX" />
<wlevs:instance-property name="oauthConsumerSecret" value="XXXXXXXXXXX" />
<wlevs:instance-property name="oauthAccessToken" value="XXXX-XXXXXXX" />
<wlevs:instance-property name="oauthAccessTokenSecret" value="XXXXXXXXX" />
<wlevs:instance-property name="httpProxyHost" value="www-proxy.us.oracle.com" />
<wlevs:instance-property name="httpProxyPort" value="80" />

4.16.2 Dependencies for the Twitter Adpater

Twitter adapters depend on the following twitter4j libraries.

  • twitter4j-core

  • twitter4j-async

  • twitter4j-stream.

4.16.3 Twitter Inbound Adapter

The Twitter inbound adapter provides filter streaming mode to receive tweets from Twitter.

The following filter options are available. Comma separated values (csv) can be used for multiple values:

  • Filter: Filters tweets based on the filter options provided at time of adapter configuration. The following filter options are available.

    • hashtagsToTrack: Specify a string for hashtag to track without the # symbol.

    • usersToTrack: Specify a long Twitter userID or the user handle to track user tweets.

    • languagesToTrack: Specify language codes to filter specific language tweets.

      Note:

      At least one of the hashtagsToTrack or usersToTrack properties must be specified at the time the adapter is configured.

      Remember

      The logical operator OR is used when you specify both hashtagsToTrack and usersToTrack. For more information, see https://dev.twitter.com/streaming/reference/post/statuses/filter.

Note:

The Twitter API has API Rate Limits. For more information, see https://dev.twitter.com/rest/public/rate-limiting.

Supported Languages

Languages supported by Twitter for Websites widgets and buttons are listed in the following table:

Table 4-2 Supported Languages and Language Codes

Name Language Code

English (default)

en

Arabic

ar

Bengali

bn

Czech

cs

Danish

da

German

de

Greek

el

Spanish

es

Persian

fa

Finnish

fi

Filipino

fil

French

fr

Hebrew

he

Hindi

hi

Hungarian

hu

Indonesian

id

Italian

it

Japanese

ja

Korean

ko

Malay

msa

Dutch

nl

Norwegian

no

Polish

pl

Portuguese

pt

Romanian

ro

Russian

ru

Swedish

sv

Thai

th

Turkish

tr

Ukrainian

uk

Urdu

ur

Vietnamese

vi

Chinese (Simplified)

zh-cn

Chinese (Traditional)

zh-tw

Note:

Based on the Twitter website, zh-cn language code is for Simplified Chinese and zh-tw is for Traditional Chinese. However, the Twitter metadata defines zh only for Chinese. So, the Twitter Inbound adapter uses zh language code both for Simplified Chinese and Traditional Chinese language.

For more information, see https://dev.twitter.com/web/overview/languages.

4.16.4 Example with the Filter Option

This example is a context file that uses the filter option.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
xmlns:jdbc="http://www.oracle.com/ns/ocep/jdbc/"
xmlns:hadoop="http://www.oracle.com/ns/oep/hadoop/"
xmlns:nosqldb="http://www.oracle.com/ns/oep/nosqldb/"
xsi:schemaLocation=" http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://www.bea.com/ns/wlevs/spring
http://www.bea.com/ns/wlevs/spring/ocep-epn.xsd
http://www.oracle.com/ns/ocep/jdbc
http://www.oracle.com/ns/ocep/jdbc/ocep-jdbc.xsd
http://www.oracle.com/ns/oep/hadoop
http://www.oracle.com/ns/oep/hadoop/oep-hadoop.xsd
http://www.oracle.com/ns/oep/nosqldb
http://www.oracle.com/ns/oep/nosqldb/oep-nosqldb.xsd"> 
<wlevs:event-type-repository>
<wlevs:event-type type-name="tweetEvent">
<wlevs:class>com.bea.wlevs.sample.Tweet</wlevs:class>
</wlevs:event-type>
</wlevs:event-type-repository>
<wlevs:adapter id="twitteradapter" provider="twitter-inbound">
<wlevs:listener ref="inboundchannel"/>
<wlevs:instance-property name="streamingMode" value="filter" /> 
<wlevs:instance-property name="hashtagsToTrack" value="IndvsSA" />
<wlevs:instance-property name="usersToTrack" value="338674755" />
<wlevs:instance-property name="languagesToTrack" value="en"/>
<wlevs:instance-property name="oauthConsumerKey" value="XXXXXX" />
<wlevs:instance-property name="oauthConsumerSecret" value="XXXXXX" />
<wlevs:instance-property name="oauthAccessToken" value="XXXX-XXXX" />
<wlevs:instance-property name="oauthAccessTokenSecret" value="XXXXXXXX" />
<wlevs:instance-property name="httpProxyHost" value="www-proxy.us.oracle.com" />
<wlevs:instance-property name="httpProxyPort" value="80" />
</wlevs:adapter>
<wlevs:channel id="inboundchannel" event-type="tweetEvent">
        <wlevs:listener ref="tweetprocessor"/> 
</wlevs:channel>
<wlevs:processor id="tweetprocessor"/> 
        <wlevs:channel id="outboundchannel" event-type="tweetEvent">
                <wlevs:listener ref="twiiteroutbountadapter"/>
                <wlevs:source ref="tweetprocessor"/> 
        </wlevs:channel>
<wlevs:adapter id="twiiteroutbountadapter" class="com.bea.wlevs.sample.OutputBean" advertise="true"/>
</beans>

4.17 MQTT Adapter

The MQTT Inbound/Outbound adapter receives any MQTT messages from the topics set up on the MQTT broker on the configured host, converts them, and sends them downstream.

The MQTT broker is in charge of the infrastructure required for topics used for the message flow. The MQTT messages are received by the Inbound Adapter and are converted to the Oracle Stream Analytics event type. The converted messages are transmitted downstream where they are processed into a Complex event or filtered based on a cql query. They are then output to the Oracle Stream Analytics sink. If you provide multiple broker URLs, the MQTT Adpter sends the messages to the broker which MQTT is able to connect first based on the broker URLs present in the serverURIs list. As part of serverURIs you may provide multiple broker URLs. You set these URIs in MqttConnectOptions.setServerURIs() in MQTT adapter which is used to build connection with the broker.

When an attempt to connect is initiated the client starts with the first serverURI in the list and works through the list until a connection is established with a server. If a connection cannot be made to any of the servers then the connect attempt fails. Specifying a list of servers that a client may connect to has several uses:

  • High Availability and reliable message delivery.

  • Some MQTT servers support a high availability feature where two or more "equal" MQTT servers share state.

  • An MQTT client can connect to any of the "equal" servers and be assured that messages are reliably delivered and durable subscriptions are maintained no matter which server the client connects to.

  • The cleansession flag must be set to false if durable subscriptions and/or reliable message delivery is required.

Hunt List

A set of servers may be specified that are not "equal" (as in the high availability option). As no state is shared across the servers reliable message delivery and durable subscriptions are not valid. The cleansession flag must be set to true if the hunt list mode is used.

Dependencies

The main external dependency is on the MQTT client jar file:

org.eclipse.paho.mqtt-client-0.4.0.jar

This supports MQTT 3.1.1.

Installation

There are no special installation requirements for the MQTT adapter. This is similar to any other adapter jar. The jar name is com.oracle.cep.adapters.mqtt.jar.

This jar will be available as part of the installer at <INSTALL_HOME>/oep/modules folder. To have it as part of the server, you must add it to thebundleloader.xml file.

<bundle>
        <startlevel>3</startlevel>
        <location>oep/modules</location>
        <name>com.oracle.cep.adapters.mqtt.jar</name>
</bundle>

The OSGi bundle starts at level 3.

Inbound Adapter

The Inbound Adapter converts messages using built-in mapper or a custom converter. If the message data is likely to be CSV, JSON and XML, a default mapper can be used.

Outbound Adapter

The Outbound Adapter can publish EventProcessing events to MQTT topics. The converted topics can then be picked up by another inbound adapter or any other system.

You can use the MQTT Inbound/Outbound adapter as part of either EPN or Oracle Stream Analytics applications.

Note:

The MQTT broker must be installed to use the MQTT Adapter.

4.17.1 MQTT Configuration Parameters

Both the Inbound and Outbound adapters have parameters that must be configured. These tables indicate which are required and which are optional.

MQTT Inbound Configuration

The configuration parameters for the Inbound MQTT Adapter are as follows:

Table 4-3 MQTT Inbound Adapter Configuration Parameters

Property Name Data Type Required? Description

serverURIs

java.lang.String

Yes

Comma separated list of server URIs that the client may connect to. Example:

tcp://iot.eclipse.org:1883

topicName

java.lang.String

Yes

Comma separated list of topics that the client will subscribe to. Each value can include wildcards.

qualityOfService

java.lang.String

No

Comma separated list of QoS's for each of the specified topics. Valid values are 0 (At Most Once), 1 (At Least Once) and 2 (Exactly Once). If nothing is specified, it defaults to 1.

loadBalanceEnabled

java.lang.Boolean

No

Set this property to true to force every member in the cluster to work collectively to handle the load. By default, this property is false.

clientId

java.lang.String

No

The unique client identifier. If not specified, it will be generated automatically.

persistenceType

java.lang.String

No

The persistent data store for the in-flight messages, enabling the delivery to the QoS specified. Valid values are memory and file. If nothing is specified, it defaults to memory.

directory

java.lang.String

Yes

If the persistenceType property is set to "file", then this property is required. It specifies which folder must be used to store messages.

mapper

com.oracle.cep.mappers.api.Mapper

No

Mapper object used to convert the messages from the topic to event types. Normally, SX/OEP developers will set up instances of the built-in mappers such as CSV, JSON and XML, but the adapter can handle custom mapper implementations.

eventTypeName

java.lang.String

"Yes"

If the mapper property is set to use a CSV mapper, then this property is required. It specifies which event type is being handled

converter

oracle.ateam.sx.adapters.mqtt.api.InboundConverter

No

Converter object used to convert the messages from the topic to event types. Converters provide ways for developers to implement their conversion logic. It also allows the reading of message metadata such as QoS, duplicate and retained.

cleanSession

java.lang.Boolean

No

Specifies whether the client and server should remember their state across restarts and reconnects. It defaults to true which means that the subscriptions are non-durable.

userName

java.lang.String

No

The user name to use for the connection.

password

java.lang.String

No

The password to use for the connection.

sslContextProvider

java.lang.String

No

The SSL context provider to use for the connection.

sslKeyStore

java.lang.String

No

The SSL key store to use for the connection.

sslKeyStorePassword

java.lang.String

No

The SSL key store password to use for the connection.

sslKeyStoreType

java.lang.String

No

The SSL key store type to use for the connection.

sslKeyStoreProvider

java.lang.String

No

The SSL key store provider to use for the connection.

sslTrustStore

java.lang.String

No

The SSL trust store to use for the connection.

sslTrustStorePassword

java.lang.String

No

The SSL trust store password to use for the connection.

sslTrustStoreType

java.lang.String

No

The SSL trust store type to use for the connection.

sslTrustStoreProvider

java.lang.String

No

The SSL trust store provider to use for the connection.

autoReconnect

java.lang.Boolean

No

Specifies that if the connection is lost, if it should reconnect automatically. It defaults to true.

connectionTimeout

java.lang.Integer

No

The connection timeout value. This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default value is 30 seconds.

keepAliveInterval

java.lang.Integer

No

The "keep alive" interval. This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. The default value is 60 seconds.

connectionWaitTime

java.lang.Integer

No

The amount of time, measured in seconds, to wait until a connection can be established. It allows time for the adapter connect properly. The default value is 1 second.

connectionAttempts

java.lang.Integer

No

The number of attempts that the adapter will try to connect. The default value is 3 attempts. If set to zero ("0") then it tries forever.

connectionDelay

java.lang.Integer

No

The amount of time, measured in seconds, to wait after a tentative connection fail. This delay takes place for every tentative connection. The default value is 5 seconds.

MQTT Outbound Configuration

The configuration parameters for the Oubound MQTT Adapter are as follows:

Table 4-4 MQTT Outbound Adapter Configuration Parameters

Property Name Data Type Required? Description

serverURIs

java.lang.String

Yes

Comma separated list of server URIs that the client may connect to. Example:

tcp://iot.eclipse.org:1883

topicName

java.lang.String

Yes

Comma separated list of topics that the client will subscribe to. Each value can include wildcards.

qualityOfService

java.lang.String

No

Comma separated list of QoS's for each of the specified topic. Valid values are 0 (At Most Once), 1 (At Least Once) and 2 (Exactly Once). If nothing is specified, it defaults to 1.

clientId

java.lang.String

No

The unique client identifier. If not specified, it will be generated automatically.

persistenceType

java.lang.String

No

The persistent data store for the in-flight messages, enabling the delivery to the QoS specified. Valid values are: memory and file. If nothing is specified, it defaults to memory.

directory

java.lang.String

Yes

If the persistenceType property is set to "file", then this property is required. It specifies which folder must be used to store messages.

mapper

com.oracle.cep.mappers.api.Mapper

No

Mapper object used to convert the event types into messages. Normally, SX/OEP developers will set up instances of the built-in mappers such as JSON and XML, but the adapter can handle any type of JAXB-based mapper provided.

converter

oracle.ateam.sx.adapters.mqtt.api.OutboundConverter

No

Converter object used to convert the event types into messages. Converters provide ways for developers to implement their conversion logic. It also allows the writing of message metadata such as QoS, duplicate and retained.

cleanSession

java.lang.Boolean

No

Specifies whether the client and server should remember their state across restarts and reconnects. It defaults to true which means that the subscriptions are non-durable.

userName

java.lang.String

No

The user name to use for the connection.

password

java.lang.String

No

The password to use for the connection.

sslContextProvider

java.lang.String

No

The SSL context provider to use for the connection.

sslKeyStore

java.lang.String

No

The SSL key store to use for the connection.

sslKeyStorePassword

java.lang.String

No

The SSL key store password to use for the connection.

sslKeyStoreType

java.lang.String

No

The SSL key store type to use for the connection.

sslKeyStoreProvider

java.lang.String

No

The SSL key store provider to use for the connection.

sslTrustStore

java.lang.String

No

The SSL trust store to use for the connection.

sslTrustStorePassword

java.lang.String

No

The SSL trust store password to use for the connection.

sslTrustStoreType

java.lang.String

No

The SSL trust store type to use for the connection.

sslTrustStoreProvider

java.lang.String

No

The SSL trust store provider to use for the connection.

autoReconnect

java.lang.Boolean

No

Specifies that if the connection is lost, if it should reconnect automatically. It defaults to true.

connectionTimeout

java.lang.Integer

No

The connection timeout value. This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default value is 30 seconds.

keepAliveInterval

java.lang.Integer

No

The "keep alive" interval. This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. The default value is 60 seconds.

connectionWaitTime

java.lang.Integer

No

The amount of time, measured in seconds, to wait until a connection can be established. It allows time for the adapter connect properly. The default value is 1 second.

4.17.2 MQTT Receiver EPN

This sample EPN demonstrates how to use an MQTT inbound adapter and subscribe to MQTT topics.

This EPN subscribes to a topic named temperatures on the MQTT Broker. The MQTT Broker runs on the specified server URIs. This shows the high availability feature. Even if one of the servers fails, it safely switches over to the other server.

For an Inbound Adapter add provider="mqtt-inbound" in the adapter definition in the context file.

<wlevs:adapter id="mqttInbound" provider="mqtt-inbound">
        <wlevs:listener ref="inboundTemperatures"/>
        <wlevs:instance-property name="serverURIs" value="tcp://10.191.195.134:1883,tcp://10.191.211.254:1884" />
        <wlevs:instance-property name="topicName" value="temperatures" />
        <wlevs:instance-property name="qualityOfService" value="0" />
        <wlevs:instance-property name="mapper" ref="jsonMapper" />
</wlevs:adapter>

4.17.3 MQTT Sender EPN

This sample EPN demonstrates how to use an MQTT outbound adapter and publish events to MQTT topics.

This EPN publishes to a topic named temperatures on the MQTT Broker. The MQTT Broker runs on the specified server URIs. This shows the High Availability feature. Even if one of the servers fails, it safely switches over to the other server.

For an Outbound Adapter add provider="mqtt-outbound" in the adapter definition in the context file.

<wlevs:adapter id="mqttOutbound" provider="mqtt-outbound">
        <wlevs:instance-property name="serverURIs" value="tcp://10.191.195.134:1883,tcp://10.191.211.254:1884" />
        <wlevs:instance-property name="topicName" value="temperatures" />
        <wlevs:instance-property name="qualityOfService" value="0" />
        <wlevs:instance-property name="mapper" ref="jsonMapper" />
</wlevs:adapter>

4.18 Kafka Adapter

You can create EPN diagrams with any available inbound adapter, including kafka inbound, or a custom inbound adapter and connect it to Kafka outbound adapter.

The Kafka inbound adapter can read messages from one or multiple Kafka topics. It converts these messages to events using mapper properties like CSV, JSON or XML. The Kafka outbound adapter can publish events by converting them to messages using mapper properties like JSON and XML to Kafka topics.

eventTypeName property in the Spring file is required only when the CSV mapper is chosen as the mapper strategy, it points to kafka-inbound and kafka-outbound adapter config.

Whenever JSON and XML mappers are configured, they are associated with event types using Java classes. The Tuple Events (declaratively defined) can be used if these are not used with mappers. The following is a sample context content:

<wlevs:event-type-repository>
        <wlevs:event-type type-name="sx-34-2-Explore_kafka_with_bigdecimal-2">
                <wlevs:class>
                        oracle.wlevs.strex.generated.sx_34_2_Explore_kafka_with_bigdecimal_2
                </wlevs:class>
                </wlevs:event-type>
        </wlevs:event-type type-name="sx-34-2-Explore_kafka_with_bigdecimal-3">
        <wlevs:properties>
                <wlevs:property name="sensorId" type="java.lang.String"/>
                <wlevs:property name="sensorValue" type="bigdecimal"/>
        </wlevs:properties>
</wlevs:event-type>
        <wlevs:event-type type-name="sx-34-2-Explore_kafka_with_bigdecimal-1">
        <wlevs:properties>
                <wlevs:property name="sensorId" type="java.lang.String"/>
                <wlevs:property name="sensorValue" type="bigdecimal"/>
        </wlevs:properties>
        </wlevs:event-type>
</wlevs:event-type-repository>
<bean class="com.oracle.cep.mappers.csv.CSVMapper" id="csvMapperBean"/>
<bean class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory" factory-method="create" id="jsonMapperBean">
<property name="eventTypeName" value="sx-34-2-Explore_kafka_with_bigdecimal-2"/>
<property name="mediaType" value="application/json"/>
</bean>

Dependencies

Kafka inbound and outbound adapters depend on the following libraries.

Table 4-5 Kafka Adapter Dependencies

Library Version Library name

Apache Kafka

2.10-0.8.2.1

kafka_2.10-0.8.2.1.jar

Apache kafka-clients

0.8.2.1

kafka-clients-0.8.2.1.jar

Metrics Core Library

2.2.0

metrics-core-2.2.0.jar

Scala Standard Library

2.10.4

scala-library-2.10.4.jar

ZooKeeper

3.4.6

zookeeper-3.4.6.jar

ZkClient

0.3

zkclient-0.3.jar

Installation Requirements

Zookeeper and Kafka need to be installed and running to use Oracle Stream Analytics Kafka adapters. The following steps can be used to set-up Kafka in a local development Linux environment.

  1. Download Apache Kafka from http://kafka.apache.org/downloads.html.

  2. Extract the contents of the downloaded file into a folder.

  3. Open a terminal window.

  4. In the terminal window, navigate to the bin folder from where you extracted the contents.

  5. Execute ./zookeeper-server-start.sh ../config/zookeeper.properties.

  6. Open another terminal window. This will be the second terminal.

  7. In the terminal window, navigate to the bin folder from where you extracted the contents.

  8. Execute ./kafka-server-start.sh ../config/server.properties.

You can create a deployment with the following characteristics:

  • One Zookeeper instance running on localhost and listening on port 2181

  • One Kafka broker instance running on localhost and listening on port 9092.

4.18.1 Inbound Adapter receiving messages from Kafka

This is a sample application that subscribes to a topic named stocks and continuously fetches messages.

When a message arrives, it is automatically transformed from the JSON format into the expected event type. A CQL processor sends the events downstream to an output channel. Finally, the output channel sends these events to a custom adapter that writes the contents in the standard output console.

<wlevs:adapter id="kafkaInbound" provider="kafka-inbound">
        <wlevs:listener ref="inboundStockQuotes"/>
        <wlevs:instance-property name="zookeeper" value="localhost:2181" />
        <wlevs:instance-property name="topicName" value="stocks" />
        <wlevs:instance-property name="mapper" ref="jsonMapper" />
</wlevs:adapter>

4.18.2 Outbound Adapter sending messages to Kafka

This is a sample application that continuously creates stock quotes with randomly generated prices. The interval between each generation is one second.

These stock quotes are queried by a CQL processor that sends out the events into an output channel. Finally, the output channel sends these events to the Kafka outbound adapter, which writes these events in a topic named stocks in JSON format.

<wlevs:adapter id="kafkaOutbound" provider="kafka-outbound">
        <wlevs:instance-property name="bootstrapServers" value="localhost:9091,localhost:9092" />
        <wlevs:instance-property name="topicName" value="stocks" />
        <wlevs:instance-property name="mapper" ref="jsonMapper" />
</wlevs:adapter>

4.18.3 Kafka Configuration

Kafka Adapter must be configured for both Inbound and Outbound Adapters.

The supported adapter properties for Kakfa inbound and outbound adapters are listed in the sections below.

Kafka Inbound Adapter Configuration

The configuration parameters for the Inbound Kafka Adapter are as follows:

Table 4-6 Kafka Inbound Adapter Properties

Property Name Data Type Required Description

zookeeper

java.lang.String

Yes

Informs the adapter how to connect to the Zookeeper server. If more than one Zookeeper server exists for HA, than it should mention all of them separated by comma.

Typical value for this property is: localhost:2181

topicName

java.lang.String

Yes

The name of the Kafka topic.

groupId

java.lang.String

No

Each consumer process must belong to a group. Thus, this property informs to which group bind to. If no value is set in this property, the adapter will automatically generate one, using the following notation: serverName + "-" + appName + "-" + adapterId.

mapper

com.oracle.cep.mappers.api.Mapper

No

Mapper object used to convert the messages from the topic to event types. Normally, Oracle Stream Analytics developers will set up instances of the built-in mappers such as CSV, JSON and XML, but the adapter can handle custom mapper implementations.

eventTypeName

com.oracle.cep.mappers.api.Mapper

No

Required only when the CSV mapper is chosen as the mapper strategy. This property should state which event type is being handled.

converter

com.oracle.cep.adapters.kafka.api.InboundConverter

No

Converter object used to convert the messages from the topic to event types. Converters provide ways to developers to implement their conversion logic. It also allows the reading of message metadata such as key, partition and offset.

consumerThreads

java.lang.Integer

No

To increase message consumption throughput, this property can be adjusted to create more listener threads.

It defaults to one if nothing is specified.

The total number of threads (computed as the sum of all threads within the Oracle Stream Analytics cluster) must keep up with the number of partitions set in the topic on the cluster side.

workManagerName

java.lang.String

No

If no value is specified, the default work manager is used to schedule the threads used to listen messages from Kafka topics. To change this behavior, this property can define a custom work manager. The value must be the work manager name, the same one set in the config.xml configuration file.

consumerConfigs

java.util.Properties

No

Allow Oracle Stream Analytics developers to set any native Kafka property Related to the consumer API. A complete list of possible properties is available at: http://kafka.apache.org/082/documentation.html#consumerconfigs.

Kafka Outbound Adapter Configuration

The configuration parameters for the Outbound Kafka Adapter are as follows:

Table 4-7 Kafka Outbound Adapter Properties

Property Name Data Type Required Description

bootstrapServers

java.lang.String

Yes

Informs the adapter how to connect to the Kafka cluster, by mentioning one or more servers (separated by comma) that performs the initial bootstrap discovery of the members. Typical value for this property is: localhost:9091,localhost:9092

topicName

java.lang.String

Yes

The name of the Kafka topic.

mapper

com.oracle.cep.mappers.api.Mapper

No

Mapper object used to convert the event types into messages. Normally, Oracle Stream Analytics developers will set up instances of the built-in mappers such as JSON and XML, but the adapter can handle any type of JAXB-based mapper provided.

converter

com.oracle.cep.adapters.kafka.api.OutboundConverter

No

Converter object used to convert the event types into messages. Converters provide ways to developers to implement their conversion logic. It also allows the writing of message metadata such as key and partition.

partitionProperty

java.lang.String

No

Specifies which property of the event type holds the information about which partition the message will be sent. Developers must take care of only using the java.lang.Integer type to set the value of the partition.

keyProperty

java.lang.String

No

Specifies which property of the event type holds the information about the key that will be used to figure out the partition. Developers can chose between byte[] and String to set the value of the key, but any other type will throw an error.

producerConfigs

java.util.Properties

No

Allow Oracle Stream Analytics developers To set any native Kafka property Related to the producer API. A complete list of possible Properties is available at: http://kafka.apache.org/082/documentation.html#producerconfigs.

4.19 Coherence Adapter

Coherence Cache is an in-memory distributed cache used for providing fast access to frequently used data. The Event Processing provides the capability to process incoming stream of events in real-time. In a coherence cache, data is stored in a distributed in-memory cache. Each entry in cache is a key-value pair. Every cache data operation requires user to specify both key and value.

Coherence cache supports following operations:

  • INSERT: adds a new {key,value} pair

  • DELETE: deletes an existing {key,value} pair

  • UPDATE: modifies an existing {key,value} pair.

Oracle Stream Analytics provides two out of the box adapters which can either listen change events of a cache as incoming data stream or write output stream into a cache.

4.19.1 Coherence Inbound Adapter

Coherence Inbound Adapter provides the capability to listen change events from a cache. Coherence inbound adapter registers itself as a listener to coherence cache. On every data operation in cache, cache notifies inbound adapter with the change details. Coherence inbound adapter receives the change events and transforms them to Event Processing events and pushes to downstream for further event processing.

Cache supports the following three types of change events depending on type of data operation:

  • INSERT: to insert a new {key,value} pair into cache

  • DELETE: to remove an existing {key,value} pair from cache

  • UPDATE: to update an existing {key,old-value} pair with {key,new-value} pair.

Note:

Currently coherence inbound adapter only listens to INSERT change events, i.e., the adapter listens to all new {key,value} pairs incoming into cache. Any change event that is generated as a result of UPDATE or DELETE operation of cache will be ignored by inbound adapter.

Sequence of Events

The Oracle Stream Analytics coherence inbound adapter can receive insert events from cache. There are two methods to insert data into the cache which are inherited from the Map class:

If you are particular caring about the sequence of the inserted events, use LinkedHashMap to do batch insert operation for the cache. The LinkedHashMap class remembers the sequence of events inserted. The EventProcessing can receive events in the same sequence as each event is inserted into the LinkedHashMap. User caring about inserting sequence should use LinkedHashMap to do batch insert operation for the cache.

Example for batch insert operation for the cache using LinkedHashMap:

Map batch = new LinkedHashMap();
while(condition)
{
        batch.put(k1, v1)
        batch.put(k2, v2)
        batch.put(k3, v3)
        ......
}

cohrenceCache.putAll(batch)

The EventProcessing coherence inbound adapter can receive events from the cache in sequence which preserve the original order: k1, k2, k3.

Configuration

The following is the configuration element to define a coherence inbound adapter in Oracle Stream Analytics application:

<wlevs:adapter id="coherenceInboundAdapter" provider="coherence-inbound">
        <wlevs:instance-property name="cache" ref="OracleCoherenceCache"/>
        <wlevs:instance-property name="eventType" value="EmployeeEvent"/>
</wlevs:adapter>

where OracleCoherenceCache is a cache element defined already in application EPN configuration. See OracleCoherenceCache and EmployeeEvent.

Configuration Parameters

The following are the details about the various configuration element attributes and properties:

  • cache — this property specifies the cache whose change events user wants to listen. The value must refer to a coherence cache element which is defined in the EPN configuration. This is a mandatory property of the adapter configuration.

  • eventType — this property specifies the type of events propagated by the inbound adapter to the downstream stage. The event type should be already defined in the event type repository of the application. This is also a mandatory property of the adapter configuration.

As soon as an event is entered into coherence cache by user, cache notifies the adapter and provides the change event to listening inbound adapter. Coherence cache pushes the change event to listening inbound adapter that maintains an unbounded buffer queue. The reason to maintain buffer queue is to return the callback thread which invokes map listener interface implemented by inbound adapter. Coherence adapter dequeues these events in its own thread and pushes it to the downstream stage. After adapter dequeues the change event, it converts the received object to event object of type provided in eventType.

Note:

The change event (object inserted into cache) should be of type java.util.Map where map entries are attribute name and value pair.

4.19.2 Coherence Outbound Adapter

Coherence outbound adapter provides the capability to write events into a coherence cache. As cache is a map and every entry in cache is in form of {key, value} pair, outbound adapter determines the key for every event and put that event into cache using the calculated key.

The following is the configuration element to define a coherence inbound adapter in Oracle Stream Analytics application:

<wlevs:adapter id="coherenceOutboundAdapter" provider="coherence-outbound">
        <wlevs:instance-property name="cache" ref="OracleCoherenceCache"/>
        <wlevs:instance-property name="eventType" value="EmployeeEvent"/>
        <wlevs:instance-property name="key" value="id"/>
</wlevs:adapter>

where OracleCoherenceCache is a cache element defined already in application EPN configuration. See OracleCoherenceCache and EmployeeEvent.

Configuration Parameters

The following are the details about the various configuration element attribute and properties:

  • cache — this property specifies the destination cache where user wants to write output stream. The value should refer to a coherence cache element which is defined in the EPN configuration. This is a mandatory property of adapter configuration.

  • eventType — this property specifies the type of events propagated by the outbound adapter to downstream cache. The event type should be already defined in the event type repository of application. This is also a mandatory property of adapter configuration.

  • key — this property specifies the key attribute in the event type definition for the outgoing event to destination cache.

After an outgoing event arrives at outbound adapter, the adapter will fetch the value of key attribute from event. Now adapter will form a {key,value} pair to write into destination cache where key will be the value of key attribute in the event. And value is an instance of java.util.Map created using outgoing event object. Outbound adapter also works in asynchronous manner and it maintains a buffer queue where it stores all outgoing events. A separate output thread dequeues event from this buffer and push into coherence cache. The reason for having buffer queue is because of the behavior of put operation of coherence cache. Coherence cache invokes all map listener on each put operation and it may increase latency if any downstream map listener on cache consume a significant time on each put operation initiated by outbound adapter.

4.19.3 OracleCoherenceCache

Coherence OracleCoherenceCache is defined in EPN configuration file using wlevs:cache element as follows:

<wlevs:caching-system id="coherenceCachingSystem" provider="coherence">
        </wlevs:caching-system>
<wlevs:cache id="OracleCoherenceCache" >
        <wlevs:caching-system ref="coherenceCachingSystem"/>
</wlevs:cache>

4.19.4 EmployeeEvent

EmployeeEvent is defined as follows:

public class EmployeeEvent
{
  private int id;
  private String name;
  private String dept;
  private int phone;
  
  public int getId()
  {
    return id;
  }
  public void setId(int id)
  {
    this.id = id;
  }
  public String getName()
  {
    return name;
  }
  public void setName(String name)
  {
    this.name = name;
  }
  public String getDept()
  {
    return dept;
  }
  public void setDept(String dept)
  {
    this.dept = dept;
  }
  public int getPhone()
  {
    return phone;
  }