9 EclipseLink、JPAおよびOracle Coherence
Oracle Stream Analyticsインストールには、アプリケーションでのJava Persistence API (JPA)の使用をサポートするためにEclipseLink 2.4.2オープン・ソース・マッピングおよび永続性フレームワークが含まれます。JPAは、オブジェクト・リレーショナル・マッピング(ORM)およびエンタープライズJavaの永続性の標準です。
この章では、EclipseLinkおよびJPAを使用してデータベースからの読取りおよびデータベースへの書込みを行う2つのOracle Stream Analyticsサンプル・アプリケーションとして、HelloWorldおよびJPA-Coherence-Sample-Codeを示します。JPA-Coherence-Sample-Codeでは、アプリケーションおよびサーバーがクラスタ化された環境内でデータ更新を調整するためにCoherenceキャッシュも使用します。
この章の内容は次のとおりです。
9.1 高度な手順
次の高度なステップを使用して、EclipseLinkが含まれるOracle Stream Analyticsアプリケーションを作成します。
- 必要に応じて、JPAおよびOracle Coherenceが含まれるOracle Stream Analyticsアプリケーションを作成します。
- 正しいJPA構成を使用して
persistence.xmlファイルを作成します。このファイルには、ランタイム操作を制御するプロパティが含まれます。 persistence.xmlファイルをアプリケーションのMETA-INFディレクトリに配置します。- アプリケーションをバンドルおよびデプロイします。
EclipseLinkの詳細は、http://eclipse.org/eclipselink/を参照してください。
注意:
CQLサンプルをAIX 6.1の日本語プラットフォームで実行すると、コヒーレンス・ソケット例外が発生します。この問題の解決方法:-Djava.net.preferIPv4Stack=trueパラメータをstartwlevs.shスクリプトの最後の行に追加します。
注意:
AIXプラットフォームでは、空間サンプルはサポートされていません。9.2 HelloWorldの例
HelloWorldの例では、EclipseLinkを使用して、HelloWorldイベントをアクセスおよび格納するために読取りおよび書込みモードでデータ・ソースへのJDBC接続を確立します。
この例では、HelloWorldイベントには日時情報が含まれます。
この例は次のファイルで構成されており、これらについてこの項で説明します。
9.2.1 persistence.xml構成ファイル
次のpersistence.xmlファイルには、helloworldと呼ばれる1つの永続性ユニット(persistence-unit)があります。Oracle Stream AnalyticsはJava SE環境であるため、helloworld永続性ユニットのtransaction-typeはRESOURCE_LOCALです。EclipseLinkのプロパティは、データベースの読取りおよび書込み操作とロギング用の設定を指定します。この例では、データベース内のオブジェクトを表す管理対象の永続可能クラスはcom.bea.wlevs.event.example.helloworld.HelloWorldEventです。
このpersistence.xmlファイルには、コメントアウトされ、falseに設定されたJPAロギングのエントリがあります。これらの設定のコメントを解除してtrueに設定することにより、アプリケーションの動作をデバッグまたは監視できます。プロパティの設定の詳細は、http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htmを参照してください。
<?xml version="1.0" encoding="UTF-8" ?>
<persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
version="2.0" xmlns="http://java.sun.com/xml/ns/persistence">
<persistence-unit name="helloworld" transaction-type="RESOURCE_LOCAL">
<class>com.bea.wlevs.event.example.helloworld.HelloWorldEvent</class>
<properties>
<property name="eclipselink.jdbc.read-connections.min" value="1"/>
<property name="eclipselink.jdbc.write-connections.min" value="1"/>
<!--
<property name="eclipselink.logging.timestamp" value="false"/>
<property name="eclipselink.logging.thread" value="false"/>
<property name="eclipselink.logging.session" value="false"/>
<property name="eclipselink.logging.exceptions" value="false"/>
<property name="eclipselink.logging.connection" value="false"/>
<property name="eclipselink.logging.level" value="FINER"/>
-->
</properties>
</persistence-unit>
</persistence>9.2.2 HelloWorldAdapter.java
HelloWorldAdapter.javaクラスは、HelloWorldEventタイプのイベントを継続的に作成するカスタム・スレッドのアダプタです。アプリケーションにより、DateFormatタイプのメッセージ・テキストが作成され、これは、HelloWorldEventタイプのイベントを作成するためにgenerateHelloMessageメソッドによって使用されます。
Oracle Stream Analyticsフレームワークは、setEventSenderメソッドを呼び出すことにより、StreamSenderインスタンスを使用してeventSenderプライベート変数を初期化します。StreamSenderインスタンスは、StreamSourceインスタンスによって発行されたイベントをStreamSinkリスナーに送信します。この例では、StreamSinkリスナーはHellowWorldBeanインスタンスです。
package com.bea.wlevs.adapter.example.helloworld;
import java.text.DateFormat;
import java.util.Date;
import com.bea.wlevs.ede.api.RunnableBean;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.event.example.helloworld.HelloWorldEvent;
public class HelloWorldAdapter implements RunnableBean, StreamSource {
private static final int SLEEP_MILLIS = 300;
private DateFormat dateFormat;
private String message;
private boolean suspended;
private StreamSender eventSender;
public HelloWorldAdapter() {
super();
dateFormat = DateFormat.getTimeInstance();
}
public void run() {
suspended = false;
while (!isSuspended()) { // Generate messages forever...
generateHelloMessage();
try {
synchronized (this) {
wait(SLEEP_MILLIS);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void setMessage(String message) {
this.message = message;
}
private void generateHelloMessage() {
String message = this.message + dateFormat.format(new Date());
HelloWorldEvent event = new HelloWorldEvent();
event.setMessage(message);
eventSender.sendInsertEvent(event);
}
public void setEventSender(StreamSender sender) {
eventSender = sender;
}
public synchronized void suspend() {
suspended = true;
}
private synchronized boolean isSuspended() {
return suspended;
}
}9.2.3 HelloWorldEvent.java
HelloWorldEvent.javaクラスは、メッセージからイベントを作成します。HelloWorldAdapter.generateHelloMessageメソッドは、HelloWorldEvent.setMessageメソッドをコールしてメッセージからイベントを作成します。HelloWorldBeanクラスは、messageおよび生成されたidをデータ・ストアに格納し、これらをデータ・ストアから取得します。
package com.bea.wlevs.event.example.helloworld;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity
public class HelloWorldEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
private String message;
public String getMessage() {
return message;
}
public void setMessage (String message) {
this.message = message;
}
}9.2.4 HelloWorldBean.java
HelloWorldBean.javaクラスは、HelloWorldEventからイベントを取得し、JPAを使用してデータベースで読取りおよび書込み操作を実行するイベント・シンクおよびソースです。
Oracle Stream Analyticsフレームワークは、setEventSenderメソッドを呼び出すことにより、StreamSenderインスタンスを使用してm_eventSenderプライベート変数を初期化します。onInserEventメソッドは、StreamSourceインスタンスによって発行されたイベントをStreamSinkリスナーにダウンストリーム送信します。
package com.bea.wlevs.example.helloworld;
import java.util.HashMap;
import java.util.List;
import javax.annotation.Resource;
import javax.sql.DataSource;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import org.eclipse.persistence.config.PersistenceUnitProperties;
import com.bea.wlevs.ede.api.StreamSink;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.event.example.helloworld.HelloWorldEvent;
public class HelloWorldBean implements StreamSink, StreamSource, DisposableBean {
private static final String PERSISTENCE_UNIT_NAME = "helloworld";
private EntityManagerFactory m_entityMgrFactory;
private EntityManager m_entityMgr;
private DataSource m_ds;
private boolean m_shuttingDown;
private StreamSender m_eventSender;
public void setEventSender(StreamSender sender){
m_eventSender = sender;
}
private void setupEntityManager(){
if (m_entityMgr!=null)
return;
HashMap props = new HashMap();
props.put(PersistenceUnitProperties.NON_JTA_DATASOURCE, m_ds);
props.put("eclipselink.ddl-generation", "create-tables");
props.put("eclipselink.ddl-generation.output-mode", "database");
m_entityMgrFactory = Persistence.createEntityManagerFactory
(PERSISTENCE_UNIT_NAME, props);
m_entityMgr = m_entityMgrFactory.createEntityManager();
}
public void onInsertEvent(Object event){
if (m_shuttingDown)
return;
setupEntityManager();
if (event instanceof HelloWorldEvent) {
HelloWorldEvent helloWorldEvent = (HelloWorldEvent) event;
System.out.println("Message: " + helloWorldEvent.getMessage());
m_entityMgr.getTransaction().begin();
try {
m_entityMgr.persist(helloWorldEvent);
m_entityMgr.getTransaction().commit();
} finally {
if (m_entityMgr.getTransaction().isActive())
m_entityMgr.getTransaction().rollback();
}
}
Query q = m_entityMgr.createQuery("select t from HelloWorldEvent t");
List<HelloWorldEvent> hwlist = q.getResultList();
System.out.println("Stored " + hwlist.size() + " helloworld events");
m_eventSender.sendInsertEvent(event);
}
@Resource(name="derbyDS")
public void setDataSource(DataSource ds){
m_ds = ds;
}
public void destroy(){
m_shuttingDown = true;
if (m_entityMgr!=null){
m_entityMgr.close();
m_entityMgr=null;
}
if (m_entityMgrFactory!=null){
m_entityMgrFactory.close();
m_entityMgrFactory=null;
}
}
}9.3 JPA Coherenceの例
JPA Coherenceの例では、CoherenceのCacheLoaderまたはCacheStoreインタフェース用のEclipseLink JPA実装の使用方法を示します。
9.3.1 persistence.xml構成ファイル
EclipseLinkのプロパティは、データベースの読取りおよび書込み操作とロギング用の設定を指定します。データベース内のオブジェクトを表す管理対象の永続可能クラスは、com.oracle.cep.sample.PriceTargetおよびcom.oracle.cep.sample.SaleEventです。
このpersistence.xmlファイルには、コメントアウトされ、falseに設定されたJPAロギングのエントリがあります。これらの設定のコメントを解除してtrueに設定することにより、アプリケーションの動作をデバッグまたは監視できます。プロパティの設定の詳細は、http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htmを参照してください。
<?xml version="1.0" encoding="UTF-8" ?>
<persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
version="2.0" xmlns="http://java.sun.com/xml/ns/persistence">
<persistence-unit name="derby" transaction-type="RESOURCE_LOCAL">
<class>com.oracle.cep.sample.PriceTarget</class>
<class>com.oracle.cep.sample.SaleEvent</class>
<properties>
<property name="eclipselink.jdbc.read-connections.min" value="1"/>
<property name="eclipselink.jdbc.write-connections.min" value="1"/>
<property name="javax.persistence.jdbc.driver"
value="org.apache.derby.jdbc.EmbeddedDriver"/>
<property name="javax.persistence.jdbc.url"
value="jdbc:derby:test1;create=true"/>
<property name="eclipselink.ddl-generation" value="create-tables"/>
<property name="eclipselink.ddl-generation.output-mode" value="database"/>
<!--
<property name="eclipselink.logging.timestamp" value="false"/>
<property name="eclipselink.logging.thread" value="false"/>
<property name="eclipselink.logging.session" value="false"/>
<property name="eclipselink.logging.exceptions" value="false"/>
<property name="eclipselink.logging.connection" value="false"/>
<property name="eclipselink.logging.level" value="FINER"/>
-->
</properties>
</persistence-unit>
</persistence>9.3.2 クラス
この例は、次のクラスで構成されています。
この例では、一連の初期アイテムが発売され、要求されたターゲット価格がデータ・ストアで設定されます。データ・ストアは、CacheLoaderで使用するよう設定されているため、PriceTarget.java Coherenceキャッシュで使用可能です。SaleEventsのストリームがSaleEventsGeneratorアダプタから生成されます。販売価格がターゲット価格と一致する場合は、SaleEvent Coherenceキャッシュに格納されます。Coherence MapListener実装により、キャッシュに格納されているSaleEventsが実際にデータ・ストアでも使用可能かどうかが検証されます。
9.3.2.1 CoherenceMapListener.java
CoherenceMapListener.javaクラスは、Coherenceキャッシュにパブリッシュされたイベントをリスニングします。
package com.oracle.cep.sample;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import com.tangosol.util.MapListener;
import com.tangosol.util.MapEvent;
import com.tangosol.util.ObservableMap;
import com.bea.wlevs.ede.api.InitializingBean;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.StreamSender;
public class CoherenceMapListener implements MapListener, InitializingBean, StreamSource {
private static final String PERSISTENCE_UNIT_NAME = "derby";
private EntityManagerFactory m_entityMgrFactory;
private EntityManager m_entityMgr;
private ObservableMap m_saleEventCache;
private StreamSender m_sender;
public void afterPropertiesSet()
{
m_saleEventCache.addMapListener(this);
}
public void setEventSender(StreamSender sender)
{
m_sender = sender;
}
public void setSaleEventCache(ObservableMap cache)
{
m_saleEventCache = cache;
}
public void entryInserted(MapEvent event)
{
verifyEventInStore(event);
}
private void verifyEventInStore(MapEvent event){
if (!(event.getNewValue() instanceof SaleEvent)){
System.out.println("Unexpected type in SaleEvent cache");
return;
}
if (m_entityMgr==null){
setupEntityMgr();
}
SaleEvent sale = (SaleEvent) event.getNewValue();
Query q = m_entityMgr.createQuery("SELECT s FROM SaleEvent s
WHERE s.itemID = :itemID");
q.setParameter("itemID", sale.getItemID());
List<SaleEvent> saleEvents = q.getResultList();
if (saleEvents.size()==0)
System.out.println("ERROR! Matched SaleEvent not found in store");
else {
System.out.println("Found sale event for " +
saleEvents.get(0).getItemID() + " for $" +
saleEvents.get(0).getSalePrice());
m_sender.sendInsertEvent(sale);
}
}
private void setupEntityMgr() {
m_entityMgrFactory = Persistence.createEntityManagerFactory(
PERSISTENCE_UNIT_NAME);
m_entityMgr = m_entityMgrFactory.createEntityManager();
}
public void entryUpdated(MapEvent event){
verifyEventInStore(event);
}
public void entryDeleted(MapEvent event){
System.out.println("SaleEvent cache entry deleted.
Should not see this event for this sample");
}
}9.3.2.2 PriceTarget.java
package com.oracle.cep.sample;
import javax.persistence.Entity;
import javax.persistence.Id;
@Entity
public class PriceTarget implements java.io.Serializable {
@Id
private String itemID;
private double targetPrice;
public String getItemID() {
return itemID;
}
public void setItemID(String itemID) {
this.itemID = itemID;
}
public double getTargetPrice(){
return targetPrice;
}
public void setTargetPrice(double targetPrice){
this.targetPrice = targetPrice;
}
}9.3.2.3 PriceTargetLoader.java
package com.oracle.cep.sample;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import com.bea.wlevs.ede.api.InitializingBean;
public class PriceTargetLoader implements DisposableBean, InitializingBean {
private static final String PERSISTENCE_UNIT_NAME = "derby";
static ArrayList<PriceTarget> s_entriesToLoad = new ArrayList<PriceTarget>();
static {
setUpEntriesToLoad();
}
private EntityManagerFactory m_entityMgrFactory;
private EntityManager m_entityMgr;
public void afterPropertiesSet() {
m_entityMgrFactory = Persistence.createEntityManagerFactory(
PERSISTENCE_UNIT_NAME);
m_entityMgr = m_entityMgrFactory.createEntityManager();
m_entityMgr.getTransaction().begin();
try{
Query q = m_entityMgr.createQuery("SELECT t FROM PriceTarget t
WHERE t.itemID = :itemID");
for (PriceTarget target : s_entriesToLoad){
q.setParameter("itemID", target.getItemID());
List<PriceTarget> targetList = q.getResultList();
if (targetList.size()==0){
System.out.println("Persisting target " + target.getItemID());
m_entityMgr.persist(target);
} else {
System.out.println("Found target " + target.getItemID());
}
}
m_entityMgr.getTransaction().commit();
}
finally {
if(m_entityMgr.getTransaction().isActive())
m_entityMgr.getTransaction().rollback();
}
}
public void destroy() {
if(m_entityMgr!=null) {
m_entityMgr.close();
m_entityMgr=null;
}
if(m_entityMgrFactory!=null){
m_entityMgrFactory.close();
m_entityMgrFactory=null;
}
}
private static void setUpEntriesToLoad(){
// 'smith', ipad2, $400
PriceTarget target = new PriceTarget();
target.setItemID("ipad2");
target.setTargetPrice(400);
s_entriesToLoad.add(target);
// 'doe', kindle, $100
target = new PriceTarget();
target.setItemID("kindle");
target.setTargetPrice(100);
s_entriesToLoad.add(target);
// walker, rebel, $400
target = new PriceTarget();
target.setItemID("rebel");
target.setTargetPrice(400);
s_entriesToLoad.add(target);
// williams, lasko1320, $25
target = new PriceTarget();
target.setItemID("lasko1320");
target.setTargetPrice(25);
s_entriesToLoad.add(target);
}
}9.3.2.4 SaleEvent.java
package com.oracle.cep.sample;
import javax.persistence.Entity;
import javax.persistence.Id;
@Entity
public class SaleEvent implements java.io.Serializable {
@Id
private String itemID;
private double salePrice;
public SaleEvent() { }
public SaleEvent(String itemID, double salePrice){
this.itemID = itemID;
this.salePrice = salePrice;
}
public String getItemID(){
return itemID;
}
public void setItemID(String itemID){
this.itemID = itemID;
}
public double getSalePrice(){
return salePrice;
}
public void setSalePrice(double salePrice) {
this.salePrice = salePrice;
}
public String toString() {
return "SaleEvent(" + itemID + ":" + salePrice + ")";
}
}9.3.2.5 SaleEventsGenerator.java
package com.oracle.cep.sample;
import java.util.Map;
import java.util.Random;
import com.bea.wlevs.ede.api.RunnableBean;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.InitializingBean;
public class SaleEventsGeneraton implements RunnableBean, StreamSource,
InitializingBean{
private static final int SLEEP_MILLIS = 1000;
private static final String[] s_itemIDs = {
"kodaksport",
"ipodtouch-8GB",
"ipad2",
"kindle",
"garmin1690",
"rebel",
"logitech1080",
"tomtom",
"ipad2",
"cuisinart10s",
"keurig-b70",
"lasko1320" };
private static final double[] s_prices = {
60.0,
200.0,
450.0,
99,
120,
400,
70,
100,
399,
100,
150,
20 };
private boolean m_suspended;
private Thread m_thread;
private StreamSender m_sender;
private Map m_priceTargetCache;
public void setPriceTargetCache(Map cache){
m_priceTargetCache = cache;
}
public void afterPropertiesSet() {
// pre-load PriceTarget cache
for (PriceTarget target : PriceTargetLoader.s_entriesToLoad)
{
System.out.println("Getting : " + target.getItemID());
m_priceTargetCache.get(target.getItemID());
}
}
public void run() {
m_thread = Thread.currentThread();
m_suspended = false;
// send random sale events
Random random = new Random(System.currentTimeMillis());
while (!isSuspended())
{
int index = random.nextInt(s_itemIDs.length);
SaleEvent event = new SaleEvent(s_itemIDs[index], s_prices[index]);
m_sender.sendInsertEvent(event);
try {
synchronized (this) { wait(SLEEP_MILLIS); }
} catch (InterruptedException e) {
if (isSuspended())
return;
}
}
}
public void setEventSender(StreamSender sender) {
m_sender = sender;
}
public synchronized void suspend() {
m_suspended = true;
if (m_thread!=null)
m_thread.interrupt();
}
private synchronized boolean isSuspended() {
return m_suspended;
}
}