You can configure a caching system so that applications have ready access to event data. The caches in the system can be a combination of Oracle Coherence distributed caching, Oracle Stream Analytics local caching, and caching solutions provided by third parties. You can access the events in the caches with Oracle CQL and Java classes.
This chapter includes the following sections:
A cache is a temporary storage area for event data. To increase the availability of event data and to increase application performance, you can create a cache so that applications can publish to or consume events from the cache.
An application can also access the processed event data written to the cache by other applications.
You can configure any stage in an Oracle Stream Analytics application that generates events to publish its events to the cache. A cache does not have to be a stage in the EPN. Another component or Spring bean can access events in the cache programmatically with the caching APIs.
A caching system is a configured instance of a caching implementation. A caching system defines a named set of configured caches and the configuration for remote communication when any of the caches are distributed across multiple machines.
Oracle Stream Analytics caching enables an application to perform the following tasks. All of these tasks happen incrementally without halting the application or causing latency spikes.
Pre-load a cache with event data before an application is deployed.
Periodically refresh, invalidate, and flush the event data in a cache.
Dynamically update a cache configuration.
Oracle Stream Analytics supports the following caching implementations:
Oracle Stream Analytics local cache: a local, in-memory single-JVM cache. This implementation is best for local use (it cannot be used in a cluster). It might also be useful for development in the early stages because it is relatively simple to set up.
Oracle Coherence: a JCache-compliant in-memory distributed data grid solution for clustered applications and application servers. It coordinates updates to the data using cluster-wide concurrency control, replicates data modifications across the cluster using the highest performing clustered protocol available, and delivers notifications of data modifications to any servers that request them. You take advantage of Oracle Coherence features using the standard Java collections API to access and modify data, and use the standard JavaBean event model to receive data change notifications.
Note:
Before you can use Oracle Stream Analytics with Oracle Coherence, you must obtain a valid Oracle Coherence license such as a license for Coherence Enterprise Edition, Coherence Grid Edition, or Oracle WebLogic Application Grid.
For more information on Oracle Coherence, see http://docs.oracle.com/middleware/1213/coherence/index.html
.
Third-party caches: you can create a plug-in to allow Oracle Stream Analytics to work with other, third-party cache implementations.
Caching technology is a great fit for streaming data use cases, where high throughput can be particularly important. Getting data from a cache is usually much faster than getting the same data from a relational database.
The following scenarios describe common use cases for caching in Oracle Stream Analytics applications.
Publish events to a cache
A financial application publishes events to a cache while the financial market is open, and then processes data in the cache after the market closes. Publishing events to a cache makes them available to the application or available to other Oracle Stream Analytics applications running in the server. Publishing events to a cache also allows for asynchronous writes to a secondary storage by the cache implementation.
Consume data from a cache
Oracle Stream Analytics applications sometimes need to access non-streaming data. By caching this data, you can increase the performance of the application. The standard components of an Oracle Stream Analytics application that are allowed direct programming access to a cache are input- and output-adapters and business POJOs.
Additionally, applications can access a cache from Oracle CQL either by a user-defined function or directly from an Oracle CQL statement. In the case of a user-defined function, programmers use Spring to inject the cache resource into the implementation of the function. For more information, see Application and Resource Configuration.
Applications can also query a cache directly from Oracle CQL statements that run in a processor. In this case, the cache functions as another type of data source to a processor so that querying a cache is similar to querying a channel except that data is pulled from a cache.
An example of using Oracle CQL to query a cache is from a financial application that publishes orders and the trades used to execute the orders to a cache. At the end of the day when the markets close, the application queries the cache to find all the trades related to a particular order.
Update and delete data in a cache
An Oracle Stream Analytics application can update and delete data in a cache when required. For example, a financial application might need to update an order in the cache each time individual trades that fulfill the order are executed, or an order might need to be deleted if it has been cancelled. The components of an application that are allowed to consume data from a cache are also allowed to update it.
Use a cache in a multiserver domain
If you build an Oracle Stream Analytics application that uses a cache, and you plan to deploy that application in a multiserver domain, then you must use a caching system that supports a distributed cache. In this case, you must use either Oracle Coherence or a third-party caching system that supports a distributed cache.
For more information, see:
You can configure your application to use the Oracle Coherence caching system and cache. Use this caching system if you plan to deploy your application to a multiserver domain.
When you configure with Oracle Coherence, only the first caching-system can be configured in a server. The Oracle Stream Analytics server ignores other caching systems that you have configured.
Note:
Before you can legally use Oracle Stream Analytics with Oracle Coherence, you must obtain a valid Coherence license such as a license for Coherence Enterprise Edition, Coherence Grid Edition, or Oracle WebLogic Application Grid.
For more information on Oracle Coherence, see http://docs.oracle.com/middleware/1213/coherence/index.html
.
The following assembly and configuration file settings configure an Oracle Coherence caching system and cache for an Oracle CQL processor. The cache uses an event type to specify the key properties for locating table rows in the relational database. This caching system is advertised, which means other applications can access the data in its caches.
The assembly file settings configure the caching system and cache1. The value-type
setting is the event type into which you want to load the database values. This cache is advertised.
<wlevs:cache id="cache1" value-type="TradeReport" advertise="true"> <wlevs:caching-system ref="coherence-caching-system"/> </wlevs:cache> <wlevs:caching-system id="coherence-caching-system" provider="coherence"/>
Note:
When you change the id
setting for a coherence cache in the EPN diagram, the id
changes in the assembly file and in the coherence-cache- file. However, if you change the id
setting in the assembly file source editor, the id
changes in the assembly file only. In this case, you must manually change the cache-name
setting in the coherence-cache-
to match the id setting in the assembly file. You also have to change all references to that cache.
When the cache is advertised, a component in the EPN of an application in a separate bundle can reference the advertised cache. The following example shows how a processor in one bundle can use the cache-source
element to reference a cache source in another bundle with a cache-id
of cacheprovider
:
<wlevs:processor id="myProcessor2"> <wlevs:cache-source ref="cacheprovider:cache-id"> </wlevs:processor>
Note:
When you have Oracle Coherence caches in the EPN assembly files of one or more applications deployed to the same Oracle Stream Analytics server, never configure multiple instances of the same cache with a loader or a store.
You can inadvertently do this by employing multiple applications that each configure the same Oracle Coherence cache with a loader or store in their respective EPN assembly file. If you configure multiple instances of the same cache with a loader or a store, Oracle Stream Analytics throws an exception.
The coherence-cache-config.xml
file is the basic Oracle Coherence configuration file and must conform to the Oracle Coherence DTDs, as is true for any Oracle Coherence application.
See the Oracle Coherence documentation for information about coherence-cache-config.xml
: http://docs.oracle.com/middleware/1213/coherence/index.html
.
An Oracle Stream Analytics Oracle Coherence factory must be declared when you use Spring to configure a loader or store for a cache. You specify the factory with the cachestore-scheme
element and include a factory class that enables Oracle Coherence to call into Oracle Stream Analytics and retrieve a reference to the loader or store that is configured for the cache. The only difference between configuring a loader or store is that the method-name
element has a value of getLoader
when a loader is used and getStore
when a store is being used. You pass the cache name to the factory as an input parameter.
<cache-config> <caching-scheme-mapping> <cache-mapping> <cache-name>myCoherenceCache</cache-name> <scheme-name>new-replicated</scheme-name> </cache-mapping> <cache-mapping> <cache-name>myLoaderCache</cache-name> <scheme-name>test-loader-scheme</scheme-name> </cache-mapping> <cache-mapping> <cache-name>myStoreCache</cache-name> <scheme-name>test-store-scheme</scheme-name> </cache-mapping> <cache-mapping> <cache-name> cache1 </cache-name> <scheme-name> new-replicated </scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <replicated-scheme> <scheme-name>new-replicated</scheme-name> <service-name>ReplicatedCache</service-name> <backing-map-scheme> <class-scheme> <scheme-ref>my-local-scheme</scheme-ref> </class-scheme> </backing-map-scheme> </replicated-scheme> <class-scheme> <scheme-name>my-local-scheme</scheme-name> <class-name>com.tangosol.net.cache.LocalCache</class-name> <eviction-policy>LRU</eviction-policy> <high-units>100</high-units> <low-units>50</low-units> </class-scheme> <local-scheme> <scheme-name>test-loader-scheme</scheme-name> <eviction-policy>LRU</eviction-policy> <high-units>100</high-units> <low-units>50</low-units> <!-- A cachestore-scheme element that gets a loader starts here --> <cachestore-scheme> <class-scheme> <class-factory-name>com.bea.wlevs.cache.coherence.configuration.SpringFactory </class-factory-name> <method-name>getLoader</method-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>myCoherenceCache</param-value> </init-param> <init-param> <param-type> java.lang.String </param-type> <param-value> cache1 </param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> <!-- The cachestore-scheme element ends here --> </local-scheme> <local-scheme> <scheme-name>test-store-scheme</scheme-name> <eviction-policy>LRU</eviction-policy> <high-units>100</high-units> <low-units>50</low-units> <!-- A cachestore-scheme element that gets a store starts here --> <cachestore-scheme> <class-scheme> <class-factory-name>com.bea.wlevs.cache.coherence.configuration.SpringFactory </class-factory-name> <method-name>getStore</method-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>myCoherenceCache</param-value> </init-param> <init-param> <param-type> java.lang.String </param-type> <param-value> cache1 </param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> <!-- The cachestore-scheme element ends here --> </local-scheme> </caching-schemes> </cache-config>
tangosol-coherence-override.xml File (optional)
The tangosol-coherence-override.xml
file is a global per-server file. It contains what is referred to as the operational configuration in the Oracle Coherence documentation. This file contains global, server-wide configuration settings for Oracle Coherence caching. You create this file in an XML editor and put it in the Oracle Stream Analytics server config
directory for the server you want to configure.
Note:
Do not include the tangosol-coherence-override.xml
file when you use Oracle Coherence for clustering.
Add the following XML to the Oracle Coherence configuration file to reference the tangosol-coherence-override.xml
file. Include the cluster-name
element to prevent Oracle Coherence from attempting to join existing Oracle Coherence clusters when Oracle Stream Analytics starts up. This can cause problems and sometimes prevent Oracle Stream Analytics from starting.
...
<coherence xml-override="/tangosol-coherence-override.xml">
<cluster-config>
<member-identity>
<cluster-name>com.bea.wlevs.example.provider</cluster-name>
</member-identity>
...
</coherence>
For more information about Oracle Stream Analytics clusters, see Native Clusters in Oracle Stream Analytics in Administering Oracle Stream Analytics.
The com.oracle.cep.cacheloader
package provides the CsvCacheLoader
class for loading CSV events into a Coherence cache. You use a cache loader with an inbound adapter by replacing the sourceUrl
property.
The first assembly file CSV adapter configuration shows a CSV inbound adapter that loads a file with the sourceUrl property. The second assembly file CSV adapter entry shows a CSV inbound adapter that loads a cache loader bean.
Load Events in a CSV file
<wlevs:adapter id="StockTradeCSVInboundAdapter" provider="csv-inbound"> <wlevs:listener ref="AdapterOutputChannel"/> <wlevs:instance-property name="eventType" value="TradeEvent"/> <wlevs:instance-property name="sourceUrl" value="file:/scratch/mpawlan/oep9-19/oep/utils/load-generator/StockData.csv"/> </wlevs:adapter>
Load Events with a Cache Loader
<wlevs:cache id="csvcache" key-properties="sequenceNo" value-type="TradeEvent" advertise="true"> <wlevs:caching-system ref="cachesys" /> </wlevs:cache> <bean id="csvloader" class="com.oracle.cep.cacheloader.CsvCacheLoader"> <property name="cacheName" value="csvcache"/> <property name="sourceUrl" value="file:///scratch/juhe/view_storage/trade.csv"/> </bean>
You can configure your application to use the Oracle Stream Analytics local caching system and cache. The Oracle Stream Analytics local caching system is appropriate when you do not plan to deploy your application to a multiserver domain.
If you plan to deploy your application to a multiserver domain, use an Oracle Coherence cache.
This chapter describes some of the configuration settings. For complete information, see Component Configuration Schema and Coherence Caching System in Schema Reference for Oracle Stream Analytics.
The following assembly file settings configure the local caching system and cache. The value-type
setting is the event type into which you want to load the database values.
<wlevs:cache id="localcache" value-type="HelloWorldEvent"> <wlevs:caching-system ref="caching-system"/> </wlevs:cache> <wlevs:caching-system id="caching-system" provider="wlevs" advertise="false"/>
The following configuration file settings specify a maximum size and eviction policy for the local caching system.The maximum size specifies the number of cache elements in memory after which the eviction policy occurs. The example also specifies the maximum amount of time in milliseconds that an entry is cached. Default time-to-live
value is infinite. This example specifies 3600 milliseconds.
<caching-system> <name>caching-system</name> <cache> <name>localcache</name> <max-size>64</max-size> <eviction-policy>LFU</eviction-policy> <time-to-live>3600</time-to-live> </cache> </caching-system>
The following configuration file settings add a write-behind
element as a child element of cache. The write-behind
element means Oracle Stream Analytics invokes the cache store from a separate thread after a create or update of a cache entry. The child elements of write-behind indicate the following:
The number of updates that are picked up from the store buffer to write back to the backing store (batch-size). The default value is 100.
The number of attempts that the user thread makes to write to the store buffer. The user thread is the thread that creates or updates a cache entry. If all attempts by the user thread to write to the store buffer fail, it will invoke the store synchronously (batch-write-attempts
). The default value is 1.
The time in milliseconds the user thread waits before aborting an attempt to write to the store buffer (buffer-write-timeout
). The attempt to write to the store buffer fails only when the buffer is full. After the time out, further attempts can be made to write to the buffer based on the value of buffer-write-attempts
. The default value is 100.
<caching-system> <name>caching-system-id</name> <cache> <name>cache-id</name> <max-size>100000</max-size> <eviction-policy>LRU</eviction-policy <time-to-live>3600</time-to-live> <write-behind> <buffer-size>200</buffer-size> <buffer-write-attempts>2</buffer-write-attempts> <buffer-write-timeout>200</buffer-write-timeout> </write-behind> </cache> </caching-system>
The following configuration file settings add a listeners
child element to configure the behavior of components that listen to the cache. The listener
element has an asynchronous
attribute that you can set to either true
(listeners are invoked asynchronously) or false
(listeners are invoked synchronously).
The work-manager-name
child element specifies the work manager to use to asynchronously invoke listeners. This value is ignored if synchronous invocations are enabled. If a work manager is specified for the cache, this value overrides that setting for invoking listeners only. The value of the work-manager-name
element corresponds to the name
element of the work-manager
setting in the Oracle Stream Analytics config.xml
server configuration file.
<caching-system> <name>caching-system-id</name> <cache> <name>cache-id</name> <max-size>100000</max-size> <eviction-policy>LRU</eviction-policy <time-to-live>3600</time-to-live> <write-behind> <buffer-size>200</buffer-size> <buffer-write-attempts>2</buffer-write-attempts> <buffer-write-timeout>200</buffer-write-timeout> </write-behind> <listeners asynchronous="true"> <work-manager-name>cachingWM</work-manager-name> </listeners> </cache> </caching-system>
You can configure a cache to receive events as they pass through the network. For example, to specify that a cache listens to a channel, configure the channel with a wlevs:listener
element that has a reference to the cache.
In the following example, as the channel sends new events to the cache, the events are inserted into the cache. If the channel sends a remove event (an old event that exits the output window), then the event is removed from the cache.
<wlevs:caching-system id="caching-system-id"/>
<wlevs:cache id="cache-id" name="alternative-cache-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
<wlevs:channel id="tradeStream">
<wlevs:listener ref="cache-id"/>
</wlevs:channel>
The following sections describe the options available to you to specify the key that is used to index the cache.
When you do not explicitly specify a key, the event object serves as both the key and value when the event is inserted into the cache. In this case, the event class must include a valid implementation of the equals
and hashcode
methods that take into account the values of the key properties.
Specify a property name for the key property in the assembly file with the key-properties
attribute, as shown in the following example:
<wlevs:cache id="myCache" key-properties="key-property-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
In this case, all events that are inserted into the cache are required to have a property of this name at runtime, otherwise Oracle Stream Analytics throws an exception. For example, assume the event type being inserted into the cache looks something like the following; note the key
property (only relevant Java source shown):
public class MyEvent { private String key; public MyEvent() {} public MyEvent(String key) { this.key = key; } public String getKey() { return key;} public void setKey(String key) { this.key = key;} }
The corresponding declaration in the assembly file looks like the following:
<wlevs:cache id="myCache" key-properties="key">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
you can use the metadata annotation com.bea.wlevs.ede.api.Key
to annotate the event property in the Java class that implements the event type. This annotation does not have any attributes.
To use a metadata annotation to specify a key:
You can use the key-class
attribute of the wlevs:cache
element to specify a composite key in which multiple properties form the key. The value of the key-class
attribute must be a JavaBean
class with public fields that match the fields of the event class. The JavaBean
class must override the equals
and hashCode
methods from the java.lang.Object class
. The matching is done according to the field name. For example:
<wlevs:cache id="myCache" key-class="key-class-name"> <wlevs:caching-system ref="caching-system-id"/> </wlevs:cache>
For a cache with a composite key composed of key-field1 and key-field2, you can execute both of the following queries:
SELECT stream.field2, cache.key-field1 from stream[NOW], cache WHERE stream.field2=cache.key-field1 AND stream.field2=cache.key-field2 SELECT stream.field1, cache.key-field1 from stream[NOW], cache WHERE stream.field1=cache.key-field1
You can configure a cache as an event source. To use a cache as an event source, you need to implement the com.bea.wlevs.ede.api.StreamSink
interface.
The configuration follows:
<wlevs:cache id="cache-id" name="alternative-cache-name" caching-system="caching-system-id"> <wlevs:listener ref="cache-listener-id" /> </wlevs:cache>
You can configure a cache as a source of events to which another component in the event processing network listens. The listening component can be an adapter or a bean.
A class that listens to a cache must implement an interface that provides methods for receiving events, as follows:
A class that listens to a Coherence cache must implement the com.tangosol.util.MapListener
interface.
A class that listens to an Oracle Stream Analytics local cache must implement the com.bea.cache.jcache.CacheListener
interface.
<wlevs:caching-system id="caching-system-id"/> ... <wlevs:cache id="cache-id" name="alternative-cache-name"> <wlevs:caching-system ref="caching-system-id"/> <wlevs:cache-listener ref="cache-listener-id" /> </wlevs:cache> ... <bean id="cacheListenerId" class="com.bea.wlevs.example.provider.coherence"/>
In the example, the cacheListenerId
Spring bean listens to events coming from the cache. In this case, the user-defined class that implements this component, com.bea.wlevs.example.MyCacheListener
, is listening to an Oracle Coherence cache. It must implement the appropriate Oracle Coherence-specific Java interfaces, including com.tangosol.util.MapListener
. The following example illustrates this implementation.
package com.bea.wlevs.example.provider.coherence; import com.tangosol.util.MapEvent; import com.tangosol.util.MapListener; public class LocalListener implements MapListener { public static int deleted = 0; public static int inserted = 0; public static int updated = 0; public void entryDeleted(MapEvent event) { deleted++; } public void entryInserted(MapEvent event) { inserted++; } public void entryUpdated(MapEvent event) { updated++; } }
You can configure your application to use a third-party caching system and cache.
Configure a Third-Party Caching System and Cache
You can have a cache in an EPN exchange data with another data source, including a database. For example, you can load a cache with data when the application starts or create a read/write relationship between the cache and a database.
If the cache will only be reading data, including when the backing store is read-only, you should use a cache loader. If the cache will read and write data, use a cache store. In both cases, creating the relationship involves specific configuration and a Java class that knows how to communicate with the data source.
Using a cache loader, you can have a cache in your EPN load data from a read-only data source. A cache loader is a Java class that loads cache objects into a cache. You create a cache loader by writing a Java class that implements the appropriate interfaces to enable the loader class to communicate with the cache. Then you configure a cache loader by using the wlevs:cache-loader
child element of the wlevs:cache
element to specify the bean that does the loading work.
If the backing store is read-write, use a cache store instead (see Exchange Data with a Read-Write Data Source).
When creating a cache loader, you implement interfaces as follows:
To load cache data into an Oracle Coherence cache, create a class that implements the appropriate Oracle Coherence-specific Java interfaces, including com.tangosol.net.cache.CacheLoader
. See Example 8-2 for an example.
To load cache data into an Oracle Stream Analytics local cache, create a class that implements com.bea.cache.jcache.CacheLoader
interface. This interface includes the load
method to customize loading a single object into the cache; Oracle Stream Analytics calls this method when the requested object is not in the cache. The interface also includes loadAll
methods that you implement to customize the loading of the entire cache.
In Example 8-1, the localLoader
bean loads events into an Oracle Coherence cache when the backing store is read-only.
When working with a Coherence cache, note that if you specify a cache loader in your configuration file, you must also specify the corresponding class factory method name in your Coherence cache configuration file. For a cache loader, you specify the getLoader
method of com.bea.wlevs.cache.coherence.configuration.SpringFactory
. For example code, see Configure an Oracle Coherence Caching System and Cache.
Example 8-1 Oracle Coherence Cache EPN Assembly File for a Cache Loader
<wlevs:caching-system id="caching-system-id"/> <wlevs:cache id="myCache" advertise="false"> <wlevs:caching-system ref="caching-system-id"/> <wlevs:cache-loader ref="localLoader"/> </wlevs:cache> <bean id="localLoader" class="com.bea.wlevs.example.provider.coherence.LocalLoader"/>
Example 8-2 Oracle Coherence Cache LocalLoader Implementation
package com.bea.wlevs.example.provider.coherence; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import com.bea.wlevs.example.provider.event.ProviderData; import com.tangosol.net.cache.CacheLoader; public class LocalLoader implements CacheLoader { public static int loadCount = 0; public static Set keys = new HashSet(); public LocalLoader() { } public Object load(Object key) { loadCount++; keys.add(key); return new ProviderData((String) key); } public Map loadAll(Collection keys) { Map result = new HashMap(); for (Object key : keys) { result.put(key, load(key)); } return result; } }
Using a cache store, you can have a cache in your EPN exchange data with a read-write data source. A cache store is a Java class that exchanges cache objects with a cache. You create a cache store by writing a Java class that implements the appropriate interfaces to enable it to communicate with the data source. Then you add the cache store to the EPN by using the wlevs:cache-store
child element of the wlevs:cache
element to specify the bean that communicates with the data source.
If the backing store is read-only, use a cache loader instead (see Load Cache Data from a Read-Only Data Source).
When creating a cache store, you implement interfaces as follows:
To exchange cache data with an Oracle Coherence cache, create a class that implements the appropriate Oracle Coherence-specific Java interfaces, including com.tangosol.net.cache.CacheStore
. See Example 8-4for an example.
To exchange cache data with an Oracle Stream Analytics local cache, create a class that implements the com.bea.cache.jcache.CacheStore
interface. This interface includes the store
method that stores the data in the backing store using the passed key; Oracle Stream Analytics calls this method when it inserts data into the cache. The interface also includes the storeAll
method for storing a batch of data to a backing store in the case that you have configured asynchronous writes for a cache with the write-behind
configuration element.
In Example 8-3, the localStore
bean loads events into the cache when the backing store is read-write.
Note that if you specify a cache store in your Spring configuration file, you must also specify the corresponding class factory method name in your Coherence cache configuration file. For a cache store, you specify the getStore
method of com.bea.wlevs.cache.coherence.configuration.SpringFactory
. For example code, see Configure an Oracle Coherence Caching System and Cache.
Example 8-3 Oracle Coherence Cache EPN Assembly File for a Cache Store
<wlevs:caching-system id="caching-system-id"/> <wlevs:cache id="myCache" advertise="false"> <wlevs:caching-system ref="caching-system-id"/> <wlevs:cache-store ref="localStore"/> </wlevs:cache> <bean id="localStore" class="com.bea.wlevs.example.provider.coherence.LocalStore"/>
Example 8-4 Oracle Coherence Cache LocalStore Implementation
package com.bea.wlevs.example.provider.coherence; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; import com.bea.wlevs.example.provider.event.ProviderData; import com.tangosol.net.cache.CacheStore; public class LocalStore implements CacheStore { public static int eraseCount = 0; public static int storeCount = 0; public static int loadCount = 0; public void erase(Object key) { eraseCount++; } public void eraseAll(Collection keys) { for (Object key : keys) { erase(key); } } public void store(Object key, Object value) { // // Do the store operation here. // } public void storeAll(Map entries) { for (Map.Entry entry : (Set <Map.Entry>)entries.entrySet()) { store(entry.getKey(), entry.getValue()); } } public Object load(Object key) { loadCount++; return new ProviderData((String) key); } public Map loadAll(Collection keys) { Map result = new HashMap(); for (Object key : keys) { result.put(key, load(key)); } return result; } }
Once you have configured a cache, you can access the cache from several components in an Oracle Stream Analytics application.
This section describes how to do that.
For more information, see the following sections:
Before you assemble and deploy the application, edit your META-INF/MANIFEST.MF
file to import packages that are required in your implementation. For example, if your application implements cache listeners, loaders or stores, your manifest should import com.tangosol.net.cache
packages, which contain the Coherence APIs.
Oracle Stream Analytics provides caching APIs that you can use in your application to perform certain tasks. The APIs are in the com.bea.cache.jcache
package, which includes the APIs used to access a cache and create cache loader, listeners, and stores. If you want to use the loader, listener, and store functionality, then import the com.tangosol.net
and com.tangosol.net.cache
packages.
You create, configure, and wire caching systems and caches with the EPN assembly file and component configuration files. This means that you typically never explicitly use the Cache
and CachingSystem
interfaces in your application. The only reason to use them is when you have additional requirements over the standard configuration. For example, if you want to provide integration with a third-party cache provider, then you must use the CachingSystem
interface. If you want to perform operations on a cache that are not part of the java.util.Map
interface, then you can use the Cache
interface.
If you create cache listeners, loaders, or stores for an Oracle Stream Analytics local cache, then the beans you write must implement the CacheListener
, CacheLoader
, or CacheStore
interfaces.
If you create cache listeners, loaders, or stores for an Oracle Coherence cache, then the beans you write must implement the appropriate Oracle Coherence interfaces.
If you create cache listeners, loaders, or stores for a third-party cache, then the beans you write must implement the appropriate third-party cache interfaces.
You can reference a cache from an Oracle CQL statement in much the same way you reference an event source such as a channel; this feature enables you to enrich standard streaming data with data from a separate source. The code in the following example shows a valid Oracle CQL query that joins trade events from a standard channel named S1
with stock symbol data from a cache named stockCache
.
You must abide by these restrictions when using a cache in an Oracle CQL query:
Whenever you query a cache, you must join against the [Now]
window.
This guarantees that the query will execute against a snapshot of the cache. If you join against any other window type, then if the cache changes before the window expires, the query will be incorrect.
The following example shows an invalid Oracle CQL query that joins a Range
window against a cache. If the cache changes before this window expires, the query will be incorrect. Consequently, this query will raise Oracle Stream Analytics server error “external relation must be joined with s[now]
."
SELECT trade.symbol, trade.price, trade.numberOfShares, company.name FROM TradeStream [Range 8 hours] as trade, CompanyCache as company WHERE trade.symbol = company.id
When you use data from a cache in an Oracle CQL query, Oracle Stream Analytics pulls the data rather than it being pushed, as is the case with a channel. This means that, continuing with the query executes only when a channel pushes a trade
event to the query; the stock symbol data in the cache never causes a query to execute, it is only pulled by the query when needed.
You must specify the key property needed to do a lookup based on the cache key.
Consider two streams S
and C
with schemas (id
, group
, value
) where the cache key is id
. A valid query is:
select count(*) as n from S [now], C where S.id = C.id
Joins must be executed only by referencing the cache key.
You cannot use a cache in a view. Instead, use a join.
Only a single channel source may occur in the FROM
clause of an Oracle CQL statement that joins cache data source(s).
If the cache is a processor source, you connect the cache directly to the channel on the EPN.
If the cache is a processor sink, it can be connected directly to a processor.
Access a Cache from an Oracle CQL Statement
This procedure assumes that you have already configured the caching system and caches. For more information, see:
If you have not already done so, create the event type that corresponds to the cache data and register it in the event repository.
Specify the key properties for the data in the cache.
In the EPN assembly file, update the configuration of the cache to declare the event type of its values; use the value-type
attribute of the wlevs:cache
element. For example:
<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id"
name="alternative-cache-name"
value-type="CompanyEvent">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
The value-type attribute specifies the type for the values contained in the cache. This must be a valid type name in the event type repository.
This attribute is required only if the cache is referenced in an Oracle CQL query. This is because the query processor needs to know the type of events in the cache.
In the EPN assembly file, update the configuration of the processor that executes the Oracle CQL query that references a cache:
If the cache is a processor source: you connect the cache directly to the processor on the EPN as Figure 8-1 shows.
Update the wlevs:processor
element a wlevs:cache-source
child element that references the cache. For example:
<wlevs:channel id="S1"/>
<wlevs:processor id="cacheProcessor">
<wlevs:source ref="S1">
<wlevs:cache-source ref="cache-id">
</wlevs:processor>
In the example, the processor will have data pushed to it from the S1
channel as usual; however, the Oracle CQL queries that execute in the processor can also pull data from the cache-id
cache. When the query processor matches an event type in the FROM
clause to an event type supplied by a cache, such as CompanyEvent
, the processor pulls instances of that event type from the cache.
If the cache is a processor sink: you must connect the processor to the cache using a channel on the EPN (that is, there must be a channel between the processor and the cache sink) as Figure 8-2 shows.
In this case, the application assembly file looks like this:
<wlevs:channel id="channel1" event-type="StockTick">
<wlevs:listener ref="processor" />
</wlevs:channel>
<wlevs:processor id="processor">
<wlevs:listener ref="channel2" />
</wlevs:processor>
<wlevs:channel id="channel2" event-type="StockTick">
<wlevs:listener ref="cache-id" />
</wlevs:channel>
SELECT S1.symbol, S1.lastPrice, stockCache.description FROM S1 [Now], stockCache WHERE S1.symbol = stockCache.symbol
An adapter can also be injected with a cache using the standard Spring mechanism for referencing another bean. A cache bean implements the java.util.Map
interface which is what the adapter uses to access the injected cache.
First, the configuration of the adapter in the EPN assembly file must be updated with a wlevs:instance-property
child element, as shown in the following example:
<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id" name="alternative-cache-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
...
<wlevs:adapter id="myAdapter" provider="myProvider">
<wlevs:instance-property name="map" ref="cache-id"/>
</wlevs:adapter>
In the example, the ref
attribute of wlevs:instance-property
references the id
value of the wlevs:cache
element. Oracle Stream Analytics automatically injects the cache, implemented as a java.util.Map
, into the adapter.
In the adapter Java source, add a setMap
(Map)
method with the code that implements whatever you want the adapter to do with the cache:
package com.bea.wlevs.example;
…
import java.util.Map;
public class MyAdapter implements Runnable, Adapter, EventSource, SuspendableBean {
...
public void setMap (Map map) {...}
}
A business POJO, configured as a standard Spring bean in the EPN assembly file, can be injected with a cache using the standard Spring mechanism for referencing another bean. In this way the POJO can view and manipulate the cache. A cache bean implements the java.util.Map
interface which is what the business POJO uses to access the injected cache. A cache bean can also implement a vendor-specific sub-interface of java.util.Map
, but for portability it is recommended that you implement Map
.
First, the configuration of the business POJO in the EPN assembly file must be updated with a property
child element, as shown in the following example based on the Output bean of the FX example (see Foreign Exchange (FX) Example in Getting Started with Event Processing for Oracle Stream Analytics):
<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id" name="alternative-cache-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
...
<bean class="com.bea.wlevs.example.helloworld.HelloWorldBean">
<property name="map" ref="cache-id"/>
</bean>
In the example, the ref
attribute of the property
element references the id
value of the wlevs:cache
element. Oracle Stream Analytics automatically injects the cache, implemented as a java.util.Map
, into the business POJO bean.
In the business POJO bean Java source, add a setMap
(Map)
method with the code that implements whatever you want the POJO to do with the cache:
package com.bea.wlevs.example.helloworld;
…
import java.util.Map;
public class HelloWorldBean implements EventSink {
...
public void setMap (Map map) {...}
}
In addition to standard event streams, Oracle CQL rules can also invoke the member methods of a user-defined function.
These user-defined functions are implemented as standard Java classes and are declared in the component configuration file of the Oracle CQL processor, as shown in the following example:
<bean id="orderFunction" class="orderFunction-impl-class"/>
The processor in which the relevant Oracle CQL rule runs must then be injected with the user-defined function using the wlevs:function
child element, referencing the Spring bean with the ref
attribute:
<wlevs:processor id= "tradeProcessor">
<wlevs:function ref="orderFunction"/>
</wlevs:processor>
Alternatively, you can specify the bean class in the wlevs:function
element:
<wlevs:processor id="testProcessor"> <wlevs:listener ref="providerCache"/> <wlevs:listener ref="outputCache"/> <wlevs:cache-source ref="testCache"/> <wlevs:function function-name="mymod" exec-method="execute" /> <bean class="com.bea.wlevs.example.function.MyMod"/> </wlevs:function> </wlevs:processor>
The following Oracle CQL rule, assumed to be configured for the tradeProcessor
processor, shows how to invoke the existsOrder
method of the orderFunction
user-defined function:
INSERT INTO InstitutionalOrder
SELECT er.orderKey AS key, er.symbol AS symbol, er.shares as cumulativeShares
FROM ExecutionRequest er [Range 8 hours]
WHERE NOT orderFunction.existsOrder(er.orderKey)
You can also configure the user-defined function to access a cache by injecting the function with a cache using the standard Spring mechanism for referencing another bean. A cache bean implements the java.util.Map
interface which is what the user-defined function uses to access the injected cache.
First, the configuration of the user-defined function in the EPN assembly file must be updated with a wlevs:property
child element, as shown in the following example:
<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id" name="alternative-cache-name">
<wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
...
<bean id="orderFunction" class="orderFunction-impl-class">
<wlevs:property name="cache" ref="cache-id"/>
</bean>
In the example, the ref
attribute of the wlevs:property
element references the id
value of the wlevs:cache
element. Oracle Event Processing automatically injects the cache, implemented as a java.util.Map
, into the user-defined function.
In the user-defined function's Java source, add a setMap
(Map)
method with the code that implements whatever you want the function to do with the cache:
package com.bea.wlevs.example;
…
import java.util.Map;
public class OrderFunction {
...
public void setMap (Map map) {...}
}
For more information on user-defined functions, see User Defined Functions in Oracle CQL Language Reference.
At runtime, you can access a cache programmatically using JMX and the MBeans that Oracle Stream Analytics deploys for the caching systems and caches you define. For more information, see JMX in Administering Oracle Stream Analytics.
The simplest and least error-prone way to access a caching system or cache with JMX is to use the Oracle Stream Analytics Visualizer. For more information, see JMX Management in Using Visualizer for Oracle Stream Analytics.
The simplest and least error-prone way to access a caching system or cache with JMX is to use the Oracle Stream Analytics Visualizer (see How to Access a Cache With JMX Using Oracle Stream Analytics Visualizer). Alternatively, you can access a caching system or cache with JMX using Java code that you write.
Oracle Stream Analytics creates a StageMBean
for each cache that your application uses as a stage. The Type
of this MBean is Stage
.
To access a cache with JMX using Java: