Skip Headers
Oracle® Fusion Middleware Developer's Guide for Oracle Event Processing
11g Release 1 (11.1.1.7) for Eclipse

Part Number E14301-09
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
PDF · Mobi · ePub

16 Handling Events with Java

This chapter describes how to implement the Oracle Event Processing interfaces needed for a Java class to act as an event sink and event source, receiving and sending events in an event processing network (EPN). It also describes how to configure a Java class as an Oracle Event Processing event bean or Spring bean.

Whether you are writing new logic in Java or wanting to incorporate existing Java code, Oracle Event Processing provides several ways to add Java code to your application. Your options include where and how the code executes, as well as how you configure it.

This chapter includes the following sections:

16.1 Roles for Java Code in an Event Processing Network

Whether you are writing new functionality or have existing logic in Java that you want to incorporate into an Oracle Event Processing application, places where you code might go depending on its role.

Note that many Oracle Event Processing applications have no need for Java code at all. For example, an application's logic might be captured in Oracle CQL alone.

Use the following descriptions of roles for Java code to find the best place for your code.

16.2 Handling Events with Sources and Sinks

When you write Java code designed to handle events, you create a class that is an event sink or event source. An event sink is able to receive events as they flow through an event processing network. An event source can create events and send them along to a downstream stage in the EPN. For example, you might create a class that receive events, does something with their data, then send them along to the next stage.

Event sinks and sources implement particular interfaces provided by the Oracle Event Processing API. An event sink implements interfaces that include methods through which the Oracle Event Processing server can pass into it event type instances. An event source implements an interface that includes a method through which it receives an object with which to send events. A single class can implement either or both kinds of functionality, depending on its role in the event processing network (EPN).

Note:

This section assumes that you are familiar with streams and relations and the differing ways they represent events moving through an event processing network. If not, be sure to read Section 1.3, "Overview of Events, Streams and Relations".

Common places for event-handling Java code are adapters and beans. An adapter is a part of the EPN that communicates with external components. It is designed to receive external event data and create events from the data, or to receive internal events and send their data along to another component outside the EPN. Writing event handling code, as described in this chapter, is an important part of creating an adapter. For more on creating adapters, see Chapter 15, "Integrating an External Component Using a Custom Adapter".

Description of java_logic.gif follows
Description of the illustration java_logic.gif

Like an adapter, a bean can handle events. But you typically add a bean in the midst of an EPN so that events will pass into and out of it. For example, you might want to add a bean whose Java code executes application logic in response to data in events passing through it.

The following procedure describes the typical steps for creating a Java class that receives and sends events.

  1. Implement the interfaces needed to receive or send events. Your options for developing Java logic as an EPN component are as follows:

    • To create a class that can receive events as they pass through the EPN, implement interfaces that make the class an event sink. Those interfaces include StreamSink or RelationSink for receiving single events, and BatchStreamSink or BatchRelationSink for receiving batches of events. For more information, see Section 16.2.1, "Implementing an Event Sink".

    • To create a class that can send events to other parts of the EPN, implement interfaces that make the class an event source. Those interfaces include StreamSource or RelationSource for sending single events, and BatchStreamSource or BatchRelationSource for sending batches of events. For more information, see Section 16.2.2, "Implementing an Event Source".

  2. Configure the class so that you can add it to the EPN where it belongs.

    Note:

    This procedure assumes that the custom event bean is bundled in the same application JAR file that contains the other components of the EPN, such as the processor, streams, and business logic POJO. If you want to bundle the custom event bean in its own JAR file so that it can be shared among multiple applications, see Section 23.2.4.2, "How to Assemble an Event Bean in its Own Bundle."

16.2.1 Implementing an Event Sink

You can create a Java class that is able to receive events as they pass through an event processing network. A component that can receive events is an event sink. You might create an event sink, for example, to receive events in the midst of an event processing network, with logic for responding to each event's content.

A Java class that is an event sink implements one of the interfaces described in this section. Each of these interfaces provides methods that the Oracle Event Processing server uses to pass events to the class as the events exit the EPN stage connected upstream of the class, typically a channel.

