8 Cached Event Data

You can configure a caching system so that applications have ready access to event data. The caches in the system can be a combination of Oracle Coherence distributed caching, Oracle Stream Analytics local caching, and caching solutions provided by third parties. You can access the events in the caches with Oracle CQL and Java classes.

This chapter includes the following sections:

8.1 Caching Defined

A cache is a temporary storage area for event data. To increase the availability of event data and to increase application performance, you can create a cache so that applications can publish to or consume events from the cache.

An application can also access the processed event data written to the cache by other applications.

You can configure any stage in an Oracle Stream Analytics application that generates events to publish its events to the cache. A cache does not have to be a stage in the EPN. Another component or Spring bean can access events in the cache programmatically with the caching APIs.

A caching system is a configured instance of a caching implementation. A caching system defines a named set of configured caches and the configuration for remote communication when any of the caches are distributed across multiple machines.

Oracle Stream Analytics caching enables an application to perform the following tasks. All of these tasks happen incrementally without halting the application or causing latency spikes.

  • Pre-load a cache with event data before an application is deployed.

  • Periodically refresh, invalidate, and flush the event data in a cache.

  • Dynamically update a cache configuration.

8.1.1 Supported Caching Implementations

Oracle Stream Analytics supports the following caching implementations:

  • Oracle Stream Analytics local cache: a local, in-memory single-JVM cache. This implementation is best for local use (it cannot be used in a cluster). It might also be useful for development in the early stages because it is relatively simple to set up.

  • Oracle Coherence: a JCache-compliant in-memory distributed data grid solution for clustered applications and application servers. It coordinates updates to the data using cluster-wide concurrency control, replicates data modifications across the cluster using the highest performing clustered protocol available, and delivers notifications of data modifications to any servers that request them. You take advantage of Oracle Coherence features using the standard Java collections API to access and modify data, and use the standard JavaBean event model to receive data change notifications.

    Note:

    Before you can use Oracle Stream Analytics with Oracle Coherence, you must obtain a valid Oracle Coherence license such as a license for Coherence Enterprise Edition, Coherence Grid Edition, or Oracle WebLogic Application Grid.

    For more information on Oracle Coherence, see http://docs.oracle.com/middleware/1213/coherence/index.html.

  • Third-party caches: you can create a plug-in to allow Oracle Stream Analytics to work with other, third-party cache implementations.

8.1.2 Use Cases

Caching technology is a great fit for streaming data use cases, where high throughput can be particularly important. Getting data from a cache is usually much faster than getting the same data from a relational database.

The following scenarios describe common use cases for caching in Oracle Stream Analytics applications.

  • Publish events to a cache

    A financial application publishes events to a cache while the financial market is open, and then processes data in the cache after the market closes. Publishing events to a cache makes them available to the application or available to other Oracle Stream Analytics applications running in the server. Publishing events to a cache also allows for asynchronous writes to a secondary storage by the cache implementation.

  • Consume data from a cache

    Oracle Stream Analytics applications sometimes need to access non-streaming data. By caching this data, you can increase the performance of the application. The standard components of an Oracle Stream Analytics application that are allowed direct programming access to a cache are input- and output-adapters and business POJOs.

    Additionally, applications can access a cache from Oracle CQL either by a user-defined function or directly from an Oracle CQL statement. In the case of a user-defined function, programmers use Spring to inject the cache resource into the implementation of the function. For more information, see Application and Resource Configuration.

    Applications can also query a cache directly from Oracle CQL statements that run in a processor. In this case, the cache functions as another type of data source to a processor so that querying a cache is similar to querying a channel except that data is pulled from a cache.

    An example of using Oracle CQL to query a cache is from a financial application that publishes orders and the trades used to execute the orders to a cache. At the end of the day when the markets close, the application queries the cache to find all the trades related to a particular order.

  • Update and delete data in a cache

    An Oracle Stream Analytics application can update and delete data in a cache when required. For example, a financial application might need to update an order in the cache each time individual trades that fulfill the order are executed, or an order might need to be deleted if it has been cancelled. The components of an application that are allowed to consume data from a cache are also allowed to update it.

  • Use a cache in a multiserver domain

    If you build an Oracle Stream Analytics application that uses a cache, and you plan to deploy that application in a multiserver domain, then you must use a caching system that supports a distributed cache. In this case, you must use either Oracle Coherence or a third-party caching system that supports a distributed cache.

    For more information, see:

8.2 Configure an Oracle Coherence Caching System and Cache

You can configure your application to use the Oracle Coherence caching system and cache. Use this caching system if you plan to deploy your application to a multiserver domain.

When you configure with Oracle Coherence, only the first caching-system can be configured in a server. The Oracle Stream Analytics server ignores other caching systems that you have configured.

Note:

Before you can legally use Oracle Stream Analytics with Oracle Coherence, you must obtain a valid Coherence license such as a license for Coherence Enterprise Edition, Coherence Grid Edition, or Oracle WebLogic Application Grid.

For more information on Oracle Coherence, see http://docs.oracle.com/middleware/1213/coherence/index.html.

The following assembly and configuration file settings configure an Oracle Coherence caching system and cache for an Oracle CQL processor. The cache uses an event type to specify the key properties for locating table rows in the relational database. This caching system is advertised, which means other applications can access the data in its caches.

8.2.1 Assembly File

The assembly file settings configure the caching system and cache1. The value-type setting is the event type into which you want to load the database values. This cache is advertised.

   <wlevs:cache id="cache1" value-type="TradeReport" advertise="true">
     <wlevs:caching-system ref="coherence-caching-system"/>
   </wlevs:cache>
   <wlevs:caching-system id="coherence-caching-system" provider="coherence"/>

Note:

