Oracle® Fusion Middleware Oracle CQLデータ・カートリッジによるアプリケーションの開発 12c リリース(12.2.1.2.0) E82779-01 |
|
![]() 前へ |
![]() 次へ |
データ・カートリッジ・フレームワークは、ユーザーおよびベンダーがOracle CQL機能を拡張するためにカートリッジを作成できるようにするService Provider Interface (SPI)です。「Oracleビッグ・データ・カートリッジ」に説明されているHadoopカートリッジおよびNoSQLカートリッジは、データ・カートリッジ・フレームワークで作成されたカートリッジの例です。
データ・カートリッジ・フレームワークを使用すると、たとえば、Oracle CQL機能を拡張してテレマティック・アプリケーションの開発をサポートできます。テレマティック・アプリケーションには、電気通信(電気信号および電波)、自動車技術、輸送、電気工学(センサー、計装、ワイヤレスコミュニケーション)、コンピュータ・サイエンス(モノのインターネット)などが含まれます。
この章の内容は次のとおりです。
Oracle Stream Analyticsカートリッジは、外部関数、型、索引、Javaクラスおよびデータ・ソースを定義する、単一の管理可能ユニットです。カートリッジのユーザーは、次の形式のリンクのあるOracle CQLコードから、使用可能な関数、型、索引、Javaクラスおよびデータ・ソースを参照します。
myFunction@myCartridge(arg1)
Oracle Stream Analyticsデータ・カートリッジ・フレームワークで作成されたカートリッジは、Oracle Stream Analyticsライブラリです。つまり、ライブラリのデプロイと同じ方法で、コマンドラインから、またはOracle JDeveloperでカートリッジをデプロイできます。カートリッジをデプロイすると、すべての外部関数、型、索引、Javaクラスおよびデータ・ソースをOracle CQL問合せで使用できるようになります。アプリケーションをデプロイする前にカートリッジをデプロイする必要があります。アプリケーションを更新せずにカートリッジを更新できます。
com.oracle.cep.cartridge
パッケージには、カートリッジ・フレームワークJavaインタフェースが含まれます。この項では、インタフェース使用時に可能な操作、および必要な操作を説明しています。インタフェースおよび例外も簡単に説明しています。
次のことが可能です。
Oracle CQLで、表およびストリーム属性のタイプに対して任意のタイプ・システムを使用できます。
関数呼出しのための独自の索引データ構造を使用できます。
Oracle CQL内で可視の新しいJavaクラスを使用できます。カートリッジをデプロイする際に、新しいJavaクラスを持つアプリケーションまたはライブラリを含めます。新しいJavaクラスにアクセスするアプリケーションは、MANIFEST.MF
ファイルに正しいJavaパッケージをImport-Package
ヘッダー・エントリでインポートする必要があります。
次の操作が必要です。
カートリッジがサポートするすべての関数のリストが含まれる、すべてのデプロイ済カートリッジのMBeanを用意します。カートリッジがアンデプロイされると、MBeanインスタンスは登録解除されます。
ExternalFunctionProvider.listFunctions
メソッドを実装します。
カートリッジ外部データ・ソースに結び付けた表ソースのビーン・ステージMBeanを用意します。このMBeanは、id
やprovider
名などの表ソース・カスタム・プロパティのリストを提供します。
オプションで、表ソースSpring Beanファクトリはcom.bea.wlevs.management.configuration.spring.StageFactoryAccess
インタフェースを実装可能で、表ソースプロパティへのアクセス方法をカスタマイズできます。
com.oracle.cep.cartridge
パッケージは次のJavaインタフェースを提供します。
CapabilityProvider
: ExternalConnection
はこのインタフェースを実装可能で、<
(未満)、AND
、OR
など、サポートされる機能を指定できます。
ExternalConnection
: ExternalDataSource
に接続します。
ExternalConstants
: データ・カートリッジが使用する一般定数を定義します。このインタフェースは次の2つの定数を提供します。外部接続機能を提供するEQUALS
とExternalFunctionProvider
リンクid
を提供するSERVER_CONTEXT_LINK_ID
です。
ExternalDataSource
: getConnnection
メソッドを使用してコンテキスト・データの外部ソースに接続し、Oracle CQLプロセッサ・イベントに参加します。外部データ・ソースは、そのプロパティの構成をサポートしている必要があります。たとえば、NoSQLDBデータ・ソースは、ホスト、ポートおよびストア名の構成をサポートしています。
外部データ・ソースはサポートする関数を指定します。デフォルトでは、すべての外部データ・ソースが等値比較関数をサポートしています。
SELECT * FROM S[NOW], MyExternalDataSource WHERE S.id = MyExternalDataSource.id
データ・ソースをOracle CQLプロセッサで使用可能にするには、com.oracle.cep.cartridge.ExternalDataSource
インタフェースを実装したSpring Beanを登録し、そのSpring Beanを表ソース(wlevs:table-source
タグ)のターゲットにします。
ExternalFunction
: ExternalFunctionProvider
または別の外部エンティティにより提供される関数。
ExternalFunctionDefinition
: ExternalFunctionProvider
または他の外部エンティティにより提供されるOracle CQL問合せおよびビューで使用する関数のメタデータを指定します。
ExternalFunctionProvider
: Oracle CQL問合せおよびビューから直接アクセスできる関数のセットを定義します。getID
メソッドを使用して、外部関数プロバイダをOSGiサービスとして登録します。また、プロバイダは、ExternalContants.SERVER_CONTEXT_LINK_ID
サービス・プロパティを指定し、Oracle CQL問合せおよびビューで使用するリンクIDを示してプロバイダを識別する必要があります。
ExternalPredicate
: プリコンパイルされた文の述部を属性と述語句で表します。
ExternalPreparedStatement
: 外部関数プロバイダのプリコンパイルされた文を表し、同一または類似した関数を効率的に繰り返し実行します。
この項では、カートリッジの2つの例を説明します。算術カートリッジとデータ・ソース・カートリッジです。算術カートリッジは、Oracle Spatialデータ・カートリッジに記載されている、関数のみを含むSpatialカートリッジに類似したOracle CQL問合せで、算術関数を使用可能にします。データ・カートリッジは、「Oracleビッグ・データ・カートリッジ」に説明されているHadoopに類似したデータ・ソースを定義します。
Oracle Stream Analyticsアプリケーション内でOracle CQL問合せを使用可能にするには、各カートリッジを個別のアプリケーション・ライブラリとしてデプロイします。カートリッジをデプロイした後、Oracle Stream Analyticsアプリケーションまたはカートリッジを使用するアプリケーションをデプロイします。
算術カートリッジは次の関数クラスを持ちます。
追加、配列および例外操作の機能を提供するJavaクラスのセット。
Oracle CQL問合せでの配列の使用および例外のスローを可能にする配列機能および例外機能を提供するExceptionFunction.java
クラスおよびArrayFunciton.java
クラス。
ArithmeticActivator.java
クラスは、カートリッジ・バンドルを開始または停止します。
すべての関数クラスは、com.oracle.cep.cartridge.ExternalFunctionProvider
インタフェースを実装し、Oracle CQL問合せで使用する関数名を返すgetName
メソッドを持ちます。
たとえば、AddFunction.java
およびAddLongFunction.java
のgetName
メソッドは関数名plus
を返します。Oracle CQL問合せでこの関数名を使用して関数を呼び出せます。次の問合せはarithmetic
カートリッジでplus
関数を使用しinputChannel
から2つの整数を追加します。
SELECT plus@arithmetic(typeInt, typeInt2) AS typeInt FROM inputChannel
データ・ソース・カートリッジ・ファイル
カートリッジの例ではデータ・ソースを定義するJavaクラスのセットを使用します。
MyCartridgeSource.java
クラスはcom.oracle.cep.cartridge.ExternalDataSource interface
を実装します。このクラスはデータ・ソース接続機能を定義し、データベースでイベント・データの読込みおよび書込みを行います。
MyActivator.java
クラスはorg.osgi.framework.BundleActivator
を実装し、カートリッジ・バンドルを開始および停止するコードを提供します。
MyHandler.java
クラスはorg.springframework.beans.factory.xml.NamespaceHandler
を実装し、カートリッジ・ネームスペースを管理してUddsファクトリ・ビーンを登録するためのコードを提供します。
UddsDefinitionParser.java
クラスはorg.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
を拡張し、UddsFactoryBean
オブジェクトを解析および登録するためのコードを提供します。
UddsFactoryBean.java
クラスはorg.springframework.beans.factory.config.AbstractFactoryBean
を拡張し、イベントおよびイベント・タイプ・リポジトリを管理するためのコードを提供します。
この項では、データ・ソース・アプリケーションおよびデータ・ソース・カートリッジと、算術カートリッジのソース・コードを説明します。
AddFunction.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.Map; import com.oracle.cep.cartridge.ExternalFunction; public class AddFunction implements ExternalFunction{ @Override public String getName() { return "plus"; } @Override public Class<?>[] getParameterTypes() { Class<?>[] parameters = new Class<?>[2]; parameters[0] = java.lang.Integer.class; parameters[1] = java.lang.Integer.class; return parameters; } @Override public Class<?> getReturnType() { return java.lang.Integer.class; } @Override public Object execute(Object[] args, String caller, Map<String, Object> context) throws Exception { if(args.length != 2) throw new IllegalArgumentException("add function need an 2 parameters"); if(!(args[0] instanceof java.lang.Integer && args[1] instanceof java.lang.Integer)) { throw new IllegalArgumentException("add function only support java.lang.Integer"); } java.lang.Integer arg1 = (Integer) args[0]; java.lang.Integer arg2 = (Integer) args[1]; return new java.lang.Integer(arg1 + arg2); } }
ArithmeticActivator.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.Hashtable; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; import com.oracle.cep.cartridge.ExternalFunctionProvider; public class ArithmeticActivator implements BundleActivator { private ServiceRegistration reg; @Override public void start(BundleContext context) throws Exception { Hashtable props = new Hashtable(); props.put("server.context.link.id", "arithmetic"); this.reg = context.registerService(ExternalFunctionProvider.class.getName(), new UserDefineFunction(), props); } @Override public void stop(BundleContext arg0) throws Exception { this.reg.unregister(); } }
ArrayFunction.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.ArrayList; import java.util.List; import java.util.Map; import com.oracle.cep.cartridge.ExternalFunction; public class ArrayFunction implements ExternalFunction { @Override public String getName() { return "array"; } @Override public Class<?>[] getParameterTypes() { Class<?>[] parameters = new Class<?>[2]; parameters[0] = Integer.class; parameters[1] = Integer.class; return parameters; } @Override public Class<?> getReturnType() { return List.class; } @Override public Object execute(Object[] args, String caller, Map<String, Object> context) throws Exception { if(args.length == 0) { return null; } if(!(args[0] instanceof java.lang.Integer)) { throw new IllegalArgumentException("median function only supports java.lang.Integer"); } List ret = new ArrayList(); for(Object obj:args) { ret.add(obj); } return ret; } }
ExceptionFunction.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.Map; import com.oracle.cep.cartridge.ExternalFunction; public class ExceptionFunction implements ExternalFunction{ @Override public String getName() { return "exception" } @Override public Class<?>[] getParameterTypes() { return new Class<?>[]{Integer.class}; } @Override public Class<?> getReturnType() { return Integer.class; } @Override public Object execute(Object[] args, String caller, Map<String, Object> context) throws Exception { throw new NullPointerException("I am an excpetion"); } }
UserDefineFunctionClass.java
package tests.functional.cartridge.userdefine.common.libs.arithmetic; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import com.oracle.cep.cartridge.AmbiguousFunctionException; import com.oracle.cep.cartridge.ExternalFunction; import com.oracle.cep.cartridge.ExternalFunctionProvider; import com.oracle.cep.cartridge.FunctionNotFoundException; public class UserDefineFunction implements ExternalFunctionProvider { private ArrayList<ExternalFunction> functions = new ArrayList<ExternalFunction>(); public UserDefineFunction() { functions.add(new AddFunction()); functions.add(new ArrayFunction()); functions.add(new ExceptionFunction()); } @Override public ExternalFunction getFunction(String functionName, Class<?> [] parameterTypes, String caller, Map<String, Object> context) throws AmbiguousFunctionException, FunctionNotFoundException { if("plus".equalsIgnoreCase(functionName)) { return new AddFunction(); } else if("array".equalsIgnoreCase(functionName)) { return new ArrayFunction(); } else if("exception".equalsIgnoreCase(functionName)) { return new ExceptionFunction(); } throw new FunctionNotFoundException(functionName+" is not supported in arithmetic"); } @Override public String getId() { return "arithmetic"; } @Override public List<ExternalFunction> listFunctions(String caller, Map<String, Object> context) { ArrayList<ExternalFunction> functionList = new ArrayList<ExternalFunction>(); functionList.addAll(functions); return functionList; } }
データ・ソース・カートリッジは次のJavaクラス・ファイルで構成されています。
MyCartridgeSource.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import java.math.BigDecimal; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import oracle.cep.dataStructures.external.TupleValue; import org.springframework.osgi.extensions.annotation.ServiceReference; import com.bea.wlevs.ede.api.EventProperty; import com.bea.wlevs.ede.api.EventType; import com.bea.wlevs.ede.api.EventTypeRepository; import com.bea.wlevs.ede.api.Type; import com.bea.wlevs.management.configuration.spring.StageFactoryAccess; import com.oracle.cep.cartridge.ExternalConnection; import com.oracle.cep.cartridge.ExternalDataSource; import com.oracle.cep.cartridge.ExternalPredicate; import com.oracle.cep.cartridge.ExternalPreparedStatement; public class MyCartridgeSource implements StageFactoryAccess, ExternalDataSource { // @Override public Map<?, ?> getCacheDataSource() { return null; } private EventTypeRepository etr; @ServiceReference public void setEventTypeRepository(EventTypeRepository etr) { this.etr = etr; } private String eventType; @Override public String getEventType() { System.out.println("event type:" + this.eventType); return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } private long maxThreshhold = 0; @Override public long getExternalRowsThreshold() { return maxThreshhold; } public void setExternalRowsThreshold(long maxThreshhold) { this.maxThreshhold = maxThreshhold; } private String pattern; public String getPattern() { return pattern; } public void setPattern(String pattern) { this.pattern = pattern; } private String singularity; public String getSingularity() { return singularity; } public void setSingularity(String singularity) { this.singularity = singularity; } private String id; @Override public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public String getJDBCDataSource() { return null; } private Class keyClass; @Override public Class getKeyClass() { return Long.class; } public void setKeyClass(String className) throws ClassNotFoundException { this.keyClass = Class.forName(className); } private String[] keyPropertyNames; @Override public String[] getKeyPropertyNames() { return keyPropertyNames; } public void setKeyProperty(String names) { keyPropertyNames = names.split(","); } @Override public String getTableName() { return null; } public Class getObjectType() { return ExternalDataSource.class; } @Override public ExternalConnection getConnection() throws Exception { MyExternalConnection connection = new MyExternalConnection(this.etr.getEventType(this.eventType)); connection.setPattern(pattern); connection.setSingularity(singularity); return connection; } public static class MyExternalConnection implements ExternalConnection { private final EventType targetEventType; public MyExternalConnection(EventType eventtype) { this.targetEventType = eventtype; } private String pattern; public void setPattern(String pattern) { this.pattern = pattern; } private String singularity; public void setSingularity(String singularity) { this.singularity = singularity; } @Override public void close() throws Exception { } @Override public ExternalPreparedStatement prepareStatement(String relationName, List<String> relationAttrs, ExternalPredicate predicate) throws Exception { return new MyExternalPreparedStatement(this.targetEventType, predicate,this.pattern,this.singularity); } @Override public boolean supportsPredicate(ExternalPredicate predicate) throws Exception { return true; } } public static class MyExternalPreparedStatement implements ExternalPreparedStatement { private ExternalPredicate predicate; private Object[] keys = new Object[10]; private final EventType targetEventType; private Pattern pattern; private Pattern singularity; public MyExternalPreparedStatement(EventType targetEventType, ExternalPredicate predicate,String pattern,String singularity) { this.targetEventType = targetEventType; this.predicate = predicate; if (pattern == null) { this.pattern = Pattern.compile(".*"); } else { this.pattern = Pattern.compile(pattern); } if (singularity == null) { this.singularity = Pattern.compile("$.^"); } else { this.singularity = Pattern.compile(singularity); } } @Override public void close() throws Exception {} @Override public Iterator<Object> executeQuery() throws Exception { List<Object> result = new ArrayList<Object>(); List attrs = predicate.getAttributes(); String value=""; for(int i = 0;i<attrs.size();i++) { if(keys[i+1] ==null) { System.out.println("empty="+keys[i+1]); return result.iterator(); } value = keys[i+1].toString(); Matcher m = this.pattern.matcher(value); if(!m.matches()) { System.out.println("empty="+value); return result.iterator(); } } TupleValue event = (TupleValue) this.targetEventType.createEvent(); EventProperty[] properties = this.targetEventType.getProperties(); for (int i = 0; i < properties.length; i++) { properties[i].setValue(event, createValue(properties[i], value)); } System.out.println("one="+value); result.add(event); Matcher s = this.singularity.matcher(value); if(s.matches()) { System.out.println("double="+value); result.add(event); } return result.iterator(); } private Object createValue(EventProperty property, String value) { Type propertyType = property.getType(); Object ret; if (Type.INT == propertyType) { ret = Integer.valueOf(value); } else if (Type.BIGINT == propertyType) { ret = Long.valueOf(value); } else if (Type.FLOAT == propertyType) { ret = Float.valueOf(value); } else if (Type.DOUBLE == propertyType) { ret = Double.valueOf(value); } else if (Type.BYTE == propertyType) { ret = value.getBytes(); } else if (Type.BOOLEAN == propertyType) { ret = false; } else if (Type.TIMESTAMP == propertyType) { ret = new Date(); } else if (Type.INTERVAL == propertyType) { ret = Long.valueOf(value); } else { ret = value; } return ret; } @Override public void setBigDecimal(int paramIndex, BigDecimal x) throws Exception { this.keys[paramIndex] = x; } @Override public void setBoolean(int paramIndex, boolean x) throws Exception { this.keys[paramIndex] = x; } @Override public void setBytes(int paramIndex, byte[] x) throws Exception { this.keys[paramIndex] = x; } @Override public void setDouble(int paramIndex, double x) throws Exception { this.keys[paramIndex] = x; } @Override public void setFloat(int paramIndex, float x) throws Exception { this.keys[paramIndex] = x; } @Override public void setInt(int paramIndex, int x) throws Exception { this.keys[paramIndex] = x; } @Override public void setLong(int paramIndex, long x) throws Exception { this.keys[paramIndex] = x; } @Override public void setNull(int paramIndex, int x) throws Exception { this.keys[paramIndex] = x; } @Override public void setString(int paramIndex, String x) throws Exception { this.keys[paramIndex] = x; } @Override public void setTimestamp(int paramIndex, Timestamp x) throws Exception { this.keys[paramIndex] = x; } } // @Override public Class getBeanClass() { return this.getClass(); } // @Override public Map getInstancePropertiesAsMap() { return null; } // @Override public String getProvider() { return null; } }
MyActivator.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; public class MyActivator implements BundleActivator { private ServiceRegistration reg; @Override public void start(BundleContext context) throws Exception { } @Override public void stop(BundleContext arg0) throws Exception { } }
MyHandler.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.xml.NamespaceHandler; import org.springframework.beans.factory.xml.NamespaceHandlerSupport; import org.springframework.beans.factory.xml.ParserContext; import org.w3c.dom.Element; import org.w3c.dom.Node; import tests.functional.cartridge.externaldatasource.common.apps.cart2.spring.FileDefinitionParser; public class MyHandler implements NamespaceHandler { private NamespaceHandlerSupport support = new NamespaceHandlerSupport() { public void init() { registerBeanDefinitionParser("udds", new UddsDefinitionParser()); } }; @Override public BeanDefinitionHolder decorate(Node node, BeanDefinitionHolder definition, ParserContext parserContext) { return this.support.decorate(node, definition, parserContext); } @Override public void init() { this.support.init(); } @Override public BeanDefinition parse(Element element, ParserContext parserContext) { return this.support.parse(element, parserContext); } }
UddsDefinitionParser.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; import org.springframework.core.Conventions; import org.w3c.dom.Attr; import org.w3c.dom.Element; import org.w3c.dom.NamedNodeMap; public class UddsDefinitionParser extends AbstractSingleBeanDefinitionParser { protected Class<?> getBeanClass(Element element) { return UddsFactoryBean.class; } protected void doParse(Element element, BeanDefinitionBuilder builder) { NamedNodeMap attributes = element.getAttributes(); for (int x = 0; x < attributes.getLength(); x++) { Attr attribute = (Attr) attributes.item(x); String name = attribute.getLocalName(); if ("id".equals(name)) continue; builder.addPropertyValue( Conventions.attributeNameToPropertyName(name), attribute.getValue()); } } }
UddsFactoryBean.java
package tests.functional.cartridge.userdefine.common.libs.datasource; import org.osgi.framework.BundleContext; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.osgi.context.BundleContextAware; import org.springframework.osgi.extensions.annotation.ServiceReference; import com.bea.wlevs.ede.api.EventTypeRepository; public class UddsFactoryBean extends AbstractFactoryBean<MyCartridgeSource> implements InitializingBean, BeanNameAware, BundleContextAware { private EventTypeRepository etr; private BundleContext bundleContext; private String beanName; private String eventType; private String pattern; private String singularity; @ServiceReference public void setEventTypeRepository(EventTypeRepository etr) { this.etr = etr; } @Override public void setBundleContext(BundleContext context) { this.bundleContext = context; } @Override public void setBeanName(String name) { this.beanName = name; } public String getEventType() { return this.eventType; } public void setEventType(String eventType) { this.eventType = eventType; } private String keyProperty; public String getKeyProperty() { return keyProperty; } public void setKeyProperty(String names) { keyProperty = names; } @Override protected MyCartridgeSource createInstance() throws Exception { MyCartridgeSource ret = new MyCartridgeSource(); System.out.println("id="+this.beanName+",eventType="+this.eventType); ret.setId(this.beanName); ret.setEventType(this.eventType); ret.setPattern(this.pattern); ret.setSingularity(singularity); ret.setKeyProperty(keyProperty); return ret; } @Override public Class<?> getObjectType() { return MyCartridgeSource.class; } public void setPattern(String pattern) { this.pattern = pattern; } public String getPattern() { return pattern; } public void setSingularity(String singularity) { this.singularity = singularity; } public String getSingularity() { return singularity; } }