Adding Custom Functions and Custom Stages

Custom functions are user-defined functions that are custom implementations to an application's built-in functions.

Creating a Custom Jar

A custom jar is a user-supplied Jar archive containing Java classes for custom stage types or custom functions that will be used within a pipeline.

To create a Custom Jar:

  1. On the Catalog page, click Create New Item, and select Custom Jar from the drop-down list.
  1. On the Type Properties screen, enter the following details:
    • Name
    • Description
    • Tags
    • Custom Jar Type: Select Custom Jar, from the drop-down list.
  2. Click Next.
  3. On the Custom Jar Details page, click Upload file, select the jar file that you want to import into the application.
  4. Click Save.

Your custom Java/Scala class must implement the BatchEventProcessor interface as defined in the Javadoc.

Adding Custom Functions

The custom functions get installed, when you add a custom jar file.

The custom functions will be available in the Expression Builder after they get installed. The custom functions will be listed under the Custom category.

Implementing Custom Functions

For a custom function, apply the @OsaFunction annotation to a method in any class, including a class implementing a custom stage type. For more information, see the Javadoc and the Sample.

Note:

Functions with same name within same package/class/method in same/different jar are not supported.

Sample: Encrypt a Column

This sample class defines a custom function that takes one textual field and produces an MD5 hash for it.

package com.oracle.osacs;
 
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import com.oracle.cep.api.annotations.OsaFunction;
 
 
public class CustomMD5Function {
               
        @OsaFunction(name = "md5", description = "Create an md5 hex from a string")
        public static String md5(String message) {
               String result = null;
               
               try {
                       MessageDigest md = MessageDigest.getInstance("MD5");
                   md.update(message.getBytes());
                   byte[] digest = md.digest();           
                   result = javax.xml.bind.DatatypeConverter.printHexBinary(digest);     
               } catch (NoSuchAlgorithmException e) {
                       e.printStackTrace();
               }
                                           
             return result;        
        }       
 
}

Adding a Custom Stage

To add a custom stage:
  1. Open the required pipeline in Pipeline Editor.
  2. Right-click the stage after which you want to add a custom stage. Click Add a Stage, and Custom, and then select Custom Stage from Custom Jars.
  3. Enter a name and suitable description for the custom stage and click Save.
  4. In the stage editor, enter the following details:
    1. Custom Stage Type: Select the custom stage that was previously installed though a custom jar
    2. Input Mapping: Select the corresponding column from the previous stage for every input parameter

You can add multiple custom stages based on your use case.

Sample: Encrypt a Column

This sample class defines a custom stage that takes one textual field and produces an MD5 hash for it.

package com.oracle.osacs;

import com.oracle.cep.api.event.*;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.stage.EventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

@SuppressWarnings("serial")
@OsaStage(name = "md5", description = "Create an md5 hex from a string", inputSpec = "input, message:string", outputSpec = "output, message:string, md5:string")
public class CustomMD5Stage implements EventProcessor {

	EventFactory eventFactory;
	EventSpec outputSpec;

	@Override
	public void init(ProcessorContext ctx, Map<String, String> config) {
		eventFactory = ctx.getEventFactory();
		OsaStage meta = CustomMD5Stage.class.getAnnotation(OsaStage.class);
		String spec = meta.outputSpec();
		outputSpec = TupleEventSpec.fromAnnotation(spec);
	}

	@Override
	public void close() {
	}

	@Override
	public Event processEvent(Event event) {
		Attr attr = event.getAttr("message");
		Map<String, Object> values = new HashMap<String, Object>();
		if (!attr.isNull()) {
			String val = (String) attr.getObjectValue();
			String md5 = null;
			try {
				MessageDigest md = MessageDigest.getInstance("MD5");
				md.update(val.getBytes());
				byte[] digest = md.digest();
				md5 = javax.xml.bind.DatatypeConverter.printHexBinary(digest);
			} catch (NoSuchAlgorithmException e) {
				e.printStackTrace();
			}
			values.put("message", val);
			values.put("md5", md5);
		} else {
			values.put("message", "empty");
			values.put("md5", "empty");
		}
		Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
		return outputEvent;
	}
}

Sample: Invoke a REST Service

package com.oracle.osacs;

import com.oracle.cep.api.event.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.stage.EventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

class BookResult {
	String isbn;
	String title;
	String publishedDate;
	String publisher;
}

@SuppressWarnings("serial")
@OsaStage(name = "RestBooks", description = "Provide info for a given book", inputSpec = "input, isbn:string", outputSpec = "output, isbn:string, title:string, publishedDate:string, publisher:string")
public class CustomStageRest implements EventProcessor {

	EventFactory eventFactory;
	EventSpec outputSpec;

	static Properties props = new Properties();

	static {
		try {
			props.load(CustomStageRest.class.getResourceAsStream("/CustomStageRest.properties"));
		} catch (IOException ioex) {
			ioex.printStackTrace();
		}
	}

	@Override
	public void init(ProcessorContext ctx, Map<String, String> config) {
		eventFactory = ctx.getEventFactory();
		OsaStage meta = CustomStageRest.class.getAnnotation(OsaStage.class);
		String spec = meta.outputSpec();
		outputSpec = TupleEventSpec.fromAnnotation(spec);
	}

	@Override
	public void close() {
	}

	@Override
	public Event processEvent(Event event) {
		Attr isbnAttr = event.getAttr("isbn");

		Map<String, Object> values = new HashMap<String, Object>();
		if (!isbnAttr.isNull()) {
			String isbn = (String) isbnAttr.getObjectValue();

			BookResult result = getBook(isbn);

			values.put("isbn", isbn);
			values.put("title", result.title);
			values.put("publishedDate", result.publishedDate);
			values.put("publisher", result.publisher);

		} else {
			values.put("isbn", "");
			values.put("title", "");
			values.put("publishedDate", "");
			values.put("publisher", "");
		}
		Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
		return outputEvent;
	}

	
	/**
	 * Calls the Google Books REST API to get book information based on the ISBN ID
	 * @param isbn
	 * @return BookResult book information
	 */
	public BookResult getBook(String isbn) {
		HttpRequestBase request;
		BookResult result = null;

		String uri = "https://www.googleapis.com/books/v1/volumes?q=isbn:" + isbn;

		request = new HttpGet(uri);

		CloseableHttpClient client = HttpClientBuilder.create().build();

		String proxyHost = props.getProperty("proxyHost");
		String proxyPort = props.getProperty("proxyPort");
		if (proxyHost != null && proxyPort != null) {
			int proxyPortInt = Integer.parseInt(proxyPort);
			HttpHost proxy = new HttpHost(proxyHost, proxyPortInt);
			RequestConfig config = RequestConfig.custom().setProxy(proxy).build();
			request.setConfig(config);
		}

		try {
			HttpResponse response = client.execute(request);
			String resultJson = EntityUtils.toString(response.getEntity());
			StatusLine sl = response.getStatusLine();
			int code = sl.getStatusCode();
			if (code < 200 || code >= 300) {
				System.err.println("" + code + " : " + sl.getReasonPhrase());
			}

			ObjectMapper mapper = new ObjectMapper();
			JsonNode root = mapper.readValue(resultJson, JsonNode.class);
			JsonNode bookArray = root.path("items");

			if (bookArray.size() > 0) {
				result = new BookResult();
				JsonNode book = bookArray.path(0).path("volumeInfo"); // We only consider the first book for this ISBN
				result.isbn = isbn;
				result.title = book.path("title").asText();
				result.publishedDate = book.path("publishedDate").asText();
				result.publisher = book.path("publisher").asText();
				return result;
			} else {
				return null; // No book found
			}
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}
}

Note:

Following third-party jars are required for compilation of REST sample,
  • httpclient-4.5.6.jar
  • httpcore-4.4.10.jar
  • jackson-databind-2.9.10.jar
The above jars are required only at compile time and need not be packaged along with custom jar. These libraries and their dependencies are already packaged with OSA distribution.

Sample: Invoke a SOAP Service


