Java is the language you use to write logic for event bean and Spring bean components to add to the EPN. Use an event bean in your EPN to define application logic that works on event data. Use a Spring bean in your EPN when your deployment context and the features you want to use are based on Spring.
Event bean application logic functions as an event sink, an event source, or both. An event sink receives and works on large quantities of event data. An event source sends large quantities of event data. In an EPN, you can configure event beans and adapters with logic to make them behave as event sources, event sinks, or both. In the case of an event bean, the event sink and event source logic comes from its associated JavaBean. In the case of an adapter, the event sink and event source logic comes from its JavaBean event type. See Events and Event Types for information about creating a JavaBean event type.
You can use JAXB in event bean logic. See JAXB Support for information.
This chapter includes the following sections:
Event beans and Spring beans are based on Java classes. The Java class you use for an event or Spring bean can conform to the JavaBean specification or not conform, depending on your application requirements.
An event bean is an Oracle extension to the regular Spring-based bean.
An event bean can be an event sink, event source, or both an event sink and an event source. You can add event sinks and sources to adapters and event beans.
An event sink is a JavaBean or Java class that listens for and works on events. An event sink can receive events, retrieve data from the events, and create a new event from the data to send to a downstream component.
An event source is a JavaBean or Java class that sends events.
If your deployment context and the features you want to use are based on Spring, use a Spring bean. Otherwise, use an event bean. Table 7-1 lists the features provided by event beans and Spring beans.
Table 7-1 Comparison of Event Beans and Spring Beans
Bean Type | Description |
---|---|
Event bean |
Useful as an EPN stage to actively use the capabilities of the Oracle Stream Analytics server container. An event bean:
|
Spring bean |
Useful for legacy integration to Spring. A Spring bean:
|
Event beans are executed in parallel when they implement either the Runnable
or the RunnableBean
interface. The infrastructure uses a work manager associated with the application for spawning the thread.
You can associate a work manager to an application by naming the work manager with the same name as the application. If you do not explicitly specify a work manager for your application, then Oracle Stream Analytics creates a work manager with default values for the minimum (MIN
) and maximum (MAX
) number of threads.
If you need finer control over the threading, a custom event bean can implement the interface com.bea.wlevs.ede.spi.WorkManagerAware
. In this case, the event bean is injected with the work manager of the application during initialization. The work manager can be used to explicitly manage the threading for the event bean instance.
Implement the com.bea.wlevs.ede.api.HeartbeatAware
interface if you want your event bean to receive heartbeat events. A heartbeat event is an event of type heartbeat
that you can use to model the advance of time. This interface has the onHeartbeat(long timestamp)
callback method to implement.
An event bean is an EPN component that applies logic to events as they pass through. The event bean logic is defined by its JavaBean event type.
This chapter describes some of the assembly and configuration file event bean settings. For a complete reference, see event-bean in Schema Reference for Oracle Stream Analytics.
Assembly File
The following event bean assembly file entry shows the event bean id
, associated class
, and that the event bean listens for events from the upstream BeanOutputChannel
component.
<wlevs:event-bean id="eventBean" class="tradereport.TradeEvent" > <wlevs:listener ref="BeanOutputChannel"/> <wlevs:event-bean>
Configuration File
The following event bean configuration file entry shows an event bean configured with the record-parameters
child element:
<event-bean> <name>eventBean</name> <record-parameters> <dataset-name>tradereport_sample</dataset-name> <event-type-list> <event-type>TradeEvent</event-type> </event-type-list> <batch-size>1</batch-size> <batch-time-out>10</batch-time-out> </record-parameters> </event-bean>
You can configure a Java class as a Spring bean to include the class in an event processing network. This is a good option if you have an existing Spring bean that you want to incorporate into the EPN or if you want to incorporate Spring features into your Java code.
In a Spring bean you plan to add to an EPN, you can implement the various life cycle interfaces. These include InitializingBean
, DisposableBean
, and the active interfaces, such as RunnableBean
. The Spring bean event source can also use configuration metadata annotations such as @Prepare
, @Rollback
, and @Activate
.
A Spring bean is a Java class managed by the Spring framework. You add a class as a Spring bean by configuring it in the EPN assembly file using the standard bean
element.
A Spring bean is not an Oracle Stream Analytics stage. You cannot monitor a Spring bean with the Oracle Stream Analytics monitoring framework, you cannot use the configuration metadata annotations in a Spring bean, and you cannot set a Spring bean to record and play back events that pass through it.
Assembly File
In the assembly file, you use the bean
element to declare a custom Spring bean as a component in the event processor network. For example:
<bean id="TradeListenerBean" class="com.oracle.cep.example.tradereport.TradeListener"> </bean>
You create an event sink to receive events in an EPN and apply logic that responds to the event data. A Java class that is an event sink implements one of the interfaces described in this section.
Each of these interfaces provides methods that the Oracle Event Processing server uses to pass events to the class as the events exit the EPN stage connected upstream from the Java class, which is typically a channel.
The interfaces described here provide support for events arriving either as streams or relations. However, interfaces for relations also support receiving events arriving as streams. As described in the following table, the interfaces are hierarchically related.
Interface | Description |
---|---|
|
Implement to receive events sequentially in a stream. |
|
Implement to receive events sequentially in a relation. Extends |
|
Implement to receive batched events in a stream. Events might arrive batched by time stamp when the upstream channel allows batching. Extends |
|
Implement to receive batched events as a relation. Events might arrive batched by time stamp when the upstream channel allows batching. Extends |
EventRejectedException Behavior in onInsertEvent Implementations
You need to explicitly throw EventRejectedException
in onInsertEvent
implementations for exceptions you do not want to get dropped. You can raise an EventProcessingException
and it is propagated all the way to the source of the error through a CQL processor. An EventRejectedException can chain exceptions from its downstream listeners, in case there is more than one exception. The CQL processor converts the EventRejectedException
to a soft exception. See Fault Handling for more information.
A class that receives events as a stream only receive events that are, from the Oracle Stream Analytics standpoint, inserted. That is because in a stream, events are always appended to the end of a sequence. Events in a stream are also always received in ascending time order so that their time stamps have non-decreasing values from one event to the one that follows. Non-decreasing time stamps enables the time stamp of one event to be the same as the time stamp of the event that precedes it, but not earlier than that preceding time stamp. The time stamp is either the same or later.
As a result, the interfaces that support receiving events as a stream have one method each for receiving events. The interfaces for receiving events as a relation support receiving multiple kinds of events.
Implement the StreamSink
interface if your class receives unbatched events as a stream. The StreamSink
interface has a single method, onInsertEvent
, which the Oracle Stream Analytics server calls to pass in each event from the stream as events leave the upstream stage that is connected to your class.
In Example 7-1, a simple StreamSink
implementation that receives stock trade events where each event is an Object
instance, and tests to see whether the event is an instance of a particular event type. If so, then the code retrieves values of properties known to be members of that type.
You implement the BatchStreamSink
interface if you expect your class to receive batched events as a stream. The interface has a single method, onInsertEvents
, which the Oracle Stream Analytics server calls to pass in a collection of events received from the upstream stage. The BatchStreamSink
interface extends StreamSink
, so can receive unbatched events also.
For more information about event batching, see Batch Processing Channels.
Example 7-1 Implement the StreamSink Interface
public class TradeListener implements StreamSink { public void onInsertEvent(Object event) throws EventRejectedException { if (event instanceof TradeEvent){ String symbolProp = ((TradeEvent) event).getSymbol(); Integer volumeProp = ((TradeEvent) event).getVolume(); // Code to do something with the property values. } } }
A class that receives events as a relation can receive any of the kinds of events possible in a relation, which are insert events, delete events, and update events. Unlike a stream, events in a relation are unordered and include events that have been updated or deleted by code that created or operated on the relation.
As a result, the interfaces that support receiving events as a relation have methods through which your class can receive insert, delete, or update events.
You implement the RelationSink
interface if your class receives unbatched events as a relation. The RelationSink
interface has three methods, one of which it inherits from the StreamSink
interface: onInsertEvent
, onDeleteEvent
, and onUpdateEvent
. At runtime, the Oracle Stream Analytics server calls the appropriate method depending on which type of event is received from the upstream channel connected to your class.
public class TradeListener implements RelationSink { public void onInsertEvent(Object event) throws EventRejectedException { if (event instanceof TradeEvent){ String symbolProp = ((TradeEvent) event).getSymbol(); Integer volumeProp = ((TradeEvent) event).getVolume(); // Do something with the inserted event. } } @Override public void onDeleteEvent(Object event) throws EventRejectedException { if (event instanceof TradeEvent){ // Do something with the deleted event. } } @Override public void onUpdateEvent(Object event) throws EventRejectedException { if (event instanceof TradeEvent){ // Do something with the updated event. } } }
Implement the BatchRelationSink
interface if your class receives batched events as a relation. It has an onEvents
method designed to receive all three types of events from the batch in java.util.Collection
instances:
onEvents(insertEvents, deleteEvents, updateEvents)
In addition, the BatchRelationSink interface extends the RelationSink
interface to support receiving unbatched events.
At runtime, the Oracle Stream Analytics server calls the appropriate method to pass in events received from the upstream stage connected to your class.
For more information about event batching, see Batch Processing Channels.
For complete API reference information about the Oracle Stream Analytics APIs described in this section, see the Java API Reference for Oracle Stream Analytics.
You can create a Java class that sends events to a downstream stage in an event processing network. You can create an event source, for example, to send events your Java code has created or altered from event data flowing through the EPN.
A Java class that is an event source implements one of the interfaces described in this section. Each of these interfaces provides a method used by the Oracle Stream Analytics server to pass into your class an instance of a sender class.
The sender instance your event source receives implements one of the sender interfaces described in this section. The sender interfaces provide methods your code can call to send events as streams or relations, and batched or unbatched to the downstream EPN stage that follows, such as a channel.
The interfaces described here support sending events either as streams or relations. Interfaces for relation also support sending events as streams.
Table 7-2 Interfaces for Implementing an Event Source
Interface | Description |
---|---|
|
Implement this interface to send events as a stream. At runtime, the Oracle Stream Analytics server injects an instance of a stream sender class. |
|
Implement this interface to send events as a relation or stream. At runtime, the Oracle Stream Analytics server injects an instance of a relation sender class. Extends |
The interfaces listed in Table 7-3 are implemented by sender classes your event source class receives from the Oracle Stream Analytics server.
Table 7-3 Interfaces Implemented by Sender Classes
Interface | Description |
---|---|
|
Provides a method to send events as a stream. |
|
Provides methods to send events as a relation. Extends |
|
Provides a method with which your code can send batched events as a stream. You might send events batched by time stamp if the downstream stage to which you're sending them is a channel configured for batched events. Extends |
|
Provides a method to send batched events as a relation. You can send events batched by time stamp when the downstream stage is a channel configured for batched events. Extends |
A class that is a source of stream events should send only events that are, from the Oracle Stream Analytics standpoint, inserted. Sending only inserted events models a stream, rather than a relation. Events sent from a stream source should also have non-decreasing time stamps from one event to the event that follows. The time stamp of an event that follows another should either be the same as, or later than, the event that preceded it.
When you implement StreamSource
, your code can send events batched or unbatched. Your implementation of the StreamSource
setEventSender
method receives a sender instance that you can cast to one of the types described in Table 7-3. Use the sender instance in your code to send events as expected by the downstream stage.
If your code sends events to a channel that enables batching, use one of the batched event senders to batch events by time stamp before sending them. For more information, see Batch Processing Channels.
The sender instance provides a sendHeartbeat
method to send a heartbeat when the receiving channel is configured to be application time stamped.
A class that is a source of events as a relation can send insert, delete, and update events to the downstream stage. When you implement the RelationSource
interface, your code can send events batched or unbatched. Your implementation of the RelationSource
setEventSender
method receives a sender instance that you can cast to one of the types described in Table 7-3. Use the sender instance to send events to the downstream stage.
Keep in mind the following constraints for handling the sender instance your class receives:
For sendDeleteEvent
, you must send an instance of the same event type as that configured for the channel.
For sendInsertEvent
, a unique constraint violation exception is raised and the input event discarded if an event with the same primary key is already in the relation.
For sendUpdateEvent
, an invalid update tuple exception will be raised and the input event will be discarded if an event with the given primary key is not in the relation.
In the following example, a simple RelationSource
implementation receives a StreamSender
, then casts the sender to a RelationSender
to send events as a relation. This class creates a new TradeEvent
instance from the event type configured in the repository, but the sendEvents
method could as easily have received an instance as a parameter from another part of the code.
package com.oracle.cep.example.tradereport; import com.bea.wlevs.ede.api.EventType; import com.bea.wlevs.ede.api.EventTypeRepository; import com.bea.wlevs.ede.api.RelationSender; import com.bea.wlevs.ede.api.RelationSource; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.util.Service; public class TradeEventSource implements RelationSource { // Variables for event type respository and event sender. Both // will be set by the server. EventTypeRepository m_repos = null; RelationSender m_sender = null; // Called by the server to set the repository instance. @Service public void setEventTypeRepository(EventTypeRepository repos) { m_repos = repos; } // Called by the server to set the sender instance. @Override public void setEventSender(StreamSender sender) { // Cast the received StreamSender to a RelationSender m_sender = (RelationSender)sender; } /** * Sends events to the next EPN stage using the sender * received from the server. This code assumes that an event * instance isn't received from another part of the class, * instead creating a new instance from the repository. */ private void sendEvents(){ EventType eventType = m_repos.getEventType("TradeEvent"); TradeEvent tradeEvent = (TradeEvent)eventType.createEvent(); m_sender.sendDeleteEvent(tradeEvent); } }