A Samples

A sample custom stage type and sample function that you can use are provided.

A.1 Sample Custom Stage Type

A.1.1 Custom Stage for Encrypting a Column

This sample class defines a custom stage that takes one textual field and produces an MD5 hash for it.

package com.oracle.osacs;

import com.oracle.cep.api.event.*;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.stage.EventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

@SuppressWarnings("serial")
@OsaStage(name = "md5", description = "Create an md5 hex from a string", inputSpec = "input, message:string", outputSpec = "output, message:string, md5:string")
public class CustomMD5Stage implements EventProcessor {

	EventFactory eventFactory;
	EventSpec outputSpec;

	@Override
	public void init(ProcessorContext ctx, Map<String, String> config) {
		eventFactory = ctx.getEventFactory();
		OsaStage meta = CustomMD5Stage.class.getAnnotation(OsaStage.class);
		String spec = meta.outputSpec();
		outputSpec = TupleEventSpec.fromAnnotation(spec);
	}

	@Override
	public void close() {
	}

	@Override
	public Event processEvent(Event event) {
		Attr attr = event.getAttr("message");
		Map<String, Object> values = new HashMap<String, Object>();
		if (!attr.isNull()) {
			String val = (String) attr.getObjectValue();
			String md5 = null;
			try {
				MessageDigest md = MessageDigest.getInstance("MD5");
				md.update(val.getBytes());
				byte[] digest = md.digest();
				md5 = javax.xml.bind.DatatypeConverter.printHexBinary(digest);
			} catch (NoSuchAlgorithmException e) {
				e.printStackTrace();
			}
			values.put("message", val);
			values.put("md5", md5);
		} else {
			values.put("message", "empty");
			values.put("md5", "empty");
		}
		Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
		return outputEvent;
	}
}

A.1.2 Custom Stage for SOAP Call


	import java.net.MalformedURLException;
	import java.net.URL;
	import java.util.ArrayList;
	import java.util.HashMap;
	import java.util.Iterator;
	import java.util.List;
	import java.util.Map;
	import javax.xml.namespace.QName;
	import javax.xml.ws.Service;
	import com.oracle.cep.api.annotations.OsaStage;
	import com.oracle.cep.api.event.Attr;
	import com.oracle.cep.api.event.Event;
	import com.oracle.cep.api.event.EventFactory;
	import com.oracle.cep.api.event.EventSpec;
	import com.oracle.cep.api.event.TupleEventSpec;
	import com.oracle.cep.api.stage.BatchEventProcessor;
	import com.oracle.cep.api.stage.ProcessorContext;

	@SuppressWarnings("serial")
	@OsaStage(name = "CustomSoapBatchCall", description = "Call a Hello World Soap WS", inputSpec = "input, message:string", outputSpec = "output, message:string, result:string")
	public class CustomSoapBatchCall implements BatchEventProcessor {

		EventFactory eventFactory;
		EventSpec outputSpec;
		URL url;
		QName qname;
		Service service;
		HelloWorldServer server;

		@Override
		public void init(ProcessorContext ctx, Map<String, String> config) {
			eventFactory = ctx.getEventFactory();
			OsaStage meta = CustomSoapBatchCall.class.getAnnotation(OsaStage.class);
			String spec = meta.outputSpec();
			outputSpec = TupleEventSpec.fromAnnotation(spec);
		try {
			url = new URL("http://hostname:9879/hw?wsdl");
		} catch (MalformedURLException e) {
	e.printStackTrace();
	}
	qname = new QName("http://ws.osa.oracle.com/",
	"HelloWorldServerImplService");
	service = Service.create(url, qname);
	server = (HelloWorldServer) service.getPort(HelloWorldServer.class);
	}

	@Override
	public void close() {
	}

	@Override
    public Iterator<Event> processEvents(Iterator<Event> iterator) {
        List<String> reqs = new ArrayList<String>();
        while(iterator.hasNext()){
            
			reqs.add((String)iterator.next().getAttr("message").getObjectValue());
 
        }
 
        String[] ress = server.sayHelloBatch(reqs.toArray(new String[reqs.size()]));
 
        return new Iterator<Event>() {
            int i = 0;
            @Override
            public boolean hasNext() {
                return i++ < ress.length;
 
            }
 
            @Override
            public Event next() {
                Map<String, Object> values = new HashMap<String, Object>();
                values.put("message",reqs.get(i-1));
                values.put("result",ress[i-1]);
                return eventFactory.createEvent(outputSpec,values,System.currentTimeMillis());
 
            }
 
        };
 
    }

	@Override
	public Event processEvent(Event event) {
		Attr attr = event.getAttr("message");
		Map<String, Object> values = new HashMap<String, Object>();
		if (!attr.isNull()) {
		String val = (String) attr.getObjectValue();
		String result = callSoap(val);
		values.put("message", val);
		values.put("result", result);
	} else {
	values.put("message", "empty");
	values.put("result", "empty");
	}
	Event outputEvent = eventFactory.createEvent(outputSpec, values,event.getTime());
	return outputEvent;
	}

	public String callSoap(String myName) {
	return server.sayHello(myName);
	}

	}