Note:

For a step-by-step development example that includes creating a simple event sink, be sure to see Chapter 8, "Walkthrough: Assembling a Simple Application".

The interfaces described here are intended to provide support for events arriving either as streams or relations. However, interfaces for relation support also support receiving events arriving as streams. As described in the following table, the interfaces are hierarchically related.

Interface Description

com.bea.wlevs.ede.api.StreamSink

Implement this to receive events arriving sequentially as a stream.

com.bea.wlevs.ede.api.RelationSink

Implement this to receive events arriving sequentially as a relation. Extends StreamSink., so it also provides support for receiving events as a stream.

com.bea.wlevs.ede.api.BatchStreamSink

Implement this to support receiving batched events arriving as a stream. Events might arrive batched by timestamp if the channel they are coming from is configured to allow batching. Extends StreamSink, so it also provides support for receiving events unbatched.

com.bea.wlevs.ede.api.BatchRelationSink

Implement this to support receiving batched events arriving as a relation. Events might arrive batched by timestamp if the channel they are coming from is configured to allow batching. Extends RelationSink, so it also provides support for receiving events unbatched as either streams or relations.


16.2.1.1 Implementing StreamSink or BatchStreamSink

A class that receives events as a stream will receive only events that are, from the Oracle Event Processing standpoint, "inserted." That's because in a stream, events are always appended to the end of a sequence. Events in a stream are also always received in ascending time order, so that their timestamps have non-decreasing values from one event to the one that follows it. (The idea of non-decreasing timestamps allows for the possibility that the timestamp of one event can be the same as the timestamp of the event that precedes it, but not earlier than that preceding timestamp. It's either the same or later.)

As a result, the interfaces for support to receive events as a stream have one method each for receiving events. Contrast this with the interfaces for receiving events as a relation, which support receiving multiple kinds of events.

You implement the StreamSink interface if you expect your class to receive unbatched events as a stream. It has a single method, onInsertEvent, which the Oracle Event Processing server calls to pass in each event from the stream as it leaves the upstream stage that is connected to your class.

In Example 16-1, "Implementing the StreamSink Interface", a simple StreamSink implementation that receive stock trade events receives each event as an Object instance, then tests to see if the event is an instance of a particular event type. If it is, then the code retrieves values of properties known to be members of that type.

Example 16-1 Implementing the StreamSink Interface

public class TradeListener implements StreamSink {

    public void onInsertEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            String symbolProp = ((TradeEvent) event).getSymbol();
            Integer volumeProp = ((TradeEvent) event).getVolume();
            // Code to do something with the property values.
        }
    }
}

You implement the BatchStreamSink interface if you expect your class to receive batched events as a stream. The interface has a single method, onInsertEvents, which the Oracle Event Processing server calls to pass in a collection of events received from the upstream stage. (The BatchStreamSink interface extends StreamSink, so can receive unbatched events also.)

For more information about event batching, see Section 10.1.6, "Batch Processing Channels".

16.2.1.2 Implementing RelationSink or BatchRelationSink

A class that receives events as a relation can receive any of the kinds of events possible in a relation: insert events, delete events, and update events. Unlike a stream, events in a relation are unordered and include events that have been updated or deleted by code that created or operated on the relation.

As a result, the interfaces for support to receive events as a relation have methods through which your class can receive insert, delete, or update events.

You implement the RelationSink interface if you expect your class to receive unbatched events as a relation. It has three methods (one inherited from the StreamSink interface, which it extends), onInsertEvent, onDeleteEvent, and onUpdateEvent. At runtime, the Oracle Event Processing server will call the appropriate method depending on which type of event is being received from the upstream channel connected to your class.

Example 16-2 Implementing the RelationSink Interface

public class TradeListener implements RelationSink {

