7 Data Cartridge Framework
The Data Cartridge Framework is a service provider interface (SPI) that enables users and vendors to create cartridges to extend Oracle CQL functionality. The Hadoop and NoSQL cartridges described in Oracle Big Data Cartridges are examples of cartridges created with the Data Cartridge Framework.
For example, with the Data Cartridge Framework, you can extend Oracle CQL functionality to support the development of telematic applications. Telematic applications encompass telecommunications (electrical signals and electromagnetic waves), automotive technologies, transportation, electrical engineering (sensors, instrumentation, and wireless communications), and computer science (Internet of Things).
This chapter includes the following sections:
7.1 About the SPI
An Oracle Stream Analytics cartridge is a single manageable unit that defines external functions, types, indexes, Java classes, and data sources.
The user of the cartridge references the available functions, types, indexes, Java classes, and data sources from Oracle CQL code with links of the following form:
myFunction@myCartridge(arg1)
A cartridge created with the Oracle Stream Analytics Data Cartridges Framework is an Oracle Stream Analytics library. This means that you deploy the cartridge the same way that you deploy a library, which is from the command line or in Oracle JDeveloper. Once you deploy a cartridge, all of the external functions, types, indexes, Java classes, and data sources are available to use in Oracle CQL queries. You must deploy a cartridge before you deploy the application. You can update the cartridge without updating your application.
7.2 Interfaces
The com.oracle.cep.cartridge
package contains
the Cartridge Framework Java interfaces.
This section describes what you can do and what you must do when you use the interfaces. Brief descriptions of the interfaces and exceptions follow.
You Can:
-
Use any type system for the table and stream attribute types in Oracle CQL.
-
Provide your own index data structure for invoking functions.
-
Provide new Java classes that are visible within an Oracle CQL query. When you deploy the cartridge, include the application or library that has the new Java classes. Applications that access the new Java classes, must import the correct Java packages in their
MANIFEST.MF
file with theImport-Package
header entry.
You Must:
-
Provide an MBean for all deployed cartridges that contains a list of all functions that the cartridge supports. When the cartridge is undeployed the MBean instance is unregistered.
-
Implement the
ExternalFunctionProvider.listFunctions
method. -
Provide a bean-stage MBean for table sources that you tie to a cartridge external data source. This MBean provides a list of the table source custom properties including its
id
andprovider
name.Optionally, a table source Spring bean factory can implement the
com.bea.wlevs.management.configuration.spring.StageFactoryAccess
interface to customize how to access the table source properties.
7.2.1 Interface Descriptions
The com.oracle.cep.cartridge
package provides the following Java interfaces.
CapabilityProvider
: An ExternalConnection
can implement this interface to specify the supported capabilities such as less than <
, AND
, OR
, and so on.
ExternalConnection
: Connect to an ExternalDataSource
.
ExternalConstants
: Define general constants used by the data cartridge. This interface provides two constants: EQUALS
for external connection capabilities, and SERVER_CONTEXT_LINK_ID
to denote an ExternalFunctionProvider
link id
.
ExternalDataSource
: Use the getConnnection
method to connect to an external source of contextual data to join with Oracle CQL processor events. The external data source must support the configuration of its properties. For example, a NoSQLDB data source supports the configuration of a host, a port, and a store name.
The external data source specifies the functions it supports. By default, all external data sources support the equality function, for example:
SELECT * FROM S[NOW], MyExternalDataSource WHERE S.id = MyExternalDataSource.id
To make the data source available to Oracle CQL processors, register a Spring Bean that implements the com.oracle.cep.cartridge.ExternalDataSource
interface and make that Spring bean the target of a table source (wlevs:table-source
tag).
ExternalFunction
: A function provided by an ExternalFunctionProvider
or other external entity.
ExternalFunctionDefinition
: Specify the metadata for functions used in Oracle CQL queries and views that are provided by an ExternalFunctionProvider
or other external entity.
ExternalFunctionProvider
: Defines a set of functions that can be directly accessed from Oracle CQL queries and views. Use the getID
method to register an external function provider as an OSGi service. Also, the provider must specify the ExternalContants.SERVER_CONTEXT_LINK_ID
service property to indicate the link ID to use in Oracle CQL queries and views to identify the provider.
ExternalPredicate
: Represent prepared statement predicates with attributes and a predicate clause.
ExternalPreparedStatement
: Represent a prepared statement from an external function provider to execute the same or similar functions repeatedly and efficiently.
7.2.2 Exceptions
The com.oracle.cep.cartridge
package provides the following exceptions:
AmbiguousFunctionException
: Thrown when referenced function cannot be determined by the ExternalFunctionProvider
due to ambiguity.
CartridgeException
: Root cartridge exception.
FunctionNotFoundException
: Thrown when the referenced function in an Oracle CQL statement is not supported by ExternalFunctionProvider
.
7.3 Cartridge Examples
To make the cartridges available for Oracle CQL queries within Oracle Stream Explorer applications, deploy each cartridge as a separate application library. After you deploy the cartridges, deploy the Oracle Stream Explorer application or applications that use the cartridges.
This section describes two cartridge examples: an arithmetic cartridge and a data source cartridge. The arithmetic cartridge makes arithmetic functions available to Oracle CQL queries similar to the spatial cartridge, which contains only functions, described in Oracle Spatial Data Cartridge. The data cartridge defines a data source similar to Hadoop described in Oracle Big Data Cartridges.
7.3.1 Arithmetic Cartridge
The arithmetic cartridge has the following function classes:
-
A set of Java classes that provide the functionality for addition, array, and exception operations.
-
The
ExceptionFunction.java
andArrayFunciton.java
classes to provide array and exception functionality so that you can use arrays and throw exceptions from an Oracle CQL query. -
An
ArithmeticActivator.java
class starts and stops the cartridge bundle.
All of the function classes implement the com.oracle.cep.cartridge.ExternalFunctionProvider
interface and have a getName
method that returns the name of the function to use in an Oracle CQL query.
For example the AddFunction.java
and AddLongFunction.java
getName
methods return plus
for the function name. You use the function name in the Oracle CQL query to call the function. The following query uses the plus
function in the arithmetic
cartridge to add two integers from inputChannel
.
SELECT plus@arithmetic(typeInt, typeInt2) AS typeInt FROM inputChannel
7.3.2 Data Source Cartridge
Data Source Cartridge Files
The cartridge example uses a set of Java classes that define the data source.
The MyCartridgeSource.java
class implements the com.oracle.cep.cartridge.ExternalDataSource interface
. It defines the data source connection functionality, and reads event data from and writes event data to the database.
The MyActivator.java
class implements org.osgi.framework.BundleActivator
and provides code to start and stop the cartridge bundle.
The MyHandler.java
class implements org.springframework.beans.factory.xml.NamespaceHandler
, and provides code to manage the cartridge name space and register the Udds factory bean.
The UddsDefinitionParser.java
class extends org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
and provides code to parse and register UddsFactoryBean
objects.
The UddsFactoryBean.java
class extends org.springframework.beans.factory.config.AbstractFactoryBean
and provides code to manage events and the event type repository.
7.4 Source Code
The source code for the data source application and cartridge, and the arithmetic cartridge is provided.
7.4.1 Arithmetic Cartridge
AddFunction.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.Map; import com.oracle.cep.cartridge.ExternalFunction; public class AddFunction implements ExternalFunction{ @Override public String getName() { return "plus"; } @Override public Class<?>[] getParameterTypes() { Class<?>[] parameters = new Class<?>[2]; parameters[0] = java.lang.Integer.class; parameters[1] = java.lang.Integer.class; return parameters; } @Override public Class<?> getReturnType() { return java.lang.Integer.class; } @Override public Object execute(Object[] args, String caller, Map<String, Object> context) throws Exception { if(args.length != 2) throw new IllegalArgumentException("add function need an 2 parameters"); if(!(args[0] instanceof java.lang.Integer && args[1] instanceof java.lang.Integer)) { throw new IllegalArgumentException("add function only support java.lang.Integer"); } java.lang.Integer arg1 = (Integer) args[0]; java.lang.Integer arg2 = (Integer) args[1]; return new java.lang.Integer(arg1 + arg2); } }
ArithmeticActivator.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.Hashtable; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; import com.oracle.cep.cartridge.ExternalFunctionProvider; public class ArithmeticActivator implements BundleActivator { private ServiceRegistration reg; @Override public void start(BundleContext context) throws Exception { Hashtable props = new Hashtable(); props.put("server.context.link.id", "arithmetic"); this.reg = context.registerService(ExternalFunctionProvider.class.getName(), new UserDefineFunction(), props); } @Override public void stop(BundleContext arg0) throws Exception { this.reg.unregister(); } }
ArrayFunction.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.ArrayList; import java.util.List; import java.util.Map; import com.oracle.cep.cartridge.ExternalFunction; public class ArrayFunction implements ExternalFunction { @Override public String getName() { return "array"; } @Override public Class<?>[] getParameterTypes() { Class<?>[] parameters = new Class<?>[2]; parameters[0] = Integer.class; parameters[1] = Integer.class; return parameters; } @Override public Class<?> getReturnType() { return List.class; } @Override public Object execute(Object[] args, String caller, Map<String, Object> context) throws Exception { if(args.length == 0) { return null; } if(!(args[0] instanceof java.lang.Integer)) { throw new IllegalArgumentException("median function only supports java.lang.Integer"); } List ret = new ArrayList(); for(Object obj:args) { ret.add(obj); } return ret; } }
ExceptionFunction.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.Map; import com.oracle.cep.cartridge.ExternalFunction; public class ExceptionFunction implements ExternalFunction{ @Override public String getName() { return "exception" } @Override public Class<?>[] getParameterTypes() { return new Class<?>[]{Integer.class}; } @Override public Class<?> getReturnType() { return Integer.class; } @Override public Object execute(Object[] args, String caller, Map<String, Object> context) throws Exception { throw new NullPointerException("I am an excpetion"); } }
UserDefineFunctionClass.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import com.oracle.cep.cartridge.AmbiguousFunctionException; import com.oracle.cep.cartridge.ExternalFunction; import com.oracle.cep.cartridge.ExternalFunctionProvider; import com.oracle.cep.cartridge.FunctionNotFoundException; public class UserDefineFunction implements ExternalFunctionProvider { private ArrayList<ExternalFunction> functions = new ArrayList<ExternalFunction>(); public UserDefineFunction() { functions.add(new AddFunction()); functions.add(new ArrayFunction()); functions.add(new ExceptionFunction()); } @Override public ExternalFunction getFunction(String functionName, Class<?> [] parameterTypes, String caller, Map<String, Object> context) throws AmbiguousFunctionException, FunctionNotFoundException { if("plus".equalsIgnoreCase(functionName)) { return new AddFunction(); } else if("array".equalsIgnoreCase(functionName)) { return new ArrayFunction(); } else if("exception".equalsIgnoreCase(functionName)) { return new ExceptionFunction(); } throw new FunctionNotFoundException(functionName+" is not supported in arithmetic"); } @Override public String getId() { return "arithmetic"; } @Override public List<ExternalFunction> listFunctions(String caller, Map<String, Object> context) { ArrayList<ExternalFunction> functionList = new ArrayList<ExternalFunction>(); functionList.addAll(functions); return functionList; } }
7.4.2 Data Source Cartridge
The Data Source cartridge is comprised of the following Java class files:
MyCartridgeSource.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import java.math.BigDecimal; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import oracle.cep.dataStructures.external.TupleValue; import org.springframework.osgi.extensions.annotation.ServiceReference; import com.bea.wlevs.ede.api.EventProperty; import com.bea.wlevs.ede.api.EventType; import com.bea.wlevs.ede.api.EventTypeRepository; import com.bea.wlevs.ede.api.Type; import com.bea.wlevs.management.configuration.spring.StageFactoryAccess; import com.oracle.cep.cartridge.ExternalConnection; import com.oracle.cep.cartridge.ExternalDataSource; import com.oracle.cep.cartridge.ExternalPredicate; import com.oracle.cep.cartridge.ExternalPreparedStatement; public class MyCartridgeSource implements StageFactoryAccess, ExternalDataSource { // @Override public Map<?, ?> getCacheDataSource() { return null; } private EventTypeRepository etr; @ServiceReference public void setEventTypeRepository(EventTypeRepository etr) { this.etr = etr; } private String eventType; @Override public String getEventType() { System.out.println("event type:" + this.eventType); return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } private long maxThreshhold = 0; @Override public long getExternalRowsThreshold() { return maxThreshhold; } public void setExternalRowsThreshold(long maxThreshhold) { this.maxThreshhold = maxThreshhold; } private String pattern; public String getPattern() { return pattern; } public void setPattern(String pattern) { this.pattern = pattern; } private String singularity; public String getSingularity() { return singularity; } public void setSingularity(String singularity) { this.singularity = singularity; } private String id; @Override public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public String getJDBCDataSource() { return null; } private Class keyClass; @Override public Class getKeyClass() { return Long.class; } public void setKeyClass(String className) throws ClassNotFoundException { this.keyClass = Class.forName(className); } private String[] keyPropertyNames; @Override public String[] getKeyPropertyNames() { return keyPropertyNames; } public void setKeyProperty(String names) { keyPropertyNames = names.split(","); } @Override public String getTableName() { return null; } public Class getObjectType() { return ExternalDataSource.class; } @Override public ExternalConnection getConnection() throws Exception { MyExternalConnection connection = new MyExternalConnection(this.etr.getEventType(this.eventType)); connection.setPattern(pattern); connection.setSingularity(singularity); return connection; } public static class MyExternalConnection implements ExternalConnection { private final EventType targetEventType; public MyExternalConnection(EventType eventtype) { this.targetEventType = eventtype; } private String pattern; public void setPattern(String pattern) { this.pattern = pattern; } private String singularity; public void setSingularity(String singularity) { this.singularity = singularity; } @Override public void close() throws Exception { } @Override public ExternalPreparedStatement prepareStatement(String relationName, List<String> relationAttrs, ExternalPredicate predicate) throws Exception { return new MyExternalPreparedStatement(this.targetEventType, predicate,this.pattern,this.singularity); } @Override public boolean supportsPredicate(ExternalPredicate predicate) throws Exception { return true; } } public static class MyExternalPreparedStatement implements ExternalPreparedStatement { private ExternalPredicate predicate; private Object[] keys = new Object[10]; private final EventType targetEventType; private Pattern pattern; private Pattern singularity; public MyExternalPreparedStatement(EventType targetEventType, ExternalPredicate predicate,String pattern,String singularity) { this.targetEventType = targetEventType; this.predicate = predicate; if (pattern == null) { this.pattern = Pattern.compile(".*"); } else { this.pattern = Pattern.compile(pattern); } if (singularity == null) { this.singularity = Pattern.compile("$.^"); } else { this.singularity = Pattern.compile(singularity); } } @Override public void close() throws Exception {} @Override public Iterator<Object> executeQuery() throws Exception { List<Object> result = new ArrayList<Object>(); List attrs = predicate.getAttributes(); String value=""; for(int i = 0;i<attrs.size();i++) { if(keys[i+1] ==null) { System.out.println("empty="+keys[i+1]); return result.iterator(); } value = keys[i+1].toString(); Matcher m = this.pattern.matcher(value); if(!m.matches()) { System.out.println("empty="+value); return result.iterator(); } } TupleValue event = (TupleValue) this.targetEventType.createEvent(); EventProperty[] properties = this.targetEventType.getProperties(); for (int i = 0; i < properties.length; i++) { properties[i].setValue(event, createValue(properties[i], value)); } System.out.println("one="+value); result.add(event); Matcher s = this.singularity.matcher(value); if(s.matches()) { System.out.println("double="+value); result.add(event); } return result.iterator(); } private Object createValue(EventProperty property, String value) { Type propertyType = property.getType(); Object ret; if (Type.INT == propertyType) { ret = Integer.valueOf(value); } else if (Type.BIGINT == propertyType) { ret = Long.valueOf(value); } else if (Type.FLOAT == propertyType) { ret = Float.valueOf(value); } else if (Type.DOUBLE == propertyType) { ret = Double.valueOf(value); } else if (Type.BYTE == propertyType) { ret = value.getBytes(); } else if (Type.BOOLEAN == propertyType) { ret = false; } else if (Type.TIMESTAMP == propertyType) { ret = new Date(); } else if (Type.INTERVAL == propertyType) { ret = Long.valueOf(value); } else { ret = value; } return ret; } @Override public void setBigDecimal(int paramIndex, BigDecimal x) throws Exception { this.keys[paramIndex] = x; } @Override public void setBoolean(int paramIndex, boolean x) throws Exception { this.keys[paramIndex] = x; } @Override public void setBytes(int paramIndex, byte[] x) throws Exception { this.keys[paramIndex] = x; } @Override public void setDouble(int paramIndex, double x) throws Exception { this.keys[paramIndex] = x; } @Override public void setFloat(int paramIndex, float x) throws Exception { this.keys[paramIndex] = x; } @Override public void setInt(int paramIndex, int x) throws Exception { this.keys[paramIndex] = x; } @Override public void setLong(int paramIndex, long x) throws Exception { this.keys[paramIndex] = x; } @Override public void setNull(int paramIndex, int x) throws Exception { this.keys[paramIndex] = x; } @Override public void setString(int paramIndex, String x) throws Exception { this.keys[paramIndex] = x; } @Override public void setTimestamp(int paramIndex, Timestamp x) throws Exception { this.keys[paramIndex] = x; } } // @Override public Class getBeanClass() { return this.getClass(); } // @Override public Map getInstancePropertiesAsMap() { return null; } // @Override public String getProvider() { return null; } }
MyActivator.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; public class MyActivator implements BundleActivator { private ServiceRegistration reg; @Override public void start(BundleContext context) throws Exception { } @Override public void stop(BundleContext arg0) throws Exception { } }
MyHandler.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.xml.NamespaceHandler; import org.springframework.beans.factory.xml.NamespaceHandlerSupport; import org.springframework.beans.factory.xml.ParserContext; import org.w3c.dom.Element; import org.w3c.dom.Node; import tests.functional.cartridge.externaldatasource.common.apps.cart2.spring.FileDefinitionParser; public class MyHandler implements NamespaceHandler { private NamespaceHandlerSupport support = new NamespaceHandlerSupport() { public void init() { registerBeanDefinitionParser("udds", new UddsDefinitionParser()); } }; @Override public BeanDefinitionHolder decorate(Node node, BeanDefinitionHolder definition, ParserContext parserContext) { return this.support.decorate(node, definition, parserContext); } @Override public void init() { this.support.init(); } @Override public BeanDefinition parse(Element element, ParserContext parserContext) { return this.support.parse(element, parserContext); } }
UddsDefinitionParser.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; import org.springframework.core.Conventions; import org.w3c.dom.Attr; import org.w3c.dom.Element; import org.w3c.dom.NamedNodeMap; public class UddsDefinitionParser extends AbstractSingleBeanDefinitionParser { protected Class<?> getBeanClass(Element element) { return UddsFactoryBean.class; } protected void doParse(Element element, BeanDefinitionBuilder builder) { NamedNodeMap attributes = element.getAttributes(); for (int x = 0; x < attributes.getLength(); x++) { Attr attribute = (Attr) attributes.item(x); String name = attribute.getLocalName(); if ("id".equals(name)) continue; builder.addPropertyValue( Conventions.attributeNameToPropertyName(name), attribute.getValue()); } } }
UddsFactoryBean.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import org.osgi.framework.BundleContext; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.osgi.context.BundleContextAware; import org.springframework.osgi.extensions.annotation.ServiceReference; import com.bea.wlevs.ede.api.EventTypeRepository; public class UddsFactoryBean extends AbstractFactoryBean<MyCartridgeSource> implements InitializingBean, BeanNameAware, BundleContextAware { private EventTypeRepository etr; private BundleContext bundleContext; private String beanName; private String eventType; private String pattern; private String singularity; @ServiceReference public void setEventTypeRepository(EventTypeRepository etr) { this.etr = etr; } @Override public void setBundleContext(BundleContext context) { this.bundleContext = context; } @Override public void setBeanName(String name) { this.beanName = name; } public String getEventType() { return this.eventType; } public void setEventType(String eventType) { this.eventType = eventType; } private String keyProperty; public String getKeyProperty() { return keyProperty; } public void setKeyProperty(String names) { keyProperty = names; } @Override protected MyCartridgeSource createInstance() throws Exception { MyCartridgeSource ret = new MyCartridgeSource(); System.out.println("id="+this.beanName+",eventType="+this.eventType); ret.setId(this.beanName); ret.setEventType(this.eventType); ret.setPattern(this.pattern); ret.setSingularity(singularity); ret.setKeyProperty(keyProperty); return ret; } @Override public Class<?> getObjectType() { return MyCartridgeSource.class; } public void setPattern(String pattern) { this.pattern = pattern; } public String getPattern() { return pattern; } public void setSingularity(String singularity) { this.singularity = singularity; } public String getSingularity() { return singularity; } }