9 EclipseLink, JPA, and Oracle Coherence

The Oracle Event Processing installation includes the EclipseLink 2.4.2 open source mapping and persistence framework to support the use of the Java Persistence API (JPA) in your applications. JPA is the standard for object-relational mapping (ORM) and enterprise Java persistence.

This chapter presents two sample Oracle Event Processing applications, HelloWorld and JPA-Coherence-Sample-Code, that use EclipseLink and JPA to read from and write to a database. The JPA-Coherence-Sample-Code also uses a coherence cache for coordinated data updates in an environment with clustered applications and servers.

This chapter includes the following sections:

9.1 High-Level Procedure

Use the following high-level steps to create an Oracle Event Processing application that includes EclipseLink:

  1. Create your Oracle Event Processing application including JPA and Oracle Coherence as needed.
  2. Create a persistence.xml file with the correct JPA configuration. This file contains the properties that control runtime operation.
  3. Put the persistence.xml file in the META-INF directory of your application.
  4. Bundle and deploy the application.

Learn more about EclipseLink at http://eclipse.org/eclipselink/.

Note:

Coherence socket exception occurs when you run CQL sample on AIX 6.1 Japanese platform. To resolve this issue:

Add the -Djava.net.preferIPv4Stack=true parameter to the last line of the startwlevs.sh script.

Note:

Spatial sample is not supported on AIX Platform.

9.2 HelloWorld Example

The HelloWorld example uses EclipseLink to establish a read and write JDBC connection to the data source to access and store HelloWorld events.

In this example, HelloWorld events contain date and time information.

The example is comprised of the following files, which are discussed in this section:

9.2.1 persistence.xml Configuration File

The following persistence.xml file has one persistence unit (persistence-unit) called helloworld. The helloworld persistence unit has a transaction-type of RESOURCE_LOCAL because Oracle Event Processing is a Java SE environment. The EclipseLink properties specify the settings for database read and write operations and logging. For this example, the managed persistable class that represents objects in the database is com.bea.wlevs.event.example.helloworld.HelloWorldEvent.

This persistence.xml file has entries for JPA logging that are commented out and set to false. You can uncomment these settings and set them to true to debug or otherwise monitor the application behavior. For information on property settings, see http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htm.

<?xml version="1.0" encoding="UTF-8" ?>
<persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://java.sun.com/xml/ns/persistence         http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
     version="2.0" xmlns="http://java.sun.com/xml/ns/persistence">
  <persistence-unit name="helloworld" transaction-type="RESOURCE_LOCAL">
    <class>com.bea.wlevs.event.example.helloworld.HelloWorldEvent</class>
      <properties>
        <property name="eclipselink.jdbc.read-connections.min" value="1"/>
        <property name="eclipselink.jdbc.write-connections.min" value="1"/>
      <!--
        <property name="eclipselink.logging.timestamp" value="false"/>
        <property name="eclipselink.logging.thread" value="false"/>
        <property name="eclipselink.logging.session" value="false"/>
        <property name="eclipselink.logging.exceptions" value="false"/>
        <property name="eclipselink.logging.connection" value="false"/>
        <property name="eclipselink.logging.level" value="FINER"/>
      -->
      </properties>
  </persistence-unit>
</persistence>

9.2.2 HelloWorldAdapter.java

The HelloWorldAdapter.java class is a custom threaded adapter that continuously creates events of type HelloWorldEvent. The application constructs message text of type DateFormat, which is used by the generateHelloMessage method to create events of type HelloWorldEvent.

The Oracle Event Processing framework calls the setEventSender method to initialize the eventSender private variable with a StreamSender instance. The StreamSender instance sends events emitted by a StreamSource instance to a StreamSink listener. In this example the StreamSink listener is the HellowWorldBean instance.

package com.bea.wlevs.adapter.example.helloworld;

import java.text.DateFormat;
import java.util.Date;
import com.bea.wlevs.ede.api.RunnableBean;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.event.example.helloworld.HelloWorldEvent;