    public void onInsertEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            String symbolProp = ((TradeEvent) event).getSymbol();
            Integer volumeProp = ((TradeEvent) event).getVolume();
            // Do something with the inserted event.
        }
    }

    @Override
    public void onDeleteEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            // Do something with the deleted event.
        }
    }

    @Override
    public void onUpdateEvent(Object event) throws EventRejectedException {
        if (event instanceof TradeEvent){
            // Do something with the updated event.
        }
    }
}

You implement the BatchRelationSink interface if you expect your class to receive batched events as a relation. It has an onEvents method designed to receive all three types of events from the batch in java.util.Collection instances:

onEvents(insertEvents, deleteEvents, updateEvents)

In addition, the interface extends the RelationSink interface to provide support for receiving unbatched events.

At runtime, the Oracle Event Processing server calls the appropriate method to pass in events received from the upstream stage connected to your class.

For more information about event batching, see Section 10.1.6, "Batch Processing Channels".

For complete API reference information about the Oracle Event Processing APIs described in this section, see the Oracle Fusion Middleware Java API Reference for Oracle Event Processing.

16.2.2 Implementing an Event Source

You can create a Java class that is able to send events to a downstream stage in an event processing network. A component that can send events is an event source. You might create an event source, for example, to send events your Java code has created or altered from event data flowing through the EPN.

A Java class that is an event source implements one of the interfaces described in this section. Each of these interfaces provides a method used by the Oracle Event Processing server to pass into your class an instance of a sender class.

The sender instance your event source receives, in turn, implements one of the sender interfaces described in this section. The sender interfaces provide methods your code can call to send events as streams or relations, batched or unbatched, along to the downstream EPN stage that follows it, such as a channel.

The interfaces described here are intended to provide support for sending events either as streams or relations. However, interfaces for relation support also support sending events as streams.

Table 16-1 Interfaces for Implementing an Event Source

Interface Description

com.bea.wlevs.ede.api.StreamSource

Implement this for the ability to send events as a stream. At runtime, the Oracle Event Processing server will inject an instance of a stream sender class.

com.bea.wlevs.ede.api.RelationSource

Implement this for the ability to send events as a relation or stream. At runtime, the Oracle Event Processing server will inject an instance of a relation sender class. Extends StreamSource., so it also provides support as a source of stream events.


The interfaces listed in Table 16-2, "Interfaces Implemented by Sender Classes" are implemented by sender classes your event source class receives from the Oracle Event Processing server.

Table 16-2 Interfaces Implemented by Sender Classes

Interface Description

com.bea.wlevs.ede.api.StreamSender

Provides a method with which your code can send events as a stream.

com.bea.wlevs.ede.api.RelationSender

Provides methods with which your code can send events as a relation. Extends StreamSender, so it also provides support for sending events as a stream.

com.bea.wlevs.ede.api.BatchStreamSender

Provides a method with which your code can send batched events as a stream. You might send ev

ents batched by timestamp if the downstream stage to which you're sending them is a channel configured for batched events. Extends StreamSender, so it also provides support for sending events unbatched.

com.bea.wlevs.ede.api.BatchRelationSender

Provides a method with which your code can send batched events as a relation. You might send events batched by timestamp if the downstream stage to which you're sending them is a channel configured for batched events. Extends RelationSender, so it also provides support for sending events unbatched.


16.2.2.1 Implementing StreamSource

A class that is a source of events as a stream should send only events that are, from the Oracle Event Processing standpoint, "inserted." Sending only inserted events models a stream, rather than a relation. Events sent from a stream source should also have non-decreasing timestamps from one event to the event that follows it. In other words, the timestamp of an event that follows another should either be the same as or later than the event that preceded it.

When you implement StreamSource, your code can send events batched or unbatched. Your implementation of the StreamSource setEventSender method will receive a sender instance that you can cast to one of the types described in Table 16-2, "Interfaces Implemented by Sender Classes". Your code should use the sender instance to send events as expected by the downstream stage to which the events will be going.

If your code is sending events to a channel that enables batching, you should use one of the batched event senders to batch events by timestamp before sending them. For more information, see Section 10.1.6, "Batch Processing Channels".

