7 Event Beans

Java is the language you use to write logic for event bean and Spring bean components to add to the EPN. Use an event bean in your EPN to define application logic that works on event data. Use a Spring bean in your EPN when your deployment context and the features you want to use are based on Spring.

Event bean application logic functions as an event sink, an event source, or both. An event sink receives and works on large quantities of event data. An event source sends large quantities of event data. In an EPN, you can configure event beans and adapters with logic to make them behave as event sources, event sinks, or both. In the case of an event bean, the event sink and event source logic comes from its associated JavaBean. In the case of an adapter, the event sink and event source logic comes from its JavaBean event type. See Events and Event Types for information about creating a JavaBean event type.

You can use JAXB in event bean logic. See JAXB Support for information.

This chapter includes the following sections:

7.1 Event Beans and Spring Beans

Event beans and Spring beans are based on Java classes. The Java class you use for an event or Spring bean can conform to the JavaBean specification or not conform, depending on your application requirements.

An event bean is an Oracle extension to the regular Spring-based bean.

An event bean can be an event sink, event source, or both an event sink and an event source. You can add event sinks and sources to adapters and event beans.

  • An event sink is a JavaBean or Java class that listens for and works on events. An event sink can receive events, retrieve data from the events, and create a new event from the data to send to a downstream component.

  • An event source is a JavaBean or Java class that sends events.

If your deployment context and the features you want to use are based on Spring, use a Spring bean. Otherwise, use an event bean. Table 7-1 lists the features provided by event beans and Spring beans.

Table 7-1 Comparison of Event Beans and Spring Beans

Bean Type Description

Event bean

Useful as an EPN stage to actively use the capabilities of the Oracle Stream Analytics server container. An event bean:

  • Is a type of Oracle Stream Analytics EPN stage.

  • Can be monitored by the Oracle Stream Analytics monitoring framework.

  • Can make use of the configuration metadata annotations.

  • Can be set to record and play back events that pass through it.

  • Can participate in the Oracle Stream Analytics server bean life cycle by specifying methods in its XML declaration, rather than by implementing Oracle Stream Analytics server API interfaces.

Spring bean

Useful for legacy integration to Spring. A Spring bean:

  • Is useful if you have a Spring bean you want to add to an EPN.

  • Is not a type of Oracle Stream Analytics EPN stage.

  • Cannot be monitored by the Oracle Stream Analytics monitoring framework.

  • Cannot use the configuration metadata annotations.

  • Cannot be set to record and play back events that pass through it.

7.1.1 Threading Behavior

Event beans are executed in parallel when they implement either the Runnable or the RunnableBean interface. The infrastructure uses a work manager associated with the application for spawning the thread.

You can associate a work manager to an application by naming the work manager with the same name as the application. If you do not explicitly specify a work manager for your application, then Oracle Stream Analytics creates a work manager with default values for the minimum (MIN) and maximum (MAX) number of threads.

If you need finer control over the threading, a custom event bean can implement the interface com.bea.wlevs.ede.spi.WorkManagerAware. In this case, the event bean is injected with the work manager of the application during initialization. The work manager can be used to explicitly manage the threading for the event bean instance.

7.1.2 Receive Heartbeat Events

Implement the com.bea.wlevs.ede.api.HeartbeatAware interface if you want your event bean to receive heartbeat events. A heartbeat event is an event of type heartbeat that you can use to model the advance of time. This interface has the onHeartbeat(long timestamp) callback method to implement.

7.1.3 Create an Event Bean

An event bean is an EPN component that applies logic to events as they pass through. The event bean logic is defined by its JavaBean event type.

This chapter describes some of the assembly and configuration file event bean settings.

Assembly File

The following event bean assembly file entry shows the event bean id, associated class, and that the event bean listens for events from the upstream BeanOutputChannel component.

<wlevs:event-bean id="eventBean" class="tradereport.TradeEvent" >
   <wlevs:listener ref="BeanOutputChannel"/>