A.1.3 Custom Stage for REST Call

package com.oracle.osacs;

import com.oracle.cep.api.event.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.stage.EventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

class BookResult {
	String isbn;
	String title;
	String publishedDate;
	String publisher;
}

@SuppressWarnings("serial")
@OsaStage(name = "RestBooks", description = "Provide info for a given book", inputSpec = "input, isbn:string", outputSpec = "output, isbn:string, title:string, publishedDate:string, publisher:string")
public class CustomStageRest implements EventProcessor {

	EventFactory eventFactory;
	EventSpec outputSpec;

	static Properties props = new Properties();

	static {
		try {
			props.load(CustomStageRest.class.getResourceAsStream("/CustomStageRest.properties"));
		} catch (IOException ioex) {
			ioex.printStackTrace();
		}
	}

	@Override
	public void init(ProcessorContext ctx, Map<String, String> config) {
		eventFactory = ctx.getEventFactory();
		OsaStage meta = CustomStageRest.class.getAnnotation(OsaStage.class);
		String spec = meta.outputSpec();
		outputSpec = TupleEventSpec.fromAnnotation(spec);
	}

	@Override
	public void close() {
	}

	@Override
	public Event processEvent(Event event) {
		Attr isbnAttr = event.getAttr("isbn");

		Map<String, Object> values = new HashMap<String, Object>();
		if (!isbnAttr.isNull()) {
			String isbn = (String) isbnAttr.getObjectValue();

			BookResult result = getBook(isbn);

			values.put("isbn", isbn);
			values.put("title", result.title);
			values.put("publishedDate", result.publishedDate);
			values.put("publisher", result.publisher);

		} else {
			values.put("isbn", "");
			values.put("title", "");
			values.put("publishedDate", "");
			values.put("publisher", "");
		}
		Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
		return outputEvent;
	}

	
	/**
	 * Calls the Google Books REST API to get book information based on the ISBN ID
	 * @param isbn
	 * @return BookResult book information
	 */
	public BookResult getBook(String isbn) {
		HttpRequestBase request;
		BookResult result = null;

		String uri = "https://www.googleapis.com/books/v1/volumes?q=isbn:" + isbn;

		request = new HttpGet(uri);

		CloseableHttpClient client = HttpClientBuilder.create().build();

		String proxyHost = props.getProperty("proxyHost");
		String proxyPort = props.getProperty("proxyPort");
		if (proxyHost != null && proxyPort != null) {
			int proxyPortInt = Integer.parseInt(proxyPort);
			HttpHost proxy = new HttpHost(proxyHost, proxyPortInt);
			RequestConfig config = RequestConfig.custom().setProxy(proxy).build();
			request.setConfig(config);
		}

		try {
			HttpResponse response = client.execute(request);
			String resultJson = EntityUtils.toString(response.getEntity());
			StatusLine sl = response.getStatusLine();
			int code = sl.getStatusCode();
			if (code < 200 || code >= 300) {
				System.err.println("" + code + " : " + sl.getReasonPhrase());
			}

			ObjectMapper mapper = new ObjectMapper();
			JsonNode root = mapper.readValue(resultJson, JsonNode.class);
			JsonNode bookArray = root.path("items");

			if (bookArray.size() > 0) {
				result = new BookResult();
				JsonNode book = bookArray.path(0).path("volumeInfo"); // We only consider the first book for this ISBN
				result.isbn = isbn;
				result.title = book.path("title").asText();
				result.publishedDate = book.path("publishedDate").asText();
				result.publisher = book.path("publisher").asText();
				return result;
			} else {
				return null; // No book found
			}
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}
}

Note:

Following third-party jars are required for compilation of REST sample,
  • httpclient-4.5.6.jar
  • httpcore-4.4.10.jar
  • jackson-databind-2.9.10.jar
The above jars are required only at compile time and need not be packaged along with custom jar. These libraries and their dependencies are already packaged with OSA distribution.

A.2 Sample Custom Function

This sample class defines a custom function that takes one textual field and produces an MD5 hash for it.

package com.oracle.osacs;
 
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import com.oracle.cep.api.annotations.OsaFunction;
 
 
public class CustomMD5Function {
               
        @OsaFunction(name = "md5", description = "Create an md5 hex from a string")
        public static String md5(String message) {
               String result = null;
               
               try {
                       MessageDigest md = MessageDigest.getInstance("MD5");
                   md.update(message.getBytes());
                   byte[] digest = md.digest();           
                   result = javax.xml.bind.DatatypeConverter.printHexBinary(digest);     
               } catch (NoSuchAlgorithmException e) {
                       e.printStackTrace();
               }
                                           
             return result;        
        }       
 
}