The sender instance also provides a sendHeartbeat method with which you can send a heartbeat if the receiving channel is configured to be application timestamped.

16.2.2.2 Implementing RelationSource

A class that is a source of events as a relation can send insert, delete, and update events as expected by the downstream stage that is receiving the events.

When you implement RelationSource, your code can send events batched or unbatched. Your implementation of the RelationSource setEventSender method will receive a sender instance that you can cast to one of the types described in Table 16-2, "Interfaces Implemented by Sender Classes". Your code should use the sender instance to send events as expected by the downstream stage.

As you implement RelationSource, keep in mind the following constraints when using the sender instance your class receives:

  • For sendDeleteEvent, you must send an instance of the same event type as that configured for the channel.

  • For sendInsertEvent, a unique constraint violation exception will be raised and the input event discarded if an event with the same primary key is already in the relation.

  • For sendUpdateEvent, an invalid update tuple exception will be raised and the input event will be discarded if an event with the given primary key is not in the relation.

In Example 16-3, "Implementing the RelationSource Interface", a simple RelationSource implementation receives a StreamSender, then casts the sender to a RelationSender in order to send events as a relation. This class creates a new TradeEvent instance from the event type configured in the repository, but the sendEvents method could as easily have received an instance as a parameter from another part of the code.

Example 16-3 Implementing the RelationSource Interface

package com.oracle.cep.example.tradereport;

import com.bea.wlevs.ede.api.EventType;
import com.bea.wlevs.ede.api.EventTypeRepository;
import com.bea.wlevs.ede.api.RelationSender;
import com.bea.wlevs.ede.api.RelationSource; 
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.util.Service;

public class TradeEventSource implements RelationSource {

    // Variables for event type respository and event sender. Both
    // will be set by the server.
    EventTypeRepository m_repos = null;       
    RelationSender m_sender = null;

    // Called by the server to set the repository instance.
    @Service
    public void setEventTypeRepository(EventTypeRepository repos) {
        m_repos = repos;
    }

    // Called by the server to set the sender instance.
    @Override
    public void setEventSender(StreamSender sender) {
        // Cast the received StreamSender to a RelationSender    
        m_sender = (RelationSender)sender;
    }

    /**
     * Sends events to the next EPN stage using the sender 
     * received from the server. This code assumes that an event
     * instance isn't received from another part of the class, 
     * instead creating a new instance from the repository.
     */
    private void sendEvents(){
        EventType eventType = m_repos.getEventType("TradeEvent");
        TradeEvent tradeEvent = (TradeEvent)eventType.createEvent();
        m_sender.sendDeleteEvent(tradeEvent);
    }
}

16.3 Configuring Java Classes as Beans

When you write Java classes to handle events, you can add them to an EPN by configuring them in the EPN assembly file. You can configure a class a either a Spring bean or an Oracle Event Processing event bean.

Note:

You can also add a class as part of an adapter, whose code receives or sends events when interacting with an external component. For more about configuring a class as an adapter, see Chapter 15, "Integrating an External Component Using a Custom Adapter".

Whether you configure your class as an event bean or Spring bean depends on your deployment context and the features you want to support. Essentially, however, you might want to configure the class as Spring bean if you're interested in taking advantage of an Spring features, such as when you're already using Spring elsewhere.

Table 16-3, "Comparison of Event Beans and Spring Beans" lists the features provided by event beans and Spring beans.

Table 16-3 Comparison of Event Beans and Spring Beans

Bean Type Description

Event bean

Useful as an EPN stage to actively use the capabilities of the Oracle Event Processing server container. An event bean:

  • Is a type of Oracle Event Processing EPN stage.

  • Can be monitored by the Oracle Event Processing monitoring framework.

  • Can make use of the configuration metadata annotations.

  • Can be set to record and play-back events that pass through it.

  • Can participate in the Oracle Event Processing server bean lifecycle by specifying methods in its XML declaration, rather than by implementing Oracle Event Processing server API interfaces.

Spring bean

