カスタム・アダプタは、Oracle Stream Analyticsで提供されているアダプタがサポートしていない外部コンポーネントとの間でイベント・データのやり取りを行います。
Oracle Stream Analyticsで提供されるアダプタの詳細は、Oracle Stream Analyticsによるイベント処理用アプリケーションの開発のQuickFixアダプタの操作を参照してください。
この章の内容は次のとおりです。
作成するカスタム・アダプタのタイプは、受信データの形式と、アダプタ・コードで受信データをOracle Stream Analyticsイベントに変換するために使用するテクノロジに依存します。通常、カスタム・アダプタは次のことを行います。
Reuters、Wombat、Bloombergなどのデータ・ベンダーからのAPIを使用します。
TIBCO Rendezvousなどのメッセージング・システムを使用します。
カスタマ自身のデータ・プロトコルとのソケット接続を確立します。
入力または出力外部コンポーネントで必要に応じて自身を認証します。
外部コンポーネントからRAWイベント・データを受信し、データをイベントに変換して、次のアプリケーション・ノードにイベントを送信します。
イベント処理ネットワーク内でイベントを受信し、ターゲット外部コンポーネントで認識される形式にイベント・データを変換します。
カスタム・アダプタを実装するには、外部コンポーネントと通信するJavaコードを記述します。入力カスタム・アダプタ実装は、外部コンポーネントからRAWイベント・データを受信し、データをイベントに変換して、次のアプリケーション・ノードにイベントを送信します。出力カスタム・アダプタ実装は、イベント処理ネットワーク内でイベントを受信し、ターゲット外部コンポーネントで認識される形式にイベント・データを変換します。
カスタム・アダプタの作成のためのOracle Stream Analytics APIについては、Oracle Stream Analytics Java APIリファレンスで説明されています。
次の手順の概要では、カスタム・アダプタを作成する方法を説明します。これらのトピックについては、この章の後半で詳しく説明します。
外部コンポーネントと通信するJavaクラスを実装します。
イベント・タイプ・インスタンスの送信または受信をサポートするインタフェースを実装します。
イベントを送信するには、カスタム・アダプタをイベント・ソースとして実装します。
イベントを受信するには、カスタム・アダプタをイベント・シンクとして実装します。
イベント・ソースおよびイベント・シンクの詳細は、Oracle Stream Analytics Java APIリファレンスを参照してください。
アダプタが中断と再開をサポートしている場合(アンデプロイしてデプロイする場合など)、これらのイベントを処理するインタフェースを実装します。
アプリケーションの拡張性を向上するためにマルチスレッドを使用します。
必要な認証のために、イベント・データを提供または受信するコンポーネントにログイン資格証明を渡すJavaロジックを記述します。
複数のアプリケーションがカスタム・アダプタにアクセスする場合、ファクトリ・クラスを作成します。
アセンブリ・ファイルでアダプタを構成してEPNに追加します。
この項の例では、ファイルからRAWイベント・データを取得し、データをイベントに変換して、EPNのダウンストリーム・ステージにイベントを送信する基本的な入力アダプタのコードを示します。コードはOracle Spatialのサンプル・アプリケーションからの抜粋です。
注意:
ここに示すコード例は本番用ではありませんが、入力アダプタの基本的なワークフローの概要を示しています。
サーバーは次のBusStopAdapterクラス・メソッドをコールして、BusStopAdapterオブジェクトが実行時に必要とする情報を提供(注入)します。
BusStopAdapter.setRepositoryInstanceメソッドは、このアプリケーション用のイベント・タイプ構成が含まれるEventTypeRepositoryインスタンスを注入します。EventTypeRepositoryインスタンスは、アダプタがそのアダプタ用に構成されているイベント・タイプに関する情報を取得できるようにします。
BusStopAdapter.setEventSenderメソッドは、BusStopAdapterオブジェクトがイベントを次のEPNノードに送信できるように、StreamSenderオブジェクトを注入します。
BusStopAdapter.setPathおよびBusStopAdapter.setEventTypeメソッドは、イベントのプロパティ値を提供します。
BusStopAdapter.runメソッド実装は、RAWイベント・データを取得し、データをイベント・タイプ・インスタンスに解析して、新しいイベントをダウンストリームのEPNノードに送信します。
package com.oracle.cep.sample.spatial;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.bea.wlevs.ede.api.EventProperty;
import com.bea.wlevs.ede.api.EventRejectedException;
import com.bea.wlevs.ede.api.EventType;
import com.bea.wlevs.ede.api.EventTypeRepository;
import com.bea.wlevs.ede.api.RunnableBean;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.ede.api.StreamSink;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.util.Service;
import java.lang.RuntimeException;
public class BusStopAdapter implements RunnableBean, StreamSource, StreamSink {
static final Log s_logger = LogFactory.getLog("BusStopAdapter");
private String m_filePath;
private String m_eventTypeName;
private EventType m_eventType;
private StreamSender m_eventSender;
private boolean m_stopped;
private int m_repeat = 1;
private EventTypeRepository m_etr = null;
public BusStopAdapter()
{ super(); }
/**
* Called by the server to pass in the path
* to the file with bus stop data.
*
* @param path The value specified for the path
* property in the adapter's configuration in the EPN assembly file.
*/
public void setPath(String path) throws RuntimeException {
// Code to create a File instance from the path. This
// File object retrieves event data from the file.
}
/**
* Called by the server to pass in the name of the event
* type to which event data should be bound.
*
* @param path The value specified for the path
* property in the adapter's configuration in the EPN assembly file.
*/
public void setEventType(String typ)
{ m_eventTypeName = typ; }
/**
* Called by the server to set an event type
* repository instance that knows about event
* types configured for this application.
*
* This repository instance will be used to retrieve an
* event type instance that is populated with
* event data retrieved from the event data file.
*
* @param etr The event repository.
*/
@Service(filter = EventTypeRepository.SERVICE_FILTER)
public void setEventTypeRepository(EventTypeRepository etr)
{ m_etr = etr; }
/**
* Executes to retrieve raw event data and
* create event type instances from it, then
* sends the events to the next stage in the
* EPN.
*
* This method, implemented from the RunnableBean
* interface, executes when this adapter instance is active.
*/
public void run() {
if (m_etr == null){
throw new RuntimeException("EventTypeRepository is not set");
}
// Get the event type from the repository wit the event type name
// specified as a property of this adapter in the assembly file.
m_eventType = m_etr.getEventType(m_eventTypeName);
if (m_eventType == null){
throw new RuntimeException("EventType(" + m_eventType + ") is not found.");
}
BufferedReader reader = null;
System.out.println("Sending " + m_eventType + " from " + m_filePath);
while ((m_repeat != 0) && (!m_stopped)) {
try {
reader = new BufferedReader(new FileReader(m_filePath));
} catch (Exception e) {
m_stopped = true;
break;
}
while (!isStopped()) {
try {
// Create an event and assign to it an event type generated
// from the event data retrieved by the reader.
Object ev = null;
ev = readLine(reader);
if (ev == null){
reader.close();
break;
}
// Send the newly created event to a downstream node
//listening to this adpater.
m_eventSender.sendInsertEvent(ev);
} catch (Exception e){
m_stopped = true;
break;
}
}
}
}
/**
* Called by the server to pass in a sender instance to be used to
* send generated events to a downstream node.
*
* @param sender A sender instance.
*/
public void setEventSender(StreamSender sender)
{ m_eventSender = sender; }
/**
* Returns true if this adapter instance has been
* suspended, such as because an exception occurred.
*/
private synchronized boolean isStopped()
{ return m_stopped; }
/**
* Reads data from reader, creating event type
* instances from that data. This method is
* called from the run() method.
*
* @param reader Raw event data from a file.
* @return An instance of the event type specified
* as a property of this adapter.
*/
protected Object readLine(BufferedReader reader) throws Exception
{
// Code to read raw event data and return an event type
// instance from it.
}
/**
* Called by the server to pass in an
* insert event received from an
* upstream stage in the EPN.
*/
@Override
public void onInsertEvent(Object event) throws EventRejectedException {
// Code to begin executing the logic needed to
// convert incoming event data to event type instances.
}
}
カスタム・アダプタを作成する場合、それをアセンブリおよびコンポーネント構成ファイルで構成することでEPNに追加します。
アセンブリ・ファイルでwlevs:adapter要素を使用して、アダプタをイベント・プロセッサ・ネットワークのコンポーネントとして宣言します。wlevs:instance-property子要素を使用して、アダプタの静的プロパティを設定します。静的プロパティは、アダプタのデプロイ後に動的に変更しないプロパティです。
次の例では、BusStopAdapterクラスはクラス・メソッドに対応するプロパティで構成されています。BusStopAdapterのアクセッサ・メソッドsetPath、setEventTypeおよびsetBufferは、アセンブリ・ファイルのpath、eventTypeおよびbufferプロパティに対応します。アセンブリ・ファイルでのプロパティ名のスペルは、アクセッサ・メソッドのスペルからgetまたはset接頭辞を除いたものと一致します。
<wlevs:adapter id="BusStopAdapter" class="com.oracle.cep.sample.spatial.BusStopAdapter" > <wlevs:instance-property name="path" value="bus_stops.csv"/> <wlevs:instance-property name="eventType" value="BusStop"/> <wlevs:instance-property name="buffer" value="30.0"/> </wlevs:adapter>
wlevs:listener要素を使用して、イベント・シンクであるアダプタを参照します。次の例では、BusPositionGen CSVアダプタは、イベントをBusStopAdapterに送信します。
<wlevs:adapter id="BusPositionGen" provider="csvgen"> <!-- Code omitted --> <wlevs:listener ref="BusStopAdapter"/> </wlevs:adapter>
構成ファイルでadapter要素を使用して、実行時に更新できる子要素およびプロパティを追加します。各アダプタ構成には、config要素の個別のadapter子要素が必要です。次の例では、イベント記録用のBusStopAdapterアダプタを構成します。
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application"> <adapter> <name>BusStopAdapter</name> <record-parameters> <dataset-name>spatial_sample</dataset-name> <event-type-list> <event-type>BusPos</event-type> <event-type>BusStop</event-type> <event-type>BusPosEvent</event-type> <event-type>BusStopArrivalEvent</event-type> <event-type>BusStopPubEvent</event-type> <event-type>BusStopPubEvent</event-type> </event-type-list> <batch-size>1</batch-size> <batch-time-out>10</batch-time-out> </record-parameters> </adapter> </wlevs:config>
アダプタが外部データ・フィードにアクセスする場合、アダプタはユーザー認証のためにデータ・フィードにログイン資格証明を渡す必要があることがあります。アダプタ・コードには暗号化していないログイン資格証明をハードコードできます。この安全でない方法では、パスワードを暗号化したり、ログイン資格証明を簡単に変更できません。
まず、ログイン資格証明をアセンブリ・ファイルで静的に構成するか、アダプタ構成を拡張して動的に構成するかを決定します。資格証明をアセンブリ・ファイルで静的に構成する方が簡単ですが、資格証明が変更された場合、アセンブリ・ファイルの更新を反映するためにアプリケーションを再起動する必要があります。アダプタ構成を拡張すると、アプリケーションを再起動せずに資格証明を動的に変更できますが、XSDファイルを作成してJAXBオブジェクトにコンパイルするといった追加手順が必要になります。
複数のスレッドを使用してデータ・ソースから読み取るように、アダプタを実装または構成できます。イベント処理作業に高いオーバーヘッドがある場合、マルチスレッド・アダプタでパフォーマンスを向上できます。マルチスレッドを実装するには、ワーク・マネージャを使用するか、WorkManagerインタフェースを実装してアダプタをマルチスレッド化できます。RunnableまたはRunnableBeanインタフェースを実装して、カスタム・アダプタを並列実行できます。
注意:
シングル・スレッド・アダプタでは、イベントの順序は保証されます。マルチスレッド・アダプタでは、イベントの順序は保証されません。イベントの順序を管理するために、カスタム・アダプタ・クラスにコードを追加することが必要となる場合があります。
ワーク・マネージャ
スレッド化を実装する最も簡単な方法は、ワーク・マネージャを使用してアダプタを構成することです。ワーク・マネージャは、複数のスレッドでのスケジュール用にアプリケーションがコード(Workインタフェースを実装するクラス)を送信できるサーバー機能です。ワーク・マネージャ機能のインタフェースは、commonj.work.WorkManagerです。
Oracle Stream Analyticsでは、WorkManagerインスタンスはサーバーのconfig.xmlファイルで構成できます。また、使用するアプリケーションごとにデフォルトのワーク・マネージャも割り当てられます。
2つのうちいずれかの方法で、アダプタで使用するワーク・マネージャへの参照を取得できます。
アダプタに注入する(config.xmlファイルで構成された)サーバーをスコープにした特定のワーク・マネージャの名前をユーザーが指定できるアダプタの構成プロパティを作成できます。ワーク・マネージャの明示的な構成をサポートするアダプタの例は、組込みJMSアダプタ用の構成の詳細を参照してください。
アダプタにcom.bea.wlevs.ede.spi.WorkManagerAwareインタフェースを実装できます。このインタフェースは、アダプタを初期化してアプリケーションにデフォルトのワーク・マネージャを注入するときにシステムによってコールされる単一のメソッドsetWorkManager(WorkManager wm)で構成されます。
WorkManagerインスタンスを取得したら、ワーク・マネージャのスレッド・プールで実行される作業を送信できます。
ワーク・マネージャにアプリケーションと同じ名前を指定することで、ワーク・マネージャとアプリケーションを関連付けできます。アプリケーションのワーク・マネージャを明示的に指定しない場合は、デフォルト値の最小(MIN)および最大(MAX)スレッド数で、デフォルトのワーク・マネージャが作成されます。
WorkManagerAwareインタフェース
スレッドをより詳細に管理する必要がある場合は、カスタム・アダプタにWorkManagerAwareインタフェースを実装できます。この場合、Oracle Stream Analyticsは初期化中にアプリケーションのワーク・マネージャを使用してアダプタを注入します。このワーク・マネージャを使用して、アダプタまたはイベントBeanインタフェースのスレッドを明示的に管理します。
RunnableBeanインタフェース
コードの一部を別のスレッドで実行する必要はあっても、WorkManager APIによって提供されるスレッドを完全に制御する必要はないアダプタは、com.bea.wlevs.ede.api.RunnableBeanインタフェースを実装できます。このインタフェースには、アプリケーションのデフォルト・ワーク・マネージャに属するいずれかのスレッドでシステムによってコールされるrunメソッドが含まれます。RunnableBeanインタフェースは、単一のワーク・マネージャ・スレッドでコードを実行する非常に簡潔なモデルをアダプタに提供します。並列度を高めるために複数のワーク・マネージャ・スレッドを明示的に使用する必要がある場合は、WorkManagerインタフェースを直接使用します。「ワーク・マネージャ」を参照してください。
EPN
このインタフェースの参照情報は、Oracle Stream Analytics Java APIリファレンスを参照してください。
次のインタフェースを実装して、イベント処理を中断および再開するためのサーバー・サポートを追加できます。たとえば、EPNが中断したときにカスタム・アダプタでイベントの処理を停止し、EPNが再開したときにイベントの処理を再び開始できます。いずれかまたは両方の場合に、必要に応じてリソースを管理できます。
表1-1 アダプタの中断と再開をサポートするインタフェース
| インタフェース | 説明 |
|---|---|
|
EPNが中断したときに実行するロジックを提供する。 |
|
これを実装して、EPNが処理を再開する場合に実行するロジックを提供します。 |
Oracle Stream Analytics Java APIリファレンスを参照してください。
カスタム・アダプタのライフサイクル・ステージ(構成の準備ができたとき、構成がアクティブ化したとき、および例外によってアダプタが終了したとき)を処理するカスタム・アダプタ実装のメソッドを指定するには、次の注釈を使用します。
次のシグネチャでカスタム・アダプタ・メソッド上に該当する@Prepare、@Activateまたは@Rollback注釈を配置し、それらのメソッドが構成情報をアダプタに送信することを示します。
public void methodName(AdapterConfigObject adapterConfig)
AdapterConfigObjectは、アプリケーションにデプロイされたアダプタの構成XMLファイルのJava表現を参照します。このクラスのタイプは、デフォルトでcom.bea.wlevs.configuration.application.DefaultAdapterConfigです。アダプタの構成を拡張した場合、このクラスのタイプは、拡張されたXMLファイルを記述するXSDに指定されているものです。たとえば、HelloWorldサンプルでは、タイプはcom.bea.wlevs.adapter.example.helloworld.HelloWorldAdapterConfigです
実行時に、Oracle Stream AnalyticsはAdapterConfigObjectクラスを作成し、そのクラスにXMLファイルのデータを移入して、作成したインスタンスをアダプタに渡します。@Activateおよび@Rollbackアダプタ・ライフサイクル注釈で注釈されたアダプタ・メソッドは、そのクラスを使用してアダプタ構成の情報を取得します。
これらの@Prepare、@Activateおよび@Rollback注釈には属性はありません。次の例では、HelloWorldAdapterConfigクラスはアダプタの構成XMLファイルを表します。アプリケーションのデプロイ時に、Oracle Stream Analyticsはこのクラスのインスタンスを作成します。HelloWorldの例では、アダプタ構成は拡張されています。「com.bea.wlevs.configuration.Activate (@Activate)」の例を参照してください。
Oracle Stream Analyticsサーバーは、構成の変更によってアダプタの状態が更新された場合は常に、@Prepareで注釈されているメソッドをコールします。次の例では、HelloWorldサンプル・アプリケーションのアダプタ・コンポーネントのもので、@Prepare注釈を使用する方法を示します。
package com.bea.wlevs.adapter.example.helloworld; ... import com.bea.wlevs.configuration.Prepare; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldAdapter implements RunnableBean, StreamSource { ... @Prepare public void checkConfiguration(HelloWorldAdapterConfig adapterConfig) { if (adapterConfig.getMessage() == null || adapterConfig.getMessage().length() == 0) { throw new RuntimeException("invalid message: " + message); } } ... }
注釈付きのcheckConfigurationメソッドは、(拡張されたアダプタの構成ファイルに設定されている)アダプタ構成のmessageプロパティがnullまたは空でないことを確認します。nullまたは空の場合、メソッドは例外をスローします。
Oracle Stream Analyticsサーバーは、@Prepare注釈が付いたすべてのメソッドをコールして正常に実行した後、@Activateで注釈されているメソッドをコールします。@Activateメソッドを使用して、残りのアダプタ実装で使用するアダプタ構成データを取得します。次の例では、HelloWorldの例のアダプタ・コンポーネントで@Activate注釈を使用する方法を示します。
package com.bea.wlevs.adapter.example.helloworld; ... import com.bea.wlevs.configuration.Activate; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldAdapter implements RunnableBean, StreamSource { ... @Activate public void activateAdapter(HelloWorldAdapterConfig adapterConfig) { this.message = adapterConfig.getMessage(); } ... }
次の例は、このXSDファイルで結果のJava構成オブジェクトの完全修飾名(太字の部分)も指定されていることを示します。
<?xml version="1.0" encoding="UTF-8"?> <xs:schema xmlns="http://www.bea.com/ns/wlevs/example/helloworld" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:jxb="http://java.sun.com/xml/ns/jaxb" xmlns:xjc="http://java.sun.com/xml/ns/jaxb/xjc" xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" targetNamespace="http://www.bea.com/ns/wlevs/example/helloworld" elementFormDefault="unqualified" attributeFormDefault="unqualified" jxb:extensionBindingPrefixes="xjc" jxb:version="1.0"> <xs:annotation> <xs:appinfo> <jxb:schemaBindings> <jxb:package name="com.bea.wlevs.adapter.example.helloworld"/> </jxb:schemaBindings> </xs:appinfo> </xs:annotation> <xs:import namespace="http://www.bea.com/ns/wlevs/config/application" schemaLocation="wlevs_application_config.xsd"/> <xs:element name="config"> <xs:complexType> <xs:choice maxOccurs="unbounded"> <xs:element name="adapter" type="HelloWorldAdapterConfig"/> <xs:element name="processor" type="wlevs:DefaultProcessorConfig"/> <xs:element name="channel" type="wlevs:DefaultStreamConfig" /> </xs:choice> </xs:complexType> </xs:element> <xs:complexType name="HelloWorldAdapterConfig"> <xs:complexContent> <xs:extension base="wlevs:AdapterConfig"> <xs:sequence> <xs:element name="message" type="xs:string"/> </xs:sequence> </xs:extension> </xs:complexContent> </xs:complexType> </xs:schema>
Oracle Stream Analyticsは、アプリケーションのデプロイ時にこのクラスのインスタンスを作成します。たとえば、helloworldAdapterの構成ファイルのアダプタ・セクションは次のようになります。
<?xml version="1.0" encoding="UTF-8"?> <helloworld:config ... <adapter> <name>helloworldAdapter</name> <message>HelloWorld - the current time is:</message> </adapter> </helloworld:config>
注釈付きのactivateAdapterメソッドは、構成オブジェクトのgetMessageメソッドを使用して、アダプタの構成XMLファイルに設定されているmessageプロパティの値を取得します。この例の場合、値はHelloWorld - the current time is:です。この値は、アダプタ実装ファイルの主要部分で使用できます。
Oracle Stream Analyticsサーバーは、@Prepareで注釈されているコンポーネントがコールされたが例外がスローされた場合は常に、@Rollbackで注釈されているメソッドをコールします。次の例は、HelloWorldの例のアダプタ・コンポーネントのもので、@Rollback注釈を使用する方法を示します。
package com.bea.wlevs.adapter.example.helloworld; ... import com.bea.wlevs.configuration.Rollback; import com.bea.wlevs.ede.api.RunnableBean; import com.bea.wlevs.ede.api.StreamSender; import com.bea.wlevs.ede.api.StreamSource; import com.bea.wlevs.event.example.helloworld.HelloWorldEvent; public class HelloWorldAdapter implements RunnableBean, StreamSource { @Rollback ... public void rejectConfigurationChange(HelloWorldAdapterConfig adapterConfig) { }
このサンプルでは、rejectConfigurationChangeメソッドに@Rollback注釈が付いています。つまり、これは@Prepareメソッドが例外を送出した場合に呼び出されるメソッドです。上記の例では、実際には何も起こりません。