	import java.net.MalformedURLException;
	import java.net.URL;
	import java.util.ArrayList;
	import java.util.HashMap;
	import java.util.Iterator;
	import java.util.List;
	import java.util.Map;
	import javax.xml.namespace.QName;
	import javax.xml.ws.Service;
	import com.oracle.cep.api.annotations.OsaStage;
	import com.oracle.cep.api.event.Attr;
	import com.oracle.cep.api.event.Event;
	import com.oracle.cep.api.event.EventFactory;
	import com.oracle.cep.api.event.EventSpec;
	import com.oracle.cep.api.event.TupleEventSpec;
	import com.oracle.cep.api.stage.BatchEventProcessor;
	import com.oracle.cep.api.stage.ProcessorContext;

	@SuppressWarnings("serial")
	@OsaStage(name = "CustomSoapBatchCall", description = "Call a Hello World Soap WS", inputSpec = "input, message:string", outputSpec = "output, message:string, result:string")
	public class CustomSoapBatchCall implements BatchEventProcessor {

		EventFactory eventFactory;
		EventSpec outputSpec;
		URL url;
		QName qname;
		Service service;
		HelloWorldServer server;

		@Override
		public void init(ProcessorContext ctx, Map<String, String> config) {
			eventFactory = ctx.getEventFactory();
			OsaStage meta = CustomSoapBatchCall.class.getAnnotation(OsaStage.class);
			String spec = meta.outputSpec();
			outputSpec = TupleEventSpec.fromAnnotation(spec);
		try {
			url = new URL("http://hostname:9879/hw?wsdl");
		} catch (MalformedURLException e) {
	e.printStackTrace();
	}
	qname = new QName("http://ws.osa.oracle.com/",
	"HelloWorldServerImplService");
	service = Service.create(url, qname);
	server = (HelloWorldServer) service.getPort(HelloWorldServer.class);
	}

	@Override
	public void close() {
	}

	@Override
    public Iterator<Event> processEvents(Iterator<Event> iterator) {
        List<String> reqs = new ArrayList<String>();
        while(iterator.hasNext()){
            
			reqs.add((String)iterator.next().getAttr("message").getObjectValue());
 
        }
 
        String[] ress = server.sayHelloBatch(reqs.toArray(new String[reqs.size()]));
 
        return new Iterator<Event>() {
            int i = 0;
            @Override
            public boolean hasNext() {
                return i++ < ress.length;
 
            }
 
            @Override
            public Event next() {
                Map<String, Object> values = new HashMap<String, Object>();
                values.put("message",reqs.get(i-1));
                values.put("result",ress[i-1]);
                return eventFactory.createEvent(outputSpec,values,System.currentTimeMillis());
 
            }
 
        };
 
    }

	@Override
	public Event processEvent(Event event) {
		Attr attr = event.getAttr("message");
		Map<String, Object> values = new HashMap<String, Object>();
		if (!attr.isNull()) {
		String val = (String) attr.getObjectValue();
		String result = callSoap(val);
		values.put("message", val);
		values.put("result", result);
	} else {
	values.put("message", "empty");
	values.put("result", "empty");
	}
	Event outputEvent = eventFactory.createEvent(outputSpec, values,event.getTime());
	return outputEvent;
	}

	public String callSoap(String myName) {
	return server.sayHello(myName);
	}

	}

Limitations

The limitations and restrictions of the custom stages and custom functions are listed in this section.

Custom stage type and custom functions must:

  • only be used for stateless transformations. Access to state from previous calls to stage type or function methods cannot be guaranteed and might change based on optimizations.

  • not use any blocking invocations.

  • not start a new thread.

  • not use any thread synchronization primitives, including the wait() method, which could potentially introduce deadlocks.

  • have/be in a fully-qualified class name.

When you use the custom stages or custom functions, be careful about the heap space usage.

Note:

The resulting jar must include all the required dependencies and third-party classes and the size of the jar file must be less than 160 MB.

Mapping of Data Types

The following table lists the data types that can be used by custom stage types and custom functions.

Oracle Stream Analytics Data Type Java Data Type Comment

BOOLEAN

boolean

INT

int

BIGINT

long

FLOAT

float

DOUBLE

double

STRING

String

BIGDECIMAL

BigDecimal

TIMESTAMP

long (in nanoseconds)

Can only be used in Custom Stage Types

INTERVAL

long

Can only be used in Custom Stage Types