When you change the id setting for a coherence cache in the EPN diagram, the id changes in the assembly file and in the coherence-cache- file. However, if you change the id setting in the assembly file source editor, the id changes in the assembly file only. In this case, you must manually change the cache-name setting in the coherence-cache- to match the id setting in the assembly file. You also have to change all references to that cache.

When the cache is advertised, a component in the EPN of an application in a separate bundle can reference the advertised cache. The following example shows how a processor in one bundle can use the cache-source element to reference a cache source in another bundle with a cache-id of cacheprovider:

<wlevs:processor id="myProcessor2">
  <wlevs:cache-source ref="cacheprovider:cache-id">
</wlevs:processor>

Note:

When you have Oracle Coherence caches in the EPN assembly files of one or more applications deployed to the same Oracle Stream Analytics server, never configure multiple instances of the same cache with a loader or a store.

You can inadvertently do this by employing multiple applications that each configure the same Oracle Coherence cache with a loader or store in their respective EPN assembly file. If you configure multiple instances of the same cache with a loader or a store, Oracle Stream Analytics throws an exception.

8.2.2 Configuration File

The coherence-cache-config.xml file is the basic Oracle Coherence configuration file and must conform to the Oracle Coherence DTDs, as is true for any Oracle Coherence application.

See the Oracle Coherence documentation for information about coherence-cache-config.xml: http://docs.oracle.com/middleware/1213/coherence/index.html.

An Oracle Stream Analytics Oracle Coherence factory must be declared when you use Spring to configure a loader or store for a cache. You specify the factory with the cachestore-scheme element and include a factory class that enables Oracle Coherence to call into Oracle Stream Analytics and retrieve a reference to the loader or store that is configured for the cache. The only difference between configuring a loader or store is that the method-name element has a value of getLoader when a loader is used and getStore when a store is being used. You pass the cache name to the factory as an input parameter.

<cache-config>  
    <caching-scheme-mapping>    
        <cache-mapping>       
            <cache-name>myCoherenceCache</cache-name>       
            <scheme-name>new-replicated</scheme-name>     
        </cache-mapping>             
        <cache-mapping>       
            <cache-name>myLoaderCache</cache-name>       
            <scheme-name>test-loader-scheme</scheme-name>     
        </cache-mapping>         
        <cache-mapping>       
            <cache-name>myStoreCache</cache-name>       
            <scheme-name>test-store-scheme</scheme-name>     
        </cache-mapping>
        <cache-mapping>
            <cache-name>
                cache1
            </cache-name>
            <scheme-name>
                new-replicated
            </scheme-name>
        </cache-mapping>
    </caching-scheme-mapping>      
    <caching-schemes>    
        <replicated-scheme>      
            <scheme-name>new-replicated</scheme-name>      
            <service-name>ReplicatedCache</service-name>      
            <backing-map-scheme>        
                <class-scheme>          
                    <scheme-ref>my-local-scheme</scheme-ref>                                                
                </class-scheme>      
            </backing-map-scheme>    
        </replicated-scheme>        
        <class-scheme>       
            <scheme-name>my-local-scheme</scheme-name>       
            <class-name>com.tangosol.net.cache.LocalCache</class-name>       
            <eviction-policy>LRU</eviction-policy>       
            <high-units>100</high-units>       
            <low-units>50</low-units>     
        </class-scheme>        
        <local-scheme>       
            <scheme-name>test-loader-scheme</scheme-name>       
            <eviction-policy>LRU</eviction-policy>       
            <high-units>100</high-units>       
            <low-units>50</low-units>


<!-- A cachestore-scheme element that gets a loader starts here -->       
            <cachestore-scheme>          
                <class-scheme>
     <class-factory-name>com.bea.wlevs.cache.coherence.configuration.SpringFactory
     </class-factory-name>            
                    <method-name>getLoader</method-name>            
                    <init-params>              
                        <init-param>                
                            <param-type>java.lang.String</param-type>                
                            <param-value>myCoherenceCache</param-value>              
                        </init-param>
                        <init-param>
                            <param-type>
                                java.lang.String
                            </param-type>
                            <param-value>
                                cache1
                            </param-value>
                        </init-param>
                    </init-params>          
                </class-scheme>       
            </cachestore-scheme> 
     <!-- The cachestore-scheme element ends here -->          
        </local-scheme> 
   
        <local-scheme>      
            <scheme-name>test-store-scheme</scheme-name>      
            <eviction-policy>LRU</eviction-policy>      
            <high-units>100</high-units>      
            <low-units>50</low-units>
 

<!-- A cachestore-scheme element that gets a store starts here --> 
            <cachestore-scheme>        
                <class-scheme>
     <class-factory-name>com.bea.wlevs.cache.coherence.configuration.SpringFactory
     </class-factory-name>          
                    <method-name>getStore</method-name>          
                    <init-params>            
                        <init-param>               
                            <param-type>java.lang.String</param-type>               
                            <param-value>myCoherenceCache</param-value>            
                        </init-param>
                        <init-param>
                            <param-type>
                                java.lang.String
                            </param-type>
                            <param-value>
                                cache1
                            </param-value>
                        </init-param>
                    </init-params>        
                </class-scheme>      
            </cachestore-scheme>
     <!-- The cachestore-scheme element ends here --> 
        </local-scheme>  
    </caching-schemes>
</cache-config>

tangosol-coherence-override.xml File (optional)

The tangosol-coherence-override.xml file is a global per-server file. It contains what is referred to as the operational configuration in the Oracle Coherence documentation. This file contains global, server-wide configuration settings for Oracle Coherence caching. You create this file in an XML editor and put it in the Oracle Stream Analytics server config directory for the server you want to configure.

Note:

Do not include the tangosol-coherence-override.xml file when you use Oracle Coherence for clustering.

