The Oracle Stream Analytics installation includes the EclipseLink 2.4.2 open source mapping and persistence framework to support the use of the Java Persistence API (JPA) in your applications. JPA is the standard for object-relational mapping (ORM) and enterprise Java persistence.
This chapter presents two sample Oracle Stream Analytics applications, HelloWorld
and JPA-Coherence-Sample-Code
, that use EclipseLink and JPA to read from and write to a database. The JPA-Coherence-Sample-Code also uses a coherence cache for coordinated data updates in an environment with clustered applications and servers.
This chapter includes the following sections:
Use the following high-level steps to create an Oracle Stream Analytics application that includes EclipseLink:
persistence.xml
file with the correct JPA configuration. This file contains the properties that control runtime operation.persistence.xml
file in the META-INF
directory of your application.Learn more about EclipseLink at http://eclipse.org/eclipselink/
.
Note:
Coherence socket exception occurs when you run CQL sample on AIX 6.1 Japanese platform. To resolve this issue:Add the -Djava.net.preferIPv4Stack=true
parameter to the last line of the startwlevs.sh
script.
Note:
Spatial sample is not supported on AIX Platform.The HelloWorld example uses EclipseLink to establish a read and write JDBC connection to the data source to access and store HelloWorld events.
In this example, HelloWorld events contain date and time information.
The example is comprised of the following files, which are discussed in this section:
The following persistence.xml
file has one persistence unit (persistence-unit
) called helloworld
. The helloworld persistence unit has a transaction-type
of RESOURCE_LOCAL
because Oracle Stream Analytics is a Java SE environment. The EclipseLink properties specify the settings for database read and write operations and logging. For this example, the managed persistable class that represents objects in the database is com.bea.wlevs.event.example.helloworld.HelloWorldEvent
.
This persistence.xml
file has entries for JPA logging that are commented out and set to false
. You can uncomment these settings and set them to true
to debug or otherwise monitor the application behavior. For information on property settings, see http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htm
.
<?xml version="1.0" encoding="UTF-8" ?> <persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd" version="2.0" xmlns="http://java.sun.com/xml/ns/persistence"> <persistence-unit name="helloworld" transaction-type="RESOURCE_LOCAL"> <class>com.bea.wlevs.event.example.helloworld.HelloWorldEvent</class> <properties> <property name="eclipselink.jdbc.read-connections.min" value="1"/> <property name="eclipselink.jdbc.write-connections.min" value="1"/> <!-- <property name="eclipselink.logging.timestamp" value="false"/> <property name="eclipselink.logging.thread" value="false"/> <property name="eclipselink.logging.session" value="false"/> <property name="eclipselink.logging.exceptions" value="false"/> <property name="eclipselink.logging.connection" value="false"/> <property name="eclipselink.logging.level" value="FINER"/> --> </properties> </persistence-unit> </persistence>
The HelloWorldAdapter.java
class is a custom threaded adapter that continuously creates events of type HelloWorldEvent
. The application constructs message text of type DateFormat
, which is used by the generateHelloMessage
method to create events of type HelloWorldEvent
.
The Oracle Stream Analytics framework calls the setEventSender
method to initialize the eventSender
private variable with a StreamSender
instance. The StreamSender
instance sends events emitted by a StreamSource
instance to a StreamSink
listener. In this example the StreamSink
listener is the HellowWorldBean
instance.
package com.bea.wlevs.adapter.example.helloworld; import java.text.DateFormat; import java.util.Date; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldAdapter implements RunnableBean, StreamSource { private static final int SLEEP_MILLIS = 300; private DateFormat dateFormat; private String message; private boolean suspended; private StreamSender eventSender; public HelloWorldAdapter() { super(); dateFormat = DateFormat.getTimeInstance(); } public void run() { suspended = false; while (!isSuspended()) { // Generate messages forever... generateHelloMessage(); try { synchronized (this) { wait(SLEEP_MILLIS); } } catch (InterruptedException e) { e.printStackTrace(); } } } public void setMessage(String message) { this.message = message; } private void generateHelloMessage() { String message = this.message + dateFormat.format(new Date()); HelloWorldEvent event = new HelloWorldEvent(); event.setMessage(message); eventSender.sendInsertEvent(event); } public void setEventSender(StreamSender sender) { eventSender = sender; } public synchronized void suspend() { suspended = true; } private synchronized boolean isSuspended() { return suspended; } }
The HelloWorldEvent.java
class creates an event from a message. The HelloWorldAdapter.generateHelloMessage method calls the HelloWorldEvent.setMessage method to create an event from a message.
The HelloWorldBean
class stores the message
and its generated id
to and retrieves them from the data store.
package com.bea.wlevs.event.example.helloworld; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; @Entity public class HelloWorldEvent { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private long id; private String message; public String getMessage() { return message; } public void setMessage (String message) { this.message = message; } }
The HelloWorldBean.java
class is an event sink and source that retrieves events from HelloWorldEvent
and performs read and write operations on the database with JPA.
The Oracle Stream Analytics framework calls the setEventSender
method to initialize the m_eventSender
private variable with a StreamSender
instance. The onInserEvent
method sends the events emitted by the StreamSource
instance downstream to StreamSink
listeners.
package com.bea.wlevs.example.helloworld; import java.util.HashMap; import java.util.List; import javax.annotation.Resource; import javax.sql.DataSource; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; import javax.persistence.Query; import org.springframework.beans.factory.DisposableBean; import org.eclipse.persistence.config.PersistenceUnitProperties; import com.bea.wlevs.ede.api.StreamSink; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldBean implements StreamSink, StreamSource, DisposableBean { private static final String PERSISTENCE_UNIT_NAME = "helloworld"; private EntityManagerFactory m_entityMgrFactory; private EntityManager m_entityMgr; private DataSource m_ds; private boolean m_shuttingDown; private StreamSender m_eventSender; public void setEventSender(StreamSender sender){ m_eventSender = sender; } private void setupEntityManager(){ if (m_entityMgr!=null) return; HashMap props = new HashMap(); props.put(PersistenceUnitProperties.NON_JTA_DATASOURCE, m_ds); props.put("eclipselink.ddl-generation", "create-tables"); props.put("eclipselink.ddl-generation.output-mode", "database"); m_entityMgrFactory = Persistence.createEntityManagerFactory (PERSISTENCE_UNIT_NAME, props); m_entityMgr = m_entityMgrFactory.createEntityManager(); } public void onInsertEvent(Object event){ if (m_shuttingDown) return; setupEntityManager(); if (event instanceof HelloWorldEvent) { HelloWorldEvent helloWorldEvent = (HelloWorldEvent) event; System.out.println("Message: " + helloWorldEvent.getMessage()); m_entityMgr.getTransaction().begin(); try { m_entityMgr.persist(helloWorldEvent); m_entityMgr.getTransaction().commit(); } finally { if (m_entityMgr.getTransaction().isActive()) m_entityMgr.getTransaction().rollback(); } } Query q = m_entityMgr.createQuery("select t from HelloWorldEvent t"); List<HelloWorldEvent> hwlist = q.getResultList(); System.out.println("Stored " + hwlist.size() + " helloworld events"); m_eventSender.sendInsertEvent(event); } @Resource(name="derbyDS") public void setDataSource(DataSource ds){ m_ds = ds; } public void destroy(){ m_shuttingDown = true; if (m_entityMgr!=null){ m_entityMgr.close(); m_entityMgr=null; } if (m_entityMgrFactory!=null){ m_entityMgrFactory.close(); m_entityMgrFactory=null; } } }
The JPA Coherence example demonstrates the usage of the EclipseLink JPA implementation for the Coherence CacheLoader
or CacheStore
interfaces.
The EclipseLink properties specify the settings for database read and write operations and logging. The managed persistable classes that represents objects in the database are com.oracle.cep.sample.PriceTarget
and com.oracle.cep.sample.SaleEvent
.
This persistence.xml
file has entries for JPA logging that are commented out and set to false
. You can uncomment these settings and set them to true
to debug or otherwise monitor the application behavior. For information on property settings, see http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htm
.
<?xml version="1.0" encoding="UTF-8" ?> <persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd" version="2.0" xmlns="http://java.sun.com/xml/ns/persistence"> <persistence-unit name="derby" transaction-type="RESOURCE_LOCAL"> <class>com.oracle.cep.sample.PriceTarget</class> <class>com.oracle.cep.sample.SaleEvent</class> <properties> <property name="eclipselink.jdbc.read-connections.min" value="1"/> <property name="eclipselink.jdbc.write-connections.min" value="1"/> <property name="javax.persistence.jdbc.driver" value="org.apache.derby.jdbc.EmbeddedDriver"/> <property name="javax.persistence.jdbc.url" value="jdbc:derby:test1;create=true"/> <property name="eclipselink.ddl-generation" value="create-tables"/> <property name="eclipselink.ddl-generation.output-mode" value="database"/> <!-- <property name="eclipselink.logging.timestamp" value="false"/> <property name="eclipselink.logging.thread" value="false"/> <property name="eclipselink.logging.session" value="false"/> <property name="eclipselink.logging.exceptions" value="false"/> <property name="eclipselink.logging.connection" value="false"/> <property name="eclipselink.logging.level" value="FINER"/> --> </properties> </persistence-unit> </persistence>
The example is comprised of the following classes:
In this example, an initial set of items go on sale and the requested target prices are set up in a data store. The data store is available in the PriceTarget.java
Coherence cache because it is set up to be used with CacheLoader
. A stream of SaleEvents
is generated from the SaleEventsGenerator
adapter. If the sale prices match the target prices, they are stored in SaleEvent
Coherence cache. A Coherence MapListener
implementation verifies that the SaleEvents
stored in the cache are actually available in the data store as well.
The CoherenceMapListener.java
class listens for events published to the coherence cache.
package com.oracle.cep.sample; import java.util.List; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; import javax.persistence.Query; import org.springframework.beans.factory.DisposableBean; import com.tangosol.util.MapListener; import com.tangosol.util.MapEvent; import com.tangosol.util.ObservableMap; import com.bea.wlevs.ede.api.InitializingBean; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.ede.api.StreamSender; public class CoherenceMapListener implements MapListener, InitializingBean, StreamSource { private static final String PERSISTENCE_UNIT_NAME = "derby"; private EntityManagerFactory m_entityMgrFactory; private EntityManager m_entityMgr; private ObservableMap m_saleEventCache; private StreamSender m_sender; public void afterPropertiesSet() { m_saleEventCache.addMapListener(this); } public void setEventSender(StreamSender sender) { m_sender = sender; } public void setSaleEventCache(ObservableMap cache) { m_saleEventCache = cache; } public void entryInserted(MapEvent event) { verifyEventInStore(event); } private void verifyEventInStore(MapEvent event){ if (!(event.getNewValue() instanceof SaleEvent)){ System.out.println("Unexpected type in SaleEvent cache"); return; } if (m_entityMgr==null){ setupEntityMgr(); } SaleEvent sale = (SaleEvent) event.getNewValue(); Query q = m_entityMgr.createQuery("SELECT s FROM SaleEvent s WHERE s.itemID = :itemID"); q.setParameter("itemID", sale.getItemID()); List<SaleEvent> saleEvents = q.getResultList(); if (saleEvents.size()==0) System.out.println("ERROR! Matched SaleEvent not found in store"); else { System.out.println("Found sale event for " + saleEvents.get(0).getItemID() + " for $" + saleEvents.get(0).getSalePrice()); m_sender.sendInsertEvent(sale); } } private void setupEntityMgr() { m_entityMgrFactory = Persistence.createEntityManagerFactory( PERSISTENCE_UNIT_NAME); m_entityMgr = m_entityMgrFactory.createEntityManager(); } public void entryUpdated(MapEvent event){ verifyEventInStore(event); } public void entryDeleted(MapEvent event){ System.out.println("SaleEvent cache entry deleted. Should not see this event for this sample"); } }
package com.oracle.cep.sample; import javax.persistence.Entity; import javax.persistence.Id; @Entity public class PriceTarget implements java.io.Serializable { @Id private String itemID; private double targetPrice; public String getItemID() { return itemID; } public void setItemID(String itemID) { this.itemID = itemID; } public double getTargetPrice(){ return targetPrice; } public void setTargetPrice(double targetPrice){ this.targetPrice = targetPrice; } }
package com.oracle.cep.sample; import java.util.ArrayList; import java.util.List; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; import javax.persistence.Query; import org.springframework.beans.factory.DisposableBean; import com.bea.wlevs.ede.api.InitializingBean; public class PriceTargetLoader implements DisposableBean, InitializingBean { private static final String PERSISTENCE_UNIT_NAME = "derby"; static ArrayList<PriceTarget> s_entriesToLoad = new ArrayList<PriceTarget>(); static { setUpEntriesToLoad(); } private EntityManagerFactory m_entityMgrFactory; private EntityManager m_entityMgr; public void afterPropertiesSet() { m_entityMgrFactory = Persistence.createEntityManagerFactory( PERSISTENCE_UNIT_NAME); m_entityMgr = m_entityMgrFactory.createEntityManager(); m_entityMgr.getTransaction().begin(); try{ Query q = m_entityMgr.createQuery("SELECT t FROM PriceTarget t WHERE t.itemID = :itemID"); for (PriceTarget target : s_entriesToLoad){ q.setParameter("itemID", target.getItemID()); List<PriceTarget> targetList = q.getResultList(); if (targetList.size()==0){ System.out.println("Persisting target " + target.getItemID()); m_entityMgr.persist(target); } else { System.out.println("Found target " + target.getItemID()); } } m_entityMgr.getTransaction().commit(); } finally { if(m_entityMgr.getTransaction().isActive()) m_entityMgr.getTransaction().rollback(); } } public void destroy() { if(m_entityMgr!=null) { m_entityMgr.close(); m_entityMgr=null; } if(m_entityMgrFactory!=null){ m_entityMgrFactory.close(); m_entityMgrFactory=null; } } private static void setUpEntriesToLoad(){ // 'smith', ipad2, $400 PriceTarget target = new PriceTarget(); target.setItemID("ipad2"); target.setTargetPrice(400); s_entriesToLoad.add(target); // 'doe', kindle, $100 target = new PriceTarget(); target.setItemID("kindle"); target.setTargetPrice(100); s_entriesToLoad.add(target); // walker, rebel, $400 target = new PriceTarget(); target.setItemID("rebel"); target.setTargetPrice(400); s_entriesToLoad.add(target); // williams, lasko1320, $25 target = new PriceTarget(); target.setItemID("lasko1320"); target.setTargetPrice(25); s_entriesToLoad.add(target); } }
package com.oracle.cep.sample; import javax.persistence.Entity; import javax.persistence.Id; @Entity public class SaleEvent implements java.io.Serializable { @Id private String itemID; private double salePrice; public SaleEvent() { } public SaleEvent(String itemID, double salePrice){ this.itemID = itemID; this.salePrice = salePrice; } public String getItemID(){ return itemID; } public void setItemID(String itemID){ this.itemID = itemID; } public double getSalePrice(){ return salePrice; } public void setSalePrice(double salePrice) { this.salePrice = salePrice; } public String toString() { return "SaleEvent(" + itemID + ":" + salePrice + ")"; } }
package com.oracle.cep.sample; import java.util.Map; import java.util.Random; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.ede.api.InitializingBean; public class SaleEventsGeneraton implements RunnableBean, StreamSource, InitializingBean{ private static final int SLEEP_MILLIS = 1000; private static final String[] s_itemIDs = { "kodaksport", "ipodtouch-8GB", "ipad2", "kindle", "garmin1690", "rebel", "logitech1080", "tomtom", "ipad2", "cuisinart10s", "keurig-b70", "lasko1320" }; private static final double[] s_prices = { 60.0, 200.0, 450.0, 99, 120, 400, 70, 100, 399, 100, 150, 20 }; private boolean m_suspended; private Thread m_thread; private StreamSender m_sender; private Map m_priceTargetCache; public void setPriceTargetCache(Map cache){ m_priceTargetCache = cache; } public void afterPropertiesSet() { // pre-load PriceTarget cache for (PriceTarget target : PriceTargetLoader.s_entriesToLoad) { System.out.println("Getting : " + target.getItemID()); m_priceTargetCache.get(target.getItemID()); } } public void run() { m_thread = Thread.currentThread(); m_suspended = false; // send random sale events Random random = new Random(System.currentTimeMillis()); while (!isSuspended()) { int index = random.nextInt(s_itemIDs.length); SaleEvent event = new SaleEvent(s_itemIDs[index], s_prices[index]); m_sender.sendInsertEvent(event); try { synchronized (this) { wait(SLEEP_MILLIS); } } catch (InterruptedException e) { if (isSuspended()) return; } } } public void setEventSender(StreamSender sender) { m_sender = sender; } public synchronized void suspend() { m_suspended = true; if (m_thread!=null) m_thread.interrupt(); } private synchronized boolean isSuspended() { return m_suspended; } }