プライマリ・コンテンツに移動
Oracle® Fusion Middleware Oracle Event Processingアプリケーションの開発
12c リリース1 (12.1.3)
E54312-04
目次へ移動
目次

前
前へ
次
次へ

9 EclipseLink、JPAおよびOracle Coherence

Oracle Event Processingインストールには、アプリケーションでのJava Persistence API (JPA)の使用をサポートするためにEclipseLink 2.4.2オープン・ソース・マッピングおよび永続性フレームワークが含まれます。JPAは、オブジェクト・リレーショナル・マッピング(ORM)およびエンタープライズJavaの永続性の標準です。

この章では、EclipseLinkおよびJPAを使用してデータベースからの読取りおよびデータベースへの書込みを行うOracle Event Processingアプリケーションの2つのサンプルとしてHelloWorldおよびJPA-Coherence-Sample-Codeを示します。JPA-Coherence-Sample-Codeでは、アプリケーションおよびサーバーがクラスタ化された環境内でデータ更新を調整するためにCoherenceキャッシュも使用します。

この章の内容は次のとおりです。

9.1 高度な手順

次の高度なステップを使用して、EclipseLinkが含まれるOracle Event Processingアプリケーションを作成します。

  1. 必要に応じて、JPAおよびOracle Coherenceが含まれるOracle Event Processingアプリケーションを作成します。
  2. 正しいJPA構成を使用してpersistence.xmlファイルを作成します。このファイルには、ランタイム操作を制御するプロパティが含まれます。
  3. persistence.xmlファイルをアプリケーションのMETA-INFディレクトリに配置します。
  4. アプリケーションをバンドルおよびデプロイします。

EclipseLinkの詳細は、http://eclipse.org/eclipselink/を参照してください。

注:

CQLサンプルをAIX 6.1の日本語プラットフォームで実行すると、コヒーレンス・ソケット例外が発生します。この問題の解決方法:

-Djava.net.preferIPv4Stack=trueパラメータをstartwlevs.shスクリプトの最後の行に追加します。

注:

AIXプラットフォームでは、空間サンプルはサポートされていません。

9.2 HelloWorldの例

HelloWorldの例では、EclipseLinkを使用して、HelloWorldイベントをアクセスおよび格納するために読取りおよび書込みモードでデータ・ソースへのJDBC接続を確立します。

この例では、HelloWorldイベントには日時情報が含まれます。

この例は次のファイルで構成されており、これらについてこの項で説明します。

9.2.1 persistence.xml構成ファイル

次のpersistence.xmlファイルには、helloworldと呼ばれる1つの永続性ユニット(persistence-unit)があります。Oracle Event ProcessingはJava SE環境であるため、helloworld永続性ユニットのtransaction-typeRESOURCE_LOCALです。EclipseLinkのプロパティは、データベースの読取りおよび書込み操作とロギング用の設定を指定します。この例では、データベース内のオブジェクトを表す管理対象の永続可能クラスはcom.bea.wlevs.event.example.helloworld.HelloWorldEventです。

このpersistence.xmlファイルには、コメントアウトされ、falseに設定されたJPAロギングのエントリがあります。これらの設定のコメントを解除してtrueに設定することにより、アプリケーションの動作をデバッグまたは監視できます。プロパティの設定の詳細は、http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htmを参照してください。

<?xml version="1.0" encoding="UTF-8" ?>
<persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://java.sun.com/xml/ns/persistence         http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
     version="2.0" xmlns="http://java.sun.com/xml/ns/persistence">
  <persistence-unit name="helloworld" transaction-type="RESOURCE_LOCAL">
    <class>com.bea.wlevs.event.example.helloworld.HelloWorldEvent</class>
      <properties>
        <property name="eclipselink.jdbc.read-connections.min" value="1"/>
        <property name="eclipselink.jdbc.write-connections.min" value="1"/>
      <!--
        <property name="eclipselink.logging.timestamp" value="false"/>
        <property name="eclipselink.logging.thread" value="false"/>
        <property name="eclipselink.logging.session" value="false"/>
        <property name="eclipselink.logging.exceptions" value="false"/>
        <property name="eclipselink.logging.connection" value="false"/>
        <property name="eclipselink.logging.level" value="FINER"/>
      -->
      </properties>
  </persistence-unit>
</persistence>

9.2.2 HelloWorldAdapter.java

