This chapter describes how you can write user-defined functions for use in Oracle Continuous Query Language (Oracle CQL) to perform more advanced or application-specific operations on stream data than is possible using built-in functions.
For more information, see Functions.
This chapter includes the following sections:
You can write user-defined functions in Java to provide functionality that is not available in Oracle CQL or Oracle CQL built-in functions. You can create a user-defined function that returns an aggregate value or a single (non-aggregate) value.
For example, you can use user-defined functions in the following:
The select list of a SELECT
statement
The condition of a WHERE
clause
Note:
You can also create user-defined windows (see User-Defined Stream-to-Relation Window Operators).
To make your user-defined function available for use in Oracle CQL queries, the JAR file that contains the user-defined function implementation class must be in the Oracle Stream Explorer server class path or the Oracle Stream Explorer server class path must be modified to include the JAR file.
For more information, see:
You can create the following types of user-defined functions:
You can create overloaded functions and you can override built-in functions.
A user-defined single-row function is a function that returns a single result row for every row of a queried stream or view (for example, like the concat
built-in function does).
For more information, see How to Implement a User-Defined Single-Row Function.
A user-defined aggregate is a function that implements com.bea.wlevs.processor.AggregationFunctionFactory
and returns a single aggregate result based on group of tuples, rather than on a single tuple (for example, like the sum
built-in function does).
Consider implementing your aggregate function so that it performs incremental processing, if possible. This will improve scalability and performance because the cost of (re)computation on arrival of new events will be proportional to the number of new events as opposed to the total number of events seen thus far.
For more information, see How to Implement a User-Defined Aggregate Function.
User-defined functions support any of the built-in Oracle CQL data types listed in Oracle CQL Built-in Data Types. See the table in that section for a list of Oracle CQL data types and their Java equivalents.
The Oracle CQL data types shown there list the data types you can specify in the Oracle CQL statement you use to register your user-defined function. The Java equivalents are the Java data types you can use in your user-defined function implementation.
At run time, Oracle Stream Explorer maps between the Oracle CQL data type and the Java data type. If your user-defined function returns a data type that is not in this list, Oracle Stream Explorer will throw a ClassCastException
.
For more information about data conversion, see Data Type Conversion.
This section describes:
For more information, see Introduction to Oracle CQL User-Defined Functions.
You implement a user-defined single-row function by implementing a Java class that provides a public constructor and a public method that is invoked to execute the function.
To implement a user-defined single-row function:
You implement a user-defined aggregate function by implementing a Java class that implements the com.bea.wlevs.processor.AggregationFunctionFactory
interface.
To implement a user-defined aggregate function:
Implement a Java class as shown in the below example.
Consider implementing your aggregate function so that it performs incremental processing, if possible. This will improve scalability and performance because the cost of (re)computation on arrival of new events will be proportional to the number of new events as opposed to the total number of events seen thus far. The user-defined aggregate function supports incremental processing.
Ensure that the data type of the return value corresponds to a supported data type as User-Defined Function Data Types describes.
For more information on accessing the Oracle Stream Explorer cache from a user-defined function, see User-Defined Functions and the Oracle Stream Explorer Server Cache.
package com.bea.wlevs.test.functions; import com.bea.wlevs.processor.AggregationFunction; import com.bea.wlevs.processor.AggregationFunctionFactory; public class Variance implements AggregationFunctionFactory, AggregationFunction { private int count; private float sum; private float sumSquare; public Class<?>[] getArgumentTypes() { return new Class<?>[] {Integer.class}; } public Class<?> getReturnType() { return Float.class; } public AggregationFunction newAggregationFunction() { return new Variance(); } public void releaseAggregationFunction(AggregationFunction function) { } public Object handleMinus(Object[] params) { if (params != null && params.length == 1) { Integer param = (Integer) params[0]; count--; sum -= param; sumSquare -= (param * param); } if (count == 0) { return null; } else { return getVariance(); } } public Object handlePlus(Object[] params) { if (params != null && params.length == 1) { Integer param = (Integer) params[0]; count++; sum += param; sumSquare += (param * param); } if (count == 0) { return null; } else { return getVariance(); } } public Float getVariance() { float avg = sum / (float) count; float avgSqr = avg * avg; float var = sumSquare / (float)count - avgSqr; return var; } public void initialize() { count = 0; sum = 0.0F; sumSquare = 0.0F; } }
Compile the user-defined function Java implementation class and register the class in your Oracle Stream Explorer application assembly file.
<wlevs:processor id="testProcessor"> <wlevs:listener ref="providerCache"/> <wlevs:listener ref="outputCache"/> <wlevs:cache-source ref="testCache"/> <wlevs:function function-name="var" is-incremental="true"> <bean class="com.bea.wlevs.test.functions.Variance"/> </wlevs:function> </wlevs:processor>
You must set the is-incremental
attribute of the function element to true
to indicate that the user-defined function, var
, has an incremental implementation. Setting the is-incremental
function to true
guarantees that Oracle Stream Explorer calls the handleMinus
method when it purges events from the current processing window. If this attribute is set to false
(default), then the handleMinus
function is never called and instead Oracle Stream Explorer provides the full set of events of the current window with every call to the handlePlus
method.
For more information, see Developing Applications for Event Processing with Oracle Stream Explorer.
Invoke your user-defined function in the select list of a SELECT
statement or the condition of a WHERE
clause.
... <query id="uda6"><![CDATA[ select var(c2) from S4[range 3] ]]></query> ...
At run-time, when the user-defined aggregate is executed, and a new event becomes active in the window of interest, the aggregations will have to be recomputed (since the set over which the aggregations are defined has a new member). To do so, Oracle Stream Explorer passes only the new event (rather than the entire active set) to the appropriate handler context by invoking the appropriate handlePlus*
method. This state can now be updated to include the new event. Thus, the aggregations have been recomputed in an incremental fashion.
Similarly, when an event expires from the window of interest, the aggregations will have to be recomputed (since the set over which the aggregations are defined has lost a member). To do so, Oracle Stream Explorer passes only the expired event (rather than the entire active set) to the appropriate handler context by invoking the appropriate handleMinus
method. As before, the state in the handler context can be incrementally updated to accommodate expiry of the event in an incremental fashion.