13 User-Defined Functions

This chapter describes how you can write user-defined functions for use in Oracle Continuous Query Language (Oracle CQL) to perform more advanced or application-specific operations on stream data than is possible using built-in functions.

For more information, see Functions.

This chapter includes the following sections:

13.1 Introduction to Oracle CQL User-Defined Functions

You can write user-defined functions in Java to provide functionality that is not available in Oracle CQL or Oracle CQL built-in functions. You can create a user-defined function that returns an aggregate value or a single (non-aggregate) value.

For example, you can use user-defined functions in the following:

  • The select list of a SELECT statement

  • The condition of a WHERE clause

Note:

You can also create user-defined windows (see User-Defined Stream-to-Relation Window Operators).

To make your user-defined function available for use in Oracle CQL queries, the JAR file that contains the user-defined function implementation class must be in the Oracle Stream Explorer server class path or the Oracle Stream Explorer server class path must be modified to include the JAR file.

For more information, see:

13.1.1 Types of User-Defined Functions

You can create the following types of user-defined functions:

You can create overloaded functions and you can override built-in functions.

13.1.1.1 User-Defined Single-Row Functions

A user-defined single-row function is a function that returns a single result row for every row of a queried stream or view (for example, like the concat built-in function does).

For more information, see How to Implement a User-Defined Single-Row Function.

13.1.1.2 User-Defined Aggregate Functions

A user-defined aggregate is a function that implements com.bea.wlevs.processor.AggregationFunctionFactory and returns a single aggregate result based on group of tuples, rather than on a single tuple (for example, like the sum built-in function does).

Consider implementing your aggregate function so that it performs incremental processing, if possible. This will improve scalability and performance because the cost of (re)computation on arrival of new events will be proportional to the number of new events as opposed to the total number of events seen thus far.

For more information, see How to Implement a User-Defined Aggregate Function.

13.1.2 User-Defined Function Data Types

User-defined functions support any of the built-in Oracle CQL data types listed in Oracle CQL Built-in Data Types. See the table in that section for a list of Oracle CQL data types and their Java equivalents.

The Oracle CQL data types shown there list the data types you can specify in the Oracle CQL statement you use to register your user-defined function. The Java equivalents are the Java data types you can use in your user-defined function implementation.

At run time, Oracle Stream Explorer maps between the Oracle CQL data type and the Java data type. If your user-defined function returns a data type that is not in this list, Oracle Stream Explorer will throw a ClassCastException.

For more information about data conversion, see Data Type Conversion.

13.1.3 User-Defined Functions and the Oracle Stream Explorer Server Cache

You can access an Oracle Stream Explorer cache from an Oracle CQL statement or user-defined function. For more information, see Oracle Fusion Middleware Developer's Guide for Oracle Stream Explorer.

13.2 Implementing a User-Defined Function

This section describes:

For more information, see Introduction to Oracle CQL User-Defined Functions.

13.2.1 How to Implement a User-Defined Single-Row Function

You implement a user-defined single-row function by implementing a Java class that provides a public constructor and a public method that is invoked to execute the function.

To implement a user-defined single-row function:

  1. Implement a Java class.

    Ensure that the data type of the return value corresponds to a supported data type as User-Defined Function Data Types describes.

    For more information on accessing the Oracle Stream Explorer cache from a user-defined function, see User-Defined Functions and the Oracle Stream Explorer Server Cache.

    package com.bea.wlevs.example.function;
    
    public class MyMod {
        public Object execute(int arg0, int arg1) {
            return new Integer(arg0 % arg1);
        }
    }
    
  2. Compile the user-defined function Java implementation class and register the class in your Oracle Stream Explorer application assembly file.
    <wlevs:processor id="testProcessor">
        <wlevs:listener ref="providerCache"/>
        <wlevs:listener ref="outputCache"/>
        <wlevs:cache-source ref="testCache"/>
        <wlevs:function function-name="mymod" exec-method="execute" />
            <bean class="com.bea.wlevs.example.function.MyMod"/>
        </wlevs:function>
    </wlevs:processor>
    

    Specify the method that is invoked to execute the function using the wlevs:function element exec-method attribute. This method must be public and must be uniquely identifiable by its name (that is, the method cannot have been overridden).

    For more information, see Developing Applications for Event Processing with Oracle Stream Explorer.

  3. Invoke your user-defined function in the select list of a SELECT statement or the condition of a WHERE clause.
    ...
    <view id="v1" schema="c1 c2 c3 c4"><![CDATA[ 
        select
            mymod(c1, 100), c2, c3, c4 
        from 
            S1
    ]]></view>
    ...
    <query id="q1"><![CDATA[ 
        select * from v1 [partition by c1 rows 1] where c4 - c3 = 2.3 
    ]]></query>
    ...
    