Add the following XML to the Oracle Coherence configuration file to reference the tangosol-coherence-override.xml file. Include the cluster-name element to prevent Oracle Coherence from attempting to join existing Oracle Coherence clusters when Oracle Stream Analytics starts up. This can cause problems and sometimes prevent Oracle Stream Analytics from starting.

...
<coherence xml-override="/tangosol-coherence-override.xml">
  <cluster-config>
    <member-identity>
      <cluster-name>com.bea.wlevs.example.provider</cluster-name>
    </member-identity>
...
</coherence>

For more information about Oracle Stream Analytics clusters, see Native Clusters in Oracle Stream Analytics in Administering Oracle Stream Analytics.

8.2.3 Cache Loader Bean

The com.oracle.cep.cacheloader package provides the CsvCacheLoader class for loading CSV events into a Coherence cache. You use a cache loader with an inbound adapter by replacing the sourceUrl property.

The first assembly file CSV adapter configuration shows a CSV inbound adapter that loads a file with the sourceUrl property. The second assembly file CSV adapter entry shows a CSV inbound adapter that loads a cache loader bean.

Load Events in a CSV file

<wlevs:adapter id="StockTradeCSVInboundAdapter" provider="csv-inbound">
  <wlevs:listener ref="AdapterOutputChannel"/>
  <wlevs:instance-property name="eventType" value="TradeEvent"/>
  <wlevs:instance-property name="sourceUrl"
    value="file:/scratch/mpawlan/oep9-19/oep/utils/load-generator/StockData.csv"/>
</wlevs:adapter>

Load Events with a Cache Loader

 <wlevs:cache id="csvcache" key-properties="sequenceNo"
   value-type="TradeEvent" advertise="true">
   <wlevs:caching-system ref="cachesys" />
   </wlevs:cache>
   <bean id="csvloader" class="com.oracle.cep.cacheloader.CsvCacheLoader">
     <property name="cacheName" value="csvcache"/>
     <property name="sourceUrl" 
       value="file:///scratch/juhe/view_storage/trade.csv"/>
   </bean>

8.3 Configure a Local Caching System and Cache

You can configure your application to use the Oracle Stream Analytics local caching system and cache. The Oracle Stream Analytics local caching system is appropriate when you do not plan to deploy your application to a multiserver domain.

If you plan to deploy your application to a multiserver domain, use an Oracle Coherence cache.

This chapter describes some of the configuration settings. For complete information, see Component Configuration Schema and Coherence Caching System in Schema Reference for Oracle Stream Analytics.

8.3.1 Assembly File

The following assembly file settings configure the local caching system and cache. The value-type setting is the event type into which you want to load the database values.

<wlevs:cache id="localcache" value-type="HelloWorldEvent">
  <wlevs:caching-system ref="caching-system"/>
    </wlevs:cache>
  <wlevs:caching-system id="caching-system" provider="wlevs" advertise="false"/>

8.3.2 Configuration File

The following configuration file settings specify a maximum size and eviction policy for the local caching system.The maximum size specifies the number of cache elements in memory after which the eviction policy occurs. The example also specifies the maximum amount of time in milliseconds that an entry is cached. Default time-to-live value is infinite. This example specifies 3600 milliseconds.

  <caching-system>
    <name>caching-system</name>
    <cache>
      <name>localcache</name>
      <max-size>64</max-size>
      <eviction-policy>LFU</eviction-policy>
      <time-to-live>3600</time-to-live>
    </cache>
  </caching-system>

The following configuration file settings add a write-behind element as a child element of cache. The write-behind element means Oracle Stream Analytics invokes the cache store from a separate thread after a create or update of a cache entry. The child elements of write-behind indicate the following:

  • The number of updates that are picked up from the store buffer to write back to the backing store (batch-size). The default value is 100.

  • The number of attempts that the user thread makes to write to the store buffer. The user thread is the thread that creates or updates a cache entry. If all attempts by the user thread to write to the store buffer fail, it will invoke the store synchronously (batch-write-attempts). The default value is 1.

  • The time in milliseconds the user thread waits before aborting an attempt to write to the store buffer (buffer-write-timeout). The attempt to write to the store buffer fails only when the buffer is full. After the time out, further attempts can be made to write to the buffer based on the value of buffer-write-attempts. The default value is 100.

<caching-system>
    <name>caching-system-id</name>
    <cache>
        <name>cache-id</name>
        <max-size>100000</max-size>
        <eviction-policy>LRU</eviction-policy
        <time-to-live>3600</time-to-live>
        <write-behind>
            <buffer-size>200</buffer-size>
            <buffer-write-attempts>2</buffer-write-attempts>
            <buffer-write-timeout>200</buffer-write-timeout>
        </write-behind>
    </cache>
</caching-system>

The following configuration file settings add a listeners child element to configure the behavior of components that listen to the cache. The listener element has an asynchronous attribute that you can set to either true (listeners are invoked asynchronously) or false (listeners are invoked synchronously).

The work-manager-name child element specifies the work manager to use to asynchronously invoke listeners. This value is ignored if synchronous invocations are enabled. If a work manager is specified for the cache, this value overrides that setting for invoking listeners only. The value of the work-manager-name element corresponds to the name element of the work-manager setting in the Oracle Stream Analytics config.xml server configuration file.

<caching-system>
     <name>caching-system-id</name>
     <cache>
        <name>cache-id</name>
        <max-size>100000</max-size>
        <eviction-policy>LRU</eviction-policy
        <time-to-live>3600</time-to-live>
        <write-behind>
            <buffer-size>200</buffer-size>
            <buffer-write-attempts>2</buffer-write-attempts>
            <buffer-write-timeout>200</buffer-write-timeout>
        </write-behind>
        <listeners asynchronous="true">
            <work-manager-name>cachingWM</work-manager-name>
        </listeners>
     </cache>