Useful for legacy integration to Spring. A Spring bean:

  • Is useful if you have a Spring bean you want to add to an EPN.

  • Is not a type of Oracle Event Processing EPN stage.

  • Cannot be monitored by the Oracle Event Processing monitoring framework.

  • Cannot use the configuration metadata annotations.

  • Cannot be set to record and play back events that pass through it.


For more details on each of the different bean types, see the following:

16.3.1 Configuring a Java Class as an Event Bean

You can configure a Java class as an Oracle Event Processing event bean in the EPN assembly file and the componenent configuration file. Configuring it as a event bean in the EPN assembly file will add the bean to the EPN with default settings. You can then configure it in the component configuration file to support runtime configuration changes to certain features.

For more information, see the following sections:

For a complete description of the configuration file, including registration of other components of your application, see Section 5.3, "Creating EPN Assembly Files."

16.3.1.1 Configuring an Event Bean in an EPN Assembly File

In an EPN assembly file, you use the wlevs:event-bean element to declare a custom event bean as a component in the event processor network. Note that the configuration code will differ for event beans created from a factory. For more information, see Section 16.3.1.3, "Creating an Event Bean Factory".

In the following example, the TradeListener class is configured as an event bean whose downstream connection is a channel with ID BeanOutputChannel:

<wlevs:event-bean id="TradeListenerBean"
    class="com.oracle.cep.example.tradereport.TradeListener">
    <wlevs:listener ref="BeanOutputChannel"/>
</wlevs:event-bean>

You can also use a wlevs:instance-property child element to set any static properties in the bean. Static properties are those that you will not change after the bean is deployed.

For example, if your bean class has a setThreshold method, you can pass it the port number as shown:

<wlevs:event-bean id="TradeListenerBean"
    class="com.oracle.cep.example.tradereport.TradeListener">
    <wlevs:instance-property name="threshold" value="6000" />
    <wlevs:listener ref="BeanOutputChannel"/>
</wlevs:event-bean>

You reference an event bean that is an event sink by using the wlevs:listener element. In the following example, a TradeListenerBean event bean receives events from a channel ProcessorOutputChannel:

<wlevs:channel id="ProcessorOutputChannel" >
    <wlevs:listener ref="TradeListenerBean" />
</wlevs:channel>

16.3.1.2 Configuring an Event Bean in a Component Configuration File

You can add configuration for an event bean in a component configuration file. Configuration you add here is available to be updated at runtime. Adding configuration in a component configuration file assumes that you have added the event bean to the EPN by configuring it in the EPN assembly file (see Section 16.3.1.1, "Configuring an Event Bean in an EPN Assembly File" for more information).

You can also create a separate component configuration XML file as needed according to your development environment. For example, if your application has more than one custom event bean, you can create separate XML files for each, or create a single XML file that contains the configuration for all custom event beans, or even all components of your application (beans, adapters, processors, and streams).

In the following example, a TradeListenerBean event bean is configured for event recording. Each event bean configuration should have a separate event-bean child element of the config element.

<?xml version="1.0" encoding="UTF-8"?>
<wlevs:config 
    xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application">
    <event-bean>
        <name>TradeListenerBean</name>
        <record-parameters>
            <dataset-name>tradereport_sample</dataset-name>
            <event-type-list>
                <event-type>TradeEvent</event-type>
            </event-type-list>
            <batch-size>1</batch-size>
            <batch-time-out>10</batch-time-out>
        </record-parameters>
    </event-bean>
</wlevs:config>

Uniquely identify each event bean with the name child element. This name must be the same as the value of the id attribute in the wlevs:event-bean element of the EPN assembly file that defines the event processing network of your application. This is how Oracle Event Processing knows to which particular event bean component in the EPN assembly file this configuration applies.

You can also extend component configuration with your own elements. For more information, see Section 26, "Extending Component Configuration".

For a reference on the component configuration XML schema, see Section B.2, "Component Configuration Schema wlevs_application_config.xsd".

16.3.1.3 Creating an Event Bean Factory