public class HelloWorldAdapter implements RunnableBean, StreamSource {
    private static final int SLEEP_MILLIS = 300;
    private DateFormat dateFormat;
    private String message;
    private boolean suspended;
    private StreamSender eventSender;
 
    public HelloWorldAdapter() {
        super();
        dateFormat = DateFormat.getTimeInstance();
    }
    public void run() {
        suspended = false;
        while (!isSuspended()) { // Generate messages forever... 
            generateHelloMessage();
            try {
                synchronized (this) {
                    wait(SLEEP_MILLIS);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void setMessage(String message) {
        this.message = message;
    }
    private void generateHelloMessage() {
        String message = this.message + dateFormat.format(new Date());
        HelloWorldEvent event = new HelloWorldEvent();
        event.setMessage(message);
        eventSender.sendInsertEvent(event);
    }
    public void setEventSender(StreamSender sender) {
        eventSender = sender;
    }
    public synchronized void suspend() {
        suspended = true;
    }
    private synchronized boolean isSuspended() {
        return suspended;
    }
}

9.2.3 HelloWorldEvent.java

The HelloWorldEvent.java class creates an event from a message. The HelloWorldAdapter.generateHelloMessage method calls the HelloWorldEvent.setMessage method to create an event from a message. The HelloWorldBean class stores the message and its generated id to and retrieves them from the data store.

package com.bea.wlevs.event.example.helloworld;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class HelloWorldEvent {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private long id;
  private String message;

  public String getMessage() {
    return message;
  }

  public void setMessage (String message) {
    this.message = message;
  }
}

9.2.4 HelloWorldBean.java

The HelloWorldBean.java class is an event sink and source that retrieves events from HelloWorldEvent and performs read and write operations on the database with JPA.

The Oracle Event Processing framework calls the setEventSender method to initialize the m_eventSender private variable with a StreamSender instance. The onInserEvent method sends the events emitted by the StreamSource instance downstream to StreamSink listeners.

package com.bea.wlevs.example.helloworld;

import java.util.HashMap;
import java.util.List;
import javax.annotation.Resource;
import javax.sql.DataSource;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import org.eclipse.persistence.config.PersistenceUnitProperties;
import com.bea.wlevs.ede.api.StreamSink;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.event.example.helloworld.HelloWorldEvent;

public class HelloWorldBean implements StreamSink, StreamSource, DisposableBean {
    private static final String PERSISTENCE_UNIT_NAME = "helloworld";
    private EntityManagerFactory m_entityMgrFactory;
    private EntityManager m_entityMgr;
    private DataSource m_ds;
    private boolean m_shuttingDown;
    private StreamSender m_eventSender;
 
    public void setEventSender(StreamSender sender){
        m_eventSender = sender;
    }
    private void setupEntityManager(){
        if (m_entityMgr!=null)
            return;
        HashMap props = new HashMap();
        props.put(PersistenceUnitProperties.NON_JTA_DATASOURCE, m_ds);
        props.put("eclipselink.ddl-generation", "create-tables");
        props.put("eclipselink.ddl-generation.output-mode", "database");
        m_entityMgrFactory = Persistence.createEntityManagerFactory
                             (PERSISTENCE_UNIT_NAME, props);
        m_entityMgr = m_entityMgrFactory.createEntityManager();
    }
    public void onInsertEvent(Object event){
        if (m_shuttingDown)
            return;
        setupEntityManager();
        if (event instanceof HelloWorldEvent) {
            HelloWorldEvent helloWorldEvent = (HelloWorldEvent) event;
            System.out.println("Message: " + helloWorldEvent.getMessage());
            m_entityMgr.getTransaction().begin();
            try {
                m_entityMgr.persist(helloWorldEvent);
                m_entityMgr.getTransaction().commit();
            } finally {
             if (m_entityMgr.getTransaction().isActive())
              m_entityMgr.getTransaction().rollback();
            }
        }
        Query q = m_entityMgr.createQuery("select t from HelloWorldEvent t");
        List<HelloWorldEvent> hwlist = q.getResultList();
        System.out.println("Stored " + hwlist.size() + " helloworld events");
        m_eventSender.sendInsertEvent(event);
    }
    @Resource(name="derbyDS")
    public void setDataSource(DataSource ds){
        m_ds = ds;
    }
    public void destroy(){
        m_shuttingDown = true;
 
        if (m_entityMgr!=null){
            m_entityMgr.close();
            m_entityMgr=null;
        }
        if (m_entityMgrFactory!=null){
            m_entityMgrFactory.close();
            m_entityMgrFactory=null;
        }
    }
}

9.3 JPA Coherence Example

The JPA Coherence example demonstrates the usage of the EclipseLink JPA implementation for the Coherence CacheLoader or CacheStore interfaces.

9.3.1 persistence.xml Configuration File

The EclipseLink properties specify the settings for database read and write operations and logging. The managed persistable classes that represents objects in the database are com.oracle.cep.sample.PriceTarget and com.oracle.cep.sample.SaleEvent.

This persistence.xml file has entries for JPA logging that are commented out and set to false. You can uncomment these settings and set them to true to debug or otherwise monitor the application behavior. For information on property settings, see http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htm.

<?xml version="1.0" encoding="UTF-8" ?>
<persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://java.sun.com/xml/ns/persistence      http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
     version="2.0" xmlns="http://java.sun.com/xml/ns/persistence">
  <persistence-unit name="derby" transaction-type="RESOURCE_LOCAL">
    <class>com.oracle.cep.sample.PriceTarget</class>
    <class>com.oracle.cep.sample.SaleEvent</class>
    <properties>
      <property name="eclipselink.jdbc.read-connections.min" value="1"/>
      <property name="eclipselink.jdbc.write-connections.min" value="1"/>
      <property name="javax.persistence.jdbc.driver"
                value="org.apache.derby.jdbc.EmbeddedDriver"/>
      <property name="javax.persistence.jdbc.url"
                value="jdbc:derby:test1;create=true"/>
      <property name="eclipselink.ddl-generation" value="create-tables"/>
      <property name="eclipselink.ddl-generation.output-mode" value="database"/>
    <!--
      <property name="eclipselink.logging.timestamp" value="false"/>
      <property name="eclipselink.logging.thread" value="false"/>
      <property name="eclipselink.logging.session" value="false"/>
      <property name="eclipselink.logging.exceptions" value="false"/>
      <property name="eclipselink.logging.connection" value="false"/>
      <property name="eclipselink.logging.level" value="FINER"/>
    -->
    </properties>
  </persistence-unit>
</persistence>

9.3.2 Classes

The example is comprised of the following classes:

In this example, an initial set of items go on sale and the requested target prices are set up in a data store. The data store is available in the PriceTarget.java Coherence cache because it is set up to be used with CacheLoader. A stream of SaleEvents is generated from the SaleEventsGenerator adapter. If the sale prices match the target prices, they are stored in SaleEvent Coherence cache. A Coherence MapListener implementation verifies that the SaleEvents stored in the cache are actually available in the data store as well.

9.3.2.1 CoherenceMapListener.java

The CoherenceMapListener.java class listens for events published to the coherence cache.

package com.oracle.cep.sample;
 
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import com.tangosol.util.MapListener;
import com.tangosol.util.MapEvent;
import com.tangosol.util.ObservableMap;
import com.bea.wlevs.ede.api.InitializingBean;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.StreamSender;

public class CoherenceMapListener implements MapListener, InitializingBean,                                              StreamSource {
    private static final String PERSISTENCE_UNIT_NAME = "derby";
    private EntityManagerFactory m_entityMgrFactory;
    private EntityManager m_entityMgr;
    private ObservableMap m_saleEventCache;
    private StreamSender m_sender;
 
    public void afterPropertiesSet()
    {
        m_saleEventCache.addMapListener(this);
    }
    public void setEventSender(StreamSender sender)
    {
        m_sender = sender;
    }
    public void setSaleEventCache(ObservableMap cache)
    {
        m_saleEventCache = cache;
    }
    public void entryInserted(MapEvent event)
    {
        verifyEventInStore(event);
    }
    private void verifyEventInStore(MapEvent event){
        if (!(event.getNewValue() instanceof SaleEvent)){
            System.out.println("Unexpected type in SaleEvent cache");
            return;
        }
        if (m_entityMgr==null){
            setupEntityMgr();
        }
        SaleEvent sale = (SaleEvent) event.getNewValue();
        Query q = m_entityMgr.createQuery("SELECT s FROM SaleEvent s 
                 WHERE s.itemID = :itemID");
        q.setParameter("itemID", sale.getItemID());
        List<SaleEvent> saleEvents = q.getResultList();
        if (saleEvents.size()==0)
            System.out.println("ERROR! Matched SaleEvent not found in store");
        else {
          System.out.println("Found sale event for " +
                              saleEvents.get(0).getItemID() + " for $" +
                              saleEvents.get(0).getSalePrice());
          m_sender.sendInsertEvent(sale);
        }
    }
    private void setupEntityMgr() {
      m_entityMgrFactory = Persistence.createEntityManagerFactory(
                           PERSISTENCE_UNIT_NAME);
      m_entityMgr = m_entityMgrFactory.createEntityManager();
    }
    public void entryUpdated(MapEvent event){
        verifyEventInStore(event);
    }
    public void entryDeleted(MapEvent event){
      System.out.println("SaleEvent cache entry deleted. 
                          Should not see this event for this sample");
    }
}

9.3.2.2 PriceTarget.java

package com.oracle.cep.sample;
 
import javax.persistence.Entity;
import javax.persistence.Id;
 
@Entity
public class PriceTarget implements java.io.Serializable {
    @Id
    private String itemID;
    private double targetPrice;
 
    public String getItemID() {
        return itemID;
    }
    public void setItemID(String itemID) {
        this.itemID = itemID;
    }
    public double getTargetPrice(){
        return targetPrice;
    }
    public void setTargetPrice(double targetPrice){
        this.targetPrice = targetPrice;
    }
}

9.3.2.3 PriceTargetLoader.java

package com.oracle.cep.sample;
 
import java.util.ArrayList;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import com.bea.wlevs.ede.api.InitializingBean;
 
public class PriceTargetLoader implements DisposableBean, InitializingBean {
    private static final String PERSISTENCE_UNIT_NAME = "derby";
    static ArrayList<PriceTarget> s_entriesToLoad = new ArrayList<PriceTarget>();
    static {
      setUpEntriesToLoad();
    }
 
    private EntityManagerFactory m_entityMgrFactory;
    private EntityManager m_entityMgr;
 
    public void afterPropertiesSet() {
      m_entityMgrFactory = Persistence.createEntityManagerFactory(
                                                      PERSISTENCE_UNIT_NAME);
      m_entityMgr = m_entityMgrFactory.createEntityManager();
      m_entityMgr.getTransaction().begin();
      try{
        Query q = m_entityMgr.createQuery("SELECT t FROM PriceTarget t 
                                          WHERE t.itemID = :itemID");
        for (PriceTarget target : s_entriesToLoad){
          q.setParameter("itemID", target.getItemID());
          List<PriceTarget> targetList = q.getResultList();
          if (targetList.size()==0){
            System.out.println("Persisting target " + target.getItemID());
            m_entityMgr.persist(target);
          } else {
            System.out.println("Found target " + target.getItemID());
          }
        }
        m_entityMgr.getTransaction().commit();
     }
     finally {
       if(m_entityMgr.getTransaction().isActive())
         m_entityMgr.getTransaction().rollback();
     }
   }
 
   public void destroy() {
     if(m_entityMgr!=null) {
       m_entityMgr.close();
       m_entityMgr=null;
      }
      if(m_entityMgrFactory!=null){
       m_entityMgrFactory.close();
       m_entityMgrFactory=null;
     }
   }
   private static void setUpEntriesToLoad(){
    // 'smith', ipad2, $400
    PriceTarget target = new PriceTarget();
    target.setItemID("ipad2");
    target.setTargetPrice(400);
    s_entriesToLoad.add(target);
    // 'doe', kindle, $100
    target = new PriceTarget();
    target.setItemID("kindle");
    target.setTargetPrice(100);
    s_entriesToLoad.add(target);
    // walker, rebel, $400
    target = new PriceTarget();
    target.setItemID("rebel");
    target.setTargetPrice(400);
    s_entriesToLoad.add(target);
    // williams, lasko1320, $25
    target = new PriceTarget();
    target.setItemID("lasko1320");
    target.setTargetPrice(25);
    s_entriesToLoad.add(target);
  }
}

9.3.2.4 SaleEvent.java

package com.oracle.cep.sample;
import javax.persistence.Entity;
import javax.persistence.Id;
 
@Entity
public class SaleEvent implements java.io.Serializable {
    @Id
    private String itemID;
    private double salePrice;
 
    public SaleEvent() { }
    public SaleEvent(String itemID, double salePrice){
        this.itemID = itemID;
        this.salePrice = salePrice;
    }
    public String getItemID(){
        return itemID;
    }
    public void setItemID(String itemID){
        this.itemID = itemID;
    }
    public double getSalePrice(){
        return salePrice;
    }
    public void setSalePrice(double salePrice) {
        this.salePrice = salePrice;
    }
    public String toString() {
         return "SaleEvent(" + itemID + ":" + salePrice + ")";
    }
}

9.3.2.5 SaleEventsGenerator.java

package com.oracle.cep.sample;
 
import java.util.Map;
import java.util.Random;
import com.bea.wlevs.ede.api.RunnableBean;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.InitializingBean;
 
public class SaleEventsGeneraton implements RunnableBean, StreamSource,
                                 InitializingBean{
    private static final int SLEEP_MILLIS = 1000;
    private static final String[] s_itemIDs = { 
          "kodaksport",
          "ipodtouch-8GB",
          "ipad2",
          "kindle",
          "garmin1690",
          "rebel",
          "logitech1080",
          "tomtom",
          "ipad2",
          "cuisinart10s",
          "keurig-b70",
          "lasko1320" };
    private static final double[] s_prices = {
          60.0,
          200.0,
          450.0,
          99,
          120,
          400,
          70,
          100,
          399,
          100,
          150,
          20 };
 
    private boolean m_suspended;
    private Thread m_thread;
    private StreamSender m_sender;
    private Map m_priceTargetCache;
 
    public void setPriceTargetCache(Map cache){
        m_priceTargetCache = cache;
    }
    public void afterPropertiesSet() {
        // pre-load PriceTarget cache
        for (PriceTarget target : PriceTargetLoader.s_entriesToLoad)
        {
            System.out.println("Getting : " + target.getItemID());
            m_priceTargetCache.get(target.getItemID());
        }
    }
    public void run() {
        m_thread = Thread.currentThread();
        m_suspended = false;
        // send random sale events
        Random random = new Random(System.currentTimeMillis());
        while (!isSuspended())
        { 
            int index = random.nextInt(s_itemIDs.length);
            SaleEvent event = new SaleEvent(s_itemIDs[index], s_prices[index]);
            m_sender.sendInsertEvent(event);
            try {
                synchronized (this) { wait(SLEEP_MILLIS); }
            } catch (InterruptedException e) {
                if (isSuspended())
                    return;
            }
        }
    }
    public void setEventSender(StreamSender sender) {
        m_sender = sender;
    }
    public synchronized void suspend() {
        m_suspended = true;
        if (m_thread!=null)
            m_thread.interrupt();
    }
    private synchronized boolean isSuspended() {
        return m_suspended;
    }
}