</caching-system>

8.4 Configure a Cache as an Event Listener

You can configure a cache to receive events as they pass through the network. For example, to specify that a cache listens to a channel, configure the channel with a wlevs:listener element that has a reference to the cache.

In the following example, as the channel sends new events to the cache, the events are inserted into the cache. If the channel sends a remove event (an old event that exits the output window), then the event is removed from the cache.

<wlevs:caching-system id="caching-system-id"/>

<wlevs:cache id="cache-id" name="alternative-cache-name">
    <wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>

<wlevs:channel id="tradeStream">
    <wlevs:listener ref="cache-id"/>
</wlevs:channel>

8.5 Index a Cache with a Key

The following sections describe the options available to you to specify the key that is used to index the cache.

When you do not explicitly specify a key, the event object serves as both the key and value when the event is inserted into the cache. In this case, the event class must include a valid implementation of the equals and hashcode methods that take into account the values of the key properties.

8.5.1 Assembly File

Specify a property name for the key property in the assembly file with the key-properties attribute, as shown in the following example:

<wlevs:cache id="myCache" key-properties="key-property-name">
    <wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>

In this case, all events that are inserted into the cache are required to have a property of this name at runtime, otherwise Oracle Stream Analytics throws an exception. For example, assume the event type being inserted into the cache looks something like the following; note the key property (only relevant Java source shown):

public class MyEvent {
  private String key;
  public MyEvent() {}
  public MyEvent(String key) { this.key = key; }
  public String getKey() { return key;}
  public void setKey(String key) { this.key = key;}
}

The corresponding declaration in the assembly file looks like the following:

<wlevs:cache id="myCache" key-properties="key">
    <wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>

8.5.2 Metadata Annotation

you can use the metadata annotation com.bea.wlevs.ede.api.Key to annotate the event property in the Java class that implements the event type. This annotation does not have any attributes.

To use a metadata annotation to specify a key:

  1. Import the com.bea.wlevs.ede.api.Key package.
  2. Apply the @Key annotation to a method.

    The following example shows how to specify that the key property of the MyEvent event type is the key; only relevant code is shown:

    import com.bea.wlevs.ede.api.Key;
    public class MyEvent {
      private String key;
      public MyEvent() {}
      public MyEvent(String key) { this.key = key; }
      public String getKey() { return key; }
      @Key
      public void setKey(String key) { this.key = key; }
    }
    

8.5.3 Composite Key

You can use the key-class attribute of the wlevs:cache element to specify a composite key in which multiple properties form the key. The value of the key-class attribute must be a JavaBean class with public fields that match the fields of the event class. The JavaBean class must override the equals and hashCode methods from the java.lang.Object class. The matching is done according to the field name. For example:

<wlevs:cache id="myCache" key-class="key-class-name">
    <wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>

For a cache with a composite key composed of key-field1 and key-field2, you can execute both of the following queries:

SELECT stream.field2, cache.key-field1 from stream[NOW], cache WHERE stream.field2=cache.key-field1 AND stream.field2=cache.key-field2

SELECT stream.field1, cache.key-field1 from stream[NOW], cache WHERE stream.field1=cache.key-field1

8.6 Configure a Cache as an Event Source

You can configure a cache as an event source. To use a cache as an event source, you need to implement the com.bea.wlevs.ede.api.StreamSink interface.

The configuration follows:

<wlevs:cache id="cache-id" name="alternative-cache-name"
   caching-system="caching-system-id">
   <wlevs:listener ref="cache-listener-id" />
</wlevs:cache>

8.7 Configure a Cache with a Cache Listener

You can configure a cache as a source of events to which another component in the event processing network listens. The listening component can be an adapter or a bean.

A class that listens to a cache must implement an interface that provides methods for receiving events, as follows:

  • A class that listens to a Coherence cache must implement the com.tangosol.util.MapListener interface.

  • A class that listens to an Oracle Stream Analytics local cache must implement the com.bea.cache.jcache.CacheListener interface.

<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id" name="alternative-cache-name">
    <wlevs:caching-system ref="caching-system-id"/>
    <wlevs:cache-listener ref="cache-listener-id" />
</wlevs:cache>
...
<bean id="cacheListenerId" class="com.bea.wlevs.example.provider.coherence"/>

In the example, the cacheListenerId Spring bean listens to events coming from the cache. In this case, the user-defined class that implements this component, com.bea.wlevs.example.MyCacheListener, is listening to an Oracle Coherence cache. It must implement the appropriate Oracle Coherence-specific Java interfaces, including com.tangosol.util.MapListener. The following example illustrates this implementation.

package com.bea.wlevs.example.provider.coherence;

import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;

public class LocalListener implements MapListener {
    public static int deleted = 0;
    public static int inserted = 0;
    public static int updated = 0;

    public void entryDeleted(MapEvent event) { deleted++; }
    public void entryInserted(MapEvent event) { inserted++; }
    public void entryUpdated(MapEvent event) { updated++; }
}

8.8 Configure a Third-Party Caching System and Cache

You can configure your application to use a third-party caching system and cache.

