A Samples
A sample custom stage type and sample function that you can use are provided.
A.1 Sample Custom Stage Type
A.1.1 Custom Stage for Encrypting a Column
This sample class defines a custom stage that takes one textual field and produces an MD5 hash for it.
package com.oracle.osacs;
import com.oracle.cep.api.event.*;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.stage.EventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("serial")
@OsaStage(name = "md5", description = "Create an md5 hex from a string", inputSpec = "input, message:string", outputSpec = "output, message:string, md5:string")
public class CustomMD5Stage implements EventProcessor {
EventFactory eventFactory;
EventSpec outputSpec;
@Override
public void init(ProcessorContext ctx, Map<String, String> config) {
eventFactory = ctx.getEventFactory();
OsaStage meta = CustomMD5Stage.class.getAnnotation(OsaStage.class);
String spec = meta.outputSpec();
outputSpec = TupleEventSpec.fromAnnotation(spec);
}
@Override
public void close() {
}
@Override
public Event processEvent(Event event) {
Attr attr = event.getAttr("message");
Map<String, Object> values = new HashMap<String, Object>();
if (!attr.isNull()) {
String val = (String) attr.getObjectValue();
String md5 = null;
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(val.getBytes());
byte[] digest = md.digest();
md5 = javax.xml.bind.DatatypeConverter.printHexBinary(digest);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
values.put("message", val);
values.put("md5", md5);
} else {
values.put("message", "empty");
values.put("md5", "empty");
}
Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
return outputEvent;
}
}
A.1.2 Custom Stage for SOAP Call
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.xml.namespace.QName;
import javax.xml.ws.Service;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.event.Attr;
import com.oracle.cep.api.event.Event;
import com.oracle.cep.api.event.EventFactory;
import com.oracle.cep.api.event.EventSpec;
import com.oracle.cep.api.event.TupleEventSpec;
import com.oracle.cep.api.stage.BatchEventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;
@SuppressWarnings("serial")
@OsaStage(name = "CustomSoapBatchCall", description = "Call a Hello World Soap WS", inputSpec = "input, message:string", outputSpec = "output, message:string, result:string")
public class CustomSoapBatchCall implements BatchEventProcessor {
EventFactory eventFactory;
EventSpec outputSpec;
URL url;
QName qname;
Service service;
HelloWorldServer server;
@Override
public void init(ProcessorContext ctx, Map<String, String> config) {
eventFactory = ctx.getEventFactory();
OsaStage meta = CustomSoapBatchCall.class.getAnnotation(OsaStage.class);
String spec = meta.outputSpec();
outputSpec = TupleEventSpec.fromAnnotation(spec);
try {
url = new URL("http://hostname:9879/hw?wsdl");
} catch (MalformedURLException e) {
e.printStackTrace();
}
qname = new QName("http://ws.osa.oracle.com/",
"HelloWorldServerImplService");
service = Service.create(url, qname);
server = (HelloWorldServer) service.getPort(HelloWorldServer.class);
}
@Override
public void close() {
}
@Override
public Iterator<Event> processEvents(Iterator<Event> iterator) {
List<String> reqs = new ArrayList<String>();
while(iterator.hasNext()){
reqs.add((String)iterator.next().getAttr("message").getObjectValue());
}
String[] ress = server.sayHelloBatch(reqs.toArray(new String[reqs.size()]));
return new Iterator<Event>() {
int i = 0;
@Override
public boolean hasNext() {
return i++ < ress.length;
}
@Override
public Event next() {
Map<String, Object> values = new HashMap<String, Object>();
values.put("message",reqs.get(i-1));
values.put("result",ress[i-1]);
return eventFactory.createEvent(outputSpec,values,System.currentTimeMillis());
}
};
}
@Override
public Event processEvent(Event event) {
Attr attr = event.getAttr("message");
Map<String, Object> values = new HashMap<String, Object>();
if (!attr.isNull()) {
String val = (String) attr.getObjectValue();
String result = callSoap(val);
values.put("message", val);
values.put("result", result);
} else {
values.put("message", "empty");
values.put("result", "empty");
}
Event outputEvent = eventFactory.createEvent(outputSpec, values,event.getTime());
return outputEvent;
}
public String callSoap(String myName) {
return server.sayHello(myName);
}
}
A.1.3 Custom Stage for REST Call
package com.oracle.osacs;
import com.oracle.cep.api.event.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.stage.EventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
class BookResult {
String isbn;
String title;
String publishedDate;
String publisher;
}
@SuppressWarnings("serial")
@OsaStage(name = "RestBooks", description = "Provide info for a given book", inputSpec = "input, isbn:string", outputSpec = "output, isbn:string, title:string, publishedDate:string, publisher:string")
public class CustomStageRest implements EventProcessor {
EventFactory eventFactory;
EventSpec outputSpec;
static Properties props = new Properties();
static {
try {
props.load(CustomStageRest.class.getResourceAsStream("/CustomStageRest.properties"));
} catch (IOException ioex) {
ioex.printStackTrace();
}
}
@Override
public void init(ProcessorContext ctx, Map<String, String> config) {
eventFactory = ctx.getEventFactory();
OsaStage meta = CustomStageRest.class.getAnnotation(OsaStage.class);
String spec = meta.outputSpec();
outputSpec = TupleEventSpec.fromAnnotation(spec);
}
@Override
public void close() {
}
@Override
public Event processEvent(Event event) {
Attr isbnAttr = event.getAttr("isbn");
Map<String, Object> values = new HashMap<String, Object>();
if (!isbnAttr.isNull()) {
String isbn = (String) isbnAttr.getObjectValue();
BookResult result = getBook(isbn);
values.put("isbn", isbn);
values.put("title", result.title);
values.put("publishedDate", result.publishedDate);
values.put("publisher", result.publisher);
} else {
values.put("isbn", "");
values.put("title", "");
values.put("publishedDate", "");
values.put("publisher", "");
}
Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
return outputEvent;
}
/**
* Calls the Google Books REST API to get book information based on the ISBN ID
* @param isbn
* @return BookResult book information
*/
public BookResult getBook(String isbn) {
HttpRequestBase request;
BookResult result = null;
String uri = "https://www.googleapis.com/books/v1/volumes?q=isbn:" + isbn;
request = new HttpGet(uri);
CloseableHttpClient client = HttpClientBuilder.create().build();
String proxyHost = props.getProperty("proxyHost");
String proxyPort = props.getProperty("proxyPort");
if (proxyHost != null && proxyPort != null) {
int proxyPortInt = Integer.parseInt(proxyPort);
HttpHost proxy = new HttpHost(proxyHost, proxyPortInt);
RequestConfig config = RequestConfig.custom().setProxy(proxy).build();
request.setConfig(config);
}
try {
HttpResponse response = client.execute(request);
String resultJson = EntityUtils.toString(response.getEntity());
StatusLine sl = response.getStatusLine();
int code = sl.getStatusCode();
if (code < 200 || code >= 300) {
System.err.println("" + code + " : " + sl.getReasonPhrase());
}
ObjectMapper mapper = new ObjectMapper();
JsonNode root = mapper.readValue(resultJson, JsonNode.class);
JsonNode bookArray = root.path("items");
if (bookArray.size() > 0) {
result = new BookResult();
JsonNode book = bookArray.path(0).path("volumeInfo"); // We only consider the first book for this ISBN
result.isbn = isbn;
result.title = book.path("title").asText();
result.publishedDate = book.path("publishedDate").asText();
result.publisher = book.path("publisher").asText();
return result;
} else {
return null; // No book found
}
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
Note:
Following third-party jars are required for compilation of REST sample,- httpclient-4.5.6.jar
- httpcore-4.4.10.jar
- jackson-databind-2.9.10.jar
A.2 Sample Custom Function
This sample class defines a custom function that takes one textual field and produces an MD5 hash for it.
package com.oracle.osacs;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import com.oracle.cep.api.annotations.OsaFunction;
public class CustomMD5Function {
@OsaFunction(name = "md5", description = "Create an md5 hex from a string")
public static String md5(String message) {
String result = null;
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(message.getBytes());
byte[] digest = md.digest();
result = javax.xml.bind.DatatypeConverter.printHexBinary(digest);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
return result;
}
}