13 User-Defined Functions

Use user-defined functions to perform more advanced or application-specific operations on stream data than is possible using built-in functions.

For more information, see Section 1.1.11, "Functions".

13.1 Introduction to Oracle CQL User-Defined Functions

You can write user-defined functions in Java to provide functionality that is not available in Oracle CQL or Oracle CQL built-in functions. You can create a user-defined function that returns an aggregate value or a single (non-aggregate) value.

For example, you can use user-defined functions in the following:

  • The select list of a SELECT statement

  • The condition of a WHERE clause

To make your user-defined function available for use in Oracle CQL queries, the JAR file that contains the user-defined function implementation class must be in the Oracle CEP server classpath or the Oracle CEP server classpath must be modified to include the JAR file.

For more information, see:

13.1.1 Types of User-Defined Functions

Using the classes in the oracle.cep.extensibility.functions package you can create the following types of user-defined functions:

You can create overloaded functions and you can override built-in functions.

13.1.1.1 User-Defined Single-Row Functions

A user-defined single-row function is a function that returns a single result row for every row of a queried stream or view (for example, like the concat built-in function does).

For more information, see "How to Implement a User-Defined Single-Row Function".

13.1.1.2 User-Defined Aggregate Functions

A user-defined aggregate is a function that implements com.bea.wlevs.processor.AggregationFunctionFactory and returns a single aggregate result based on group of tuples, rather than on a single tuple (for example, like the sum built-in function does).

Consider implementing your aggregate function so that it performs incremental processing, if possible. This will improve scalability and performance because the cost of (re)computation on arrival of new events will be proportional to the number of new events as opposed to the total number of events seen thus far.

For more information, see "How to Implement a User-Defined Aggregate Function".

13.1.2 User-Defined Function Datatypes

Table 13-1 lists the datatypes you can specify when you implement and register a user-defined function.

Table 13-1 User-Defined Function Datatypes

Oracle CQL Datatype Equivalent Java Datatype

bigint

java.lang.Long

char

java.lang.String

double

java.lang.Double

float

java.lang.Float

int

java.lang.Integer

Object

java.lang.Object


The Oracle CQL Datatype column lists the datatypes you can specify in the Oracle CQL statement you use to register your user-defined function and the Equivalent Java Datatype column lists the Java datatype equivalents you can use in your user-defined function implementation.

At run time, Oracle CEP maps between the Oracle CQL datatype and the Java datatype. If your user-defined function returns a datatype that is not in this list, Oracle CEP will throw a ClassCastException.

For more information about data conversion, see Section 2.3.4, "Datatype Conversion".

13.1.3 User-Defined Functions and the Oracle CEP Server Cache

You can access an Oracle CEP cache from an Oracle CQL statement or user-defined function.

For more information, see:

13.2 Implementing a User-Defined Function

This section describes:

For more information, see Section 13.1, "Introduction to Oracle CQL User-Defined Functions".

13.2.1 How to Implement a User-Defined Single-Row Function

You implement a user-defined single-row function by implementing a Java class that provides a public constructor and a public method that is invoked to execute the function.

To implement a user-defined single-row function:

  1. Implement a Java class as Example 13-1 shows.

    Ensure that the data type of the return value corresponds to a supported data type as Section 13.1.2, "User-Defined Function Datatypes" describes.

    For more information on accessing the Oracle CEP cache from a user-defined function, see Section 13.1.3, "User-Defined Functions and the Oracle CEP Server Cache".

    Example 13-1 MyMod.java User-Defined Single-Row Function

    package com.bea.wlevs.example.function;
    
    public class MyMod {
        public Object execute(int arg0, int arg1) {
            return new Integer(arg0 % arg1);
        }
    }
    
  2. Compile the user-defined function Java implementation class and register the class in your Oracle CEP application assembly file as Example 13-2 shows.

    Example 13-2 Single-Row User Defined Function for an Oracle CQL Processor

    <wlevs:processor id="testProcessor">
        <wlevs:listener ref="providerCache"/>
        <wlevs:listener ref="outputCache"/>
        <wlevs:cache-source ref="testCache"/>
        <wlevs:function function-name="mymod" exec-method=”execute” />
            <bean class="com.bea.wlevs.example.function.MyMod"/>
        </wlevs:function>
    </wlevs:processor>
    

    Specify the method that is invoked to execute the function using the wlevs:function element exec-method attribute. This method must be public and must be uniquely identifiable by its name (that is, the method cannot have been overridden).

    For more information, see "wlevs:function" in the Oracle Complex Event Processing Developer's Guide for Eclipse.

  3. Invoke your user-defined function in the select list of a SELECT statement or the condition of a WHERE clause as Example 13-3 shows.

    Example 13-3 Accessing a User-Defined Single-Row Function in Oracle CQL

    ...
    <view id="v1" schema="c1 c2 c3 c4"><![CDATA[ 
        select
            mymod(c1, 100), c2, c3, c4 
        from 
            S1
    ]]></view>
    ...
    <query id="q1"><![CDATA[ 
        select * from v1 [partition by c1 rows 1] where c4 - c3 = 2.3 
    ]]></query>
    ...
    