Configure a Third-Party Caching System and Cache

  1. Create a plug-in to define the third-party caching system as an Oracle Stream Analytics caching system provider:
    • Implement the com.bea.wlevs.cache.spi.CachingSystem interface

    • Create a factory that creates caching systems of this type.

    • Register the factory with an attribute that identifies its provider type.

  2. Declare the caching system in the EPN assembly file.

    Use the wlevs:caching-system element to declare a third-party implementation; use the class or provider attribute to specify additional information.

    For simplicity, you can include the third-party implementation code inside the Oracle Stream Analytics application bundle itself to avoid having to import or export packages and manage the life cycle of a separate bundle that contains the third-party implementation. In this case the wlevs:caching-system element appears in the EPN assembly file as shown in the following example:

    <wlevs:caching-system id="caching-system-id" 
                          class="third-party-implementation-class"/>
    

    The class attribute specifies a Java class that must implement the com.bea.wlevs.cache.spi.CachingSystem interface. For details about this interface, see the Java API Reference for Oracle Stream Analytics.

    Sometimes you might not be able to or want to include the third-party caching implementation in the same bundle as the Oracle Stream Analytics application that is using it. In this case, you must create a separate bundle with a Spring application context that includes the wlevs:caching-system element, with the mandatory advertise attribute:

    <wlevs:caching-system id ="caching-system-id" 
           class="third-party-implementation-class" advertise="true"/>
    

    Alternately, if you want to decouple the implementation bundle from the bundle that references it, or you are plugging in a caching implementation that supports multiple caching systems per Java process, you can specify a factory as a provider:

    <wlevs:caching-system id ="caching-system-id" provider="caching-provider"/>
    <factory id="factory-id" provider-name="caching-provider">
        <class>the.factory.class.name</class>
    </factory>
    

    The factory class (the.factory.class.name) must implement the com.bea.wlevs.cache.spi.CachingSystemFactory interface. This interface has the create method that returns a com.bea.wlevs.cache.spi.CachingSystem instance.

    You must deploy this bundle with the application bundle so that the application bundle can start using it.

  3. Add one or more caches for this caching system in the EPN assembly file.
    <wlevs:caching-system id ="caching-system-id" provider="caching-provider"/>
    ...
    <wlevs:cache id="cache-id" name="alternative-cache-name">
        <wlevs:caching-system ref="caching-system-id"/>
    </wlevs:cache>
    

    Specify the optional name attribute only when the name of the cache in the caching system is different from its ID. The wlevs:caching-system child element references the already-declared caching system that contains the cache. You must specify this child element only when there is more than one caching system declared (either implicitly or explicitly) or when the caching system is in a different application or bundle.

    You can export both the caching system and the cache as an OSGI service with the advertise attribute.

    <wlevs:caching-system id="caching-system-id" advertise="true"/>
    ...
    <wlevs:cache id="cache-id" name="alternative-cache-name" advertise="true" >
        <wlevs:caching-system ref="caching-system-id"/>
    </wlevs:cache>
    

    If the cache is advertised, then a component in the EPN of an application in a separate bundle can reference it. The following example shows how a processor in one bundle can use as a cache source the cache with ID cache-id located in a separate bundle (called cacheprovider):

    <wlevs:processor id="myProcessor2">
        <wlevs:cache-source ref="cacheprovider:cache-id"/>
    </wlevs:processor>
    

    The caching system creates the cache associated with a particular name and returns a reference to the cache. The resulting cache bean implements the java.util.Map interface.

  4. Configure the third-party caching system and its caches by updating the third-party caching configuration file or files for the application.

    Refer to your third-party cache documentation.

  5. Optionally, override the default third-party cache configuration by updating the appropriate configuration file with one or more additional cache element child elements. Refer to your third-party cache documentation.
    • Specify that a cache is an event sink by configuring it as a listener to another component in the event processing network.

    • Specify that a cache is an event source to which another component in the event processing network listens.

    • Configure a cache loader or store.

  6. When you assemble your application, verify that the META-INF/MANIFEST.MF file includes the following import:
    com.bea.wlevs.cache.spi; version ="<version>"
    

    If the MANIFEST.MF files does not include this import, update the MANIFEST.MF file to add this import before deploying your application.

8.9 Exchange Data Between a Cache and Another Data Source

You can have a cache in an EPN exchange data with another data source, including a database. For example, you can load a cache with data when the application starts or create a read/write relationship between the cache and a database.

If the cache will only be reading data, including when the backing store is read-only, you should use a cache loader. If the cache will read and write data, use a cache store. In both cases, creating the relationship involves specific configuration and a Java class that knows how to communicate with the data source.

8.9.1 Load Cache Data from a Read-Only Data Source

Using a cache loader, you can have a cache in your EPN load data from a read-only data source. A cache loader is a Java class that loads cache objects into a cache. You create a cache loader by writing a Java class that implements the appropriate interfaces to enable the loader class to communicate with the cache. Then you configure a cache loader by using the wlevs:cache-loader child element of the wlevs:cache element to specify the bean that does the loading work.

If the backing store is read-write, use a cache store instead (see Exchange Data with a Read-Write Data Source).

When creating a cache loader, you implement interfaces as follows:

  • To load cache data into an Oracle Coherence cache, create a class that implements the appropriate Oracle Coherence-specific Java interfaces, including com.tangosol.net.cache.CacheLoader. See Example 8-2 for an example.

  • To load cache data into an Oracle Stream Analytics local cache, create a class that implements com.bea.cache.jcache.CacheLoader interface. This interface includes the load method to customize loading a single object into the cache; Oracle Stream Analytics calls this method when the requested object is not in the cache. The interface also includes loadAll methods that you implement to customize the loading of the entire cache.

In Example 8-1, the localLoader bean loads events into an Oracle Coherence cache when the backing store is read-only.

When working with a Coherence cache, note that if you specify a cache loader in your configuration file, you must also specify the corresponding class factory method name in your Coherence cache configuration file. For a cache loader, you specify the getLoader method of com.bea.wlevs.cache.coherence.configuration.SpringFactory. For example code, see Configure an Oracle Coherence Caching System and Cache.

Example 8-1 Oracle Coherence Cache EPN Assembly File for a Cache Loader

<wlevs:caching-system id="caching-system-id"/>
<wlevs:cache id="myCache" advertise="false"> 
    <wlevs:caching-system ref="caching-system-id"/>
    <wlevs:cache-loader ref="localLoader"/> 
