Custom adapters exchange event data with external components that are not supported by the adapters provided in Oracle Stream Analytics.
See Working with QuickFix Adapter in Developing Applications for Event Processing with Oracle Stream Analytics for information about the adapters provided in Oracle Stream Analytics.
This chapter includes the following sections:
The type of custom adapter you create depends on the format of the incoming data and the technology you use in the adapter code to convert the incoming data to Oracle Stream Analytics events. Custom adapters typically do the following:
Use APIs from a data vendor, such as Reuters, Wombat, or Bloomberg.
Use messaging systems, such as TIBCO Rendezvous.
Establish a socket connection to the customer's own data protocol.
Authenticate themselves as needed with the input or output external component.
Receive raw event data from an external component, convert the data into events, and send the events to the next application node.
Receive events from within the event processing network and convert the event data to a form that is recognized by the target external component.
To implement a custom adapter, write Java code that communicates with the external component. An input custom adapter implementation receives raw event data from an external component, converts the data into events, and sends the events to the next application node. An output custom adapter implementation receives events from within the event processing network and converts the event data to a form that is recognized by the target external component.
The Oracle Stream Analytics APIs for creating custom adapters are described in Java API Reference for Oracle Stream Analytics.
The following high-level steps describe how to create a custom adapter. These topics are discussed in more detail later in this chapter.
Implement a Java class that communicates with the external component.
Implement the interfaces that support sending or receiving event type instances.
To send events, implement the custom adapter as an event source.
To receive events implement the custom adapter as an event sink.
See Java API Reference for Oracle Stream Analytics for information about event sources and sinks.
If the adapter supports being suspended and resumed, such as when it is undeployed and deployed, implement interfaces to handle these events.
Use multithreading to improve application scalability.
For any required authentications, write Java logic to pass login credentials to the component that provides or receives the event data.
Create a factory class when multiple applications access the custom adapter.
Add the adapter to an EPN by configuring it in an assembly file.
The example in this section shows the code for a basic input adapter that retrieves raw event data from a file, converts the data into events, and then sends the events to a downstream stage in the EPN. The code is excerpted from the Oracle Spatial sample application.
Note:
The example code presented here is not production ready, but outlines the basic work flow for an input adapter.
The server calls the following BusStopAdapter
class methods to provide (inject) the information a BusStopAdapter
object needs at run time.
The BusStopAdapter.setRepositoryInstance
method injects an EventTypeRepository
instance that contains the event type configurations for this application. The EventTypeRepository
instance enables the adapter to get information about the event type that was configured for this adapter.
The BusStopAdapter.setEventSender
method injects a StreamSender
object so that BusStopAdapter
object can send events to the next EPN node.
The BusStopAdapter.setPath
and BusStopAdapter.setEventType
methods provide event property values.
The BusStopAdapter.run
method implementation retrieves raw event data, parses the data into event type instances, and sends the new events to the downstream EPN node.
package com.oracle.cep.sample.spatial; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.bea.wlevs.ede.api.EventProperty; import com.bea.wlevs.ede.api.EventRejectedException; import com.bea.wlevs.ede.api.EventType; import com.bea.wlevs.ede.api.EventTypeRepository; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSink; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.util.Service; import java.lang.RuntimeException; public class BusStopAdapter implements RunnableBean, StreamSource, StreamSink { static final Log s_logger = LogFactory.getLog("BusStopAdapter"); private String m_filePath; private String m_eventTypeName; private EventType m_eventType; private StreamSender m_eventSender; private boolean m_stopped; private int m_repeat = 1; private EventTypeRepository m_etr = null; public BusStopAdapter() { super(); } /** * Called by the server to pass in the path * to the file with bus stop data. * * @param path The value specified for the path * property in the adapter's configuration in the EPN assembly file. */ public void setPath(String path) throws RuntimeException { // Code to create a File instance from the path. This // File object retrieves event data from the file. } /** * Called by the server to pass in the name of the event * type to which event data should be bound. * * @param path The value specified for the path * property in the adapter's configuration in the EPN assembly file. */ public void setEventType(String typ) { m_eventTypeName = typ; } /** * Called by the server to set an event type * repository instance that knows about event * types configured for this application. * * This repository instance will be used to retrieve an * event type instance that is populated with * event data retrieved from the event data file. * * @param etr The event repository. */ @Service(filter = EventTypeRepository.SERVICE_FILTER) public void setEventTypeRepository(EventTypeRepository etr) { m_etr = etr; } /** * Executes to retrieve raw event data and * create event type instances from it, then * sends the events to the next stage in the * EPN. * * This method, implemented from the RunnableBean * interface, executes when this adapter instance is active. */ public void run() { if (m_etr == null){ throw new RuntimeException("EventTypeRepository is not set"); } // Get the event type from the repository wit the event type name // specified as a property of this adapter in the assembly file. m_eventType = m_etr.getEventType(m_eventTypeName); if (m_eventType == null){ throw new RuntimeException("EventType(" + m_eventType + ") is not found."); } BufferedReader reader = null; System.out.println("Sending " + m_eventType + " from " + m_filePath); while ((m_repeat != 0) && (!m_stopped)) { try { reader = new BufferedReader(new FileReader(m_filePath)); } catch (Exception e) { m_stopped = true; break; } while (!isStopped()) { try { // Create an event and assign to it an event type generated // from the event data retrieved by the reader. Object ev = null; ev = readLine(reader); if (ev == null){ reader.close(); break; } // Send the newly created event to a downstream node //listening to this adpater. m_eventSender.sendInsertEvent(ev); } catch (Exception e){ m_stopped = true; break; } } } } /** * Called by the server to pass in a sender instance to be used to * send generated events to a downstream node. * * @param sender A sender instance. */ public void setEventSender(StreamSender sender) { m_eventSender = sender; } /** * Returns true if this adapter instance has been * suspended, such as because an exception occurred. */ private synchronized boolean isStopped() { return m_stopped; } /** * Reads data from reader, creating event type * instances from that data. This method is * called from the run() method. * * @param reader Raw event data from a file. * @return An instance of the event type specified * as a property of this adapter. */ protected Object readLine(BufferedReader reader) throws Exception { // Code to read raw event data and return an event type // instance from it. } /** * Called by the server to pass in an * insert event received from an * upstream stage in the EPN. */ @Override public void onInsertEvent(Object event) throws EventRejectedException { // Code to begin executing the logic needed to // convert incoming event data to event type instances. } }
When you create a custom adapter, you add it to an EPN by configuring it in the assembly and component configuration files.
In the assembly file, use the wlevs:adapter
element to declare an adapter as a component in the event processor network. Use wlevs:instance-property
child elements to set static properties in the adapter. Static properties are properties that you do not change dynamically after you deploy the adapter.
In the following example, the BusStopAdapter
class is configured with properties that correspond to the class methods. The BusStopAdapter
accessor methods setPath
, setEventType
, and setBuffer
correspond to the path
, eventType
, and buffer
properties in the assembly file. In the assembly file, the property name spellings match the spelling of the accessor methods minus the get
or set
prefix.
<wlevs:adapter id="BusStopAdapter" class="com.oracle.cep.sample.spatial.BusStopAdapter" > <wlevs:instance-property name="path" value="bus_stops.csv"/> <wlevs:instance-property name="eventType" value="BusStop"/> <wlevs:instance-property name="buffer" value="30.0"/> </wlevs:adapter>
You reference an adapter that is an event sink by using the wlevs:listener
element. In the following example, a BusPositionGen
CSV adapter sends events to the BusStopAdapter
:
<wlevs:adapter id="BusPositionGen" provider="csvgen"> <!-- Code omitted --> <wlevs:listener ref="BusStopAdapter"/> </wlevs:adapter>
In the configuration file, use the adapter
element to add child elements and properties that can be updated at run time. Each adapter configuration requires a separate adapter
child element of the config
element. The following example configures the BusStopAdapter
adapter for event recording.
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application"> <adapter> <name>BusStopAdapter</name> <record-parameters> <dataset-name>spatial_sample</dataset-name> <event-type-list> <event-type>BusPos</event-type> <event-type>BusStop</event-type> <event-type>BusPosEvent</event-type> <event-type>BusStopArrivalEvent</event-type> <event-type>BusStopPubEvent</event-type> <event-type>BusStopPubEvent</event-type> </event-type-list> <batch-size>1</batch-size> <batch-time-out>10</batch-time-out> </record-parameters> </adapter> </wlevs:config>
If your adapter accesses an external data feed, the adapter might need to pass login credentials to the data feed for user authentication. You can hard code the unencrypted login credentials in your adapter code. With this insecure approach, you cannot encrypt the password or easily change the login credentials.
You first decide whether you want the login credentials to be configured statically in the assembly file or dynamically by extending the adapter configuration. Configuring the credentials statically in the assembly file is easier, but if the credentials change, you must restart the application for the update to the assembly file to take place. Extending the adapter configuration enables you to change the credentials dynamically without restarting the application, but involves additional steps, such as creating an XSD file and compiling it into a JAXB object.
This section describes how to pass login credentials that you configure statically in the assembly file.
Pass Static Credentials to the Data Feed Provider
You can extend the a custom adapter configuration by creating your own XSD file.
Create an XSD File
You can implement or configure an adapter to use multiple threads to read from the data source. A multithreaded adapter can improve performance when its event processing work has high overhead. To implement multithreading, you can use a work manager or implement the WorkManager
interface to make an adapter multithreaded. You can implement the Runnable
or the RunnableBean interface to execute custom adapters in parallel.
Note:
In a single-threaded adapter, event order is guaranteed. Event order is not guaranteed in a multithreaded adapter. You might need to add code to your custom adapter class to manage event order.
Work Manager
The simplest way to implement threading is to configure the adapter with a work manager. A work manager is a server feature through which your application can submit code (classes that implement the Work
interface) for scheduling on multiple threads. The interface to the work manager feature is commonj.work.WorkManager
.
In Oracle Stream Analytics, WorkManager
instances can be configured on the server in the config.xml
file. The system also allocates a default work manager for each application to use.
You can obtain a reference to a work manager to use in your adapter in one of two ways:
You can create a configuration property for the adapter that allows users to specify the name a specific work manager scoped to the server (configured in the config.xml
file) that will be injected into your adapter. See the configuration details for the built-in JMS adapter for an example of an adapter that supports explicit configuration of a work manager.
You can implement the com.bea.wlevs.ede.spi.WorkManagerAware
interface in your adapter. This interface consists of a single method setWorkManager(WorkManager wm)
that is called by the system when initializing your adapter to inject the default work manager for you application.
Once you have obtained a WorkManager
instance you can submit work to it that will be executed on the work managers thread pool.
You can associate a work manager with an application by naming the work manager with the same name as the application. If you do not explicitly specify a work manager for your application, then a default work manager is created with default values for the minimum (MIN
) and maximum (MAX
) number of threads.
WorkManagerAware Interface
If you need finer control over the threading, a custom adapter can implement the WorkManagerAware
interface. In this case, Oracle Stream Analytics injects the adapter with the work manager of the application during initialization. Use this work manager to explicitly manage the threading for an adapter or an event bean instance.
RunnableBean Interface
Adapters that need to run some of their code on a separate thread, but do not need to have full control over threading provided by the WorkManager
API can implement the com.bea.wlevs.ede.api.RunnableBean
interface. This interface includes a run
method that is called by the system on one of the threads belonging to the application's default work manager. The RunnableBean
interface provides adapters with a very simple model for running code on a single work manager thread. If you need to explicitly use multiple work manager threads for more parallelism, use the WorkManager
interface directly. See Work Manager.
EPN
For reference information on this interface, see the Java API Reference for Oracle Stream Analytics.
You can implement the following interfaces to add server support for suspending and resuming event processing. For example, you can make the custom adapter stop processing events when the EPN suspends, and make it start processing events again when the EPN resumes. In either or both cases, can manage resources as needed.
Table 1-1 Interfaces to Support Suspending and Resuming an Adapter
Interface | Description |
---|---|
|
Provides logic that executes when the EPN suspends. In the |
|
Implement this to provide logic that executes when the EPN resumes work. In your implementation of the |
Use the following annotations to specify the methods of a custom adapter implementation that handle a custom adapter life cycle stages: when its configuration is prepared, when its configuration is activated, and when the adapter terminates because of an exception:
Place the applicable @Prepare
, @Activate
, or @Rollback
annotation above custom adapter methods with the following signature to indicate that these methods send configuration information to the adapter:
public void methodName(AdapterConfigObject adapterConfig)
AdapterConfigObject
refers to the Java representation of the adapter's configuration XML file that is deployed with the application. The type of this class is com.bea.wlevs.configuration.application.DefaultAdapterConfig
by default. If you have extended the configuration of the adapter, then the type of this class is whatever is specified in the XSD that describes the extended XML file. For example, in the HelloWorld sample, the type is the following: com.bea.wlevs.adapter.example.helloworld.HelloWorldAdapterConfig
At runtime, Oracle Stream Analytics creates the AdapterConfigObject
class, populates the class with data from the XML file, and passes the created instance to the adapter. The adapter methods annotated with the @Activate
and @Rollback
adapter life cycle annotations use the class to get information about the adapter configuration.
These @Prepare
, @Activat
e, and @Rollback
annotations have no attributes. In the following examples, the HelloWorldAdapterConfig
class represents the adapter's configuration XML file. When you deploy the application, Oracle Stream Analytics creates an instance of this class. In the HelloWorld example, the adapter configuration is extended. See the example in com.bea.wlevs.configuration.Activate (@Activate).
The Oracle Stream Analytics server calls methods annotated with @Prepare
whenever an adapter's state is updated by a configuration change. The following example is from the adapter component of the HelloWorld sample application and shows how to use the @Prepare
annotation.
package com.bea.wlevs.adapter.example.helloworld; ... import com.bea.wlevs.configuration.Prepare; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldAdapter implements RunnableBean, StreamSource { ... @Prepare public void checkConfiguration(HelloWorldAdapterConfig adapterConfig) { if (adapterConfig.getMessage() == null || adapterConfig.getMessage().length() == 0) { throw new RuntimeException("invalid message: " + message); } } ... }
The annotated checkConfiguration
method checks that the message
property of the adapter's configuration (set in the extended adapter configuration file) is not null
or empty. If it null
or empty, then the method throws an exception.
The Oracle Stream Analytics server calls methods annotated with @Activate
after the server has called and has successfully executed all the methods marked with the @Prepare
annotation. Use the @Activate
method to get the adapter configuration data to use in the rest of the adapter implementation. The following example shows how to use the @Activate
annotation in the adapter component of the HelloWorld example:
package com.bea.wlevs.adapter.example.helloworld; ... import com.bea.wlevs.configuration.Activate; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldAdapter implements RunnableBean, StreamSource { ... @Activate public void activateAdapter(HelloWorldAdapterConfig adapterConfig) { this.message = adapterConfig.getMessage(); } ... }
The following example shows that this XSD file also specifies the fully qualified name of the resulting Java configuration object, as shown in bold:
<?xml version="1.0" encoding="UTF-8"?> <xs:schema xmlns="http://www.bea.com/ns/wlevs/example/helloworld" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:jxb="http://java.sun.com/xml/ns/jaxb" xmlns:xjc="http://java.sun.com/xml/ns/jaxb/xjc" xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" targetNamespace="http://www.bea.com/ns/wlevs/example/helloworld" elementFormDefault="unqualified" attributeFormDefault="unqualified" jxb:extensionBindingPrefixes="xjc" jxb:version="1.0"> <xs:annotation> <xs:appinfo> <jxb:schemaBindings> <jxb:package name="com.bea.wlevs.adapter.example.helloworld"/> </jxb:schemaBindings> </xs:appinfo> </xs:annotation> <xs:import namespace="http://www.bea.com/ns/wlevs/config/application" schemaLocation="wlevs_application_config.xsd"/> <xs:element name="config"> <xs:complexType> <xs:choice maxOccurs="unbounded"> <xs:element name="adapter" type="HelloWorldAdapterConfig"/> <xs:element name="processor" type="wlevs:DefaultProcessorConfig"/> <xs:element name="channel" type="wlevs:DefaultStreamConfig" /> </xs:choice> </xs:complexType> </xs:element> <xs:complexType name="HelloWorldAdapterConfig"> <xs:complexContent> <xs:extension base="wlevs:AdapterConfig"> <xs:sequence> <xs:element name="message" type="xs:string"/> </xs:sequence> </xs:extension> </xs:complexContent> </xs:complexType> </xs:schema>
Oracle Stream Analytics creates an instance of this class when the application is deployed. For example, the adapter section of the helloworldAdapter
configuration file is as follows:
<?xml version="1.0" encoding="UTF-8"?> <helloworld:config ... <adapter> <name>helloworldAdapter</name> <message>HelloWorld - the current time is:</message> </adapter> </helloworld:config>
The annotated activateAdapter
method uses the getMessage
method of the configuration object to get the value of the message
property set in the adapter's configuration XML file. In this case, the value is HelloWorld - the current time is:
. This value can be used in the main part of the adapter implementation file.
The Oracle Stream Analytics server calls methods annotated with @Rollback
whenever a component annotated with @Prepare
was called but threw an exception. The following example from the adapter component of the HelloWorld example, shows how to use the @Rollback
annotation:
package com.bea.wlevs.adapter.example.helloworld; ... import com.bea.wlevs.configuration.Rollback; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldAdapter implements RunnableBean, StreamSource { @Rollback ... public void rejectConfigurationChange(HelloWorldAdapterConfig adapterConfig) { }
In the example, the rejectConfigurationChange
method is annotated with the @Rollback
annotation, which means this is the method that is called if the @Prepare
method threw an exception. In the example above, nothing actually happens.