13.2.2 How to Implement a User-Defined Aggregate Function

You implement a user-defined aggregate function by implementing a Java class that implements the com.bea.wlevs.processor.AggregationFunctionFactory interface.

To implement a user-defined aggregate function:

  1. Implement a Java class as Example 13-4 shows.

    Consider implementing your aggregate function so that it performs incremental processing, if possible. This will improve scalability and performance because the cost of (re)computation on arrival of new events will be proportional to the number of new events as opposed to the total number of events seen thus far. The user-defined aggregate function in Example 13-4 supports incremental processing.

    Ensure that the data type of the return value corresponds to a supported data type as Section 13.1.2, "User-Defined Function Datatypes" describes.

    For more information on accessing the Oracle CEP cache from a user-defined function, see Section 13.1.3, "User-Defined Functions and the Oracle CEP Server Cache".

    Example 13-4 Variance.java User-Defined Aggregate Function

    package com.bea.wlevs.test.functions;
     
    import com.bea.wlevs.processor.AggregationFunction;
    import com.bea.wlevs.processor.AggregationFunctionFactory;
     
    public class Variance implements AggregationFunctionFactory, AggregationFunction {
     
        private int count;
        private float sum;
        private float sumSquare;
     
        public Class<?>[] getArgumentTypes() {
            return new Class<?>[] {Integer.class};
        }
     
        public Class<?> getReturnType() {
            return Float.class;
        }
     
        public AggregationFunction newAggregationFunction() {
            return new Variance();
        }
     
        public void releaseAggregationFunction(AggregationFunction function) {
        }
     
        public Object handleMinus(Object[] params) {
            if (params != null && params.length == 1) {
                Integer param = (Integer) params[0];
                count--;
                sum -= param;
                sumSquare -= (param * param);
            }
            
            if (count == 0) {
                return null;
            } else {
                return getVariance();
            }
        }
     
        public Object handlePlus(Object[] params) {
            if (params != null && params.length == 1) {
                Integer param = (Integer) params[0];
                count++;
                sum += param;
                sumSquare += (param * param);
            }
            
            if (count == 0) {
                return null;
            } else {
                return getVariance();
            }
        }
     
        public Float getVariance() {
            float avg = sum / (float) count;
            float avgSqr = avg * avg;
            float var = sumSquare / (float)count - avgSqr;
            return var;
        }
     
        public void initialize() {
            count = 0;
            sum = 0.0F;
            sumSquare = 0.0F;
        }
     
    }
    
  2. Compile the user-defined function Java implementation class and register the class in your Oracle CEP application assembly file as Example 13-5 shows.

    Example 13-5 Aggregate User Defined Function for an Oracle CQL Processor

      <wlevs:processor id="testProcessor">
         <wlevs:listener ref="providerCache"/>
         <wlevs:listener ref="outputCache"/>
         <wlevs:cache-source ref="testCache"/>
         <wlevs:function function-name="var">
           <bean class="com.bea.wlevs.test.functions.Variance"/>
         </wlevs:function>
       </wlevs:processor>
    

    For more information, see "wlevs:function" in the Oracle Complex Event Processing Developer's Guide for Eclipse.

  3. Invoke your user-defined function in the select list of a SELECT statement or the condition of a WHERE clause as Example 13-6 shows.

    Example 13-6 Accessing a User-Defined Aggregate Function in Oracle CQL

    ...
    <query id="uda6"><![CDATA[ 
        select var(c2) from S4[range 3] 
    ]]></query>
    ...
    

    At run-time, when the user-defined aggregate is executed, and a new event becomes active in the window of interest, the aggregations will have to be recomputed (since the set over which the aggregations are defined has a new member). To do so, Oracle CEP passes only the new event (rather than the entire active set) to the appropriate handler context by invoking the appropriate handlePlus* method in Example 13-4. This state can now be updated to include the new event. Thus, the aggregations have been recomputed in an incremental fashion.

    Similarly, when an event expires from the window of interest, the aggregations will have to be recomputed (since the set over which the aggregations are defined has lost a member). To do so, Oracle CEP passes only the expired event (rather than the entire active set) to the appropriate handler context by invoking the appropriate handleMinus method in Example 13-4. As before, the state in the handler context can be incrementally updated to accommodate expiry of the event in an incremental fashion.