<wlevs:event-bean>

Configuration File

The following event bean configuration file entry shows an event bean configured with the record-parameters child element:

 <event-bean>
    <name>eventBean</name>
      <record-parameters>
        <dataset-name>tradereport_sample</dataset-name>
          <event-type-list>
            <event-type>TradeEvent</event-type>
          </event-type-list>
          <batch-size>1</batch-size>
          <batch-time-out>10</batch-time-out>
        </record-parameters>
  </event-bean>

7.1.4 Create a Spring Bean

You can configure a Java class as a Spring bean to include the class in an event processing network. This is a good option if you have an existing Spring bean that you want to incorporate into the EPN or if you want to incorporate Spring features into your Java code.

In a Spring bean you plan to add to an EPN, you can implement the various life cycle interfaces. These include InitializingBean, DisposableBean, and the active interfaces, such as RunnableBean. The Spring bean event source can also use configuration metadata annotations such as @Prepare, @Rollback, and @Activate.

A Spring bean is a Java class managed by the Spring framework. You add a class as a Spring bean by configuring it in the EPN assembly file using the standard bean element.

A Spring bean is not an Oracle Stream Analytics stage. You cannot monitor a Spring bean with the Oracle Stream Analytics monitoring framework, you cannot use the configuration metadata annotations in a Spring bean, and you cannot set a Spring bean to record and play back events that pass through it.

Assembly File

In the assembly file, you use the bean element to declare a custom Spring bean as a component in the event processor network. For example:

<bean id="TradeListenerBean"
  class="com.oracle.cep.example.tradereport.TradeListener">
</bean>

7.2 Event Sink Interfaces

You create an event sink to receive events in an EPN and apply logic that responds to the event data. A Java class that is an event sink implements one of the interfaces described in this section.

Each of these interfaces provides methods that the Oracle Event Processing server uses to pass events to the class as the events exit the EPN stage connected upstream from the Java class, which is typically a channel.

The interfaces described here provide support for events arriving either as streams or relations. However, interfaces for relations also support receiving events arriving as streams. As described in the following table, the interfaces are hierarchically related.

Interface Description

com.bea.wlevs.ede.api.StreamSink

Implement to receive events sequentially in a stream.

com.bea.wlevs.ede.api.RelationSink

Implement to receive events sequentially in a relation. Extends StreamSink to receive events in a stream.

com.bea.wlevs.ede.api.BatchStreamSink

Implement to receive batched events in a stream. Events might arrive batched by time stamp when the upstream channel allows batching. Extends StreamSink to support receiving events unbatched.

com.bea.wlevs.ede.api.BatchRelationSink

Implement to receive batched events as a relation. Events might arrive batched by time stamp when the upstream channel allows batching. Extends RelationSink to support receiving events unbatched as streams or relations.

EventRejectedException Behavior in onInsertEvent Implementations

You need to explicitly throw EventRejectedException in onInsertEvent implementations for exceptions you do not want to get dropped. You can raise an EventProcessingException and it is propagated all the way to the source of the error through a CQL processor. An EventRejectedException can chain exceptions from its downstream listeners, in case there is more than one exception. The CQL processor converts the EventRejectedException to a soft exception. See Fault Handling for more information.

7.2.1 Implement StreamSink

A class that receives events as a stream only receive events that are, from the Oracle Stream Analytics standpoint, inserted. That is because in a stream, events are always appended to the end of a sequence. Events in a stream are also always received in ascending time order so that their time stamps have non-decreasing values from one event to the one that follows. Non-decreasing time stamps enables the time stamp of one event to be the same as the time stamp of the event that precedes it, but not earlier than that preceding time stamp. The time stamp is either the same or later.

As a result, the interfaces that support receiving events as a stream have one method each for receiving events. The interfaces for receiving events as a relation support receiving multiple kinds of events.