</wlevs:cache>
<bean id="localLoader" 
      class="com.bea.wlevs.example.provider.coherence.LocalLoader"/>

Example 8-2 Oracle Coherence Cache LocalLoader Implementation

package com.bea.wlevs.example.provider.coherence;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import com.bea.wlevs.example.provider.event.ProviderData;
import com.tangosol.net.cache.CacheLoader;

public class LocalLoader implements CacheLoader {
    public static int loadCount = 0;
    public static Set keys = new HashSet();

    public LocalLoader() {
    }
    public Object load(Object key) {
        loadCount++;
        keys.add(key);
        return new ProviderData((String) key);
    }
    public Map loadAll(Collection keys) {
        Map result = new HashMap();

        for (Object key : keys) {
            result.put(key, load(key));
        }
        return result;
    }
}

8.9.2 Exchange Data with a Read-Write Data Source

Using a cache store, you can have a cache in your EPN exchange data with a read-write data source. A cache store is a Java class that exchanges cache objects with a cache. You create a cache store by writing a Java class that implements the appropriate interfaces to enable it to communicate with the data source. Then you add the cache store to the EPN by using the wlevs:cache-store child element of the wlevs:cache element to specify the bean that communicates with the data source.

If the backing store is read-only, use a cache loader instead (see Load Cache Data from a Read-Only Data Source).

When creating a cache store, you implement interfaces as follows:

  • To exchange cache data with an Oracle Coherence cache, create a class that implements the appropriate Oracle Coherence-specific Java interfaces, including com.tangosol.net.cache.CacheStore. See Example 8-4for an example.

  • To exchange cache data with an Oracle Stream Analytics local cache, create a class that implements the com.bea.cache.jcache.CacheStore interface. This interface includes the store method that stores the data in the backing store using the passed key; Oracle Stream Analytics calls this method when it inserts data into the cache. The interface also includes the storeAll method for storing a batch of data to a backing store in the case that you have configured asynchronous writes for a cache with the write-behind configuration element.

In Example 8-3, the localStore bean loads events into the cache when the backing store is read-write.

Note that if you specify a cache store in your Spring configuration file, you must also specify the corresponding class factory method name in your Coherence cache configuration file. For a cache store, you specify the getStore method of com.bea.wlevs.cache.coherence.configuration.SpringFactory. For example code, see Configure an Oracle Coherence Caching System and Cache.

Example 8-3 Oracle Coherence Cache EPN Assembly File for a Cache Store

<wlevs:caching-system id="caching-system-id"/>
<wlevs:cache id="myCache" advertise="false"> 
    <wlevs:caching-system ref="caching-system-id"/>
    <wlevs:cache-store ref="localStore"/> 
</wlevs:cache>
<bean id="localStore" 
      class="com.bea.wlevs.example.provider.coherence.LocalStore"/>

Example 8-4 Oracle Coherence Cache LocalStore Implementation

package com.bea.wlevs.example.provider.coherence;
 
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
 
import com.bea.wlevs.example.provider.event.ProviderData;
import com.tangosol.net.cache.CacheStore;
 
public class LocalStore implements CacheStore {
    public static int eraseCount = 0;
    public static int storeCount = 0;
    public static int loadCount = 0;

    public void erase(Object key) {
        eraseCount++;
    }
    public void eraseAll(Collection keys) {
        for (Object key : keys) {
            erase(key);
        }
    }
    public void store(Object key, Object value) {
        //
        // Do the store operation here.
        //
    }
    public void storeAll(Map entries) {
        for (Map.Entry entry : (Set <Map.Entry>)entries.entrySet()) {
            store(entry.getKey(), entry.getValue());
        }
    }
    public Object load(Object key) {
        loadCount++;
        return new ProviderData((String) key);
    }
    public Map loadAll(Collection keys) {
        Map result = new HashMap();
        for (Object key : keys) {
            result.put(key, load(key));
        }
        return result;
    }
}

8.10 Access a Cache from Application Code

Once you have configured a cache, you can access the cache from several components in an Oracle Stream Analytics application.

This section describes how to do that.

For more information, see the following sections:

Before you assemble and deploy the application, edit your META-INF/MANIFEST.MF file to import packages that are required in your implementation. For example, if your application implements cache listeners, loaders or stores, your manifest should import com.tangosol.net.cache packages, which contain the Coherence APIs.

Oracle Stream Analytics provides caching APIs that you can use in your application to perform certain tasks. The APIs are in the com.bea.cache.jcache package, which includes the APIs used to access a cache and create cache loader, listeners, and stores. If you want to use the loader, listener, and store functionality, then import the com.tangosol.net and com.tangosol.net.cache packages.

You create, configure, and wire caching systems and caches with the EPN assembly file and component configuration files. This means that you typically never explicitly use the Cache and CachingSystem interfaces in your application. The only reason to use them is when you have additional requirements over the standard configuration. For example, if you want to provide integration with a third-party cache provider, then you must use the CachingSystem interface. If you want to perform operations on a cache that are not part of the java.util.Map interface, then you can use the Cache interface.

If you create cache listeners, loaders, or stores for an Oracle Stream Analytics local cache, then the beans you write must implement the CacheListener, CacheLoader, or CacheStore interfaces.

If you create cache listeners, loaders, or stores for an Oracle Coherence cache, then the beans you write must implement the appropriate Oracle Coherence interfaces.

If you create cache listeners, loaders, or stores for a third-party cache, then the beans you write must implement the appropriate third-party cache interfaces.

8.10.1 Access a Cache from an Oracle CQL Statement

You can reference a cache from an Oracle CQL statement in much the same way you reference an event source such as a channel; this feature enables you to enrich standard streaming data with data from a separate source. The code in the following example shows a valid Oracle CQL query that joins trade events from a standard channel named S1 with stock symbol data from a cache named stockCache.