HelloWorldAdapter.javaクラスは、HelloWorldEventタイプのイベントを継続的に作成するカスタム・スレッドのアダプタです。アプリケーションにより、DateFormatタイプのメッセージ・テキストが作成され、これは、HelloWorldEventタイプのイベントを作成するためにgenerateHelloMessageメソッドによって使用されます。

Oracle Event Processingフレームワークは、setEventSenderメソッドをコールすることにより、StreamSenderインスタンスを使用してeventSenderプライベート変数を初期化します。StreamSenderインスタンスは、StreamSourceインスタンスによって発行されたイベントをStreamSinkリスナーに送信します。この例では、StreamSinkリスナーはHellowWorldBeanインスタンスです。

package com.bea.wlevs.adapter.example.helloworld;

import java.text.DateFormat;
import java.util.Date;
import com.bea.wlevs.ede.api.RunnableBean;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.event.example.helloworld.HelloWorldEvent;

public class HelloWorldAdapter implements RunnableBean, StreamSource {
    private static final int SLEEP_MILLIS = 300;
    private DateFormat dateFormat;
    private String message;
    private boolean suspended;
    private StreamSender eventSender;
 
    public HelloWorldAdapter() {
        super();
        dateFormat = DateFormat.getTimeInstance();
    }
    public void run() {
        suspended = false;
        while (!isSuspended()) { // Generate messages forever... 
            generateHelloMessage();
            try {
                synchronized (this) {
                    wait(SLEEP_MILLIS);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void setMessage(String message) {
        this.message = message;
    }
    private void generateHelloMessage() {
        String message = this.message + dateFormat.format(new Date());
        HelloWorldEvent event = new HelloWorldEvent();
        event.setMessage(message);
        eventSender.sendInsertEvent(event);
    }
    public void setEventSender(StreamSender sender) {
        eventSender = sender;
    }
    public synchronized void suspend() {
        suspended = true;
    }
    private synchronized boolean isSuspended() {
        return suspended;
    }
}

9.2.3 HelloWorldEvent.java

HelloWorldEvent.javaクラスは、メッセージからイベントを作成します。HelloWorldAdapter.generateHelloMessageメソッドは、HelloWorldEvent.setMessageメソッドをコールしてメッセージからイベントを作成します。HelloWorldBeanクラスは、messageおよび生成されたidをデータ・ストアに格納し、これらをデータ・ストアから取得します。

package com.bea.wlevs.event.example.helloworld;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class HelloWorldEvent {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private long id;
  private String message;

  public String getMessage() {
    return message;
  }

  public void setMessage (String message) {
    this.message = message;
  }
}

9.2.4 HelloWorldBean.java

HelloWorldBean.javaクラスは、HelloWorldEventからイベントを取得し、JPAを使用してデータベースで読取りおよび書込み操作を実行するイベント・シンクおよびソースです。

Oracle Event Processingフレームワークは、setEventSenderメソッドをコールすることにより、StreamSenderインスタンスを使用してm_eventSenderプライベート変数を初期化します。onInserEventメソッドは、StreamSourceインスタンスによって発行されたイベントをStreamSinkリスナーにダウンストリーム送信します。

package com.bea.wlevs.example.helloworld;

import java.util.HashMap;
import java.util.List;
import javax.annotation.Resource;
import javax.sql.DataSource;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import org.eclipse.persistence.config.PersistenceUnitProperties;
import com.bea.wlevs.ede.api.StreamSink;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.event.example.helloworld.HelloWorldEvent;

public class HelloWorldBean implements StreamSink, StreamSource, DisposableBean {
    private static final String PERSISTENCE_UNIT_NAME = "helloworld";
    private EntityManagerFactory m_entityMgrFactory;
    private EntityManager m_entityMgr;
    private DataSource m_ds;
    private boolean m_shuttingDown;
    private StreamSender m_eventSender;
 
    public void setEventSender(StreamSender sender){
        m_eventSender = sender;
    }
    private void setupEntityManager(){
        if (m_entityMgr!=null)
            return;
        HashMap props = new HashMap();
        props.put(PersistenceUnitProperties.NON_JTA_DATASOURCE, m_ds);
        props.put("eclipselink.ddl-generation", "create-tables");
        props.put("eclipselink.ddl-generation.output-mode", "database");
        m_entityMgrFactory = Persistence.createEntityManagerFactory
                             (PERSISTENCE_UNIT_NAME, props);
        m_entityMgr = m_entityMgrFactory.createEntityManager();
    }
    public void onInsertEvent(Object event){
        if (m_shuttingDown)
            return;
        setupEntityManager();
        if (event instanceof HelloWorldEvent) {
            HelloWorldEvent helloWorldEvent = (HelloWorldEvent) event;
            System.out.println("Message: " + helloWorldEvent.getMessage());
            m_entityMgr.getTransaction().begin();
            try {
                m_entityMgr.persist(helloWorldEvent);
                m_entityMgr.getTransaction().commit();
            } finally {
             if (m_entityMgr.getTransaction().isActive())
              m_entityMgr.getTransaction().rollback();
            }
        }
        Query q = m_entityMgr.createQuery("select t from HelloWorldEvent t");
        List<HelloWorldEvent> hwlist = q.getResultList();
        System.out.println("Stored " + hwlist.size() + " helloworld events");
        m_eventSender.sendInsertEvent(event);
    }
    @Resource(name="derbyDS")
    public void setDataSource(DataSource ds){
        m_ds = ds;
    }
    public void destroy(){
        m_shuttingDown = true;
 
        if (m_entityMgr!=null){
            m_entityMgr.close();
            m_entityMgr=null;
        }
        if (m_entityMgrFactory!=null){
            m_entityMgrFactory.close();
            m_entityMgrFactory=null;
        }
    }
}

9.3 JPA Coherenceの例

JPA Coherenceの例では、CoherenceのCacheLoaderまたはCacheStoreインタフェース用のEclipseLink JPA実装の使用方法を示します。

9.3.1 persistence.xml構成ファイル

EclipseLinkのプロパティは、データベースの読取りおよび書込み操作とロギング用の設定を指定します。データベース内のオブジェクトを表す管理対象の永続可能クラスは、com.oracle.cep.sample.PriceTargetおよびcom.oracle.cep.sample.SaleEventです。

このpersistence.xmlファイルには、コメントアウトされ、falseに設定されたJPAロギングのエントリがあります。これらの設定のコメントを解除してtrueに設定することにより、アプリケーションの動作をデバッグまたは監視できます。プロパティの設定の詳細は、http://eclipse.org/eclipselink/documentation/2.4/jpa/extensions/toc.htmを参照してください。

<?xml version="1.0" encoding="UTF-8" ?>
<persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://java.sun.com/xml/ns/persistence      http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
     version="2.0" xmlns="http://java.sun.com/xml/ns/persistence">
  <persistence-unit name="derby" transaction-type="RESOURCE_LOCAL">
    <class>com.oracle.cep.sample.PriceTarget</class>
    <class>com.oracle.cep.sample.SaleEvent</class>
    <properties>
      <property name="eclipselink.jdbc.read-connections.min" value="1"/>
      <property name="eclipselink.jdbc.write-connections.min" value="1"/>
      <property name="javax.persistence.jdbc.driver"
                value="org.apache.derby.jdbc.EmbeddedDriver"/>
      <property name="javax.persistence.jdbc.url"
                value="jdbc:derby:test1;create=true"/>
      <property name="eclipselink.ddl-generation" value="create-tables"/>
      <property name="eclipselink.ddl-generation.output-mode" value="database"/>
    <!--
      <property name="eclipselink.logging.timestamp" value="false"/>
      <property name="eclipselink.logging.thread" value="false"/>
      <property name="eclipselink.logging.session" value="false"/>
      <property name="eclipselink.logging.exceptions" value="false"/>
      <property name="eclipselink.logging.connection" value="false"/>
      <property name="eclipselink.logging.level" value="FINER"/>
    -->
    </properties>
  </persistence-unit>
</persistence>

9.3.2 クラス

この例は、次のクラスで構成されています。

この例では、一連の初期アイテムが発売され、要求されたターゲット価格がデータ・ストアで設定されます。データ・ストアは、CacheLoaderで使用するよう設定されているため、PriceTarget.java Coherenceキャッシュで使用可能です。SaleEventsのストリームがSaleEventsGeneratorアダプタから生成されます。販売価格がターゲット価格と一致する場合は、SaleEvent Coherenceキャッシュに格納されます。Coherence MapListener実装により、キャッシュに格納されているSaleEventsが実際にデータ・ストアでも使用可能かどうかが検証されます。

9.3.2.1 CoherenceMapListener.java

CoherenceMapListener.javaクラスは、Coherenceキャッシュにパブリッシュされたイベントをリスニングします。

package com.oracle.cep.sample;
 
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import com.tangosol.util.MapListener;
import com.tangosol.util.MapEvent;
import com.tangosol.util.ObservableMap;
import com.bea.wlevs.ede.api.InitializingBean;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.StreamSender;

public class CoherenceMapListener implements MapListener, InitializingBean,                                              StreamSource {
    private static final String PERSISTENCE_UNIT_NAME = "derby";
    private EntityManagerFactory m_entityMgrFactory;
    private EntityManager m_entityMgr;
    private ObservableMap m_saleEventCache;
    private StreamSender m_sender;
 
    public void afterPropertiesSet()
    {
        m_saleEventCache.addMapListener(this);
    }
    public void setEventSender(StreamSender sender)
    {
        m_sender = sender;
    }
    public void setSaleEventCache(ObservableMap cache)
    {
        m_saleEventCache = cache;
    }
    public void entryInserted(MapEvent event)
    {
        verifyEventInStore(event);
    }
    private void verifyEventInStore(MapEvent event){
        if (!(event.getNewValue() instanceof SaleEvent)){
            System.out.println("Unexpected type in SaleEvent cache");
            return;
        }
        if (m_entityMgr==null){
            setupEntityMgr();
        }
        SaleEvent sale = (SaleEvent) event.getNewValue();
        Query q = m_entityMgr.createQuery("SELECT s FROM SaleEvent s 
                 WHERE s.itemID = :itemID");
        q.setParameter("itemID", sale.getItemID());
        List<SaleEvent> saleEvents = q.getResultList();
        if (saleEvents.size()==0)
            System.out.println("ERROR! Matched SaleEvent not found in store");
        else {
          System.out.println("Found sale event for " +
                              saleEvents.get(0).getItemID() + " for $" +
                              saleEvents.get(0).getSalePrice());
          m_sender.sendInsertEvent(sale);
        }
    }
    private void setupEntityMgr() {
      m_entityMgrFactory = Persistence.createEntityManagerFactory(
                           PERSISTENCE_UNIT_NAME);
      m_entityMgr = m_entityMgrFactory.createEntityManager();
    }
    public void entryUpdated(MapEvent event){
        verifyEventInStore(event);
    }
    public void entryDeleted(MapEvent event){
      System.out.println("SaleEvent cache entry deleted. 
                          Should not see this event for this sample");
    }
}

9.3.2.2 PriceTarget.java

package com.oracle.cep.sample;
 
import javax.persistence.Entity;
import javax.persistence.Id;
 
@Entity
public class PriceTarget implements java.io.Serializable {
    @Id
    private String itemID;
    private double targetPrice;
 
    public String getItemID() {
        return itemID;
    }
    public void setItemID(String itemID) {
        this.itemID = itemID;
    }
    public double getTargetPrice(){
        return targetPrice;
    }
    public void setTargetPrice(double targetPrice){
        this.targetPrice = targetPrice;
    }
}

9.3.2.3 PriceTargetLoader.java

package com.oracle.cep.sample;
 
import java.util.ArrayList;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.springframework.beans.factory.DisposableBean;
import com.bea.wlevs.ede.api.InitializingBean;
 
public class PriceTargetLoader implements DisposableBean, InitializingBean {
    private static final String PERSISTENCE_UNIT_NAME = "derby";
    static ArrayList<PriceTarget> s_entriesToLoad = new ArrayList<PriceTarget>();
    static {
      setUpEntriesToLoad();
    }
 
    private EntityManagerFactory m_entityMgrFactory;
    private EntityManager m_entityMgr;
 
    public void afterPropertiesSet() {
      m_entityMgrFactory = Persistence.createEntityManagerFactory(
                                                      PERSISTENCE_UNIT_NAME);
      m_entityMgr = m_entityMgrFactory.createEntityManager();
      m_entityMgr.getTransaction().begin();
      try{
        Query q = m_entityMgr.createQuery("SELECT t FROM PriceTarget t 
                                          WHERE t.itemID = :itemID");
        for (PriceTarget target : s_entriesToLoad){
          q.setParameter("itemID", target.getItemID());
          List<PriceTarget> targetList = q.getResultList();
          if (targetList.size()==0){
            System.out.println("Persisting target " + target.getItemID());
            m_entityMgr.persist(target);
          } else {
            System.out.println("Found target " + target.getItemID());
          }
        }
        m_entityMgr.getTransaction().commit();
     }
     finally {
       if(m_entityMgr.getTransaction().isActive())
         m_entityMgr.getTransaction().rollback();
     }
   }
 
   public void destroy() {
     if(m_entityMgr!=null) {
       m_entityMgr.close();
       m_entityMgr=null;
      }
      if(m_entityMgrFactory!=null){
       m_entityMgrFactory.close();
       m_entityMgrFactory=null;
     }
   }
   private static void setUpEntriesToLoad(){
    // 'smith', ipad2, $400
    PriceTarget target = new PriceTarget();
    target.setItemID("ipad2");
    target.setTargetPrice(400);
    s_entriesToLoad.add(target);
    // 'doe', kindle, $100
    target = new PriceTarget();
    target.setItemID("kindle");
    target.setTargetPrice(100);
    s_entriesToLoad.add(target);
    // walker, rebel, $400
    target = new PriceTarget();
    target.setItemID("rebel");
    target.setTargetPrice(400);
    s_entriesToLoad.add(target);
    // williams, lasko1320, $25
    target = new PriceTarget();
    target.setItemID("lasko1320");
    target.setTargetPrice(25);
    s_entriesToLoad.add(target);
  }
}

9.3.2.4 SaleEvent.java

package com.oracle.cep.sample;
import javax.persistence.Entity;
import javax.persistence.Id;
 
@Entity
public class SaleEvent implements java.io.Serializable {
    @Id
    private String itemID;
    private double salePrice;
 
    public SaleEvent() { }
    public SaleEvent(String itemID, double salePrice){
        this.itemID = itemID;
        this.salePrice = salePrice;
    }
    public String getItemID(){
        return itemID;
    }
    public void setItemID(String itemID){
        this.itemID = itemID;
    }
    public double getSalePrice(){
        return salePrice;
    }
    public void setSalePrice(double salePrice) {
        this.salePrice = salePrice;
    }
    public String toString() {
         return "SaleEvent(" + itemID + ":" + salePrice + ")";
    }
}

9.3.2.5 SaleEventsGenerator.java

package com.oracle.cep.sample;
 
import java.util.Map;
import java.util.Random;
import com.bea.wlevs.ede.api.RunnableBean;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.ede.api.InitializingBean;
 
public class SaleEventsGeneraton implements RunnableBean, StreamSource,
                                 InitializingBean{
    private static final int SLEEP_MILLIS = 1000;
    private static final String[] s_itemIDs = { 
          "kodaksport",
          "ipodtouch-8GB",
          "ipad2",
          "kindle",
          "garmin1690",
          "rebel",
          "logitech1080",
          "tomtom",
          "ipad2",
          "cuisinart10s",
          "keurig-b70",
          "lasko1320" };
    private static final double[] s_prices = {
          60.0,
          200.0,
          450.0,
          99,
          120,
          400,
          70,
          100,
          399,
          100,
          150,
          20 };
 
    private boolean m_suspended;
    private Thread m_thread;
    private StreamSender m_sender;
    private Map m_priceTargetCache;
 
    public void setPriceTargetCache(Map cache){
        m_priceTargetCache = cache;
    }
    public void afterPropertiesSet() {
        // pre-load PriceTarget cache
        for (PriceTarget target : PriceTargetLoader.s_entriesToLoad)
        {
            System.out.println("Getting : " + target.getItemID());
            m_priceTargetCache.get(target.getItemID());
        }
    }
    public void run() {
        m_thread = Thread.currentThread();
        m_suspended = false;
        // send random sale events
        Random random = new Random(System.currentTimeMillis());
        while (!isSuspended())
        { 
            int index = random.nextInt(s_itemIDs.length);
            SaleEvent event = new SaleEvent(s_itemIDs[index], s_prices[index]);
            m_sender.sendInsertEvent(event);
            try {
                synchronized (this) { wait(SLEEP_MILLIS); }
            } catch (InterruptedException e) {
                if (isSuspended())
                    return;
            }
        }
    }
    public void setEventSender(StreamSender sender) {
        m_sender = sender;
    }
    public synchronized void suspend() {
        m_suspended = true;
        if (m_thread!=null)
            m_thread.interrupt();
    }
    private synchronized boolean isSuspended() {
        return m_suspended;
    }
}