Implement the StreamSink interface if your class receives unbatched events as a stream. The StreamSink interface has a single method, onInsertEvent, which the Oracle Stream Analytics server calls to pass in each event from the stream as events leave the upstream stage that is connected to your class.

In Example 7-1, a simple StreamSink implementation that receives stock trade events where each event is an Object instance, and tests to see whether the event is an instance of a particular event type. If so, then the code retrieves values of properties known to be members of that type.

You implement the BatchStreamSink interface if you expect your class to receive batched events as a stream. The interface has a single method, onInsertEvents, which the Oracle Stream Analytics server calls to pass in a collection of events received from the upstream stage. The BatchStreamSink interface extends StreamSink, so can receive unbatched events also.

For more information about event batching, see Batch Processing Channels.

Example 7-1 Implement the StreamSink Interface

public class TradeListener implements StreamSink {

    public void onInsertEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            String symbolProp = ((TradeEvent) event).getSymbol();
            Integer volumeProp = ((TradeEvent) event).getVolume();
            // Code to do something with the property values.
        }
    }
}

7.2.2 Implement RelationSink

A class that receives events as a relation can receive any of the kinds of events possible in a relation, which are insert events, delete events, and update events. Unlike a stream, events in a relation are unordered and include events that have been updated or deleted by code that created or operated on the relation.

As a result, the interfaces that support receiving events as a relation have methods through which your class can receive insert, delete, or update events.

You implement the RelationSink interface if your class receives unbatched events as a relation. The RelationSink interface has three methods, one of which it inherits from the StreamSink interface: onInsertEvent, onDeleteEvent, and onUpdateEvent. At runtime, the Oracle Stream Analytics server calls the appropriate method depending on which type of event is received from the upstream channel connected to your class.

public class TradeListener implements RelationSink {

    public void onInsertEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            String symbolProp = ((TradeEvent) event).getSymbol();
            Integer volumeProp = ((TradeEvent) event).getVolume();
            // Do something with the inserted event.
        }
    }

    @Override
    public void onDeleteEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            // Do something with the deleted event.
        }
    }

    @Override
    public void onUpdateEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            // Do something with the updated event.
        }
    }
}

Implement the BatchRelationSink interface if your class receives batched events as a relation. It has an onEvents method designed to receive all three types of events from the batch in java.util.Collection instances:

onEvents(insertEvents, deleteEvents, updateEvents)

In addition, the BatchRelationSink interface extends the RelationSink interface to support receiving unbatched events.

At runtime, the Oracle Stream Analytics server calls the appropriate method to pass in events received from the upstream stage connected to your class.

For more information about event batching, see Batch Processing Channels.

For complete API reference information about the Oracle Stream Analytics APIs described in this section, see the Java API Reference for Oracle Stream Analytics.

7.3 Event Source Interfaces

You can create a Java class that sends events to a downstream stage in an event processing network. You can create an event source, for example, to send events your Java code has created or altered from event data flowing through the EPN.

A Java class that is an event source implements one of the interfaces described in this section. Each of these interfaces provides a method used by the Oracle Stream Analytics server to pass into your class an instance of a sender class.

The sender instance your event source receives implements one of the sender interfaces described in this section. The sender interfaces provide methods your code can call to send events as streams or relations, and batched or unbatched to the downstream EPN stage that follows, such as a channel.

The interfaces described here support sending events either as streams or relations. Interfaces for relation also support sending events as streams.

Table 7-2 Interfaces for Implementing an Event Source

Interface Description

com.bea.wlevs.ede.api.StreamSource

Implement this interface to send events as a stream. At runtime, the Oracle Stream Analytics server injects an instance of a stream sender class.

com.bea.wlevs.ede.api.RelationSource

Implement this interface to send events as a relation or stream. At runtime, the Oracle Stream Analytics server injects an instance of a relation sender class. Extends StreamSource, so it also supports stream events.

The interfaces listed in Table 7-3 are implemented by sender classes your event source class receives from the Oracle Stream Analytics server.