You must abide by these restrictions when using a cache in an Oracle CQL query:

  • Whenever you query a cache, you must join against the [Now] window.

    This guarantees that the query will execute against a snapshot of the cache. If you join against any other window type, then if the cache changes before the window expires, the query will be incorrect.

    The following example shows an invalid Oracle CQL query that joins a Range window against a cache. If the cache changes before this window expires, the query will be incorrect. Consequently, this query will raise Oracle Stream Analytics server error “external relation must be joined with s[now]."

    SELECT trade.symbol, trade.price, trade.numberOfShares, company.name
    FROM TradeStream [Range 8 hours] as trade, CompanyCache as company
    WHERE trade.symbol = company.id
    

    When you use data from a cache in an Oracle CQL query, Oracle Stream Analytics pulls the data rather than it being pushed, as is the case with a channel. This means that, continuing with the query executes only when a channel pushes a trade event to the query; the stock symbol data in the cache never causes a query to execute, it is only pulled by the query when needed.

  • You must specify the key property needed to do a lookup based on the cache key.

    Consider two streams S and C with schemas (id, group, value) where the cache key is id. A valid query is:

    select count(*) as n from S [now], C
    where S.id = C.id
    
  • Joins must be executed only by referencing the cache key.

  • You cannot use a cache in a view. Instead, use a join.

  • Only a single channel source may occur in the FROM clause of an Oracle CQL statement that joins cache data source(s).

  • If the cache is a processor source, you connect the cache directly to the channel on the EPN.

  • If the cache is a processor sink, it can be connected directly to a processor.

Access a Cache from an Oracle CQL Statement

This procedure assumes that you have already configured the caching system and caches. For more information, see:

  1. If you have not already done so, create the event type that corresponds to the cache data and register it in the event repository.

    See Events and Event Types.

  2. Specify the key properties for the data in the cache.

  3. In the EPN assembly file, update the configuration of the cache to declare the event type of its values; use the value-type attribute of the wlevs:cache element. For example:

    <wlevs:caching-system id="caching-system-id"/>
    ...
    <wlevs:cache id="cache-id" 
                 name="alternative-cache-name"
                 value-type="CompanyEvent">
        <wlevs:caching-system ref="caching-system-id"/>
    </wlevs:cache>
    

    The value-type attribute specifies the type for the values contained in the cache. This must be a valid type name in the event type repository.

    This attribute is required only if the cache is referenced in an Oracle CQL query. This is because the query processor needs to know the type of events in the cache.

  4. In the EPN assembly file, update the configuration of the processor that executes the Oracle CQL query that references a cache:

    1. If the cache is a processor source: you connect the cache directly to the processor on the EPN as Figure 8-1 shows.

      Figure 8-1 Cache as Processor Source

      Description of Figure 8-1 follows
      Description of "Figure 8-1 Cache as Processor Source"

      Update the wlevs:processor element a wlevs:cache-source child element that references the cache. For example:

      <wlevs:channel id="S1"/>
      
      <wlevs:processor id="cacheProcessor">
          <wlevs:source ref="S1">
          <wlevs:cache-source ref="cache-id">
      </wlevs:processor>
      

      In the example, the processor will have data pushed to it from the S1 channel as usual; however, the Oracle CQL queries that execute in the processor can also pull data from the cache-id cache. When the query processor matches an event type in the FROM clause to an event type supplied by a cache, such as CompanyEvent, the processor pulls instances of that event type from the cache.

    2. If the cache is a processor sink: you must connect the processor to the cache using a channel on the EPN (that is, there must be a channel between the processor and the cache sink) as Figure 8-2 shows.

      Figure 8-2 Cache as Processor Sink

      Description of Figure 8-2 follows
      Description of "Figure 8-2 Cache as Processor Sink"

      In this case, the application assembly file looks like this:

      <wlevs:channel id="channel1" event-type="StockTick">
          <wlevs:listener ref="processor" />
      </wlevs:channel>
      <wlevs:processor id="processor">
          <wlevs:listener ref="channel2" />
      </wlevs:processor>
      <wlevs:channel id="channel2" event-type="StockTick">
          <wlevs:listener ref="cache-id" />
      </wlevs:channel>
      
SELECT  S1.symbol, S1.lastPrice, stockCache.description
FROM    S1 [Now], stockCache
WHERE   S1.symbol = stockCache.symbol 

8.10.2 Access a Cache from an Adapter

An adapter can also be injected with a cache using the standard Spring mechanism for referencing another bean. A cache bean implements the java.util.Map interface which is what the adapter uses to access the injected cache.

First, the configuration of the adapter in the EPN assembly file must be updated with a wlevs:instance-property child element, as shown in the following example:

<wlevs:caching-system id="caching-system-id"/>
    ...
<wlevs:cache id="cache-id" name="alternative-cache-name">
    <wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
...
<wlevs:adapter id="myAdapter" provider="myProvider">
    <wlevs:instance-property name="map" ref="cache-id"/>
</wlevs:adapter>

In the example, the ref attribute of wlevs:instance-property references the id value of the wlevs:cache element. Oracle Stream Analytics automatically injects the cache, implemented as a java.util.Map, into the adapter.

In the adapter Java source, add a setMap (Map) method with the code that implements whatever you want the adapter to do with the cache:

package com.bea.wlevs.example;
…
import java.util.Map;
public class MyAdapter implements Runnable, Adapter, EventSource, SuspendableBean  {
...
    public void setMap (Map map) {...}
}

8.10.3 Access a Cache From a Business POJO

