Oracle Event Processingインストールには、アプリケーションでのJava Persistence API (JPA)の使用をサポートするためにEclipseLink 2.4.2オープン・ソース・マッピングおよび永続性フレームワークが含まれます。JPAは、オブジェクト・リレーショナル・マッピング(ORM)およびエンタープライズJavaの永続性の標準です。
この章では、EclipseLinkおよびJPAを使用してデータベースからの読取りおよびデータベースへの書込みを行うOracle Event Processingアプリケーションの2つのサンプルとしてHelloWorld
およびJPA-Coherence-Sample-Code
を示します。JPA-Coherence-Sample-Codeでは、アプリケーションおよびサーバーがクラスタ化された環境内でデータ更新を調整するためにCoherenceキャッシュも使用します。
この章の内容は次のとおりです。
次の高度なステップを使用して、EclipseLinkが含まれるOracle Event Processingアプリケーションを作成します。
persistence.xml
ファイルを作成します。このファイルには、ランタイム操作を制御するプロパティが含まれます。persistence.xml
ファイルをアプリケーションのMETA-INF
ディレクトリに配置します。EclipseLinkの詳細は、http://eclipse.org/eclipselink/
を参照してください。
注:
CQLサンプルをAIX 6.1の日本語プラットフォームで実行すると、コヒーレンス・ソケット例外が発生します。この問題の解決方法:-Djava.net.preferIPv4Stack=true
パラメータをstartwlevs.sh
スクリプトの最後の行に追加します。
注:
AIXプラットフォームでは、空間サンプルはサポートされていません。HelloWorldの例では、EclipseLinkを使用して、HelloWorldイベントをアクセスおよび格納するために読取りおよび書込みモードでデータ・ソースへのJDBC接続を確立します。
この例では、HelloWorldイベントには日時情報が含まれます。
この例は次のファイルで構成されており、これらについてこの項で説明します。
次のpersistence.xml
ファイルには、helloworld
と呼ばれる1つの永続性ユニット(persistence-unit
)があります。Oracle Event ProcessingはJava SE環境であるため、helloworld永続性ユニットのtransaction-type
はRESOURCE_LOCAL
です。EclipseLinkのプロパティは、データベースの読取りおよび書込み操作とロギング用の設定を指定します。この例では、データベース内のオブジェクトを表す管理対象の永続可能クラスはcom.bea.wlevs.event.example.helloworld.HelloWorldEvent
です。
このpersistence.xml
ファイルには、コメントアウトされ、false
に設定されたJPAロギングのエントリがあります。これらの設定のコメントを解除してtrue
に設定することにより、アプリケーションの動作をデバッグまたは監視できます。プロパティの設定の詳細は、http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htm
を参照してください。
<?xml version="1.0" encoding="UTF-8" ?> <persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd" version="2.0" xmlns="http://java.sun.com/xml/ns/persistence"> <persistence-unit name="helloworld" transaction-type="RESOURCE_LOCAL"> <class>com.bea.wlevs.event.example.helloworld.HelloWorldEvent</class> <properties> <property name="eclipselink.jdbc.read-connections.min" value="1"/> <property name="eclipselink.jdbc.write-connections.min" value="1"/> <!-- <property name="eclipselink.logging.timestamp" value="false"/> <property name="eclipselink.logging.thread" value="false"/> <property name="eclipselink.logging.session" value="false"/> <property name="eclipselink.logging.exceptions" value="false"/> <property name="eclipselink.logging.connection" value="false"/> <property name="eclipselink.logging.level" value="FINER"/> --> </properties> </persistence-unit> </persistence>
HelloWorldAdapter.java
クラスは、HelloWorldEvent
タイプのイベントを継続的に作成するカスタム・スレッドのアダプタです。アプリケーションにより、DateFormat
タイプのメッセージ・テキストが作成され、これは、HelloWorldEvent
タイプのイベントを作成するためにgenerateHelloMessage
メソッドによって使用されます。
Oracle Event Processingフレームワークは、setEventSender
メソッドをコールすることにより、StreamSender
インスタンスを使用してeventSender
プライベート変数を初期化します。StreamSender
インスタンスは、StreamSource
インスタンスによって発行されたイベントをStreamSink
リスナーに送信します。この例では、StreamSink
リスナーはHellowWorldBean
インスタンスです。
package com.bea.wlevs.adapter.example.helloworld; import java.text.DateFormat; import java.util.Date; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldAdapter implements RunnableBean, StreamSource { private static final int SLEEP_MILLIS = 300; private DateFormat dateFormat; private String message; private boolean suspended; private StreamSender eventSender; public HelloWorldAdapter() { super(); dateFormat = DateFormat.getTimeInstance(); } public void run() { suspended = false; while (!isSuspended()) { // Generate messages forever... generateHelloMessage(); try { synchronized (this) { wait(SLEEP_MILLIS); } } catch (InterruptedException e) { e.printStackTrace(); } } } public void setMessage(String message) { this.message = message; } private void generateHelloMessage() { String message = this.message + dateFormat.format(new Date()); HelloWorldEvent event = new HelloWorldEvent(); event.setMessage(message); eventSender.sendInsertEvent(event); } public void setEventSender(StreamSender sender) { eventSender = sender; } public synchronized void suspend() { suspended = true; } private synchronized boolean isSuspended() { return suspended; } }
HelloWorldEvent.java
クラスは、メッセージからイベントを作成します。HelloWorldAdapter.generateHelloMessageメソッドは、HelloWorldEvent.setMessage
メソッドをコールしてメッセージからイベントを作成します。HelloWorldBean
クラスは、message
および生成されたid
をデータ・ストアに格納し、これらをデータ・ストアから取得します。
package com.bea.wlevs.event.example.helloworld; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; @Entity public class HelloWorldEvent { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private long id; private String message; public String getMessage() { return message; } public void setMessage (String message) { this.message = message; } }
HelloWorldBean.java
クラスは、HelloWorldEvent
からイベントを取得し、JPAを使用してデータベースで読取りおよび書込み操作を実行するイベント・シンクおよびソースです。
Oracle Event Processingフレームワークは、setEventSender
メソッドをコールすることにより、StreamSender
インスタンスを使用してm_eventSender
プライベート変数を初期化します。onInserEvent
メソッドは、StreamSource
インスタンスによって発行されたイベントをStreamSink
リスナーにダウンストリーム送信します。
package com.bea.wlevs.example.helloworld; import java.util.HashMap; import java.util.List; import javax.annotation.Resource; import javax.sql.DataSource; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; import javax.persistence.Query; import org.springframework.beans.factory.DisposableBean; import org.eclipse.persistence.config.PersistenceUnitProperties; import com.bea.wlevs.ede.api.StreamSink; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldBean implements StreamSink, StreamSource, DisposableBean { private static final String PERSISTENCE_UNIT_NAME = "helloworld"; private EntityManagerFactory m_entityMgrFactory; private EntityManager m_entityMgr; private DataSource m_ds; private boolean m_shuttingDown; private StreamSender m_eventSender; public void setEventSender(StreamSender sender){ m_eventSender = sender; } private void setupEntityManager(){ if (m_entityMgr!=null) return; HashMap props = new HashMap(); props.put(PersistenceUnitProperties.NON_JTA_DATASOURCE, m_ds); props.put("eclipselink.ddl-generation", "create-tables"); props.put("eclipselink.ddl-generation.output-mode", "database"); m_entityMgrFactory = Persistence.createEntityManagerFactory (PERSISTENCE_UNIT_NAME, props); m_entityMgr = m_entityMgrFactory.createEntityManager(); } public void onInsertEvent(Object event){ if (m_shuttingDown) return; setupEntityManager(); if (event instanceof HelloWorldEvent) { HelloWorldEvent helloWorldEvent = (HelloWorldEvent) event; System.out.println("Message: " + helloWorldEvent.getMessage()); m_entityMgr.getTransaction().begin(); try { m_entityMgr.persist(helloWorldEvent); m_entityMgr.getTransaction().commit(); } finally { if (m_entityMgr.getTransaction().isActive()) m_entityMgr.getTransaction().rollback(); } } Query q = m_entityMgr.createQuery("select t from HelloWorldEvent t"); List<HelloWorldEvent> hwlist = q.getResultList(); System.out.println("Stored " + hwlist.size() + " helloworld events"); m_eventSender.sendInsertEvent(event); } @Resource(name="derbyDS") public void setDataSource(DataSource ds){ m_ds = ds; } public void destroy(){ m_shuttingDown = true; if (m_entityMgr!=null){ m_entityMgr.close(); m_entityMgr=null; } if (m_entityMgrFactory!=null){ m_entityMgrFactory.close(); m_entityMgrFactory=null; } } }
JPA Coherenceの例では、CoherenceのCacheLoader
またはCacheStore
インタフェース用のEclipseLink JPA実装の使用方法を示します。
EclipseLinkのプロパティは、データベースの読取りおよび書込み操作とロギング用の設定を指定します。データベース内のオブジェクトを表す管理対象の永続可能クラスは、com.oracle.cep.sample.PriceTarget
およびcom.oracle.cep.sample.SaleEvent
です。
このpersistence.xml
ファイルには、コメントアウトされ、false
に設定されたJPAロギングのエントリがあります。これらの設定のコメントを解除してtrue
に設定することにより、アプリケーションの動作をデバッグまたは監視できます。プロパティの設定の詳細は、http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htm
を参照してください。
<?xml version="1.0" encoding="UTF-8" ?> <persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd" version="2.0" xmlns="http://java.sun.com/xml/ns/persistence"> <persistence-unit name="derby" transaction-type="RESOURCE_LOCAL"> <class>com.oracle.cep.sample.PriceTarget</class> <class>com.oracle.cep.sample.SaleEvent</class> <properties> <property name="eclipselink.jdbc.read-connections.min" value="1"/> <property name="eclipselink.jdbc.write-connections.min" value="1"/> <property name="javax.persistence.jdbc.driver" value="org.apache.derby.jdbc.EmbeddedDriver"/> <property name="javax.persistence.jdbc.url" value="jdbc:derby:test1;create=true"/> <property name="eclipselink.ddl-generation" value="create-tables"/> <property name="eclipselink.ddl-generation.output-mode" value="database"/> <!-- <property name="eclipselink.logging.timestamp" value="false"/> <property name="eclipselink.logging.thread" value="false"/> <property name="eclipselink.logging.session" value="false"/> <property name="eclipselink.logging.exceptions" value="false"/> <property name="eclipselink.logging.connection" value="false"/> <property name="eclipselink.logging.level" value="FINER"/> --> </properties> </persistence-unit> </persistence>
この例は、次のクラスで構成されています。
この例では、一連の初期アイテムが発売され、要求されたターゲット価格がデータ・ストアで設定されます。データ・ストアは、CacheLoader
で使用するよう設定されているため、PriceTarget.java
Coherenceキャッシュで使用可能です。SaleEvents
のストリームがSaleEventsGenerator
アダプタから生成されます。販売価格がターゲット価格と一致する場合は、SaleEvent
Coherenceキャッシュに格納されます。Coherence MapListener
実装により、キャッシュに格納されているSaleEvents
が実際にデータ・ストアでも使用可能かどうかが検証されます。
CoherenceMapListener.java
クラスは、Coherenceキャッシュにパブリッシュされたイベントをリスニングします。
package com.oracle.cep.sample; import java.util.List; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; import javax.persistence.Query; import org.springframework.beans.factory.DisposableBean; import com.tangosol.util.MapListener; import com.tangosol.util.MapEvent; import com.tangosol.util.ObservableMap; import com.bea.wlevs.ede.api.InitializingBean; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.ede.api.StreamSender; public class CoherenceMapListener implements MapListener, InitializingBean, StreamSource { private static final String PERSISTENCE_UNIT_NAME = "derby"; private EntityManagerFactory m_entityMgrFactory; private EntityManager m_entityMgr; private ObservableMap m_saleEventCache; private StreamSender m_sender; public void afterPropertiesSet() { m_saleEventCache.addMapListener(this); } public void setEventSender(StreamSender sender) { m_sender = sender; } public void setSaleEventCache(ObservableMap cache) { m_saleEventCache = cache; } public void entryInserted(MapEvent event) { verifyEventInStore(event); } private void verifyEventInStore(MapEvent event){ if (!(event.getNewValue() instanceof SaleEvent)){ System.out.println("Unexpected type in SaleEvent cache"); return; } if (m_entityMgr==null){ setupEntityMgr(); } SaleEvent sale = (SaleEvent) event.getNewValue(); Query q = m_entityMgr.createQuery("SELECT s FROM SaleEvent s WHERE s.itemID = :itemID"); q.setParameter("itemID", sale.getItemID()); List<SaleEvent> saleEvents = q.getResultList(); if (saleEvents.size()==0) System.out.println("ERROR! Matched SaleEvent not found in store"); else { System.out.println("Found sale event for " + saleEvents.get(0).getItemID() + " for $" + saleEvents.get(0).getSalePrice()); m_sender.sendInsertEvent(sale); } } private void setupEntityMgr() { m_entityMgrFactory = Persistence.createEntityManagerFactory( PERSISTENCE_UNIT_NAME); m_entityMgr = m_entityMgrFactory.createEntityManager(); } public void entryUpdated(MapEvent event){ verifyEventInStore(event); } public void entryDeleted(MapEvent event){ System.out.println("SaleEvent cache entry deleted. Should not see this event for this sample"); } }
package com.oracle.cep.sample; import javax.persistence.Entity; import javax.persistence.Id; @Entity public class PriceTarget implements java.io.Serializable { @Id private String itemID; private double targetPrice; public String getItemID() { return itemID; } public void setItemID(String itemID) { this.itemID = itemID; } public double getTargetPrice(){ return targetPrice; } public void setTargetPrice(double targetPrice){ this.targetPrice = targetPrice; } }
package com.oracle.cep.sample; import java.util.ArrayList; import java.util.List; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; import javax.persistence.Query; import org.springframework.beans.factory.DisposableBean; import com.bea.wlevs.ede.api.InitializingBean; public class PriceTargetLoader implements DisposableBean, InitializingBean { private static final String PERSISTENCE_UNIT_NAME = "derby"; static ArrayList<PriceTarget> s_entriesToLoad = new ArrayList<PriceTarget>(); static { setUpEntriesToLoad(); } private EntityManagerFactory m_entityMgrFactory; private EntityManager m_entityMgr; public void afterPropertiesSet() { m_entityMgrFactory = Persistence.createEntityManagerFactory( PERSISTENCE_UNIT_NAME); m_entityMgr = m_entityMgrFactory.createEntityManager(); m_entityMgr.getTransaction().begin(); try{ Query q = m_entityMgr.createQuery("SELECT t FROM PriceTarget t WHERE t.itemID = :itemID"); for (PriceTarget target : s_entriesToLoad){ q.setParameter("itemID", target.getItemID()); List<PriceTarget> targetList = q.getResultList(); if (targetList.size()==0){ System.out.println("Persisting target " + target.getItemID()); m_entityMgr.persist(target); } else { System.out.println("Found target " + target.getItemID()); } } m_entityMgr.getTransaction().commit(); } finally { if(m_entityMgr.getTransaction().isActive()) m_entityMgr.getTransaction().rollback(); } } public void destroy() { if(m_entityMgr!=null) { m_entityMgr.close(); m_entityMgr=null; } if(m_entityMgrFactory!=null){ m_entityMgrFactory.close(); m_entityMgrFactory=null; } } private static void setUpEntriesToLoad(){ // 'smith', ipad2, $400 PriceTarget target = new PriceTarget(); target.setItemID("ipad2"); target.setTargetPrice(400); s_entriesToLoad.add(target); // 'doe', kindle, $100 target = new PriceTarget(); target.setItemID("kindle"); target.setTargetPrice(100); s_entriesToLoad.add(target); // walker, rebel, $400 target = new PriceTarget(); target.setItemID("rebel"); target.setTargetPrice(400); s_entriesToLoad.add(target); // williams, lasko1320, $25 target = new PriceTarget(); target.setItemID("lasko1320"); target.setTargetPrice(25); s_entriesToLoad.add(target); } }
package com.oracle.cep.sample; import javax.persistence.Entity; import javax.persistence.Id; @Entity public class SaleEvent implements java.io.Serializable { @Id private String itemID; private double salePrice; public SaleEvent() { } public SaleEvent(String itemID, double salePrice){ this.itemID = itemID; this.salePrice = salePrice; } public String getItemID(){ return itemID; } public void setItemID(String itemID){ this.itemID = itemID; } public double getSalePrice(){ return salePrice; } public void setSalePrice(double salePrice) { this.salePrice = salePrice; } public String toString() { return "SaleEvent(" + itemID + ":" + salePrice + ")"; } }
package com.oracle.cep.sample; import java.util.Map; import java.util.Random; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.ede.api.InitializingBean; public class SaleEventsGeneraton implements RunnableBean, StreamSource, InitializingBean{ private static final int SLEEP_MILLIS = 1000; private static final String[] s_itemIDs = { "kodaksport", "ipodtouch-8GB", "ipad2", "kindle", "garmin1690", "rebel", "logitech1080", "tomtom", "ipad2", "cuisinart10s", "keurig-b70", "lasko1320" }; private static final double[] s_prices = { 60.0, 200.0, 450.0, 99, 120, 400, 70, 100, 399, 100, 150, 20 }; private boolean m_suspended; private Thread m_thread; private StreamSender m_sender; private Map m_priceTargetCache; public void setPriceTargetCache(Map cache){ m_priceTargetCache = cache; } public void afterPropertiesSet() { // pre-load PriceTarget cache for (PriceTarget target : PriceTargetLoader.s_entriesToLoad) { System.out.println("Getting : " + target.getItemID()); m_priceTargetCache.get(target.getItemID()); } } public void run() { m_thread = Thread.currentThread(); m_suspended = false; // send random sale events Random random = new Random(System.currentTimeMillis()); while (!isSuspended()) { int index = random.nextInt(s_itemIDs.length); SaleEvent event = new SaleEvent(s_itemIDs[index], s_prices[index]); m_sender.sendInsertEvent(event); try { synchronized (this) { wait(SLEEP_MILLIS); } } catch (InterruptedException e) { if (isSuspended()) return; } } } public void setEventSender(StreamSender sender) { m_sender = sender; } public synchronized void suspend() { m_suspended = true; if (m_thread!=null) m_thread.interrupt(); } private synchronized boolean isSuspended() { return m_suspended; } }