Oracle® Fusion Middleware Oracle Stream Analyticsによるイベント処理用アプリケーションの開発 12c (12.2.1.2.0) E82664-01 |
|
前へ |
次へ |
アプリケーションがイベント・データにアクセスできるようにキャッシング・システムを構成できます。Oracle Coherence分散キャッシング、Oracle Stream Analyticsローカル・キャッシングおよびサードパーティ提供のキャッシング・ソリューションを組み合せてシステムのキャッシュを構成できます。Oracle CQLおよびJavaクラスを使用してキャッシュのイベントにアクセスできます。
この章の内容は次のとおりです。
キャッシュはイベント・データの一時記憶域です。イベント・データの可用性およびアプリケーションのパフォーマンス向上のため、キャッシュを作成できます。アプリケーションでイベントを作成したキャッシュにパブリッシュ、または作成したキャッシュからイベントを使用できるようになります。
また、他のアプリケーションがキャッシュに書き込んだ処理済のイベント・データにもアクセスできます。
イベントを生成するOracle Stream Analyticsアプリケーションで任意のステージを構成し、イベントをキャッシュにパブリッシュできます。キャッシュはEPNのステージである必要はありません。キャッシングAPIを使用してプログラミングすることにより、別のコンポーネントまたはSpring Beanからキャッシュのイベントにアクセスできます。
キャッシング・システムとは、キャッシュ実装の構成されたインスタンスです。キャッシング・システムは、構成されたキャッシュの名前付きセットを定義するとともに、任意のキャッシュが複数のマシンに渡って配信される場合、リモート通信用の構成を定義します。
Oracle Stream Analyticsキャッシングにより、アプリケーションで次のタスクを実行できます。これらのタスクはすべてインクリメンタルに行われ、アプリケーションの停止や待機時間スパイクは発生しません。
アプリケーションのデプロイ前にキャッシュにイベント・データをあらかじめロードします。
キャッシュのイベント・データを定期的にリフレッシュ、無効化およびフラッシュします。
動的にキャッシュ構成を更新します。
Oracle Stream Analyticsでは、次のキャッシュ実装がサポートされます。
Oracle Stream Analyticsローカル・キャッシュ: ローカルのインメモリー・シングルJVMキャッシュ。この実装はローカルの使用に最適です(クラスタで使用できません)。設定が比較的簡単なため、早期の開発にも役立つ場合があります。
Oracle Coherence: クラスタリングされたアプリケーションおよびアプリケーション・サーバーのJCacheに準拠したインメモリー分散型データ・グリッド・ソリューション。これは、クラスタ全体の同時実行性制御を使用してデータへの更新を連携動作させ、使用可能な最高性能のクラスタ化プロトコルを使用してデータ変更をクラスタ全体にレプリケートする他、データ変更通知をそれをリクエストするすべてのサーバーに配信します。ユーザーは、標準JavaコレクションAPIを使用してデータへのアクセスや変更を行うことでOracle Coherenceの機能を活用する他、JavaBeanの標準イベント・モデルを使用してデータ変更通知を受け取ります。
注:
Oracle Stream AnalyticsをOracle Coherenceと組み合せて使用するには、Coherence Enterprise Edition、Coherence Grid Edition、Oracle WebLogic Application Gridのライセンスなどの有効なOracle Coherenceライセンスを取得する必要があります。
Oracle Coherenceの詳細は、http://docs.oracle.com/middleware/1213/coherence/index.html
を参照してください。
サードパーティのキャッシュ: Oracle Stream Analyticsをサードパーティの他のキャッシュ実装で機能することを可能にするプラグインを作成できます。
高いスループットが特に重要になる可能性がある場合、キャッシュ技術はストリーミング・データのユース・ケースに非常に役立ちます。通常、キャッシュからのデータの取得は、リレーショナル・データベースからの同じデータの取得よりはるかに高速になります。
Oracle Stream Analyticsアプリケーションでキャッシュする共通のユース・ケースのシナリオは次のとおりです。
キャッシュへのイベントのパブリッシュ
金融アプリケーションは、金融市場が開いている間はイベントをキャッシュにパブリッシュし、市場終了後にキャッシュのデータを処理します。イベントをキャッシュにパブリッシュすると、イベントがアプリケーションまたはサーバー上で稼働する他のOracle Stream Analyticsアプリケーションで使用可能になります。イベントをキャッシュにパブリッシュすると、キャッシュの実装によってセカンダリ・ストレージへの非同期的書込みも可能になります。
キャッシュ内のデータの使用
Oracle Stream Analyticsアプリケーションでは、ストリーミング以外のデータへのアクセスが必要になる場合があります。このデータをキャッシュすることで、アプリケーションのパフォーマンスが向上します。プログラムによりキャッシュに直接アクセスできるOracle Stream Analyticsアプリケーションの標準のコンポーネントは、入力および出力アダプタ、およびビジネスPOJOです。
また、アプリケーションは、ユーザー定義関数またはOracle CQL文から直接のいずれの方法でも、Oracle CQLからキャッシュにアクセスできます。ユーザー定義関数の場合、プログラマはSpringを使用してキャッシュ・リソースを関数の実装に組み込みます。詳細は、アプリケーションおよびリソース構成を参照してください。
アプリケーションは、プロセッサで実行するOracle CQL文から直接キャッシュに問合せを実行することもできます。この場合、データがキャッシュから抽出される場合を除いて、キャッシュへの問合せがチャネルへの問合せに類似するように、キャッシュはプロセッサへの別の種類のデータ・ソースとして機能します。
キャッシュに問い合せるためにOracle CQLを使用する例は、注文や注文をキャッシュに実行するために使用される取引をパブリッシュする金融アプリケーションにみられます。市場が終了する1日の終わりに、アプリケーションはキャッシュへの問合せを実行し、特定の注文に関連するすべての取引を検索します。
キャッシュのデータの更新と削除
Oracle Stream Analyticsアプリケーションでは、必要に応じてキャッシュ内のデータを更新および削除できます。たとえば、金融アプリケーションでは注文を実現する個々の取引が実行されるたびにキャッシュ内の注文を更新し、注文が取り消された場合はこれを削除する必要があります。キャッシュ内のデータの使用が許可されたアプリケーションのコンポーネントには、データの更新も許可されます。
マルチサーバー・ドメインにおけるキャッシュの使用
キャッシュを使用するOracle Stream Analyticsアプリケーションを構築する場合、そのアプリケーションをマルチサーバー・ドメインでデプロイし、分散型キャッシュをサポートするキャッシング・システムを使用する必要があります。この場合、Oracle Coherenceまたは分散型キャッシュをサポートするサードパーティのキャッシング・システムのいずれかを使用する必要があります。
詳細は、次を参照してください。
Oracle Coherenceキャッシング・システムおよびキャッシュを使用するようにアプリケーションを構成できます。アプリケーションをマルチサーバー・ドメインにデプロイしようとする場合は、このキャッシング・システムを使用します。
Oracle Coherenceを使用して構成すると、第1キャッシュ・システムのみがサーバー内に構成されます。Oracle Stream Analyticsサーバーは、構成済の他のキャッシング・システムを無視します。
注:
合法的にOracle CoherenceをOracle Stream Analyticsとともに使用するには、Coherence Enterprise Edition、Coherence Grid Edition、またはOracle WebLogic Application Gridのライセンスなどの有効なCoherenceライセンスを取得する必要があります。
Oracle Coherenceの詳細は、http://docs.oracle.com/middleware/1213/coherence/index.html
を参照してください。
次のアセンブリおよび構成ファイル設定で、Oracle Coherenceキャッシング・システムおよびOracle CQLプロセッサのキャッシュが構成されます。このキャッシュは、イベント・タイプを使用してリレーショナル・データベースの表の行を特定するキー・プロパティを指定します。このキャッシング・システムは公開されており、他のアプリケーションからキャッシュのデータにアクセスできます。
アセンブリ・ファイル設定により、キャッシング・システムおよびcache1を構成します。value-type
設定では、データベースの値をロードするイベント・タイプを指定します。このキャッシュは通知されます。
<wlevs:cache id="cache1" value-type="TradeReport" advertise="true"> <wlevs:caching-system ref="coherence-caching-system"/> </wlevs:cache> <wlevs:caching-system id="coherence-caching-system" provider="coherence"/>
注:
EPN図のcoherenceキャッシュのid
設定を変更すると、アセンブリ・ファイルとcoherence-cache-ファイルのid
が変更されます。ただし、アセンブリ・ファイルのソース・エディタでid
設定を変更すると、アセンブリ・ファイルのid
のみが変更されます。この場合は、coherence-cache-
のcache-name
設定を、アセンブリ・ファイルのid設定と一致するように手動で変更する必要があります。また、そのキャッシュへのすべての参照も変更する必要があります。
キャッシュが通知される場合、別のバンドル内にあるアプリケーションのEPNにあるコンポーネントがそのキャッシュを参照できます。次の例では、1つのバンドル内のプロセッサがcache-source
要素を使用して、cacheprovider
のcache-id
を持つ別のバンドルのキャッシュ・ソースを参照する方法を示しています。
<wlevs:processor id="myProcessor2"> <wlevs:cache-source ref="cacheprovider:cache-id"> </wlevs:processor>
注:
同一のOracle Stream Analyticsサーバーにデプロイされる1つ以上のアプリケーションのEPNアセンブリ・ファイルのOracle Coherenceキャッシュが存在する場合、ローダーまたはストアを使用して同一のキャッシュの複数のインスタンスを構成しないでください。
各EPNアセンブリ・ファイル内でローダーまたはストアを使用して同一のOracle Coherenceキャッシュをそれぞれ構成する、複数のアプリケーションを使用することによって、これを不注意で行う可能性があります。ローダーまたはストアのある同じキャッシュの複数のインスタンスを構成すると、Oracle Stream Analyticsは例外をスローします。
coherence-cache-config.xml
ファイルは基本的なOracle Coherence構成ファイルで、任意のOracle Coherenceアプリケーションでtrueとなるように、Oracle Coherence DTDに準拠する必要があります。
coherence-cache-config.xml
:の詳細は、Oracle Coherenceドキュメント(http://docs.oracle.com/middleware/1213/coherence/index.html
)を参照してください。
Springを使用してキャッシュのローダーまたはストアを構成する場合は、Oracle Stream Analytics Oracle Coherenceファクトリを宣言する必要があります。ファクトリを指定するには、cachestore-scheme
要素を使用し、Oracle CoherenceがOracle Stream Analyticsに呼び出して、キャッシュのために構成されるローダーまたはストアへの参照を取得できるようにするためのファクトリ・クラスを含めます。ローダーまたはストアの構成における唯一の差異は、method-name
要素が、ローダーが使用されるときはgetLoader
の値を持ち、ストアが使用されているときはgetStore
の値を持つことです。キャッシュ名は、入力パラメータとしてファクトリに渡します。
<cache-config> <caching-scheme-mapping> <cache-mapping> <cache-name>myCoherenceCache</cache-name> <scheme-name>new-replicated</scheme-name> </cache-mapping> <cache-mapping> <cache-name>myLoaderCache</cache-name> <scheme-name>test-loader-scheme</scheme-name> </cache-mapping> <cache-mapping> <cache-name>myStoreCache</cache-name> <scheme-name>test-store-scheme</scheme-name> </cache-mapping> <cache-mapping> <cache-name> cache1 </cache-name> <scheme-name> new-replicated </scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <replicated-scheme> <scheme-name>new-replicated</scheme-name> <service-name>ReplicatedCache</service-name> <backing-map-scheme> <class-scheme> <scheme-ref>my-local-scheme</scheme-ref> </class-scheme> </backing-map-scheme> </replicated-scheme> <class-scheme> <scheme-name>my-local-scheme</scheme-name> <class-name>com.tangosol.net.cache.LocalCache</class-name> <eviction-policy>LRU</eviction-policy> <high-units>100</high-units> <low-units>50</low-units> </class-scheme> <local-scheme> <scheme-name>test-loader-scheme</scheme-name> <eviction-policy>LRU</eviction-policy> <high-units>100</high-units> <low-units>50</low-units> <!-- A cachestore-scheme element that gets a loader starts here --> <cachestore-scheme> <class-scheme> <class-factory-name>com.bea.wlevs.cache.coherence.configuration.SpringFactory </class-factory-name> <method-name>getLoader</method-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>myCoherenceCache</param-value> </init-param> <init-param> <param-type> java.lang.String </param-type> <param-value> cache1 </param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> <!-- The cachestore-scheme element ends here --> </local-scheme> <local-scheme> <scheme-name>test-store-scheme</scheme-name> <eviction-policy>LRU</eviction-policy> <high-units>100</high-units> <low-units>50</low-units> <!-- A cachestore-scheme element that gets a store starts here --> <cachestore-scheme> <class-scheme> <class-factory-name>com.bea.wlevs.cache.coherence.configuration.SpringFactory </class-factory-name> <method-name>getStore</method-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>myCoherenceCache</param-value> </init-param> <init-param> <param-type> java.lang.String </param-type> <param-value> cache1 </param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> <!-- The cachestore-scheme element ends here --> </local-scheme> </caching-schemes> </cache-config>
tangosol-coherence-override.xmlファイル(オプション)
tangosol-coherence-override.xml
ファイルはサーバー単位でグローバルなファイルです。これには、Oracle Coherenceドキュメントで操作構成と呼ばれているものが含まれています。このファイルには、Oracle Coherenceキャッシュのサーバー単位のグローバルな構成設定が含まれています。このファイルはXMLエディタで作成し、構成するサーバーのOracle Stream Analyticsのconfig
ディレクトリに置きます。
注:
Oracle Coherenceをクラスタリングに使用する場合は、tangosol-coherence-override.xml
ファイルを含めないでください。
次のXMLをOracle Coherence構成ファイルに追加し、tangosol-coherence-override.xml
ファイルを参照します。Oracle Stream Analyticsの起動時にOracle Coherenceが既存のOracle Coherenceクラスタへの参加を試行しないように、cluster-name
要素を含めます。これが発生すると、問題が発生してOracle Stream Analyticsが起動できなくなる場合があります。
...
<coherence xml-override="/tangosol-coherence-override.xml">
<cluster-config>
<member-identity>
<cluster-name>com.bea.wlevs.example.provider</cluster-name>
</member-identity>
...
</coherence>
Oracle Stream Analyticsクラスタの詳細は、Oracle Stream Analyticsの管理のOracle Stream Analyticsのネイティブ・クラスタに関する項を参照してください。
com.oracle.cep.cacheloader
パッケージは、CSVイベントをCoherenceキャッシュにロードするためのCsvCacheLoader
クラスを提供します。sourceUrl
プロパティを置き換え、インバウンド・アダプタを持つキャッシュ・ローダーを使用します。
最初のアセンブリ・ファイルCSVアダプタ構成は、sourceUrlプロパティを使用してファイルをロードするCSVインバウンド・アダプタを示します。2番目のアセンブリ・ファイルCSVアダプタ・エントリは、キャッシュ・ローダーBeanをロードするCSVインバウンド・アダプタを示します。
CSVファイルのイベントのロード
<wlevs:adapter id="StockTradeCSVInboundAdapter" provider="csv-inbound"> <wlevs:listener ref="AdapterOutputChannel"/> <wlevs:instance-property name="eventType" value="TradeEvent"/> <wlevs:instance-property name="sourceUrl" value="file:/scratch/mpawlan/oep9-19/oep/utils/load-generator/StockData.csv"/> </wlevs:adapter>
キャッシュ・ローダーを使用するイベントのロード
<wlevs:cache id="csvcache" key-properties="sequenceNo" value-type="TradeEvent" advertise="true"> <wlevs:caching-system ref="cachesys" /> </wlevs:cache> <bean id="csvloader" class="com.oracle.cep.cacheloader.CsvCacheLoader"> <property name="cacheName" value="csvcache"/> <property name="sourceUrl" value="file:///scratch/juhe/view_storage/trade.csv"/> </bean>
アプリケーションを構成して、Oracle Stream Analyticsローカル・キャッシュ・システムおよびキャッシュを使用できます。Oracle Stream Analyticsローカル・キャッシング・システムは、アプリケーションをマルチサーバー・ドメインにデプロイする計画がない場合に適切です。
アプリケーションをマルチサーバー・ドメインにデプロイする計画がある場合は、Oracle Coherenceキャッシュを使用します。
この章では、いくつかの構成設定について説明します。詳細は、『Oracle Stream Analyticsスキーマ・リファレンス』のコンポーネント構成スキーマに関する項およびCoherenceキャッシュ・システムに関する項を参照してください。
次のアセンブリ・ファイル設定により、ローカル・キャッシング・システムおよびキャッシュを構成します。value-type
設定では、データベースの値をロードするイベント・タイプを指定します。
<wlevs:cache id="localcache" value-type="HelloWorldEvent"> <wlevs:caching-system ref="caching-system"/> </wlevs:cache> <wlevs:caching-system id="caching-system" provider="wlevs" advertise="false"/>
次の構成ファイル設定では、ローカル・キャッシング・システムの最大サイズおよび削除ポリシーを指定します。最大サイズは、削除ポリシー発生後のメモリー内のキャッシュ要素数を指定します。また、この例ではエントリがキャッシュされる時間の最大数もミリ秒で指定します。デフォルトのtime-to-live
値は無限大です。この例では3600ミリ秒を指定しています。
<caching-system> <name>caching-system</name> <cache> <name>localcache</name> <max-size>64</max-size> <eviction-policy>LFU</eviction-policy> <time-to-live>3600</time-to-live> </cache> </caching-system>
次の構成ファイル設定は、write-behind
要素をキャッシュの子要素として追加します。write-behind
要素を指定すると、Oracle Stream Analyticsでキャッシュ・エントリの作成または更新後に別のスレッドからキャッシュ・ストアを起動します。write-behindの子要素は次を示します。
バッキング・ストアに書込みを戻すためにストア・バッファからピック・アップされる更新の数(batch-size)。デフォルト値は100です。
ユーザー・スレッドがストア・バッファに書き込むために行う試行回数。ユーザー・スレッドとは、キャッシュ・エントリの作成または更新を行うスレッドです。ユーザー・スレッドによるストア・バッファへの書込み試行がすべて失敗した場合、同期的にストアを起動します(batch-write-attempts
)。デフォルト値は1です。
ユーザー・スレッドがストア・バッファへの書込み試行を中断する前に待機する、ミリ秒単位の時間(buffer-write-timeout
)。ストア・バッファへの書込み試行が失敗するのは、バッファが一杯の場合のみです。タイムアウト後は、以降のバッファへの書込み試行は、buffer-write-attempts
の値に基づいて行われます。デフォルト値は100です。
<caching-system> <name>caching-system-id</name> <cache> <name>cache-id</name> <max-size>100000</max-size> <eviction-policy>LRU</eviction-policy <time-to-live>3600</time-to-live> <write-behind> <buffer-size>200</buffer-size> <buffer-write-attempts>2</buffer-write-attempts> <buffer-write-timeout>200</buffer-write-timeout> </write-behind> </cache> </caching-system>
次の構成ファイル設定は、listeners
子要素を追加し、キャッシュをリスニングするコンポーネントの動作を構成します。listener
要素にはasynchronous
属性があり、true
(リスナーは非同期的に起動)またはfalse
(リスナーは同期的に起動)のいずれかに設定できます。
work-manager-name
子要素は、非同期的なリスナーの起動に使用されるワーク・マネージャを指定します。この値は、同期的起動が有効化されている場合は無視されます。キャッシュにワーク・マネージャが指定されている場合は、リスナーの呼出しについてのみこの値でオーバーライドされます。work-manager-name
要素の値は、Oracle Stream Analytics config.xml
サーバー構成ファイルのwork-manager
設定のname
要素と一致します。
<caching-system> <name>caching-system-id</name> <cache> <name>cache-id</name> <max-size>100000</max-size> <eviction-policy>LRU</eviction-policy <time-to-live>3600</time-to-live> <write-behind> <buffer-size>200</buffer-size> <buffer-write-attempts>2</buffer-write-attempts> <buffer-write-timeout>200</buffer-write-timeout> </write-behind> <listeners asynchronous="true"> <work-manager-name>cachingWM</work-manager-name> </listeners> </cache> </caching-system>
キャッシュを構成してネットワークを通過するイベントを受信できます。たとえば、キャッシュがチャネルをリスニングすることを指定するには、キャッシュへの参照を持つwlevs:listener
要素とともにチャネルを構成します。
次の例では、チャネルが新しいイベントをキャッシュに送信するときに、イベントがキャッシュに挿入されます。チャネルでremove event(出力ウィンドウに存在する古いイベント)を送信する場合、そのイベントはキャッシュから削除されます。
<wlevs:caching-system id="caching-system-id"/>
<wlevs:cache id="cache-id" name="alternative-cache-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
<wlevs:channel id="tradeStream">
<wlevs:listener ref="cache-id"/>
</wlevs:channel>
次の項では、キャッシュのインデックスに使用するキーの指定に使用可能なオプションについて説明します。
明示的にキーを指定しない場合、イベントがキャッシュに挿入されるときにイベント・オブジェクトはキーおよび値の両方として提供されます。この場合、イベント・クラスには、キー・プロパティ値を考慮するequals
およびhashcode
メソッドの有効な実装が含まれる必要があります。
次の例で示されるように、key-properties
属性を使用してアセンブリ・ファイル内でキー・プロパティのプロパティ名を指定します。
<wlevs:cache id="myCache" key-properties="key-property-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
この場合、キャッシュに挿入されるすべてのイベントで実行時にこの名前のプロパティを保持することが要求され、それ以外の場合はOracle Stream Analyticsで例外がスローされます。たとえば、キャッシュに挿入されているイベント型は次のように見えると仮定します。ここで、key
プロパティに注意します(関連するJavaソースのみが示されます)。
public class MyEvent { private String key; public MyEvent() {} public MyEvent(String key) { this.key = key; } public String getKey() { return key;} public void setKey(String key) { this.key = key;} }
アセンブリ・ファイルの対応する宣言は、次のようになります。
<wlevs:cache id="myCache" key-properties="key">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
メタデータ注釈com.bea.wlevs.ede.api.Key
を使用して、イベント型を実装するJavaクラスにおけるイベント・プロパティに注釈を付加できます。この注釈には、属性はありません。
メタデータ注釈を使用してキーを指定するには、次のステップを実行します。
wlevs:cache
要素のkey-class
属性を使用して、複数のプロパティがキーを形成する複合キーを指定できます。key-class
属性値は、パブリック・フィールドがイベント・クラスのフィールドに一致するJavaBean
である必要があります。JavaBean
クラスは、java.lang.Object class
からのequals
およびhashCode
メソッドをオーバーライドする必要があります。照合はフィールド名に従って実行されます。次に例を示します。
<wlevs:cache id="myCache" key-class="key-class-name"> <wlevs:caching-system ref="caching-system-id"/> </wlevs:cache>
key-field1およびkey-field2で構成される複合キーを持つキャッシュに対しては、次の問合せを両方とも実行できます。
SELECT stream.field2, cache.key-field1 from stream[NOW], cache WHERE stream.field2=cache.key-field1 AND stream.field2=cache.key-field2 SELECT stream.field1, cache.key-field1 from stream[NOW], cache WHERE stream.field1=cache.key-field1
キャッシュをイベント・ソースとして構成できます。キャッシュをイベント・ソースとして使用するには、com.bea.wlevs.ede.api.StreamSink
インタフェースを実装する必要があります。
次に構成を示します。
<wlevs:cache id="cache-id" name="alternative-cache-name" caching-system="caching-system-id"> <wlevs:listener ref="cache-listener-id" /> </wlevs:cache>
キャッシュは、イベント処理ネットワーク内の別のコンポーネントがリスニングする対象のイベント・ソースとして構成可能です。リスニングするコンポーネントは、アダプタまたはBeanです。
キャッシュをリスニングするクラスは、次のようなイベントを受信する方法を提供するインタフェースを実装する必要があります。
Coherenceキャッシュをリスニングするクラスは、com.tangosol.util.MapListener
インタフェースを実装する必要があります。
Oracle Stream Analyticsローカル・キャッシュをリスニングするクラスは、com.bea.cache.jcache.CacheListener
インタフェースを実装する必要があります。
<wlevs:caching-system id="caching-system-id"/> ... <wlevs:cache id="cache-id" name="alternative-cache-name"> <wlevs:caching-system ref="caching-system-id"/> <wlevs:cache-listener ref="cache-listener-id" /> </wlevs:cache> ... <bean id="cacheListenerId" class="com.bea.wlevs.example.provider.coherence"/>
例で、cacheListenerId
Spring Beanは、キャッシュからのイベントをリスニングします。この場合、このコンポーネントを実装するユーザー定義クラスcom.bea.wlevs.example.MyCacheListener
は、Oracle Coherenceキャッシュをリスニングしています。com.tangosol.util.MapListener
を含む適切なOracle Coherence固有のJavaインタフェースを実装する必要があります。次の例は、この実装を示します。
package com.bea.wlevs.example.provider.coherence; import com.tangosol.util.MapEvent; import com.tangosol.util.MapListener; public class LocalListener implements MapListener { public static int deleted = 0; public static int inserted = 0; public static int updated = 0; public void entryDeleted(MapEvent event) { deleted++; } public void entryInserted(MapEvent event) { inserted++; } public void entryUpdated(MapEvent event) { updated++; } }
サードパーティのキャッシング・システムおよびキャッシュを使用するようにアプリケーションを構成できます。
サードパーティのキャッシング・システムおよびキャッシュの構成
EPNのキャッシュとデータベースを含む他のデータ・ソースのデータを交換できます。たとえば、アプリケーションの起動時にデータを使用してキャッシュをロードしたり、キャッシュとデータベース間の読取り/書込み関係を作成できます。
バッキング・ストアが読取り専用である場合を含めてキャッシュがデータを読み取るだけの場合、キャッシュ・ローダーを使用する必要があります。キャッシュでデータを読み取るまたは書き込む場合、キャッシュ・ストアを使用します。どちらの場合も、関係の作成には、特定の構成およびデータ・ソースとの通信方法を認識するJavaクラスが含まれます。
キャッシュ・ローダーを使用して、EPNのキャッシュに読取り専用データ・ソースからデータをロードできます。キャッシュ・ローダーはキャッシュにキャッシュ・オブジェクトをロードするJavaクラスです。適切なインタフェースを実装してキャッシュと通信するローダー・クラスを有効にするJavaクラスを書き込むことによって、キャッシュ・ローダーを作成します。wlevs:cache
要素のwlevs:cache-loader
子要素を使用してロード処理を行うBeanを指定することによって、キャッシュ・ローダーを構成します。
バッキング・ストアが読取り/書込みである場合は、かわりにキャッシュ・ストアを使用します(読取り/書込みデータ・ソースでのデータの交換を参照)。
キャッシュ・ローダーを作成する場合、次のようにインタフェースを実装します。
キャッシュ・データをOracle Coherenceキャッシュにロードするには、com.tangosol.net.cache.CacheLoader
を含む適切なOracle Coherence固有のJavaインタフェースを実装するクラスを作成します。例8-2を参照してください。
キャッシュ・データをOracle Stream Analyticsローカル・キャッシュにロードするには、com.bea.cache.jcache.CacheLoader
インタフェースを実装するクラスを作成します。このインタフェースには、単一のオブジェクトのキャッシュへのロードをカスタマイズするためのload
メソッドが含まれます。Oracle Stream Analyticsは、リクエストされたオブジェクトがキャッシュ内にないときにこのメソッドを呼び出します。このインタフェースには、キャッシュ全体のロードをカスタマイズするために実装するloadAll
メソッドも含まれます。
例8-1では、localLoader
のBeanは、バッキング・ストアが読取り専用のときにイベントをOracle Coherenceキャッシュにロードします。
Coherenceキャッシュを使用していて構成ファイルでキャッシュ・ローダーを指定した場合、Coherenceキャッシュ構成ファイルで対応するクラス・ファクトリ・メソッド名も指定する必要があることに注意してください。キャッシュ・ローダーには、com.bea.wlevs.cache.coherence.configuration.SpringFactory
のgetLoader
メソッドを指定します。コードの例は、Oracle Coherenceキャッシュ・システムおよびキャッシュの構成を参照してください。
例8-1 キャッシュ・ローダー用Oracle CoherenceキャッシュのEPNアセンブリ・ファイル
<wlevs:caching-system id="caching-system-id"/> <wlevs:cache id="myCache" advertise="false"> <wlevs:caching-system ref="caching-system-id"/> <wlevs:cache-loader ref="localLoader"/> </wlevs:cache> <bean id="localLoader" class="com.bea.wlevs.example.provider.coherence.LocalLoader"/>
例8-2 Oracle CoherenceキャッシュのLocalLoaderの実装
package com.bea.wlevs.example.provider.coherence; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import com.bea.wlevs.example.provider.event.ProviderData; import com.tangosol.net.cache.CacheLoader; public class LocalLoader implements CacheLoader { public static int loadCount = 0; public static Set keys = new HashSet(); public LocalLoader() { } public Object load(Object key) { loadCount++; keys.add(key); return new ProviderData((String) key); } public Map loadAll(Collection keys) { Map result = new HashMap(); for (Object key : keys) { result.put(key, load(key)); } return result; } }
キャッシュ・ストアを使用すると、EPNのキャッシュで読取り/書込みデータ・ソースとデータを交換できます。キャッシュ・ストアはキャッシュとキャッシュ・オブジェクトを交換するJavaクラスです。適切なインタフェースを実装してデータソースと通信するために有効にするJavaクラスを書き込むことによって、キャッシュ・ストアを作成します。次に、wlevs:cache
要素のwlevs:cache-store
子要素を使用してデータソースと通信するBeanを指定することによって、キャッシュ・ストアをEPNに追加します。
バッキング・ストアが読取り専用である場合は、かわりにキャッシュ・ローダーを使用します(読取り専用データ・ソースからのキャッシュ・データのロードを参照)。
キャッシュ・ストアを作成する場合、次のようにインタフェースを実装します。
キャッシュ・データとOracle Coherenceキャッシュを交換するには、com.tangosol.net.cache.CacheStore
を含む適切なOracle Coherence固有のJavaインタフェースを実装するクラスを作成します。この例は、例8-4を参照してください。
キャッシュ・データとOracle Stream Analyticsローカル・キャッシュを交換するには、com.bea.cache.jcache.CacheStore
インタフェースを実装するクラスを作成します。このインタフェースには、受け渡されたキーを使用してバッキング・ストアにデータを保存するstore
メソッドが含まれます。Oracle Stream Analyticsは、データをキャッシュに挿入するときにこのメソッドを呼び出します。このインタフェースは、write-behind
構成要素を使用してキャッシュに非同期書込みを構成した場合に、バッキング・ストアへのデータの一括保存に使用するstoreAll
メソッドも含みます。
例8-3では、localStore
のBeanは、バッキング・ストアが読取り/書込みのときにイベントをキャッシュにロードします。
Spring構成ファイルでキャッシュ・ストアを指定した場合、Coherenceキャッシュ構成ファイルで対応するクラス・ファクトリ・メソッド名も指定する必要があることに注意してください。キャッシュ・ストアには、com.bea.wlevs.cache.coherence.configuration.SpringFactory
のgetStore
メソッドを指定します。コードの例は、Oracle Coherenceキャッシュ・システムおよびキャッシュの構成を参照してください。
例8-3 キャッシュ・ストア用Oracle CoherenceキャッシュのEPNアセンブリ・ファイル
<wlevs:caching-system id="caching-system-id"/> <wlevs:cache id="myCache" advertise="false"> <wlevs:caching-system ref="caching-system-id"/> <wlevs:cache-store ref="localStore"/> </wlevs:cache> <bean id="localStore" class="com.bea.wlevs.example.provider.coherence.LocalStore"/>
例8-4 Oracle CoherenceキャッシュのLocalStoreの実装
package com.bea.wlevs.example.provider.coherence; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; import com.bea.wlevs.example.provider.event.ProviderData; import com.tangosol.net.cache.CacheStore; public class LocalStore implements CacheStore { public static int eraseCount = 0; public static int storeCount = 0; public static int loadCount = 0; public void erase(Object key) { eraseCount++; } public void eraseAll(Collection keys) { for (Object key : keys) { erase(key); } } public void store(Object key, Object value) { // // Do the store operation here. // } public void storeAll(Map entries) { for (Map.Entry entry : (Set <Map.Entry>)entries.entrySet()) { store(entry.getKey(), entry.getValue()); } } public Object load(Object key) { loadCount++; return new ProviderData((String) key); } public Map loadAll(Collection keys) { Map result = new HashMap(); for (Object key : keys) { result.put(key, load(key)); } return result; } }
キャッシュを構成すると、Oracle Stream Analyticsアプリケーションのいくつかのコンポーネントからキャッシュにアクセスできます。
この項では、その実行方法について説明します。
詳細は、次の項を参照してください。
アプリケーションをアセンブルおよびデプロイする前に、META-INF/MANIFEST.MF
を編集して実装で必要になるパッケージをインポートします。たとえば、アプリケーションでキャッシュ・リスナー、キャッシュ・ローダーまたはキャッシュ・ストアを実装する場合は、Coherence APIを含むcom.tangosol.net.cache
パッケージを実装します。
Oracle Stream Analyticsには、アプリケーションで特定のタスクを実行するために使用できるキャッシングAPIが用意されています。APIは、キャッシュへのアクセス、およびキャッシュ・ローダー、リスナー、ストアの作成に使用されるAPIを含むcom.bea.cache.jcache
パッケージ内にあります。ローダー機能、リスナー機能およびストア機能を使用するには、com.tangosol.net
パッケージおよびcom.tangosol.net.cache
パッケージをインポートします。
EPNアセンブリ・ファイルおよびコンポーネント構成ファイルを使用して、キャッシング・システムおよびキャッシュを作成、構成および接続します。この場合、通常はアプリケーションでCache
およびCachingSystem
インタフェースを明示的には使用しません。これらは標準構成を上回る追加要件がある場合にのみ使用します。たとえば、サード・パーティ製キャッシュ・プロバイダとの統合を提供する場合は、CachingSystem
インタフェースを使用する必要があります。キャッシュでjava.util.Map
インタフェース以外の操作を実行する必要がある場合は、Cache
インタフェースを使用できます。
Oracle Stream Analyticsのローカル・キャッシュ用のキャッシュ・リスナー、キャッシュ・ローダー、またはキャッシュ・ストアを作成する場合、書き込むBeanはCacheListener
、CacheLoader
、またはCacheStore
インタフェースを実装する必要があります。
Oracle Coherenceキャッシュ用のキャッシュ・リスナー、キャッシュ・ローダー、またはキャッシュ・ストアを作成する場合、書き込むBeanは適切なOracle Coherenceインタフェースを実装する必要があります。
サードパーティのキャッシュ用のキャッシュ・リスナー、キャッシュ・ローダー、またはキャッシュ・ストアを作成する場合、書き込むBeanは適切なサードパーティのキャッシュ・インタフェースを実装する必要があります。
チャネルなどのイベント・ソースを参照するのとまったく同じ方法でOracle CQL文からキャッシュを参照できます。この機能によって、標準ストリーミング・データを個別ソースからのデータを使用して質を高めることができます。次の例のコードは、S1
という標準チャネルからの取引イベントとstockCache
というキャッシュからの銘柄記号データを結合する、有効なOracle CQL問合せを示します。
Oracle CQL問合せにおいてキャッシュを使用するとき、次の制約を遵守する必要があります。
キャッシュに問い合せる場合は、常に[Now]
ウィンドウに対して結合する必要があります。
これによって、問合せはキャッシュのスナップショットに対して実行されることが保証されます。他のウィンドウ・タイプに対して結合する場合、ウィンドウの有効期限が切れる前にキャッシュが変更されると、問合せは正しくなりません。
次の例は、キャッシュに対してRange
ウィンドウを結合する無効なOracle CQL問合せを示します。このウィンドウの有効期限が切れる前にキャッシュが変更される場合、問合せは正しくなりません。その結果、この問合せはOracle Stream Analyticsサーバー・エラー「外部リレーションはs[now]と結合している必要があります」
を表示します。
SELECT trade.symbol, trade.price, trade.numberOfShares, company.name FROM TradeStream [Range 8 hours] as trade, CompanyCache as company WHERE trade.symbol = company.id
Oracle CQL問合せでキャッシュからのデータを使用する場合、チャネルの場合と同様に、データがプッシュされるのではなく、Oracle Stream Analyticsがデータを抽出します。つまり、問合せを継続すると、問合せはチャネルがtrade
イベントを問合せにプッシュするときにのみ実行されます。キャッシュ内のストック・シンボル・データは問合せを実行せず、必要に応じて問合せによって抽出されるのみです。
キャッシュ・キーに基づいて参照を行うには、必要なキー・プロパティを指定する必要があります。
スキーマ(id
、group
、value
)を持つ(キャッシュ・キーはid
)、2つのストリームS
およびC
を検討してください。有効な問合せは次のとおりです。
select count(*) as n from S [now], C where S.id = C.id
結合はキャッシュのキーを参照することでのみ実行される必要があります。
ビューでキャッシュは使用できません。かわりに、結合を使用します。
キャッシュ・データ・ソースを結合するOracle CQL文のFROM
句では、1つのチャネル・ソースのみが発生できます。
キャッシュがプロセッサ・ソースである場合は、キャッシュをEPNのチャネルに直接接続します。
キャッシュがプロセッサ・シンクである場合は、プロセッサに直接接続できます。
Oracle CQL文からキャッシュへのアクセス
この手順では、キャッシング・システムおよびキャッシュを構成済と仮定します。詳細は、次を参照してください。
まだ構成済でない場合、キャッシュ・データに対応するイベント型を作成し、イベント・リポジトリに登録します。
イベントおよびイベント・タイプを参照してください。
キャッシュ内のデータからキー・プロパティを指定します。
EPNアセンブリ・ファイルで、キャッシュの構成を更新してその値のイベント型を宣言します。wlevs:cache
要素のvalue-type
属性を使用します。次に例を示します。
<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id"
name="alternative-cache-name"
value-type="CompanyEvent">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
value-type属性は、キャッシュに含まれる値のタイプを指定します。これは、イベント型リポジトリで有効なタイプ名にする必要があります。
この属性は、Oracle CQL問合せでキャッシュが参照される場合に必要です。これは、問合せプロセッサはキャッシュ内のイベントのタイプを意識している必要があるからです。
EPNアセンブル・ファイルで、キャッシュを参照するOracle CQL問合せを実行するプロセッサの構成を更新します。
キャッシュがプロセッサ・ソースである場合は、図8-1に示すように、キャッシュをEPNのプロセッサに直接接続します。
キャッシュを参照するwlevs:processor
要素のwlevs:cache-source
子要素を更新します。次に例を示します。
<wlevs:channel id="S1"/>
<wlevs:processor id="cacheProcessor">
<wlevs:source ref="S1">
<wlevs:cache-source ref="cache-id">
</wlevs:processor>
この例では、プロセッサは従来どおりにS1
チャネルから押し出されるデータを持ちます。ただし、プロセッサで実行するOracle CQL問合せは、cache-id
キャッシュからデータを抽出することもできます。問合せプロセッサが、FROM
句のイベント型をCompanyEvent
などのキャッシュによって提供されるイベント型に一致するとき、プロセッサはキャッシュからそのイベント型のインスタンスを抽出します。
キャッシュがプロセッサ・シンクである場合は、図8-2に示すように、EPNのチャネルを使用してプロセッサをキャッシュに接続します(つまり、プロセッサとキャッシュ・シンクの間にチャネルが必要です)。
この場合、アプリケーション・アセンブリ・ファイルは次のようになります。
<wlevs:channel id="channel1" event-type="StockTick">
<wlevs:listener ref="processor" />
</wlevs:channel>
<wlevs:processor id="processor">
<wlevs:listener ref="channel2" />
</wlevs:processor>
<wlevs:channel id="channel2" event-type="StockTick">
<wlevs:listener ref="cache-id" />
</wlevs:channel>
SELECT S1.symbol, S1.lastPrice, stockCache.description FROM S1 [Now], stockCache WHERE S1.symbol = stockCache.symbol
アダプタも、他のBeanを参照するための標準Springメカニズムを使用してキャッシュに組み込むことができます。キャッシュBeanは、java.util.Map
インタフェースを実装します。このインタフェースは、組み込まれたキャッシュにアクセスするためにアダプタが使用します。
まず、EPNアセンブリ・ファイル内のアダプタの構成は、次の例で示すように、wlevs:instance-property
子要素を使用して更新する必要があります。
<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id" name="alternative-cache-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
...
<wlevs:adapter id="myAdapter" provider="myProvider">
<wlevs:instance-property name="map" ref="cache-id"/>
</wlevs:adapter>
この例では、wlevs:instance-property
のref
属性はwlevs:cache
要素のid
値を参照します。Oracle Stream Analyticsは、自動的にアダプタにキャッシュ(java.util.Map
として実装)を組み込みます。
アダプタのJavaソースでは、アダプタによるキャッシュ処理の機能を実装するコードを持つsetMap
(Map)
メソッドを追加します。
package com.bea.wlevs.example;
…
import java.util.Map;
public class MyAdapter implements Runnable, Adapter, EventSource, SuspendableBean {
...
public void setMap (Map map) {...}
}
EPNアセンブリ・ファイルで標準のSpring Beanとして構成されたビジネスPOJOでは、他のBeanを参照するための標準のSpringメカニズムを使用して、キャッシュを挿入することができます。この方法によって、POJOではキャッシュを表示および操作できます。キャッシュBeanでは、ビジネスPOJOが挿入されたキャッシュにアクセスするために使用するjava.util.Map
インタフェースが実装されます。キャッシュBeanではjava.util.Map
のベンダー固有のサブインタフェースを実装することもできますが、移植性のためにMap
を実装することをお薦めします。
まず、EPNアセンブリ・ファイル内のビジネスPOJOの構成は、FX例(Oracle Stream Analyticsイベント処理スタート・ガイドの外国為替(FX)の例に関する項を参照)のOutput Beanに基づいた次の例で示されるように、property
子要素を使用して更新する必要があります。
<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id" name="alternative-cache-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
...
<bean class="com.bea.wlevs.example.helloworld.HelloWorldBean">
<property name="map" ref="cache-id"/>
</bean>
この例では、property
要素のref
属性はwlevs:cache
要素のid
値を参照します。Oracle Stream Analyticsは、自動的にビジネスPOJO Beanにキャッシュ(java.util.Map
として実装)を組み込みます。
ビジネスPOJO BeanのJavaソースでは、POJOによるキャッシュ処理の機能を実装するコードを持つsetMap
(Map)
メソッドを追加します。
package com.bea.wlevs.example.helloworld;
…
import java.util.Map;
public class HelloWorldBean implements EventSink {
...
public void setMap (Map map) {...}
}
標準イベント・ストリームに加えて、Oracle CQLルールはユーザー定義関数のメンバー・メソッドも起動できます。
これらのユーザー定義関数は標準Javaクラスとして実装され、次の例で示すように、Oracle CQLプロセッサのコンポーネント構成ファイル内で宣言されます。
<bean id="orderFunction" class="orderFunction-impl-class"/>
関連するOracle CQLルールが実行されるプロセッサは、ref
属性を持つSpring Beanを参照し、wlevs:function
子要素を使用してユーザー定義関数に組み込まれる必要があります。
<wlevs:processor id= "tradeProcessor">
<wlevs:function ref="orderFunction"/>
</wlevs:processor>
また、wlevs:function
要素でBeanクラスを指定できます。
<wlevs:processor id="testProcessor"> <wlevs:listener ref="providerCache"/> <wlevs:listener ref="outputCache"/> <wlevs:cache-source ref="testCache"/> <wlevs:function function-name="mymod" exec-method="execute" /> <bean class="com.bea.wlevs.example.function.MyMod"/> </wlevs:function> </wlevs:processor>
次のOracle CQLルールは、tradeProcessor
プロセッサに構成されると仮定され、orderFunction
ユーザー定義関数のexistsOrder
メソッドを起動する方法を示します。
INSERT INTO InstitutionalOrder
SELECT er.orderKey AS key, er.symbol AS symbol, er.shares as cumulativeShares
FROM ExecutionRequest er [Range 8 hours]
WHERE NOT orderFunction.existsOrder(er.orderKey)
別のBeanを参照する標準Springメカニズムを使用してキャッシュを持つ関数を組み込むことによって、キャッシュにアクセスするためのユーザー定義関数を構成することもできます。キャッシュBeanは、java.util.Map
インタフェースを実装します。このインタフェースは、組み込まれたキャッシュにアクセスするためにユーザー定義関数が使用します。
まず、EPNアセンブリ・ファイル内のユーザー定義関数の構成は、次の例で示すように、wlevs:property
子要素を使用して更新する必要があります。
<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id" name="alternative-cache-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
...
<bean id="orderFunction" class="orderFunction-impl-class">
<wlevs:property name="cache" ref="cache-id"/>
</bean>
この例では、wlevs:property
要素のref
属性はwlevs:cache
要素のid
値を参照します。Oracle Event Processingは、自動的にユーザー定義関数にキャッシュ(java.util.Map
として実装)を組み込みます。
ユーザー定義関数のJavaソースでは、関数によるキャッシュ処理の機能を実装するコードを持つsetMap
(Map)
メソッドを追加します。
package com.bea.wlevs.example;
…
import java.util.Map;
public class OrderFunction {
...
public void setMap (Map map) {...}
}
ユーザー定義関数の詳細は、『Oracle CQL言語リファレンス』のユーザー定義に関する項を参照してください。
実行時、Oracle Stream Analyticsがキャッシング・システムと定義するキャッシュにデプロイするMBeansおよびJMXを使用して、キャッシュにプログラムからアクセスできます。詳細は、Oracle Stream Analyticsの管理のJMXに関する項を参照してください。
JMXを使用してキャッシング・システムまたはキャッシュにアクセスする最も単純かつエラーの少ない方法は、Oracle Stream Analytics Visualizerを使用することです。詳細は、『Oracle Stream Analyticsビジュアライザの使用』のJMX管理に関する項を参照してください。
JMXを使用してキャッシング・システムまたはキャッシュにアクセスする最も単純かつエラーの少ない方法は、Oracle Stream Analytics Visualizerを使用することです(Oracle Stream Analytics Visualizerを使用したJMXによるキャッシュへのアクセス方法を参照)。あるいは、書き込まれるJavaコードを使用したJMXによるキャッシング・システムまたはキャッシュにアクセスできます。
Oracle Stream Analyticsにより、アプリケーションでステージとして使用するキャッシュごとにStageMBean
が作成されます。このMBeanのType
はStage
です。
Javaを使用したJMXでキャッシュにアクセスするには: