5.5 カスタム関数およびカスタム・ステージの追加

カスタム関数は、アプリケーションの組込み関数のカスタム実装であるユーザー定義関数です。

5.5.1 カスタムjarの作成

カスタムjarは、パイプライン内で使用されるカスタム・ステージ・タイプまたはカスタム関数のJavaクラスが含まれる、ユーザー提供のJarアーカイブです。

カスタムJarを作成するには:

  1. 「カタログ」ページで、「新規アイテムの作成」をクリックし、ドロップダウン・リストから「カスタムjar」を選択します。
  1. 「タイプ・プロパティ」画面で、次の詳細を入力します:
    • 名前
    • 説明
    • タグ
    • カスタムjarタイプ: ドロップダウン・リストから「カスタムjar」を選択します。
  2. 「次」をクリックします。
  3. 「カスタムJarの詳細」ページで、「ファイルのアップロード」をクリックし、アプリケーションにインポートするjarファイルを選択します。
  4. 「保存」をクリックします。

カスタムJava/Scalaクラスは、Javadocに定義されているBatchEventProcessorインタフェースを実装する必要があります。

5.5.2 カスタム関数の追加

カスタムjarファイルを追加すると、カスタム関数がインストールされます。

カスタム関数は、インストールすると式ビルダーで使用可能になります。カスタム関数は、「カスタム」カテゴリの下にリストされます。

5.5.3 カスタム関数の実装

カスタム関数の場合、任意のクラス(カスタム・ステージ・タイプを実装するクラスを含む)内のメソッドに@OsaFunction注釈を適用します。詳細は、Javadocおよびサンプルを参照してください。

ノート:

同じまたは異なるjarに含まれる同じパッケージ/クラス/メソッド内で、複数の関数に同じ名前を付けることはサポートされていません。

5.5.3.1 サンプル: 列の暗号化

このサンプル・クラスは、1つのテキスト・フィールドを取得してそれに対するMD5ハッシュを生成するカスタム関数を定義しています。

package com.oracle.osacs;
 
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import com.oracle.cep.api.annotations.OsaFunction;
 
 
public class CustomMD5Function {
               
        @OsaFunction(name = "md5", description = "Create an md5 hex from a string")
        public static String md5(String message) {
               String result = null;
               
               try {
                       MessageDigest md = MessageDigest.getInstance("MD5");
                   md.update(message.getBytes());
                   byte[] digest = md.digest();           
                   result = javax.xml.bind.DatatypeConverter.printHexBinary(digest);     
               } catch (NoSuchAlgorithmException e) {
                       e.printStackTrace();
               }
                                           
             return result;        
        }       
 
}

5.5.4 カスタム・ステージの追加

カスタム・ステージを追加するには:
  1. パイプライン・エディタで、必要なパイプラインを開きます。
  2. カスタム・ステージの追加先の前にあるステージを右クリックします。ステージの追加「カスタム」,の順にクリックし、カスタムjarからのカスタム・ステージを選択します。
  3. カスタム・ステージの名前および適切な説明を入力し、「保存」をクリックします。
  4. ステージ・エディタで、次の詳細を入力します:
    1. カスタム・ステージ・タイプ: カスタムjarを介して以前にインストールされたカスタム・ステージを選択します
    2. 入力マッピング: 各入力パラメータの前のステージの対応する列を選択します

ユースケースに基づいて、複数のカスタム・ステージを追加できます。

5.5.4.1 サンプル: 列の暗号化

このサンプル・クラスは、1つのテキスト・フィールドを取得してそれに対するMD5ハッシュを生成するカスタム・ステージを定義しています。

package com.oracle.osacs;

import com.oracle.cep.api.event.*;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.stage.EventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

@SuppressWarnings("serial")
@OsaStage(name = "md5", description = "Create an md5 hex from a string", inputSpec = "input, message:string", outputSpec = "output, message:string, md5:string")
public class CustomMD5Stage implements EventProcessor {

	EventFactory eventFactory;
	EventSpec outputSpec;

	@Override
	public void init(ProcessorContext ctx, Map<String, String> config) {
		eventFactory = ctx.getEventFactory();
		OsaStage meta = CustomMD5Stage.class.getAnnotation(OsaStage.class);
		String spec = meta.outputSpec();
		outputSpec = TupleEventSpec.fromAnnotation(spec);
	}

	@Override
	public void close() {
	}

	@Override
	public Event processEvent(Event event) {
		Attr attr = event.getAttr("message");
		Map<String, Object> values = new HashMap<String, Object>();
		if (!attr.isNull()) {
			String val = (String) attr.getObjectValue();
			String md5 = null;
			try {
				MessageDigest md = MessageDigest.getInstance("MD5");
				md.update(val.getBytes());
				byte[] digest = md.digest();
				md5 = javax.xml.bind.DatatypeConverter.printHexBinary(digest);
			} catch (NoSuchAlgorithmException e) {
				e.printStackTrace();
			}
			values.put("message", val);
			values.put("md5", md5);
		} else {
			values.put("message", "empty");
			values.put("md5", "empty");
		}
		Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
		return outputEvent;
	}
}

5.5.4.2 サンプル: RESTサービスの呼出し

package com.oracle.osacs;

import com.oracle.cep.api.event.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.oracle.cep.api.annotations.OsaStage;
import com.oracle.cep.api.stage.EventProcessor;
import com.oracle.cep.api.stage.ProcessorContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

class BookResult {
	String isbn;
	String title;
	String publishedDate;
	String publisher;
}

@SuppressWarnings("serial")
@OsaStage(name = "RestBooks", description = "Provide info for a given book", inputSpec = "input, isbn:string", outputSpec = "output, isbn:string, title:string, publishedDate:string, publisher:string")
public class CustomStageRest implements EventProcessor {

	EventFactory eventFactory;
	EventSpec outputSpec;

	static Properties props = new Properties();

	static {
		try {
			props.load(CustomStageRest.class.getResourceAsStream("/CustomStageRest.properties"));
		} catch (IOException ioex) {
			ioex.printStackTrace();
		}
	}

	@Override
	public void init(ProcessorContext ctx, Map<String, String> config) {
		eventFactory = ctx.getEventFactory();
		OsaStage meta = CustomStageRest.class.getAnnotation(OsaStage.class);
		String spec = meta.outputSpec();
		outputSpec = TupleEventSpec.fromAnnotation(spec);
	}

	@Override
	public void close() {
	}

	@Override
	public Event processEvent(Event event) {
		Attr isbnAttr = event.getAttr("isbn");

		Map<String, Object> values = new HashMap<String, Object>();
		if (!isbnAttr.isNull()) {
			String isbn = (String) isbnAttr.getObjectValue();

			BookResult result = getBook(isbn);

			values.put("isbn", isbn);
			values.put("title", result.title);
			values.put("publishedDate", result.publishedDate);
			values.put("publisher", result.publisher);

		} else {
			values.put("isbn", "");
			values.put("title", "");
			values.put("publishedDate", "");
			values.put("publisher", "");
		}
		Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
		return outputEvent;
	}

	
	/**
	 * Calls the Google Books REST API to get book information based on the ISBN ID
	 * @param isbn
	 * @return BookResult book information
	 */
	public BookResult getBook(String isbn) {
		HttpRequestBase request;
		BookResult result = null;

		String uri = "https://www.googleapis.com/books/v1/volumes?q=isbn:" + isbn;

		request = new HttpGet(uri);

		CloseableHttpClient client = HttpClientBuilder.create().build();

		String proxyHost = props.getProperty("proxyHost");
		String proxyPort = props.getProperty("proxyPort");
		if (proxyHost != null && proxyPort != null) {
			int proxyPortInt = Integer.parseInt(proxyPort);
			HttpHost proxy = new HttpHost(proxyHost, proxyPortInt);
			RequestConfig config = RequestConfig.custom().setProxy(proxy).build();
			request.setConfig(config);
		}

		try {
			HttpResponse response = client.execute(request);
			String resultJson = EntityUtils.toString(response.getEntity());
			StatusLine sl = response.getStatusLine();
			int code = sl.getStatusCode();
			if (code < 200 || code >= 300) {
				System.err.println("" + code + " : " + sl.getReasonPhrase());
			}

			ObjectMapper mapper = new ObjectMapper();
			JsonNode root = mapper.readValue(resultJson, JsonNode.class);
			JsonNode bookArray = root.path("items");

			if (bookArray.size() > 0) {
				result = new BookResult();
				JsonNode book = bookArray.path(0).path("volumeInfo"); // We only consider the first book for this ISBN
				result.isbn = isbn;
				result.title = book.path("title").asText();
				result.publishedDate = book.path("publishedDate").asText();
				result.publisher = book.path("publisher").asText();
				return result;
			} else {
				return null; // No book found
			}
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}
}

ノート:

RESTサンプルのコンパイルには次のサードパーティjarが必要です。
  • httpclient-4.5.6.jar
  • httpcore-4.4.10.jar
  • jackson-databind-2.9.10.jar
前述のjarはコンパイル時にのみ必要です。カスタムjarとともにパッケージ化する必要はありません。これらのライブラリおよびその依存関係は、すでにOSAディストリビューションにパッケージ化されています。

5.5.4.3 サンプル: SOAPサービスの呼出し


	import java.net.MalformedURLException;
	import java.net.URL;
	import java.util.ArrayList;
	import java.util.HashMap;
	import java.util.Iterator;
	import java.util.List;
	import java.util.Map;
	import javax.xml.namespace.QName;
	import javax.xml.ws.Service;
	import com.oracle.cep.api.annotations.OsaStage;
	import com.oracle.cep.api.event.Attr;
	import com.oracle.cep.api.event.Event;
	import com.oracle.cep.api.event.EventFactory;
	import com.oracle.cep.api.event.EventSpec;
	import com.oracle.cep.api.event.TupleEventSpec;
	import com.oracle.cep.api.stage.BatchEventProcessor;
	import com.oracle.cep.api.stage.ProcessorContext;

	@SuppressWarnings("serial")
	@OsaStage(name = "CustomSoapBatchCall", description = "Call a Hello World Soap WS", inputSpec = "input, message:string", outputSpec = "output, message:string, result:string")
	public class CustomSoapBatchCall implements BatchEventProcessor {

		EventFactory eventFactory;
		EventSpec outputSpec;
		URL url;
		QName qname;
		Service service;
		HelloWorldServer server;

		@Override
		public void init(ProcessorContext ctx, Map<String, String> config) {
			eventFactory = ctx.getEventFactory();
			OsaStage meta = CustomSoapBatchCall.class.getAnnotation(OsaStage.class);
			String spec = meta.outputSpec();
			outputSpec = TupleEventSpec.fromAnnotation(spec);
		try {
			url = new URL("http://hostname:9879/hw?wsdl");
		} catch (MalformedURLException e) {
	e.printStackTrace();
	}
	qname = new QName("http://ws.osa.oracle.com/",
	"HelloWorldServerImplService");
	service = Service.create(url, qname);
	server = (HelloWorldServer) service.getPort(HelloWorldServer.class);
	}

	@Override
	public void close() {
	}

	@Override
    public Iterator<Event> processEvents(Iterator<Event> iterator) {
        List<String> reqs = new ArrayList<String>();
        while(iterator.hasNext()){
            
			reqs.add((String)iterator.next().getAttr("message").getObjectValue());
 
        }
 
        String[] ress = server.sayHelloBatch(reqs.toArray(new String[reqs.size()]));
 
        return new Iterator<Event>() {
            int i = 0;
            @Override
            public boolean hasNext() {
                return i++ < ress.length;
 
            }
 
            @Override
            public Event next() {
                Map<String, Object> values = new HashMap<String, Object>();
                values.put("message",reqs.get(i-1));
                values.put("result",ress[i-1]);
                return eventFactory.createEvent(outputSpec,values,System.currentTimeMillis());
 
            }
 
        };
 
    }

	@Override
	public Event processEvent(Event event) {
		Attr attr = event.getAttr("message");
		Map<String, Object> values = new HashMap<String, Object>();
		if (!attr.isNull()) {
		String val = (String) attr.getObjectValue();
		String result = callSoap(val);
		values.put("message", val);
		values.put("result", result);
	} else {
	values.put("message", "empty");
	values.put("result", "empty");
	}
	Event outputEvent = eventFactory.createEvent(outputSpec, values,event.getTime());
	return outputEvent;
	}

	public String callSoap(String myName) {
	return server.sayHello(myName);
	}

	}

5.5.5 制限事項

この項では、カスタム・ステージおよびカスタム関数の制限と制約を示します。

カスタム・ステージ・タイプおよびカスタム関数の条件は、次のとおりです。

  • ステートレス変換にのみ使用されること。ステージ・タイプまたは関数に対する以前の呼び出しからの状態へのアクセスは保証できず、最適化に基づいて変わる可能性があります。

  • ブロッキング呼出しを使用しないこと。

  • 新しいスレッドを開始しないこと。

  • スレッド同期プリミティブ(wait()メソッドを含む)を使用しないこと。このメソッドを使用すると、デッドロックが発生する可能性があります。

  • 完全修飾クラス名を持つか、その一部であること。

カスタム・ステージまたはカスタム関数を使用する際には、ヒープ領域の使用量に注意してください。

ノート:

結果として生成されるjarには必要なすべての依存関係およびサード・パーティ・クラスが含まれる必要があり、また、jarファイルのサイズは160 MB未満である必要があります。

5.5.6 データ型のマッピング

次の表に、カスタム・ステージ・タイプおよびカスタム関数で使用可能なデータ型を示します。

Oracle Stream Analyticsのデータ型 Javaデータ型 コメント

BOOLEAN

boolean

INT

int

BIGINT

long

FLOAT

float

DOUBLE

double

STRING

String

BIGDECIMAL

BigDecimal

TIMESTAMP

long(ナノ秒)

カスタム・ステージ・タイプでのみ使用できます

INTERVAL

long

カスタム・ステージ・タイプでのみ使用できます