You can use a single event bean implementation in multiple event processing networks by implementing and configuring an event bean factory. The factory class provides event bean instances for the applications that request one.

For detail on the APIs described here, see the Oracle Fusion Middleware Java API Reference for Oracle Event Processing

Creating an event bean factory involves the following steps:

  1. In your event bean class, implement the com.bea.wlevs.ede.api.EventBean interface so that the bean can be returned by the factory. This is a marker interface, so there are no methods to implement.

    public class TradeListener implements EventBean, StreamSink {
        // Bean implementation code.
    }
    
  2. Implement an event bean factory class to create and return instances of the event bean.

    Your event bean factory class must implement the com.bea.wlevs.ede.api.EventBeanFactory interface. In your implementation of its create method, create an instance of your event bean.

    import com.oracle.cep.example.tradereport.TradeListener;
    import com.bea.wlevs.ede.api.EventBeanFactory;
    
    public class TradeListenerFactory implements EventBeanFactory {
        public TradeListenerFactory() {
        }
        public synchronized TradeListener create() 
            throws IllegalArgumentException {
    
            // Your code might have a particular way to create the instance.
            return new TradeListener();
    
        }
    }
    
  3. In an EPN assembly file, configure the factory class.

    You register factories in the EPN assembly file using the wlevs:factory element, as shown in the following example:

    <wlevs:factory provider-name="tradeListenerProvider" 
        class="com.oracle.cep.example.tradereport.TradeListenerFactory"/>
    

    If you need to specify service properties, then you must also use the osgi:service element to register the factory as an OSGI service in the EPN assembly file. The scope of the OSGI service registry is all of Oracle Event Processing. This means that if more than one application deployed to a given server is going to use the same even bean factory, be sure to register the factory only once as an OSGI service.

    Add an entry to register the service as an implementation of the com.bea.wlevs.ede.api.EventBeanFactory interface. Provide a property, with the key attribute equal to type, and the name by which this event bean provider will be referenced. Finally, add a nested standard Spring bean element to register your specific event bean class in the Spring application context

    <osgi:service interface="com.bea.wlevs.ede.api.EventBeanFactory">
        <osgi:service-properties>
            <entry key="type" value="tradeListenerProvider"</entry>
        </osgi:service-properties>
        <bean class="com.oracle.cep.example.tradereport.TradeListenerFactory" />
    </osgi:service>
    
  4. In applications that will use instances of the event bean, configure the event bean by specifying the configured event bean factory as a provider (rather than specifying the bean by its class name), as shown in the following example:

    <wlevs:event-bean id="TradeListenerBean"
        provider="tradeListenerProvider">
        ...
    </wlevs:event-bean>
    

For more on bundling an event bean in its own bundle for reuse, see Section 23.2.4.2, "How to Assemble an Event Bean in its Own Bundle".

16.3.2 Configuring a Java Class as a Spring Bean

You can configure a Java class as a Spring bean in order to include the class in an event processing network. This is a good option if you have an existing Spring bean that you want to incorporate into the EPN. Or you might simply want to have your Java code make use of Spring features.

A Spring bean is a Java class managed by the Spring framework. You add a class as a Spring bean by configuring it in the EPN assembly file using the standard bean element.

Keep in mind that a Spring bean is not a type of Oracle Event Processing stage. In other words, it cannot be monitored by the Oracle Event Processing monitoring framework, cannot use the configuration metadata annotations, and cannot be set to record and play-back events that pass through it.

In the EPN assembly file, you use the bean element to declare a custom Spring bean as a component in the event processor network. For example:

<bean id="TradeListenerBean"
    class="com.oracle.cep.example.tradereport.TradeListener">
</bean>

16.3.2.1 Supporting Spring Bean Characteristics

In a Spring bean you are planning to add to an EPN, you can implement the various lifecycle interfaces. These include InitializingBean, DisposableBean, and the active interfaces, such as RunnableBean. The Spring bean event source can also use configuration metadata annotations such as @Prepare, @Rollback, and @Activate.