Table 7-3 Interfaces Implemented by Sender Classes

Interface Description

com.bea.wlevs.ede.api.StreamSender

Provides a method to send events as a stream.

com.bea.wlevs.ede.api.RelationSender

Provides methods to send events as a relation. Extends StreamSender, so it also support stream events.

com.bea.wlevs.ede.api.BatchStreamSender

Provides a method with which your code can send batched events as a stream. You might send events batched by time stamp if the downstream stage to which you're sending them is a channel configured for batched events. Extends StreamSender, so it also provides support for sending events unbatched.

com.bea.wlevs.ede.api.BatchRelationSender

Provides a method to send batched events as a relation. You can send events batched by time stamp when the downstream stage is a channel configured for batched events. Extends RelationSender to support unbatched events.

7.3.1 Implement StreamSender

A class that is a source of stream events should send only events that are, from the Oracle Stream Analytics standpoint, inserted. Sending only inserted events models a stream, rather than a relation. Events sent from a stream source should also have non-decreasing time stamps from one event to the event that follows. The time stamp of an event that follows another should either be the same as, or later than, the event that preceded it.

When you implement StreamSource, your code can send events batched or unbatched. Your implementation of the StreamSource setEventSender method receives a sender instance that you can cast to one of the types described in Table 7-3. Use the sender instance in your code to send events as expected by the downstream stage.

If your code sends events to a channel that enables batching, use one of the batched event senders to batch events by time stamp before sending them. For more information, see Batch Processing Channels.

The sender instance provides a sendHeartbeat method to send a heartbeat when the receiving channel is configured to be application time stamped.

7.3.2 Implement RelationSender

A class that is a source of events as a relation can send insert, delete, and update events to the downstream stage. When you implement the RelationSource interface, your code can send events batched or unbatched. Your implementation of the RelationSource setEventSender method receives a sender instance that you can cast to one of the types described in Table 7-3. Use the sender instance to send events to the downstream stage.

Keep in mind the following constraints for handling the sender instance your class receives:

  • For sendDeleteEvent, you must send an instance of the same event type as that configured for the channel.

  • For sendInsertEvent, a unique constraint violation exception is raised and the input event discarded if an event with the same primary key is already in the relation.

  • For sendUpdateEvent, an invalid update tuple exception will be raised and the input event will be discarded if an event with the given primary key is not in the relation.

In the following example, a simple RelationSource implementation receives a StreamSender, then casts the sender to a RelationSender to send events as a relation. This class creates a new TradeEvent instance from the event type configured in the repository, but the sendEvents method could as easily have received an instance as a parameter from another part of the code.

package com.oracle.cep.example.tradereport;

import com.bea.wlevs.ede.api.EventType;
import com.bea.wlevs.ede.api.EventTypeRepository;
import com.bea.wlevs.ede.api.RelationSender;
import com.bea.wlevs.ede.api.RelationSource; 
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.util.Service;

public class TradeEventSource implements RelationSource {

    // Variables for event type respository and event sender. Both
    // will be set by the server.
    EventTypeRepository m_repos = null;	
    RelationSender m_sender = null;

    // Called by the server to set the repository instance.
    @Service
    public void setEventTypeRepository(EventTypeRepository repos) {
        m_repos = repos;
    }

    // Called by the server to set the sender instance.
    @Override
    public void setEventSender(StreamSender sender) {
        // Cast the received StreamSender to a RelationSender    
        m_sender = (RelationSender)sender;
    }

    /**
     * Sends events to the next EPN stage using the sender 
     * received from the server. This code assumes that an event
     * instance isn't received from another part of the class, 
     * instead creating a new instance from the repository.
     */
    private void sendEvents(){
        EventType eventType = m_repos.getEventType("TradeEvent");
        TradeEvent tradeEvent = (TradeEvent)eventType.createEvent();
        m_sender.sendDeleteEvent(tradeEvent);
    }
}