13.2.2 How to Implement a User-Defined Aggregate Function

You implement a user-defined aggregate function by implementing a Java class that implements the com.bea.wlevs.processor.AggregationFunctionFactory interface.

To implement a user-defined aggregate function:

  1. Implement a Java class as shown in the below example.

    Consider implementing your aggregate function so that it performs incremental processing, if possible. This will improve scalability and performance because the cost of (re)computation on arrival of new events will be proportional to the number of new events as opposed to the total number of events seen thus far. The user-defined aggregate function supports incremental processing.

    Ensure that the data type of the return value corresponds to a supported data type as User-Defined Function Data Types describes.

    For more information on accessing the Oracle Stream Explorer cache from a user-defined function, see User-Defined Functions and the Oracle Stream Explorer Server Cache.

    package com.bea.wlevs.test.functions;
     
    import com.bea.wlevs.processor.AggregationFunction;
    import com.bea.wlevs.processor.AggregationFunctionFactory;
     
    public class Variance implements AggregationFunctionFactory, AggregationFunction {
     
        private int count;
        private float sum;
        private float sumSquare;
     
        public Class<?>[] getArgumentTypes() {
            return new Class<?>[] {Integer.class};
        }
     
        public Class<?> getReturnType() {
            return Float.class;
        }
     
        public AggregationFunction newAggregationFunction() {
            return new Variance();
        }
     
        public void releaseAggregationFunction(AggregationFunction function) {
        }
     
        public Object handleMinus(Object[] params) {
            if (params != null && params.length == 1) {
                Integer param = (Integer) params[0];
                count--;
                sum -= param;
                sumSquare -= (param * param);
            }
            
            if (count == 0) {
                return null;
            } else {
                return getVariance();
            }
        }
     
        public Object handlePlus(Object[] params) {
            if (params != null && params.length == 1) {
                Integer param = (Integer) params[0];
                count++;
                sum += param;
                sumSquare += (param * param);
            }
            
            if (count == 0) {
                return null;
            } else {
                return getVariance();
            }
        }
     
        public Float getVariance() {
            float avg = sum / (float) count;
            float avgSqr = avg * avg;
            float var = sumSquare / (float)count - avgSqr;
            return var;
        }
     
        public void initialize() {
            count = 0;
            sum = 0.0F;
            sumSquare = 0.0F;
        }
     
    }
    
  2. Compile the user-defined function Java implementation class and register the class in your Oracle Stream Explorer application assembly file.

      <wlevs:processor id="testProcessor">
         <wlevs:listener ref="providerCache"/>
         <wlevs:listener ref="outputCache"/>
         <wlevs:cache-source ref="testCache"/>
         <wlevs:function function-name="var" is-incremental="true">
           <bean class="com.bea.wlevs.test.functions.Variance"/>
         </wlevs:function>
       </wlevs:processor>
    

    You must set the is-incremental attribute of the function element to true to indicate that the user-defined function, var, has an incremental implementation. Setting the is-incremental function to true guarantees that Oracle Stream Explorer calls the handleMinus method when it purges events from the current processing window. If this attribute is set to false (default), then the handleMinus function is never called and instead Oracle Stream Explorer provides the full set of events of the current window with every call to the handlePlus method.

    For more information, see Developing Applications for Event Processing with Oracle Stream Explorer.

  3. Invoke your user-defined function in the select list of a SELECT statement or the condition of a WHERE clause.

    ...
    <query id="uda6"><![CDATA[ 
        select var(c2) from S4[range 3] 
    ]]></query>
    ...
    

    At run-time, when the user-defined aggregate is executed, and a new event becomes active in the window of interest, the aggregations will have to be recomputed (since the set over which the aggregations are defined has a new member). To do so, Oracle Stream Explorer passes only the new event (rather than the entire active set) to the appropriate handler context by invoking the appropriate handlePlus* method. This state can now be updated to include the new event. Thus, the aggregations have been recomputed in an incremental fashion.

    Similarly, when an event expires from the window of interest, the aggregations will have to be recomputed (since the set over which the aggregations are defined has lost a member). To do so, Oracle Stream Explorer passes only the expired event (rather than the entire active set) to the appropriate handler context by invoking the appropriate handleMinus method. As before, the state in the handler context can be incrementally updated to accommodate expiry of the event in an incremental fashion.