A business POJO, configured as a standard Spring bean in the EPN assembly file, can be injected with a cache using the standard Spring mechanism for referencing another bean. In this way the POJO can view and manipulate the cache. A cache bean implements the java.util.Map interface which is what the business POJO uses to access the injected cache. A cache bean can also implement a vendor-specific sub-interface of java.util.Map, but for portability it is recommended that you implement Map.

First, the configuration of the business POJO in the EPN assembly file must be updated with a property child element, as shown in the following example based on the Output bean of the FX example (see Foreign Exchange (FX) Example in Getting Started with Event Processing for Oracle Stream Analytics):

<wlevs:caching-system id="caching-system-id"/>
...
<wlevs:cache id="cache-id" name="alternative-cache-name">
    <wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
...
<bean class="com.bea.wlevs.example.helloworld.HelloWorldBean">
    <property name="map" ref="cache-id"/>
</bean>

In the example, the ref attribute of the property element references the id value of the wlevs:cache element. Oracle Stream Analytics automatically injects the cache, implemented as a java.util.Map, into the business POJO bean.

In the business POJO bean Java source, add a setMap (Map) method with the code that implements whatever you want the POJO to do with the cache:

package com.bea.wlevs.example.helloworld;
…
import java.util.Map;
public class HelloWorldBean implements EventSink {
...
    public void setMap (Map map) {...}
}

8.10.4 Access a Cache From an Oracle CQL User-Defined Function

In addition to standard event streams, Oracle CQL rules can also invoke the member methods of a user-defined function.

These user-defined functions are implemented as standard Java classes and are declared in the component configuration file of the Oracle CQL processor, as shown in the following example:

<bean id="orderFunction" class="orderFunction-impl-class"/>

The processor in which the relevant Oracle CQL rule runs must then be injected with the user-defined function using the wlevs:function child element, referencing the Spring bean with the ref attribute:

<wlevs:processor id= "tradeProcessor">
    <wlevs:function ref="orderFunction"/>
</wlevs:processor>

Alternatively, you can specify the bean class in the wlevs:function element:

<wlevs:processor id="testProcessor">
    <wlevs:listener ref="providerCache"/>
    <wlevs:listener ref="outputCache"/>
    <wlevs:cache-source ref="testCache"/>
    <wlevs:function function-name="mymod" exec-method="execute" />
        <bean class="com.bea.wlevs.example.function.MyMod"/>
    </wlevs:function>
</wlevs:processor>

The following Oracle CQL rule, assumed to be configured for the tradeProcessor processor, shows how to invoke the existsOrder method of the orderFunction user-defined function:

INSERT INTO InstitutionalOrder
SELECT er.orderKey AS key, er.symbol AS symbol, er.shares as cumulativeShares
FROM ExecutionRequest er [Range 8 hours]
WHERE NOT orderFunction.existsOrder(er.orderKey)

You can also configure the user-defined function to access a cache by injecting the function with a cache using the standard Spring mechanism for referencing another bean. A cache bean implements the java.util.Map interface which is what the user-defined function uses to access the injected cache.

First, the configuration of the user-defined function in the EPN assembly file must be updated with a wlevs:property child element, as shown in the following example:

<wlevs:caching-system id="caching-system-id"/>
    ...
<wlevs:cache id="cache-id" name="alternative-cache-name">
    <wlevs:caching-system ref="caching-system-id"/>
</wlevs:cache>
    ...
<bean id="orderFunction" class="orderFunction-impl-class">
    <wlevs:property name="cache" ref="cache-id"/>
</bean>

In the example, the ref attribute of the wlevs:property element references the id value of the wlevs:cache element. Oracle Event Processing automatically injects the cache, implemented as a java.util.Map, into the user-defined function.

In the user-defined function's Java source, add a setMap (Map) method with the code that implements whatever you want the function to do with the cache:

package com.bea.wlevs.example;
…
import java.util.Map;
public class OrderFunction {
...
    public void setMap (Map map) {...}
}

For more information on user-defined functions, see User Defined Functions in Oracle CQL Language Reference.

8.10.5 Access a Cache with JMX

At runtime, you can access a cache programmatically using JMX and the MBeans that Oracle Stream Analytics deploys for the caching systems and caches you define. For more information, see JMX in Administering Oracle Stream Analytics.

8.10.5.1 How to Access a Cache With JMX Using Oracle Stream Analytics Visualizer

The simplest and least error-prone way to access a caching system or cache with JMX is to use the Oracle Stream Analytics Visualizer. For more information, see JMX Management in Using Visualizer for Oracle Stream Analytics.

8.10.5.2 How to Access a Cache With JMX Using Java

The simplest and least error-prone way to access a caching system or cache with JMX is to use the Oracle Stream Analytics Visualizer (see How to Access a Cache With JMX Using Oracle Stream Analytics Visualizer). Alternatively, you can access a caching system or cache with JMX using Java code that you write.

Oracle Stream Analytics creates a StageMBean for each cache that your application uses as a stage. The Type of this MBean is Stage.

To access a cache with JMX using Java:

  1. Connect to the JMX service that Oracle Stream Analytics server provides.

    For more information, see Connect to JMX Server in Administering Oracle Stream Analytics.

  2. Get a list of cache StageMbean using either of:
    • CachingSystemMBean.getCacheMBeans()

    • ApplicationMBean.getStageMBeans()

  3. Get the ObjectName for a given StageMBean that represents a cache in your caching system:
    ObjectName cacheName = ObjectName.getInstance (
        'com.bea.wlevs:Name = newCache,Type=Stage,CachingSystem=newCachingSystem,Application=provider'
    );
    
  4. Get a proxy instance for the StageMBean with this ObjectName:
    StageMBean cache = (StageMBean) MBeanServerInvocationHandler.newProxyInstance(
        server, cacheName, StageMBean.class, false
    );
    
  5. Use the methods of the StageMBean to access the cache.