4 アダプタ
アダプタは、EPNの入出力データを管理します。Oracle Stream Analyticsには、CSV、RMIおよびHTTPなどの各種タイプのデータを処理するために様々な種類のインバウンド・アダプタとアウトバウンド・アダプタが用意されています。すべてのアダプタには、OSGi登録アダプタ・ファクトリ・サービスに対する参照であり、アダプタが処理するデータのタイプを定義するproviderプロパティがあります。
インバウンド・アダプタは、EPNに入るデータ・ストリームからイベント・データを受信し、データをイベント・タイプに応じてイベントに割り当て、データをEPN内の次のステージに送信します。アウトバウンド・アダプタは、EPNによって処理されたイベントを受信し、イベントを出力フォームに変換し、変換したデータを別のEPN、非EPNアプリケーション、CSVファイルまたはWebページなどの出力データ・ソースに送信します。
この章の内容は次のとおりです。
高可用性アダプタの詳細は、高可用性アプリケーションを参照してください。
csvgenアダプタとともにロード・ジェネレータを使用してデータ・フィードをシミュレートしてアプリケーションをテストする方法の詳細は、テスト1-2-3を参照してください。
4.1 アダプタの作成
ほとんどのアダプタを作成する上で最適な方法は、Oracle JDeveloperを使用する方法です。Oracle JDeveloperコンポーネント・ウィンドウには、インバウンド・アダプタおよびアウトバウンド・アダプタとしてCSV、EDN、RMI、HTTPおよびJMSが用意されています。
他のアダプタの場合、構成ファイルを直接編集します。この章の各アダプタの項では、設定を確認できるようにアセンブリと構成ファイルの構成の例を示します。
この章では、様々なタイプのアダプタに関するアセンブリおよび構成ファイルの設定について説明します。アダプタ・スキーマは、Oracle Stream AnalyticsのインストールのOracle/Middleware/my_oep/oep/wlevs_application_config.xsdディレクトリも参照してください。
4.2 クラスタ配分サービス
クラスタ配分機能には、クラスタ内のすべてのサーバーに着信イベントを配分するために様々なOracle Stream Analyticsアダプタ・タイプ用のメカニズムが採用されています。
イベントを配分するために個別アダプタまたはイベントBeanインスタンスを構成でき、この場合、このアダプタによって処理されるすべての入力イベントはクラスタ内のすべてのサーバーに送信(配分)されます。
配分アダプタにより、すべての入力イベントがクラスタ内のすべてのサーバーに確実に送信(配分)されます。入力アダプタを配分アダプタに変換するには、distributeInput要素を追加し、次のように、これをtrueに設定します。distributionThreadsCountプロパティはオプションで、1にデフォルト設定されます。
<wlevs:adapter id="myLoadgenAdapter" provider="loadgen"> <wlevs:instance-property name="distributeToClusterGroup" value="true"/> <wlevs:instance-property name="distributionThreadsCount" value="1"/> </wlevs:adapter>
Oracle Stream Analyticsは、loadgen、CSVインバウンド・アダプタおよびJMSインバウンド(キュー)アダプタ用のクラスタ配分サービスをサポートしています。Oracle Stream Analyticsは、CSVアウトバウンド・アダプタ、JMSインバウンド(トピック)アダプタ、JMSアウトバウンド・アダプタおよびHTTPパブリッシュ/サブスクライブ・アダプタ用のクラスタ配分サービスはサポートしていません。入力配分に対してトピック宛先を構成するとエラーになります。トピック構成によって警告ログ・メッセージが生成され、トピック構成は無視されます。
前述のアダプタ・タイプ以外にも、次の例に示すように、BeanのプロバイダをclusterGroupDistributorとして指定することにより、受信したすべてのイベントをすべてのクラスタ・メンバーに配分するためのイベントBeanを構成することもできます。
<wlevs:event-bean id="distributor-bean" provider="clusterGroupDistributor"> <wlevs:instance-property name="distributionThreadsCount" value="1"/> ... other event bean properties ... </wlevs:event-bean>
現在、Oracle JDeveloperには、クラスタ配分サービス用のコンポーネントは用意されていません。ただし、Oracle Stream Analyticsアプリケーション用のアセンブリ・ファイルおよび構成ファイルにエントリを追加することにより、クラスタ配分サービスを作成できます。
4.3 パスワードの暗号化
一部のアダプタには、ユーザー名およびパスワードの子要素があります。Oracle Stream Analyticsには、パスワードが含まれるファイルを暗号化できるようencryptMSAConfigコマンドが用意されています。
4.4 JAXBサポート
Oracle Stream Analyticsには、アダプタおよびイベントBeanでJava Architecture for XML Binding (JAXB)マッピング機能を使用してXMLオブジェクトとJavaオブジェクト間でイベント・データをマーシャリングおよびアンマーシャリングするための簡易インタフェースが用意されています。
JAXBインタフェースは、JAXB 2.2仕様およびEclipseLink Moxyプロバイダ拡張機能をサポートしています。
マッピング操作は次の方法で構成できます。
-
一連の注釈付きJavaクラスを出力するために、XMLスキーマからJavaオブジェクトにマップします。
-
JAXB注釈を使用して、1つのJavaオブジェクト・セットから別のJavaオブジェクト・セットまたはXMLにマップします。
-
既存のXMLスキーマから既存の事前定義済のJavaオブジェクト表現にマップします。このアプローチでは、EclipseLink Moxy拡張機能を使用し、マッピング詳細が含まれる外部の
metadataファイルを必要とします。metadataファイルは、アプリケーション構成によって参照されます。
4.4.1 EclipseLink Moxy
EclipseLink Moxyには、注釈を指定せずにXMLスキーマまたはJavaクラスを変更しないで既存のXMLスキーマと事前定義済のJavaクラス間でマップできる拡張機能が用意されています。マッピング情報は、XPath構文を使用して外部メタデータ・ファイルで指定します。
柔軟性の高いEclipseLink Moxy拡張機能を使用すると、複雑な操作を実行できます。たとえば、複雑なXMLデータのサブセットをより簡単なイベント表現にマップできます。また、深くネストしたXMLドキュメントをフラットなJava Beanイベント形式にフラット化し、Oracle CQLによる処理も可能にします。
EclipseLink Moxyの外部のmetadataはXMLで指定します。http://www.eclipse.org/eclipselink/xsds/eclipselink_oxm_2_2.xsdにあるスキーマにアクセスしてください。
4.4.2 API
JAXB機能を必要とするアダプタまたはイベントBeanは、com.oracle.cep.mappers.api.Mapperインタフェースを実装するBeanのインジェクションによってこの機能を取得します。Mapperインタフェースは、次のようになります。
public interface Mapper {
Marshaller createMarshaller() throws MapperException;
Unmarshaller createUnmarshaller() throws MapperException;
}
アダプタまたは他のEPNコンポーネント・コードでは、インジェクトされたBeanを使用してマーシャリング・オブジェクトおよびアンマーシャリング・オブジェクトを作成します。次に示すcom.oracle.cep.mappers.api.Marshallerおよびcom.oracle.cep.mappers.api.Unmarshaller インタフェースは、ほとんどのアプリケーションに対して機能します。
public interface Marshaller {
void marshal(Object object, javax.xml.transform.Result result)
throws MapperException;
}
public interface Unmarshaller {
Object unmarshal(javax.xml.transform.Source source)
throws MapperException;
}
一部のアプリケーションでは、ターゲット・クラスを引数として使用するアンマーシャリング・メソッドなど、マーシャリングおよびアンマーシャリング用として特別なメソッド・シグネチャが必要な場合があります。このような場合は、かわりにcom.oracle.cep.mappers.jaxb.JAXBMarshallerImpl およびcom.oracle.cep.mappers.jaxb.JAXBUnmarshallerImplインタフェースを使用します。これらのインタフェースには、javax.xml.bind.Marshallerおよびjavax.xml.bind.Unmarshallerインタフェースによってサポートされる完全セットのマーシャリング・メソッドおよびアンマーシャリング・メソッドに対応するメソッドが用意されています。
アセンブリ・ファイル
次のアセンブリ・ファイル・エントリでは、プロパティを使用してマッパーBeanをコールし、イベント・タイプおよびメタデータ・ファイルを指定しています。
<bean id="mapperBean" class="com.oracle.cep.mappers.jaxb.JAXBMapperImpl" >
<property name="eventTypeName" value="CallCenterActivity" />
<property name="metadata" value="external_metadata_case1.xml" />
</bean>
ファクトリをコールしてマッパーBeanを作成するには、Bean要素に対して次を指定します。
<bean id="mapperBean" class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory"
factorymethod="create"/>構成ファイル
次の構成ファイル・エントリは、マッパーBeanに対してプロパティを指定します。
<jaxb-mapper>
<name>mapperBean</name>
<event-type-name>CallCenterActivity</event-type-name>
<metadata>external_metadata_case1.xml</metadata>
</jaxb-mapper>プロパティ
マッパーBeanは、プロパティをサポートしています。metadataMapを除くすべてのプロパティは、アセンブリ・ファイルのプロパティまたは構成ファイルの要素として構成できます。
表4-1 マッパーBeanのプロパティおよび要素
| アセンブリ・ファイルのプロパティ名 | 構成ファイルの要素名 | 説明 |
|---|---|---|
|
eventTypeName |
event-type-name |
アプリケーションによってイベント・タイプ・リポジトリに登録されているイベント・タイプの名前。イベント・タイプはJavaクラスに対応しています。このクラスのパッケージ名は、マッパーBeanによって表されるJAXBContextを初期化する際にコンテキスト・パスとして使用されます。 この構成では、マッパーによって表される |
|
|
|
Javaパッケージ名のコロン区切りリスト。マッパーBeanによって表される この構成では、マッパーによって表される |
|
|
|
|
|
|
|
検証に使用されるXMLスキーマ・ファイルのファイル名。 |
|
|
|
マッピングのカスタマイズ用のEclipseLink Moxy外部メタデータが含まれるファイルの名前。 |
|
|
N/A |
|
アプリケーションでEclipseLink Moxy固有の外部metadataが使用される場合、metadataの場所はmetadataプロパティまたはmetadataMapプロパティによって指定されます。contextPath上に複数のパッケージがある場合、metadataMapプロパティが必要です。コンポーネント構成ファイルでのmetadataMapプロパティの指定はサポートされていません。
4.5 CSVアダプタ
CSVアダプタは、カンマで区切られたインバウンド・データおよびアウトバウンド・データを処理します。EPNに入力するためにカンマ区切りの値形式でデータを受信するにはCSVInboundアダプタを使用し、EPNから出力するカンマ区切りの値形式でデータを送信するにはCSVOutboundアダプタを使用します。
Oracle Stream Analyticsインストールで提供されるロード・ジェネレータで、CSVインバウンド・アダプタを使用するOracle Stream Analyticsアプリケーションをテストできます。ロード・ジェネレータにより、サンプル・データが含まれるASCIIファイルが読み取られます。CSVインバウンド・アダプタを使用する必要があるのは、CSVインバウンド・アダプタはロード・ジェネレータによって生成されたデータ・パケットを解読するようコーディングされているためです。ロード・ジェネレータとcsvgenアダプタを参照してください。
注意:
java.sql.TimeStampタイプの場合、CSVアダプタは、yyyy-mm-dd'T'hh:mm:ss[.fffffffff]の形式でデータの読取りおよび書込みを行います。たとえば、"2012-12-12T12:12:12.120"です。
CSVアダプタを作成する上で最適な方法は、Oracle JDeveloperコンポーネント・ウィンドウを使用する方法です。Oracle JDeveloperによって生成された次のアセンブリおよび構成ファイルは、CSVのインバウンド・アダプタおよびアウトバウンド・アダプタの構成を示しています。
アセンブリ・ファイル
CSVインバウンド・アダプタは、StockData.csvファイルから読み取られたデータをTradeEventイベント・タイプのイベントに変換します。
wlevs:listener要素は、TradeEventアクティビティについてCSVインバウンド・アダプタをリスニングするコンポーネントを指定します。この例では、リスナーはAdapterOutputChannelです。AdapterOutputChannelコンポーネントは、StockTradeCSVInboundAdapterをリスニングしてここから取引イベントを受信し、次のステージに送信します。
<wlevs:adapter id="StockTradeCSVInboundAdapter" provider="csv-inbound">
<wlevs:listener ref="AdapterOutputChannel"/>
<wlevs:instance-property name="eventType" value="TradeEvent"/>
<wlevs:instance-property name="sourceUrl"
value="file:/scratch/mpawlan/oep9-19/oep/utils/load-generator/StockData.csv"/>
</wlevs:adapter>
アウトバウンド・アダプタのアセンブリ・ファイル構成は、インバウンド・アダプタと類似していますが、append属性が含まれます。trueに設定すると、Oracle Stream Analyticsにより、既存の出力ファイルにデータが追加されます。falseに設定すると、Oracle Stream Analyticsにより、新規ファイルが作成されるか、または同じ名前の既存ファイルが上書きされます。
<wlevs:adapter id="StockTradeCSVOutboundAdapter" provider="csv-outbound">
<wlevs:instance-property name="eventType" value="TradeEvent"/>
<wlevs:instance-property name="outputFile" value="/scratch/mpawlan/oep9-19/oep/utils/load-generator/StockData.csv"/>
<wlevs:instance-property name="append" value="false"/>
</wlevs:adapter>
outputFile値には絶対パスまたは相対パスを指定できます。相対パスの場合、../filename.csv、../result.csvまたはupload/result.csvを指定できます。相対パスを指定する場合、抽象パスに親ディレクトリが含まれることを確認してください。たとえば、UNIXの場合、カレント・ディレクトリ内のファイルを単純にresult.csvとして指定するのではなく./result.csvとして指定します。
構成ファイル
構成ファイル内のアダプタ要素には、アダプタのname属性とその値が示されます。アダプタ名は、アセンブリ・ファイル内のアダプタのid属性と一致する必要があります。
<csv-adapter> <name>StockTradeCSVInboundAdapter</name> <event-interval units="nanoseconds">5</event-interval> </csv-adapter>
<csv-adapter> <name>StockTradeCSVOutboundAdapter</name> </csv-adapter>
4.6 EDNアダプタ
イベント配信ネットワーク(EDN)のインバウンド・アダプタおよびアウトバウンド・アダプタは、JAXBを使用してEPNからOracle SOA Suiteイベント・ネットワークへのインタフェースを実現します。
EDNアダプタには、EDN XMLデータ転送をRAW XML (trueの場合)として表すかJAXBを使用してJavaオブジェクトとして表すかを指定するraw-xml-content構成要素があります。JAXBの場合、アダプタは、一連の適切なスキーマ(xjc)によって生成されたクラスがOracle Stream Analyticsアプリケーション・バンドルのクラス・パスに含まれることを想定します。
EDNアダプタは、イベント・タイプおよびEDLファイルへの参照を使用して構成します。初期化時には、構成されたイベント・タイプと一致するイベント定義QNameが含まれるEDLファイルが検索されます。構成されたイベント・タイプがEDL内で見つかった場合は、対応するQNameについてEDNにサブスクリプションが登録されます。
4.6.1 使用方法
EDNInboundアダプタを使用して、Oracle SOA Suiteイベント・ネットワークからの着信データを受信します。EDN入力アダプタは、指定されたEDNイベント・タイプをサブスクライブし、着信EDNイベントをOracle Stream Analyticsイベント・タイプに変換し、Oracle Stream Analyticsアプリケーションが処理できるようにします。
EDNOutboundアダプタを使用して、Oracle SOA Suiteイベント・ネットワークにアウトバウンド・データを送信します。EDN出力アダプタは、Oracle Stream Analyticsイベントを対応するEDNイベントに変換し、これらをEDNにパブリッシュします。パブリッシュされたイベントは、Oracle Stream Analyticsアプリケーションで生成された新規イベントである場合や、EDN入力アダプタによって受信され、Oracle Stream Analyticsアプリケーションによって処理され、EDN出力アダプタに送信されたEDNイベントである場合があります。
4.6.2 EDNアダプタの作成
EDNアダプタを作成する上で最適な方法は、Oracle JDeveloperコンポーネント・ウィンドウを使用する方法です。Oracle JDeveloperによって生成された次のアセンブリおよび構成ファイルは、EDNのインバウンド・アダプタおよびアウトバウンド・アダプタの構成を示しています。
アセンブリ・ファイル
-
EDN入力アダプタは、EDN入力チャネルをリスニングして
FraudCheckRequestタイプのすべてのイベントを検索します。 -
EDN出力アダプタは、
FraudCheckRequestタイプのイベントをEPN内の次のステージに送信します。
<wlevs:adapter id="edn-inbound-adapter" provider="edn-inbound">
<wlevs:listener ref="ednInputChannel"/>
<wlevs:instance-property name="eventType" value="FraudCheckRequest"/>
</wlevs:adapter>
<wlevs:adapter id="edn-outbound-adapter" provider="edn-outbound">
<wlevs:instance-property name="eventType" value="FraudCheckResponse"/>
</wlevs:adapter>構成ファイル
注意:
バンドルされたJARファイルの固定パスにEDLファイルおよびスキーマ(xsd)ファイルを配置する必要があります。<edn-adapter>
<name>edn-outbound-adapter</name>
<edl-file>FraudCheckEvent.edl</edl-file>
<validate>false</validate>
<raw-xml-content>false</raw-xml-content>
<jndi-provider-url>t3://localhost:7101</jndi-provider-url>
<jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
<user>weblogic</user>
<password>welcome1</password>
</edn-adapter>
<edn-adapter>
<name>edn-inbound-adapter</name>
<edl-file>FraudCheckEvent.edl</edl-file>
<schema-file>FraudCheckType.xsd</schema-file>
<validate>false</validate>
<raw-xml-content>false</raw-xml-content>
<jndi-provider-url>t3://localhost:7101</jndi-provider-url>
<jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
<user>weblogic</user>
<password>welcome1</password>
</edn-adapter>4.7 ファイル・アダプタ
ファイル・アダプタは、ファイルからデータを読み取ってEPNに書き込み、このデータをイベントに変換します。
現在、Oracle JDeveloperには、ファイル・アダプタ用のコンポーネントは用意されていません。ただし、Oracle Stream Analyticsアプリケーション用のアセンブリ・ファイルおよび構成ファイルにエントリを追加することにより、ファイル・アダプタを作成できます。
アセンブリ・ファイル
pathプロパティは、入力ファイルの場所を示します。アダプタは、入力ファイルからデータを読み取る際に、受信データをOrderArrivalEventタイプのイベントに変換します。ファイル・アダプタがファイルの読取りを開始するまでに5000ナノ秒のinitialDelayがあります。ダウンストリームのOrderArrivalチャネルは、OrderArrivalEventタイプのイベントをリスニングします。
<wlevs:adapter id="inputAdapter" provider="file" >
<wlevs:instance-property name="path"
value="@wlevs.domain.home@/inpOrderArrival.txt"/>
<wlevs:instance-property name="eventType" value="OrderArrivalEvent"/>
<wlevs:instance-property name="initialDelay" value="5000"/>
<wlevs:listener ref="OrderArrival"/>
</wlevs:adapter>構成ファイル
<adapter>
<name>inputAdapter</name>
</adapter>4.8 HTTPパブリッシュ/サブスクライブ・アダプタ
HTTPパブリッシャ・アダプタを使用して、JavaScript Object Notation (JSON)イベント・データをEPNからWebベースのユーザー・インタフェースに送信します。HTTPサブスクライバ・アダプタを使用して、EPNに入るJavaScript Object Notation (JSON)イベント・データを受け入れます。JSONイベント・データは、ユーザー・アクションによってイベントを生成するHTTPサーバーから送信されます。
Oracle Stream Analytics内のHTTPパブリッシュ/サブスクライブ・サーバーは、cometdプロジェクトで提案されているBayeuxプロトコルに基づいています。Bayeuxプロトコルには、クライアントとサーバーがHTTPを介して非同期メッセージで通信するための規約が定義されています。
リモートまたはローカルHTTPパブリッシャ・アダプタおよびリモートHTTPサブスクライバ・アダプタを作成できます。HTTPアダプタがローカルとリモートのどちらであるかは、必要な<server-url>子要素に指定するローカルまたはリモートURLによって決定されます。
HTTPパブリッシュ/サブスクライブ・アダプタを作成する上で最適な方法は、Oracle JDeveloperを使用する方法です。
注意:
HTTPパブリッシュ/サブスクライブ・アダプタで使用されるイベント・タイプのプロパティ・タイプとして、バイト配列はサポートされていません。
アセンブリ・ファイル
<wlevs:adapter id="http-pub-adapter" provider="httppub"/>
<wlevs:adapter id="http-sub-adapter" provider="httpsub" />
構成ファイル
すべてのローカル・アダプタおよびリモート・アダプタについて、サーバーのURLをserver-urlプロパティに指定します。サーバーには、Oracle Stream Analyticsサーバー、WebLogic Serverインスタンス、または任意のサードパーティのHTTPパブリッシュ/サブスクライブ・サーバーを指定できます。
パブリッシュ対象のすべてのローカル・アダプタについて、server-context-path要素を追加し、現在のOracle Stream AnalyticsアプリケーションをホストするOracle Stream Analyticsインスタンスに関連付けられているローカルHTTPパブリッシュ/サブスクライブ・サーバーへのパスを指定します。
デフォルトでは、各Oracle Stream Analyticsサーバーは、パス/pubsubを持つHTTPパブリッシュ/サブスクライブ・サーバーを使用して構成されます。新規ローカルHTTPパブリッシュ/サブスクライブ・サーバーを作成したか、デフォルトの構成を変更した場合は、サーバー・ファイル内のサーバーの場所を指定します。ファイル内で、サーバーの場所とともにhttp-pubsub要素のpath値を指定します。Oracle Stream Analyticsインストール内のサーバー・ファイルは、/Oracle/Middleware/my_oep/examples/domains/<my_domain>/defaultserverにあります。
channel子要素は、アダプタがパブリッシュするチャネルまたはサブスクライブするチャネルを指定します。
<http-pub-sub-adapter> <name>http-pub-adapter</name> <server-url>http://myhost.com:9102/pubsub</server-url> <channel>/channel2</channel> <event-type>com.mycompany.httppubsub.PubsubEvent</event-type> <user>wlevs</user> <password>wlevs</password> </http-pub-sub-adapter> <http-pub-sub-adapter> <name>http-sub-adapter</name> <server-url>http://myhost.com:9102/pubsub</server-url> <channel>/channel2</channel> <event-type>com.mycompany.httppubsub.PubsubEvent</event-type> </http-pub-sub-adapter>
4.9 HTTPパブリッシュ/サブスクライブ・アダプタのカスタム・コンバータBean
HTTPパブリッシュ/サブスクライブ・アダプタは、受信したJavaScript Object Notation (JSON)メッセージをイベント・タイプに変換し、再度戻します。JSONのインバウンド・メッセージおよびアウトバウンド・メッセージをイベント・タイプに変換し、再度戻す方法をカスタマイズするには、カスタム・コンバータBeanを作成します。
4.9.1 Bayeuxプロトコル
HTTPパブリッシュ/サブスクライブ(pub-sub)サーバーはcometdプロジェクトで提案されているBayeuxプロトコルに基づいています。Bayeuxプロトコルには、クライアントとサーバーがHTTPを介して非同期メッセージで通信するための規約が定義されています。pub-subサーバーは、Bayeuxプロトコルを理解するすべてのクライアントと通信できます。
次のフレームワークを使用してWebクライアントを開発できます。
-
BayeuxプロトコルをサポートするDojo JavaScriptライブラリ。Oracle Stream Analyticsでは、このライブラリは提供されていません。詳細は、http://dojotoolkit.org/を参照してください。
-
Bayeuxプロトコルを使用してpub-subサーバーと通信するFlexクライアントの開発を可能にするWebLogic Workshop Flexプラグイン。
4.9.2 カスタム・コンバータBeanの作成
カスタム・コンバータBeanは、次のインタフェースを実装するJavaクラスです。
-
InboundMessageConverterインタフェース: インバウンドJSONメッセージをイベントに変換します。 -
OutboundMessageConverterインタフェース: イベントをJSONメッセージに変換します。
これらのAPIの詳細は、Oracle Stream Analytics Java APIリファレンスを参照してください。
インバウンドHTTP Pub-Sub JSONメッセージ
インバウンドHTTP pub-sub JSONメッセージ用のカスタム・コンバータBeanは、com.bea.wlevs.adapters.httppubsub.api.InboundMessageConverterインタフェースを実装します。このインタフェースは、convertメソッドのみが設定されています。
public List convert(JSONObject message) throws Exception;
messageパラメータは、JSON形式のインバウンドHTTP pub-subメッセージです。戻り値は、EPN内の次のノードに渡すイベントのListです。
アウトバウンドHTTP Pub-Sub JSONメッセージ
アウトバウンドHTTP pub-subメッセージ用のカスタム・コンバータBeanは、com.bea.wlevs.adapters.httppubsub.api.OutboundMessageConverterインタフェースを実装します。このインタフェースは、convertメソッドのみが設定されています。
public List<JSONObject> convert(Object event) throws Exception;
eventパラメータは、EPN内のソース・ノードから送信されてアウトバウンドHTTP pub-subアダプタによって受信されたイベントです。戻り値は、JSONメッセージのListです。
例
次の例は、InboundMessageConverterインタフェースとOutboundMessageConvertインタフェースの両方を実装するカスタム・コンバータBeanを示しています。このBeanは、インバウンドとアウトバウンドの両方のHTTP pub-subアダプタに対して使用できます。
注意:
JavaオブジェクトをJSON形式に変換するためにGSON Javaライブラリを使用できます。詳細は、http://www.json.orgおよびhttp://code.google.com/p/google-gsonを参照してください。
package com.sample.httppubsub;
import com.bea.wlevs.adapters.httppubsub.api.InboundMessageConverter;
import com.bea.wlevs.adapters.httppubsub.api.OutboundMessageConverter;
import com.bea.httppubsub.json.JSONObject;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
public class TestConverter implements InboundMessageConverter, OutboundMessageConverter {
public List convert(JSONObject message) throws Exception {
List eventCollection = new ArrayList();
PubsubTestEvent event = new PubsubTestEvent();
event.setMessage("From TestConverter: " + message);
eventCollection.add(event);
return eventCollection;
}
public List<JSONObject> convert(Object event) throws Exception {
List<JSONObject> list = new ArrayList<JSONObject>(1);
Map map = new HashMap();
map.put("message", ((PubsubTestEvent) event).getMessage());
list.add(new JSONObject(map));
return list;
}
}
4.10 JMSアダプタ
JMSアダプタを使用して、Oracle Stream AnalyticsのEPNにJava Message Service (JMS)を接続し、JMSメッセージを送受信します。
Oracle Stream Analytics JMSアダプタは、Java EEに準拠する、Javaクライアントを提供するJMSサービス・プロバイダをサポートします。
JMSインバウンド・アダプタが着信JMSメッセージをOracle Stream Analyticsイベントに変換し、JMSアウトバウンド・アダプタがOracle Stream AnalyticsイベントをJMSメッセージに変換します。インバウンド変換をカスタマイズするには、独自のJavaクラスを作成します。
JMSアダプタを作成する上で最適な方法は、Oracle JDeveloperコンポーネント・ウィンドウを使用する方法です。Oracle JDeveloperによって生成された次のアセンブリおよび構成ファイルは、JMSのインバウンド・アダプタおよびアウトバウンド・アダプタの構成を示しています。
注意:
アウトバウンドJMSアダプタに関連付けられたMessageConverterオブジェクトで発生する例外によって基礎となるJMSトランザクションがロールバックされることはありません。アウトバウンドJMSアダプタ内のMessageConverterの外部で例外が発生した場合、既存のJMSトランザクションがロールバックされます。
4.10.1 サービス・プロバイダ
Oracle Stream Analyticsは、次のサービス・プロバイダに対してテストされます。
-
Oracle T3プロトコルを使用してOracle WebLogic Serverと通信するJava RMIクライアントであるWebLogic T3クライアント
-
Oracle WebLogic Server JMSのバージョン10.0、10.3および10.3.1
-
Tibco EMS JMSの現在のバージョン
使用するサービス・プロバイダがリストにない場合は、使用しているサービス・プロバイダに連絡し、jms-adapter構成に必要なjndi-provider-urlおよびjndi-factoryの情報を取得することにより、使用しているサービス・プロバイダとともに使用するようOracle Stream Analytics JMSアダプタを構成できます。
4.10.2 インバウンド・アダプタ構成
アセンブリ・ファイル
<wlevs:adapter id="jms-inbound-adapter" provider="jms-inbound" />
構成ファイル
インバウンド・アダプタは、着信JMSメッセージをTradeEventに変換します。JNDIファクトリおよびサービス・プロバイダは、weblogic.jndi.WLInitialContextFactoryおよびt3://localhost:7101です。着信クライアントは、JNDI名がJNDINameであるアダプタを検索します。JMSアダプタは、JMSメッセージをイベントに変換した後、イベントをQueue1のJNDI宛先に送信します。
オプションのconnection-jndi-name要素は、JMS接続ファクトリのJNDI名を提供します。必須のdestination-jndi-name要素は、JMS宛先のJNDI名を提供します。session-transacted要素がfalseである場合、セッションがトランザクション・セッションではないことを示します。
<jms-adapter>
<name>jms-inbound-adapter</name>
<event-type>TradeEvent</event-type>
<jndi-provider-url>t3://localhost:7101</jndi-provider-url>
<jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
<connection-jndi-name>JNDIName</connection-jndi-name>
<destination-jndi-name>Queue1</destination-jndi-name>
<session-transacted>false</session-transacted>
</jms-adapter>4.10.2.1 シングルおよびマルチスレッドのインバウンドJMSアダプタ
デフォルトでは、インバウンドJMSアダプタはシングルスレッドです。インバウンドJMSアダプタがシングルスレッドであると、イベント順序が保証されます。
スケーラビリティを向上させるために、インバウンドJMSアダプタでマルチスレッドを使用してJMS宛先からメッセージを読み取るように構成できます。インバウンドJMSアダプタがマルチスレッドであると、イベント順序は保証されません。複数のスレッドを使用するには、work-manager子要素を使用してワーク・マネージャとともにアダプタを構成します。アダプタのみが使用する専用のワーク・マネージャを指定するか、他のアダプタやJettyなどのいくつかのコンポーネントでワーク・マネージャを共有できます。
4.10.2.2 恒久サブスクリプション用のJMSアダプタの構成
JMSトピックへの恒久サブスクリプション内のクライアントになるように、インバウンドJMSアダプタを構成できます。恒久サブスクリプションでは、アダプタが非アクティブになった場合も、アダプタはパブリッシュされたメッセージを確実に受信します。インバウンド・アダプタがJMSサーバーに接続すると、恒久サブスクリプションが登録されます。それ以降にトピックに送信されるメッセージは、サブスクライバが切断されている間も保持され(期限切れではない場合)、再接続すると配信されます。
恒久サブスクリプションは、JMSメッセージをトピックにパブリッシュしているパブリッシャが永続配信モードを使用していることを前提とします。パブリッシャはOracle Stream AnalyticsのアウトバウンドJMSアダプタである(つまりdelivery-modeの値は、デフォルト値のpersistentである必要がある)ことに注意してください。
アダプタ内の恒久サブスクリプションの作成
-
JMSメッセージ・パブリッシャが永続モードでメッセージを配信していることを確認します。
-
接続ファクトリのクライアントIDを指定します。Oracle WebLogic Serverで、管理上、コンソールを使用して接続ファクトリにクライアントIDを設定できます。恒久サブスクライバを使用している各アダプタ・インスタンスに専用の接続ファクトリが構成されている必要があります。
-
次の3つの
jms-adapterプロパティを設定します。-
destination-typeをTOPICに設定します。 -
durable-subscriptionをtrueに設定します。 -
durable-subscription-nameを一意のサブスクリプション識別子に設定します。
-
4.10.3 アウトバウンド・アダプタ構成
アウトバウンドJMSアダプタは、イベントをJMSマップ・メッセージに変換し、JMSメッセージをJMS宛先に送信します。イベント・タイプが発信JMSメッセージに変換される方法を厳密に指定する独自のJavaクラスを作成することで、この変換をカスタマイズすることもできます。
アセンブリ・ファイル
<wlevs:adapter id="jms-outbound-adapter" provider="jms-outbound"/>構成ファイル
JMSアウトバウンド・アダプタの構成は、JMSインバウンド・アダプタの構成とほぼ同じです。アウトバウンド・アダプタは、JMS宛先を指定し、JMS宛先にアクセスするためのユーザー名およびパスワードを提供します。このセッションはトランザクション・セッションであり、永続的ではありません。
<jms-adapter>
<name>jms-outbound-adapter</name>
<event-type>TradeEvent</event-type>
<jndi-provider-url>t3://localhost:7101</jndi-provider-url>
<jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
<connection-jndi-name>Topic</connection-jndi-name>
<destination-jndi-name>Queue2</destination-jndi-name>
<user>weblogic</user>
<password>welcome1</password>
<session-transacted>true</session-transacted>
<delivery-mode>nonpersistent</delivery-mode>
</jms-adapter>4.11 JMSカスタム・メッセージ・コンバータBean
JMSメッセージとイベント・タイプ間の変換をカスタマイズするには、インバウンド・コンバータBeanおよびアウトバウンド・コンバータBeanを作成し、これらをOracle Stream Analyticsアプリケーションとともにパッケージ化します。
4.11.1 インタフェースの実装
インバウンド・コンバータBeanおよびアウトバウンド・コンバータBeanは、次の2つのインバウンド・インタフェースおよびアウトバウンド・インタフェースでメソッドを実装します。これらのAPIの詳細は、Oracle Stream Analytics Java APIリファレンスを参照してください。
-
インバウンド:
com.bea.wlevs.adapters.jms.api.InboundMessageConverter。convertメソッドを実装する必要があります。return値は、ダウンストリームで渡されるイベントのListです。public List convert(Message message) throws MessageConverterException, JMSException;messageパラメータ: 着信JMSメッセージに対応します。 -
アウトバウンド:
com.bea.wlevs.adapters.jms.api.OutboundMessageConverterインタフェース。convertメソッドを実装する必要があります。return値は、JMSメッセージのListです。public List<Message> convert(Session session, Object event) throws MessageConverterException, JMSException;sessionパラメータ: メッセージの作成に使用されるjavax.jms.Session。eventパラメータ: EPN内のソース・ステージから送信されてアウトバウンドJMSアダプタによって受信されるイベント。
4.12 Oracle Business Rulesアダプタ
Oracle Business Rules (OBR)アダプタは、Oracle Business Rules製品のビジネス・ルール・エンジンをラップするイベントBeanです。OBRアダプタを使用すると、イベントをファクトとしてアサートおよび取消しを行い、ビジネス・ルールをトリガーできます。
イベントを生成し、ビジネス・ロジックをOracle CQLプロセッサのダウンストリームに追加し、イベントを処理するようOBRルールを構成できます。たとえば、ルール・ファイル内でStreamSender.sendInsertEventを起動し、OBRアダプタからデータをイベントとして送信できます。
Oracle JDeveloperでは、アセンブリ・ファイルまたはEPN図に対してドラッグ・アンド・ドロップ・コンポーネントは提供されていませんが、構成ファイルに対してはドラッグ・アンド・ドロップ・コンポーネントが提供されています。
アセンブリ・ファイルにエントリを追加すること、およびOBRアダプタを構成ファイルにドラッグ・アンド・ドロップすることで、OBRアダプタを作成できます。エントリの追加によるOBRアダプタ作成の詳細は、OBRドキュメント(http://www.oracle.com/technetwork/middleware/business-rules/documentation/index.html)を参照してください。
アセンブリ・ファイル
event-type-repository要素は、アプリケーションのイベント・タイプ・リポジトリを指定します。次の例では、リポジトリにはHelloWorldEventという名前の単一のイベント・タイプがあり、リポジトリはHelloWorldEvent.javaクラスによって実装されます。
次のアダプタは、HelloWorldAdapter Javaクラスの値を使用してhelloworldAdapter IDと等しいidを指定します。アダプタはHelloWorldAdapter Javaクラスから作成されます。OBRアダプタ構成には、指定したメッセージ・テキストとともにmessage要素が含まれます。HelloWorldAdapterクラスは、アプリケーションの実行時にmessageを出力します。この例では、HelloWorldAdapterクラスがイベント・ソースです。
チャネルおよびOracle CQLプロセッサの構成の後に、OBRアダプタの宣言である<wlevs:adapter id="OBRAdapter" provider = "obr">が続き、その後ろにdecisionFunctionおよびdictionaryURLプロパティが続きます。dictionaryURLプロパティはルールが含まれるOBRディクショナリ・ファイルへのパスであり、decisionFunctionプロパティは使用するOBR関数です。handler1プロパティは、この情報にアクセスするための他のコンポーネントのハンドルです。
注意:
OBRアダプタは、ファクトの自動取消しを処理しません。アップストリーム・プロセッサによってストリームが出力される場合、必要に応じて、またはルールの優先度に応じて最後のルールがトリガーされたら、ルール・ファイル内のファクトを取り消します。
最下部にあるのはHelloWorldBean構成です。HelloWorldBeanは、HelloworldEventおよびHelloWorldAdapterクラスをインスタンス化するJavaクラスです。
<wlevs:event-type-repository>
<wlevs:event-type type-name="HelloWorldEvent">
<wlevs:class>com.bea.wlevs.event.example.helloworld.HelloWorldEvent
</wlevs:class>
</wlevs:event-type>
</wlevs:event-type-repository>
<wlevs:adapter id="helloworldAdapter"
class="com.bea.wlevs.adapter.example.helloworld.HelloWorldAdapter" >
<wlevs:instance-property name="message" value="HelloWorld - The time is:"/>
</wlevs:adapter>
<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent" >
<wlevs:listener ref="helloworldProcessor"/>
<wlevs:source ref="helloworldAdapter"/>
</wlevs:channel>
<wlevs:processor id="helloworldProcessor" />
<wlevs:channel id="helloworldOutputChannel" event-type="HelloWorldEvent"
advertise="true" max-threads="0" max-size="0" >
<wlevs:listener ref="OBRAdapter"/>
<wlevs:source ref="helloworldProcessor"/>
</wlevs:channel>
<wlevs:adapter id="OBRAdapter" provider = "obr">
<wlevs:instance-property name="decisionFunction" value="handler1" />
<wlevs:instance-property name="dictionaryUrl" value="file:helloworld.rules"/>
<wlevs:listener ref="OutputBean"/>
</wlevs:adapter>
<wlevs:event-bean id="OutputBean"
class="com.bea.wlevs.example.helloworld.HelloWorldBean">
</wlevs:event-bean>構成ファイル
この構成ファイルは、OBRアダプタから受信したHelloworldEventを処理するために使用される問合せルールおよびOracle CQLプロセッサを宣言します。また、OBRルールにアクセスするためのOBRアダプタのハンドラ(handler1)も提供します。dictionary-url要素は、使用する決定関数およびルールが含まれるOBRディクショナリ・ファイルへのパスを指定します。decision-function要素は、使用するOBR決定関数の名前を指定します。
<processor>
<name>helloworldProcessor</name>
<rules>
<query id="helloworldRule">
<![CDATA[ select * from helloworldInputChannel[range 10 slide 5] ]] >
select * from helloworldInputChannel[now]
</query>
</rules>
</processor>
<obr-adapter>
<name>OBRAdapter</name>
<dictionary-url>file:helloworld.rules</dictionary-url>
<decision-function>handler1</decision-function>
</obr-adapter>4.13 QuickFixアダプタ
QuickFixアダプタは、Financial Information eXchange (FIX)標準に従ってセキュリティ・トランザクションのリアルタイムな電子交換を処理するためのフル機能のメッセージング・エンジンです。
QuickFixアダプタは、FIXメッセージのリスニング、FIXメッセージのOracle Stream Analyticsイベントへの変換、および次のステージで処理を行うためのイベント送信を行います。
QuickFixアダプタは、基礎となるQuickFIXエンジンが提供する構成データをすべてサポートしています。FIX構成データの詳細は、『QuickFIX/J User Manual』(http://www.quickfixj.org/documentation/)を参照してください。
注意:
12cリリースでは、Oracle Stream AnalyticsはQuickFIX動的アクセプタ・セッションをサポートしません。
QuickFixアダプタはSSLをサポートしていません。
4.13.1 サポートされているQuickFixバージョンおよびサポートされていないメッセージ・タイプ
QuickFIXエンジンは、広範囲なメッセージ・タイプをサポートし、これらのメッセージ・タイプ内のデータを検証します。QuickFIXエンジンでサポートされていないメッセージがOracle Stream Analyticsアプリケーションに必要である場合、デフォルトのQuickFIXメッセージ・ハンドラを拡張し、適切なハンドラ・メソッドをオーバーライドすることにより、カスタム・ハンドラを作成する必要があります。
QuickFIXアダプタは、次のQuickFIXバージョンをサポートしています。各QuickFIXバージョンの下にリストされているメッセージはサポートされていません。
QuickFIXバージョン4.0
次のメッセージ・タイプはこのバージョンでサポートされていません。
Heartbeat、Logon、TestRequest、ResendRequest、Reject、SequenceReset、Logout
QuickFIXバージョン4.1
次のメッセージ・タイプはこのバージョンでサポートされていません。
Heartbeat、Logon、TestRequest、ResendRequest、Reject、SequenceReset、Logout
QuickFIXバージョン4.2
次のメッセージ・タイプはこのバージョンでサポートされていません。
Heartbeat、Logon、TestRequest、ResendRequest、Reject、SequenceReset、Logout
QuickFIXバージョン4.3
次のメッセージ・タイプはこのバージョンでサポートされていません。
Heartbeat、Logon、TestRequest、ResendRequest、Reject、SequenceReset、Logout
QuickFIXバージョン4.4
次のメッセージ・タイプはこのバージョンでサポートされていません。
Heartbeat、Logon、TestRequest、ResendRequest、Reject、SequenceReset、Logout
4.13.2 QuickFixアダプタの構成
現在、Oracle JDeveloperには、QuickFixアダプタ用のコンポーネントは用意されていません。ただし、Oracle Stream Analyticsアプリケーション用のアセンブリ・ファイルおよび構成ファイルにエントリを追加することにより、QuickFixアダプタを作成できます。
アセンブリ・ファイル
<wlevs:adapter id="QuickFIXInbound" provider="quickfix-inbound"/>
構成ファイル
この構成は、少なくとも1つのdefault-session要素を必要とし、ゼロ以上のセッション要素を設定できます。session要素は、特定のQuickFixセッションに使用される構成設定のグループを表します。複数のセッションが必要である場合、共通構成がdefault-session要素にグループ化されます。デフォルトでは、すべてのsessionタグが、default-sessionタグで宣言されているすべての要素を継承します。
config-name要素内のBeginString値は、使用されているFIXメッセージのバージョンを示します。BeginStringデータはQuickFIXエンジンによって検証されません。
http://quickfixj.org/quickfixj/usermanual/1.5.3/usage/configuration.htmlにあるドキュメントによると、必須およびサポート対象の識別子フィールドは、BeginString、SenderCompIDおよびTargetCompIDです。
SenderSubID、SenderLocationID、TargetSubIDおよびTargetLocationIDなどの他のフィールドは必須ではなく、リリース12.1.3ではサポートされていません。
<quickfix-adapter>
<name>quickfixAdpater</name>
<event-type>MyConfigEvent</event-type>
<default-session description="default configuration">
...
<configuration description="identifier-message format">
<config-name>BeginString</config-name>
<config-value>FIXT.0.1</config-value>
</configuration>
</default-session>
<session description="ordertracker configuration">
<configuration description="identifier-acceptor">
<config-name>SenderCompID</config-name>
<config-value>QA</config-value>
</configuration>
<configuration description="identifier-initiator">
<config-name>TargetCompID</config-name>
<config-value>ORACLE</config-value>
</configuration>
</session>
</quickfix-adapter> 4.13.3 ソケットベースのアクセプタのフェイルオーバーの構成
共有MessageStoreを使用して2つのアクセプタ・プロセスを実行することにより、ソケット・ベースのアクセプタ用の単純なフェイルオーバー・スキームを構成できます。1つのプロセスはアクティブ・アクセプタであり、もう1つのプロセスは任意のセッションに対してスタンバイしています。1つのアクセプタ・プロセスが終了すると、フェイルオーバー・アドレスを使用して構成されているクライアントが別のアクセプタにログオンしようとします。ログオンに成功すると、このセッションのメッセージ・ストアがリフレッシュされ、フェイルオーバー・セッションは正常に機能し続けます。
フェイルオーバー・スキームが正常に機能するには、次の構成設定が必要です。
RefreshMessageStoreAtLogon=Y SocketConnectPort1=8392 SocketConnectHost1=8.8.8.8 SocketConnectPort2=2932 SocketConnectHost2=12.12.12.12
4.14 RESTアダプタ
Representational State Transfer (REST)インバウンド・アダプタは、HTTPプロトコルを介して外部クライアントからHTTPポスト・データを受信します。RESTアダプタでは、XML、CSVおよびJavaScript Object Notation (JSON)でデータを受信し、そのデータをインバウンドRESTアダプタで構成されたOracle Stream Analyticsイベントに変換できます。
データをイベントに変換するために、RESTアダプタはJava Architecture for XML Binding (JAXB)マッパーおよびCSVマッパーを必要とします。マッパーは、受信データのマーシャリングおよびアンマーシャリングを実装するJavaBeanクラスです。
現在、Oracle JDeveloperには、RESTアダプタ用のコンポーネントは用意されていません。ただし、Oracle Stream Analyticsアプリケーション用のアセンブリ・ファイルおよび構成ファイルにエントリを追加することにより、RESTアダプタを作成できます。
アセンブリ・ファイル
次のアセンブリ・ファイルは、XML、CSVおよびJSONタイプの入力データを処理するインバウンドRESTアダプタの設定を示します。
<bean id="xmlMapperBean" class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory" factory-method="create" /> <bean id="csvMapperBean" class="com.oracle.cep.mappers.csv.CSVMapper" /> <bean id="jsonMapperBean" class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory" factory-method="create" /> <wlevs:adapter id="restInbound" provider="rest-inbound"> <wlevs:instance-property name="mapper" ref="xmlMapperBean" /> <wlevs:instance-property name="csvMapper" ref="csvMapperBean" /> <wlevs:instance-property name="eventTypeName" value="CallCenterActivity" /> <wlevs:instance-property name="contextPath" value="/testhttpadapter" /> </wlevs:adapter>
次のアセンブリ・ファイルは、イベントをXMLまたはJSONコンテンツ・タイプにアンマーシャリングするアウトバウンドRESTアダプタの設定を示します。
<wlevs:adapter id="restXmlOutbound" provider="rest-outbound"> <wlevs:instance-property name="mapper" ref="xmlMapperBean" /> <wlevs:instance-property name="url" value="http://localhost:9002/testadapter" /> </wlevs:adapter> <wlevs:adapter id="restJsonOutbound" provider="rest-outbound"> <wlevs:instance-property name="mapper" ref="jsonMapperBean" /> <wlevs:instance-property name="url" value="http://localhost:9002/testadapter" /> </wlevs:adapter>
構成ファイル
次の構成ファイルは、POSTデータを受信するためのrest-adapter構成と受信XMLおよびJSONデータを処理するためのjaxb-mapper構成を示しています。
<rest-adapter>
<name>restInbound</name>
<event-type-name>CallCenterActivity</event-type-name>
<context-path>/testhttpadapter</context-path>
</rest-adapter>
<jaxb-mapper>
<name>xmlMapperBean</name>
<event-type-name>CallCenterActivity</event-type-name>
<metadata>external_metadata_case1.xml</metadata>
</jaxb-mapper>
<json-mapper>
<name>jsonMapperBean</name>
<event-type-name>CallCenterActivity</event-type-name>
<media-type>application/json</media-type>
</json-mapper>
次の構成ファイルは、イベントをXMLまたはJSONコンテンツ・タイプにアンマーシャリングするアウトバウンドRESTアダプタの設定を示しています。
<rest-adapter> <name>restXmlOutbound</name> <url>http://localhost:9002/testrestadapter</url> </rest-adapter> <rest-adapter> <name>restJsonOutbound</name> <url>http://localhost:9002/testrestadapter</url> </rest-adapter> <jaxb-mapper> <name>xmlMapperBean</name> <event-type-name>CallCenterActivity</event-type-name> <metadata>external_metadata_case1.xml</metadata> </jaxb-mapper> <json-mapper> <name>jsonMapperBean</name> <event-type-name>CallCenterActivity</event-type-name> <media-type>application/json</media-type> </json-mapper>
注意:
XMLコンテンツ・タイプをRESTインバウンドおよびアウトバウンドのアダプタでサポートするには、XMLマッパーを使用します。XML注釈の追加またはJAXBバインディング・ファイルの自動生成は、このリリースではサポートされていません。4.15 RMIアダプタ
RMIのインバウンド・アダプタおよびアウトバウンド・アダプタを使用して、RMI接続からのイベント情報の読取りおよびRMI接続へのイベント情報の書込みを行います。RMIアダプタを作成する上で最適な方法は、Oracle JDeveloperコンポーネント・ウィンドウを使用する方法です。
Oracle JDeveloperによって生成された次のアセンブリおよび構成ファイルは、RMIのインバウンド・アダプタおよびアウトバウンド・アダプタの構成を示します。
注意:
RMIクライアント接続を閉じることはできません。http://docs.oracle.com/javase/8/docs/technotes/guides/rmi/faq.htmlで、質問F1を参照してください。
アセンブリ・ファイル
インバウンドRMIアダプタにはJNDI名があり、これにより、インバウンド・クライアントはEPNを特定できます。
<wlevs:adapter id="rmi-inbound-adapter" provider="rmi-inbound">
<wlevs:instance-property name="jndiName"
value="TradeReportApplication.TradeReport/rmi-inbound-adapter"/>
</wlevs:adapter>
<wlevs:adapter id="rmi-outbound-adapter" provider="rmi-outbound"/>構成ファイル
RMIアウトバウンド・アダプタは、JNDI名を使用してイベント・データの出力リソースを特定します。JNDIプロバイダにより、ディレクトリ・サービス実装をJNDIフレームワークにプラグインできるようになります。
次の例では、JNDIプロバイダはデフォルトのOracle WebLogic T3クライアントです。Oracle WebLogic T3クライアントは、Oracle T3プロトコルを使用してOracle WebLogic Serverと通信するJava RMIクライアントです。通常、T3クライアントは他のクライアント・タイプより性能が優れています。
<rmi-adapter>
<name>rmi-outbound-adapter</name>
<jndi-name>RMIOutboundJNDIName</jndi-name>
<jndi-provider-url>t3://localhost:7001</jndi-provider-url>
<jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
</rmi-adapter>4.16 Twitterアダプタ
インバウンドTwitterアダプタを使用して、ツイートを読み取ることができます。
インバウンド・アダプタでは、Twitter APIを使用してTwitterからツイートを読み取り、これらのツイートをインバウンド・チャネルを介してプロセッサに送信します。また、このアダプタでは、ハッシュタグ、ユーザー、言語などの各種フィルタ・オプションに基づいてツイートをフィルタ処理します。これらの操作の入力は、インバウンド・アダプタから取得されます。
Twitterアダプタを使用すると、Twitterインバウンド・アダプタを使用してEPN図を作成し、任意のアウトバウンド・アダプタに接続できます。
この節の内容は以下のとおりです。
4.16.1 Twitterアダプタの構成
インバウンドとアウトバウンドの両方のアダプタについて、最小限の構成を行う必要があります。
<wlevs:instance-property name="oauthConsumerKey" value="XXXXXXXXXXXX" />
<wlevs:instance-property name="oauthConsumerSecret" value="XXXXXXXXXXX" />
<wlevs:instance-property name="oauthAccessToken" value="XXXX-XXXXXXX" />
<wlevs:instance-property name="oauthAccessTokenSecret" value="XXXXXXXXX" />
<wlevs:instance-property name="httpProxyHost" value="www-proxy.us.oracle.com" />
<wlevs:instance-property name="httpProxyPort" value="80" />4.16.2 Twitter アダプタの依存関係
Twitterアダプタは、次のtwitter4jライブラリに依存します。
-
twitter4j-core -
twitter4j-async -
twitter4j-stream
4.16.3 Twitterインバウンド・アダプタ
Twitterインバウンド・アダプタでは、Twitterからツイートを受信するフィルタ・ストリーミング・モードが提供されます。
使用できるフィルタ・オプションは、次のとおりです。複数値には、カンマ区切り値(csv)を使用できます。
-
Filter: アダプタの構成時に提供されるフィルタ・オプションに基づいてツイートをフィルタします。使用できるフィルタ・オプションは、次のとおりです。-
hashtagsToTrack:#記号を使用せずに追跡するために、ハッシュタグ用の文字列を指定します。 -
usersToTrack: ユーザーのツイートを追跡するために、長いTwitterユーザーIDまたはユーザー・ハンドルを指定します。 -
languagesToTrack: 特定言語のツイートをフィルタ処理するために言語コードを指定します。注意:
アダプタの構成時には、少なくともhashtagsToTrackとusersToTrackのいずれかのプロパティを指定する必要があります。注意:
hashtagsToTrackとusersToTrackの両方を指定する場合は、論理演算子ORを使用します。詳細は、https://dev.twitter.com/streaming/reference/post/statuses/filterを参照してください。
-
注意:
Twitter APIには、APIレート制限があります。詳細は、https://dev.twitter.com/rest/public/rate-limitingを参照してください。サポートされている言語
次の表は、Webサイト・ウィジェットおよびボタンについてTwitterでサポートされる言語を一覧表示しています。
表4-2 サポートされる言語および言語コード
| 名前 | 言語コード |
|---|---|
|
英語(デフォルト) |
en |
|
アラビア語 |
ar |
|
ベンガル語 |
bn |
|
チェコ語 |
cs |
|
デンマーク語 |
da |
|
ドイツ語 |
de |
|
ギリシャ語 |
el |
|
スペイン語 |
es |
|
ペルシア語 |
fa |
|
フィンランド語 |
fi |
|
フィリピン語 |
fil |
|
フランス語 |
fr |
|
ヘブライ語 |
he |
|
ヒンディー語 |
hi |
|
ハンガリー語 |
hu |
|
インドネシア語 |
id |
|
イタリア語 |
it |
|
日本語 |
ja |
|
韓国語 |
ko |
|
マレー語 |
msa |
|
オランダ語 |
nl |
|
ノルウェー語 |
no |
|
ポーランド語 |
pl |
|
ポルトガル語 |
pt |
|
ルーマニア語 |
ro |
|
ロシア語 |
ru |
|
スウェーデン語 |
sv |
|
タイ語 |
th |
|
トルコ語 |
tr |
|
ウクライナ語 |
uk |
|
ウルドゥー語 |
ur |
|
ベトナム語 |
vi |
|
中国語(簡体字) |
zh-cn |
|
中国語(繁体字) |
zh-tw |
注意:
Twitter Webサイトによると、zh-cn言語コードは簡体字中国語を表し、zh-twは繁体字中国語を表します。ただし、Twitterメタデータでは、中国語にはzhのみが定義されています。このため、Twitterインバウンド・アダプタでは、zh言語コードを簡体字中国語と繁体字中国語の両方に使用します。
詳細は、https://dev.twitter.com/web/overview/languagesを参照してください。
4.16.4 フィルタ・オプションを使用した例
この例は、フィルタ・オプションを使用するコンテンツ・ファイルです。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
xmlns:jdbc="http://www.oracle.com/ns/ocep/jdbc"
xmlns:hadoop="http://www.oracle.com/ns/oep/hadoop"
xmlns:nosqldb="http://www.oracle.com/ns/oep/nosqldb"
xsi:schemaLocation=" http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://www.bea.com/ns/wlevs/spring
http://www.bea.com/ns/wlevs/spring/ocep-epn.xsd
http://www.oracle.com/ns/ocep/jdbc
http://www.oracle.com/ns/ocep/jdbc/ocep-jdbc.xsd
http://www.oracle.com/ns/oep/hadoop
http://www.oracle.com/ns/oep/hadoop/oep-hadoop.xsd
http://www.oracle.com/ns/oep/nosqldb
http://www.oracle.com/ns/oep/nosqldb/oep-nosqldb.xsd">
<wlevs:event-type-repository>
<wlevs:event-type type-name="tweetEvent">
<wlevs:class>com.bea.wlevs.sample.Tweet</wlevs:class>
</wlevs:event-type>
</wlevs:event-type-repository>
<wlevs:adapter id="twitteradapter" provider="twitter-inbound">
<wlevs:listener ref="inboundchannel"/>
<wlevs:instance-property name="streamingMode" value="filter" />
<wlevs:instance-property name="hashtagsToTrack" value="IndvsSA" />
<wlevs:instance-property name="usersToTrack" value="338674755" />
<wlevs:instance-property name="languagesToTrack" value="en"/>
<wlevs:instance-property name="oauthConsumerKey" value="XXXXXX" />
<wlevs:instance-property name="oauthConsumerSecret" value="XXXXXX" />
<wlevs:instance-property name="oauthAccessToken" value="XXXX-XXXX" />
<wlevs:instance-property name="oauthAccessTokenSecret" value="XXXXXXXX" />
<wlevs:instance-property name="httpProxyHost" value="www-proxy.us.oracle.com" />
<wlevs:instance-property name="httpProxyPort" value="80" />
</wlevs:adapter>
<wlevs:channel id="inboundchannel" event-type="tweetEvent">
<wlevs:listener ref="tweetprocessor"/>
</wlevs:channel>
<wlevs:processor id="tweetprocessor"/>
<wlevs:channel id="outboundchannel" event-type="tweetEvent">
<wlevs:listener ref="twiiteroutbountadapter"/>
<wlevs:source ref="tweetprocessor"/>
</wlevs:channel>
<wlevs:adapter id="twiiteroutbountadapter" class="com.bea.wlevs.sample.OutputBean" advertise="true"/>
</beans>4.17 MQTTアダプタ
MQTTインバウンド/アウトバウンド・アダプタにより、構成されたホスト上のMQTTブローカで設定されたトピックからすべてのMQTTメッセージが受信され、変換されて、ダウンストリームに送信されます。
MQTTブローカは、メッセージ・フローに使用されるトピックに必要なインフラストラクチャを管理します。MQTTメッセージは、インバウンド・アダプタによって受信され、Oracle Stream Analyticsイベント・タイプに変換されます。変換されたメッセージはダウンストリームに送信され、ここで複合イベントに処理されるか、またはCQL問合せに基づいてフィルタ処理されます。その後、これらはOracle Stream Analyticsシンクに出力されます。複数のブローカURLを指定した場合、MQTTアダプタにより、serverURIリストに存在するブローカURLに基づいて、MQTTから最初に接続可能なブローカにメッセージが送信されます。serverURIの一部として、複数のブローカURLを指定できます。これらのURIをMQTTアダプタのMqttConnectOptions.setServerURIs()に設定すると、設定したURIがブローカとの接続の構築に使用されます。
接続の試行を開始する際、クライアントでは、リスト内の最初のserverURIから開始し、サーバーとの接続が確立されるまでリストを処理します。どのサーバーにも接続できない場合、接続試行は失敗します。クライアントが接続可能なサーバーのリストを指定することには、次の用途があります。
-
高可用性および信頼性のあるメッセージ配信。
-
一部のMQTTサーバーでは、複数の同等なMQTTサーバーが状態を共有する高可用性機能がサポートされます。
-
MQTTクライアントでは、同等なサーバーのいずれかに接続し、どのサーバーにクライアントが接続したとしても、メッセージが確実に配信され、恒久サブスクリプションが保持されることが保証されます。
-
恒久サブスクリプションおよび信頼性のあるメッセージ配信が必要な場合、
cleansessionフラグをfalseに設定する必要があります。
ハント・リスト
同等ではないサーバーのセットが指定される可能性があります(高可用性オプションでの場合)。サーバー間で状態が共有されないため、信頼性のあるメッセージ配信および恒久サブスクリプションは無効となります。ハント・リスト・モードを使用する場合、cleansessionフラグをtrueに設定する必要があります。
依存
主要な外部の依存関係は、次のMQTTクライアントJARファイル上にあります。
org.eclipse.paho.mqtt-client-0.4.0.jar
これは、MQTT 3.1.1をサポートします。
インストール
MQTTアダプタに関する特別なインストール要件はありません。これは、他のアダプタJARと類似しています。JAR名はcom.oracle.cep.adapters.mqtt.jarです。
このJARは、<INSTALL_HOME>/oep/modulesフォルダにあるインストーラの一部として使用できます。サーバーの一部に含めるには、これをbundleloader.xmlファイルに追加する必要があります。
<bundle>
<startlevel>3</startlevel>
<location>oep/modules</location>
<name>com.oracle.cep.adapters.mqtt.jar</name>
</bundle>OSGiバンドルはレベル3で開始します。
インバウンド・アダプタ
インバウンド・アダプタにより、組込みマッパーまたはカスタム・コンバータを使用してメッセージが変換されます。メッセージ・データがCSV、JSONおよびXMLである可能性が高い場合は、デフォルトのマッパーを使用できます。
アウトバウンド・アダプタ
アウトバウンド・アダプタにより、EventProcessingイベントをMQTTトピックにパブリッシュできます。その後、変換されたトピックを別のインバウンド・アダプタまたは別のシステムで選択できます。
MQTTインバウンド/アウトバウンド・アダプタをEPNまたはOracle Stream Analyticsアプリケーションのいずれかの一部として使用できます。
注意:
MQTTアダプタを使用するには、MQTTブローカをインストールする必要があります。4.17.1 MQTT構成パラメータ
インバウンドとアウトバウンドのアダプタには、いずれも構成する必要のあるパラメータが含まれます。これらの表は、何が必須で何がオプションなのかを示します。
MQTTインバウンド構成
インバウンドMQTTアダプタの構成パラメータは、次のとおりです。
表4-3 MQTTインバウンド・アダプタの構成パラメータ
| プロパティ名 | データ型 | 必須? | 説明 |
|---|---|---|---|
|
|
|
はい |
クライアントが接続可能なサーバーURIのカンマ区切りリスト。例:
|
|
|
|
はい |
クライアントがサブスクライブするトピックのカンマ区切りリスト。各値にワイルドカードを使用できます。 |
|
|
|
いいえ |
指定した各トピックのQoSのカンマ区切りのリスト。有効な値は |
|
|
|
いいえ |
クラスタ内のすべてのメンバーが集合的に機能して負荷を処理するよう強制するには、このプロパティをtrueに設定します。デフォルトではこのプロパティはfalseです。 |
|
|
|
いいえ |
一意のクライアント識別子。指定されていない場合、自動的に生成されます。 |
|
|
|
いいえ |
指定されたQoSへの配信を可能にする、処理中のメッセージの永続データ・ストア。有効な値は |
|
|
|
はい |
persistenceTypeプロパティがfileに設定されている場合、このプロパティは必須です。メッセージの格納に使用する必要があるフォルダを指定します。 |
|
|
|
いいえ |
トピックからイベント・タイプにメッセージを変換するために使用されるマッパー・オブジェクト。通常はSX/OEP開発者が、CSV、JSON、XMLなどの組込みマッパーのインスタンスを設定しますが、アダプタにより、カスタム・マッパーの実装を処理できます。 |
|
|
|
はい |
mapperプロパティがCSVマッパーを使用するように設定されている場合、このプロパティは必須です。処理されるイベント・タイプを指定します。 |
|
|
|
いいえ |
トピックからイベント・タイプにメッセージを変換するために使用されるコンバータ・オブジェクト。コンバータにより、開発者が変換ロジックを実装するための方法が提供されます。また、QoSや複製および保持されたメッセージ・メタデータを読み取ることもできます。 |
|
|
|
いいえ |
再起動および再接続後の状態をクライアントとサーバーのどちらに保存する必要があるかを指定します。デフォルトで、サブスクリプションが非恒久であることを意味するtrueに設定されます。 |
|
|
|
いいえ |
接続に使用するユーザー名。 |
|
|
|
いいえ |
接続に使用するパスワード。 |
|
|
|
いいえ |
接続に使用するSSLコンテキスト・プロバイダ。 |
|
|
|
いいえ |
接続に使用するSSLキーストア。 |
|
|
|
いいえ |
接続に使用するSSLキーストア・パスワード。 |
|
|
|
いいえ |
接続に使用するSSLキーストア・タイプ。 |
|
|
|
いいえ |
接続に使用するSSLキーストア・プロバイダ。 |
|
|
|
いいえ |
接続に使用するSSLトラスト・ストア。 |
|
|
|
いいえ |
接続に使用するSSLトラスト・ストア・パスワード。 |
|
|
|
いいえ |
接続に使用するSSLトラスト・ストア・タイプ。 |
|
|
|
いいえ |
接続に使用するSSLトラスト・ストア・プロバイダ。 |
|
|
|
いいえ |
接続が失われた場合、自動的に再接続する必要があるかどうかを指定します。デフォルトでtrueに設定されます。 |
|
|
|
いいえ |
接続タイムアウト値。この値(秒単位)は、MQTTサーバーへのネットワーク接続が確立されるのをクライアントで待機する最大時間間隔を定義します。デフォルト値は30秒です。 |
|
|
|
いいえ |
キープ・アライブ間隔。この値(秒単位)は、送信メッセージまたは受信メッセージの最大時間間隔を定義します。これにより、クライアントでは、TCP/IPタイムアウトの期間待機せずに、サーバーが使用不可になったかどうかを検出できます。デフォルト値は60秒です。 |
|
|
|
いいえ |
接続を確立できるまで待機する時間(秒単位)。これにより、アダプタが適切に接続するための時間を許可します。デフォルト値は1秒です。 |
|
|
java.lang.Integer |
いいえ |
アダプタにより接続が試行される試行回数。デフォルト値は3回の試行です。ゼロ(0)に設定されている場合、永久的に試行されます。 |
|
|
|
いいえ |
暫定的な接続が失敗した後に待機する時間(秒単位)。この遅延は、暫定的な接続のたびに発生します。デフォルト値は5秒です。 |
MQTTアウトバウンド構成
アウトバウンドMQTTアダプタの構成パラメータは、次のとおりです。
表4-4 MQTTアウトバウンド・アダプタの構成パラメータ
| プロパティ名 | データ型 | 必須? | 説明 |
|---|---|---|---|
|
|
|
はい |
クライアントが接続可能なサーバーURIのカンマ区切りリスト。例:
|
|
|
|
はい |
クライアントがサブスクライブするトピックのカンマ区切りリスト。各値にワイルドカードを使用できます。 |
|
|
|
いいえ |
指定した各トピックのQoSのカンマ区切りのリスト。有効な値は |
|
|
|
いいえ |
一意のクライアント識別子。指定されていない場合、自動的に生成されます。 |
|
|
|
いいえ |
指定されたQoSへの配信を可能にする、処理中のメッセージの永続データ・ストア。有効な値は |
|
|
|
はい |
persistenceTypeプロパティがfileに設定されている場合、このプロパティは必須です。メッセージの格納に使用する必要があるフォルダを指定します。 |
|
|
|
いいえ |
イベント・タイプをメッセージを変換するために使用されるマッパー・オブジェクト。通常はSX/OEP開発者が、JSONやXMLなどの組込みマッパーのインスタンスを設定しますが、アダプタにより、提供されている任意のタイプのJAXBベース・マッパーを処理できます。 |
|
|
|
いいえ |
イベント・タイプをメッセージを変換するために使用されるコンバータ・オブジェクト。コンバータにより、開発者が変換ロジックを実装するための方法が提供されます。また、QoSや複製および保持されたメッセージ・メタデータを書き込むこともできます。 |
|
|
|
いいえ |
再起動および再接続後の状態をクライアントとサーバーのどちらに保存する必要があるかを指定します。デフォルトで、サブスクリプションが非恒久であることを意味するtrueに設定されます。 |
|
|
|
いいえ |
接続に使用するユーザー名。 |
|
|
|
いいえ |
接続に使用するパスワード。 |
|
|
|
いいえ |
接続に使用するSSLコンテキスト・プロバイダ。 |
|
|
|
いいえ |
接続に使用するSSLキーストア。 |
|
|
|
いいえ |
接続に使用するSSLキーストア・パスワード。 |
|
|
|
いいえ |
接続に使用するSSLキーストア・タイプ。 |
|
|
|
いいえ |
接続に使用するSSLキーストア・プロバイダ。 |
|
|
|
いいえ |
接続に使用するSSLトラスト・ストア。 |
|
|
|
いいえ |
接続に使用するSSLトラスト・ストア・パスワード。 |
|
|
|
いいえ |
接続に使用するSSLトラスト・ストア・タイプ。 |
|
|
|
いいえ |
接続に使用するSSLトラスト・ストア・プロバイダ。 |
|
|
|
いいえ |
接続が失われた場合、自動的に再接続する必要があるかどうかを指定します。デフォルトでtrueに設定されます。 |
|
|
|
いいえ |
接続タイムアウト値。この値(秒単位)は、MQTTサーバーへのネットワーク接続が確立されるのをクライアントで待機する最大時間間隔を定義します。デフォルト値は30秒です。 |
|
|
|
いいえ |
キープ・アライブ間隔。この値(秒単位)は、送信メッセージまたは受信メッセージの最大時間間隔を定義します。これにより、クライアントでは、TCP/IPタイムアウトの期間待機せずに、サーバーが使用不可になったかどうかを検出できます。デフォルト値は60秒です。 |
|
|
|
いいえ |
接続を確立できるまで待機する時間(秒単位)。これにより、アダプタが適切に接続するための時間を許可します。デフォルト値は1秒です。 |
4.17.2 MQTTレシーバEPN
このサンプルのEPNは、MQTTインバウンド・アダプタの使用方法およびMQTTトピックのサブスクライブ方法を示します。
このEPNは、MQTTブローカ上のtemperaturesという名前のトピックをサブスクライブします。MQTTブローカは、指定されたサーバーURI上で実行されます。これは、高可用性機能を示します。いずれかのサーバーが失敗した場合でも、他のサーバーに安全にスイッチオーバーします。
インバウンド・アダプタに対して、provider="mqtt-inbound"をコンテキスト・ファイルのアダプタ定義に追加します。
<wlevs:adapter id="mqttInbound" provider="mqtt-inbound">
<wlevs:listener ref="inboundTemperatures"/>
<wlevs:instance-property name="serverURIs" value="tcp://10.191.195.134:1883,tcp://10.191.211.254:1884" />
<wlevs:instance-property name="topicName" value="temperatures" />
<wlevs:instance-property name="qualityOfService" value="0" />
<wlevs:instance-property name="mapper" ref="jsonMapper" />
</wlevs:adapter>4.17.3 MQTTセンダーEPN
このサンプルのEPNは、MQTTアウトバウンド・アダプタの使用方法およびMQTTトピックへのイベントのパブリッシュ方法を示します。
このEPNは、MQTTブローカ上のtemperaturesという名前のトピックにパブリッシュします。MQTTブローカは、指定されたサーバーURI上で実行されます。これは、高可用性機能を示します。いずれかのサーバーが失敗した場合でも、他のサーバーに安全にスイッチオーバーします。
アウトバウンド・アダプタに対して、provider="mqtt-outbound"をコンテキスト・ファイルのアダプタ定義に追加します。
<wlevs:adapter id="mqttOutbound" provider="mqtt-outbound">
<wlevs:instance-property name="serverURIs" value="tcp://10.191.195.134:1883,tcp://10.191.211.254:1884" />
<wlevs:instance-property name="topicName" value="temperatures" />
<wlevs:instance-property name="qualityOfService" value="0" />
<wlevs:instance-property name="mapper" ref="jsonMapper" />
</wlevs:adapter>4.18 Kafkaアダプタ
Kafkaインバウンド・アダプタやカスタム・インバウンド・アダプタなどの使用可能な任意のインバウンド・アダプタを使用してEPN図を作成し、Kafkaアウトバウンド・アダプタに接続できます。
Kafkaインバウンド・アダプタでは、1つ以上のKafkaトピックからメッセージを読み取ることができます。これにより、CSV、JSON、XMLなどのマッパー・プロパティを使用して、これらのメッセージをイベントに変換します。Kafkaアウトバウンド・アダプタでは、JSONやXMLなどのマッパー・プロパティを使用してイベントをメッセージに変換することにより、それらのイベントをKafkaトピックにパブリッシュできます。
Springファイル内のeventTypeNameプロパティは、CSVマッパーがマッパー計画として選択されている場合にのみ必須であり、kafka-inboundおよびkafka-outboundアダプタ構成を参照します。
JSONおよびXMLマッパーが構成されている場合には常に、これらのマッパーは、Javaクラスを使用してイベント・タイプに関連付けられます。マッパーとともに使用しない場合、(宣言的に定義された)タプル・イベントを使用できます。サンプルのコンテキスト・コンテンツは次のとおりです。
<wlevs:event-type-repository>
<wlevs:event-type type-name="sx-34-2-Explore_kafka_with_bigdecimal-2">
<wlevs:class>
oracle.wlevs.strex.generated.sx_34_2_Explore_kafka_with_bigdecimal_2
</wlevs:class>
</wlevs:event-type>
</wlevs:event-type type-name="sx-34-2-Explore_kafka_with_bigdecimal-3">
<wlevs:properties>
<wlevs:property name="sensorId" type="java.lang.String"/>
<wlevs:property name="sensorValue" type="bigdecimal"/>
</wlevs:properties>
</wlevs:event-type>
<wlevs:event-type type-name="sx-34-2-Explore_kafka_with_bigdecimal-1">
<wlevs:properties>
<wlevs:property name="sensorId" type="java.lang.String"/>
<wlevs:property name="sensorValue" type="bigdecimal"/>
</wlevs:properties>
</wlevs:event-type>
</wlevs:event-type-repository>
<bean class="com.oracle.cep.mappers.csv.CSVMapper" id="csvMapperBean"/>
<bean class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory" factory-method="create" id="jsonMapperBean">
<property name="eventTypeName" value="sx-34-2-Explore_kafka_with_bigdecimal-2"/>
<property name="mediaType" value="application/json"/>
</bean>依存
Kafkaインバウンドおよびアウトバウンドのアダプタは、次のライブラリに依存します。
表4-5 Kafkaアダプタの依存関係
| ライブラリ | バージョン | ライブラリ名 |
|---|---|---|
|
Apache Kafka |
2.10-0.8.2.1 |
|
|
Apache kafkaクライアント |
0.8.2.1 |
|
|
メトリック・コア・ライブラリ |
2.2.0 |
|
|
Scala標準ライブラリ |
2.10.4 |
|
|
ZooKeeper |
3.4.6 |
|
|
ZkClient |
0.3 |
|
インストール要件
Oracle Stream Analytics Kafkaアダプタを使用するには、ZookeeperおよびKafkaをインストールして実行する必要があります。次の手順を使用して、ローカル開発Linux環境でKafkaを設定できます。
-
Apache Kafkaをhttp://kafka.apache.org/downloads.htmlからダウンロードします。
-
ダウンロードされたファイルの内容をフォルダに抽出します。
-
端末のウィンドウを開きます。
-
端末のウィンドウで、内容を抽出した場所から
binフォルダにナビゲートします。 -
./zookeeper-server-start.sh ../config/zookeeper.propertiesを実行します。 -
別の端末ウィンドウを開きます。これは2つ目の端末になります。
-
端末のウィンドウで、内容を抽出した場所から
binフォルダにナビゲートします。 -
./kafka-server-start.sh ../config/server.propertiesを実行します。
次の特性を持つデプロイメントを作成できます。
-
localhostで実行し、ポート2181をリスニングする1つのZookeeperインスタンス
-
localhostで実行し、ポート9092をリスニングする1つのKafkaブローカ・インスタンス
4.18.1 Kafkaからのメッセージを受信するインバウンド・アダプタ
これは、stocksという名前のトピックをサブスクライブし、継続的にメッセージをフェッチするサンプル・アプリケーションです。
メッセージが受信されると、JSON形式から必要なイベント・タイプに自動的に変換されます。CQLプロセッサにより、イベントはダウンストリームの出力チャネルに送信されます。最後に、これらのイベントは、出力チャネルによってカスタム・アダプタに送信され、このアダプタにより、そのコンテンツが標準の出力コンソールに書き込まれます。
<wlevs:adapter id="kafkaInbound" provider="kafka-inbound">
<wlevs:listener ref="inboundStockQuotes"/>
<wlevs:instance-property name="zookeeper" value="localhost:2181" />
<wlevs:instance-property name="topicName" value="stocks" />
<wlevs:instance-property name="mapper" ref="jsonMapper" />
</wlevs:adapter>4.18.2 Kafkaにメッセージを送信するアウトバウンド・アダプタ
これは、株価がランダムに生成される株価情報を継続的に作成するサンプル・アプリケーションです。生成間隔は1秒です。
CQLプロセッサでは、これらの株価情報を問い合せて、そのイベントを出力チャネルに送信します。最後に、これらのイベントは、出力チャネルによってKafkaアウトバウンド・アダプタに送信され、このアダプタにより、stocksという名前のトピックにJSON形式で書き込まれます。
<wlevs:adapter id="kafkaOutbound" provider="kafka-outbound">
<wlevs:instance-property name="bootstrapServers" value="localhost:9091,localhost:9092" />
<wlevs:instance-property name="topicName" value="stocks" />
<wlevs:instance-property name="mapper" ref="jsonMapper" />
</wlevs:adapter>4.18.3 Kafka構成
Kafkaアダプタは、インバウンドとアウトバウンドの両方のアダプタに構成する必要があります。
次の項では、インバウンドおよびアウトバウンドのKakfaアダプタについてサポートされるアダプタ・プロパティを一覧表示しています。
Kafkaインバウンド・アダプタ構成
インバウンドKafkaアダプタの構成パラメータは、次のとおりです。
表4-6 Kafkaインバウンド・アダプタ・プロパティ
| プロパティ名 | データ型 | 必須 | 説明 |
|---|---|---|---|
|
|
|
はい |
アダプタに、Zookeeperサーバーへの接続方法を通知します。HAに複数のZookeeperサーバーが存在する場合、これらすべてをカンマで区切って示す必要があります。 このプロパティの一般的な値は、 |
|
|
|
はい |
Kafkaトピックの名前。 |
|
|
|
いいえ |
各コンシューマ・プロセスは、グループに属している必要があります。これにより、このプロパティはバインド先グループを通知します。このプロパティに値が設定されていない場合、アダプタにより、 |
|
|
|
いいえ |
トピックからイベント・タイプにメッセージを変換するために使用されるマッパー・オブジェクト。通常はOracle Stream Analytics開発者が、CSV、JSON、XMLなどの組込みマッパーのインスタンスを設定しますが、アダプタにより、カスタム・マッパーの実装を処理できます。 |
|
|
|
いいえ |
CSVマッパーがマッパー計画として選択されている場合にのみ、必須となります。このプロパティでは、処理されるイベント・タイプを示す必要があります。 |
|
|
|
いいえ |
トピックからイベント・タイプにメッセージを変換するために使用されるコンバータ・オブジェクト。コンバータにより、開発者に変換ロジックを実装するための方法が提供されます。また、キー、パーティション、オフセットなどのメッセージ・メタデータを読み取ることもできます。 |
|
|
|
いいえ |
メッセージ消費のスループットを向上させるために、リスナー・スレッドの作成量が増加するよう、このプロパティを調整できます。 何も指定されていない場合は、デフォルトで1に設定されます。 スレッドの合計数(Oracle Stream Analyticsクラスタ内のすべてのスレッドの合計として算出されます)は、クラスタ側のトピックに設定されたパーティション数に対応している必要があります。 |
|
|
|
いいえ |
値が指定されていない場合は、デフォルトのワーク・マネージャを使用して、Kafkaトピックからメッセージをリスニングするために使用するスレッドがスケジューリングされます。この動作を変更するには、このプロパティでカスタム・ワーク・マネージャを定義できます。値は、 |
|
|
|
いいえ |
Oracle Stream Analytics開発者は、これにより、コンシューマAPIに関連する任意のネイティブKafkaプロパティを設定できます。使用可能なプロパティの完全なリストは、http://kafka.apache.org/082/documentation.html#consumerconfigsを参照してください。 |
Kafkaアウトバウンド・アダプタ構成
アウトバウンドKafkaアダプタの構成パラメータは、次のとおりです。
表4-7 Kafkaアウトバウンド・アダプタ・プロパティ
| プロパティ名 | データ型 | 必須 | 説明 |
|---|---|---|---|
|
|
|
はい |
アダプタに、メンバーの初期ブートストラップ検出を実行する1つ以上のサーバーを(カンマで区切って)示すことにより、Kafkaクラスタへの接続方法を通知します。このプロパティの一般的な値は、localhost:9091、localhost:9092です。 |
|
|
|
はい |
Kafkaトピックの名前。 |
|
|
|
いいえ |
イベント・タイプをメッセージを変換するために使用されるマッパー・オブジェクト。通常はOracle Stream Analytics開発者が、JSONやXMLなどの組込みマッパーのインスタンスを設定しますが、アダプタにより、提供されている任意のタイプのJAXBベース・マッパーを処理できます。 |
|
|
|
いいえ |
イベント・タイプをメッセージを変換するために使用されるコンバータ・オブジェクト。コンバータにより、開発者に変換ロジックを実装するための方法が提供されます。また、キーやパーティションなどのメッセージ・メタデータを書き込むこともできます。 |
|
|
|
いいえ |
メッセージをどのパーティションに送信し、そのパーティションに関する情報をイベント・タイプのどのプロパティに保持するかを指定します。パーティションの値を設定する場合、開発者は、java.lang.Integerタイプのみを使用するよう考慮する必要があります。 |
|
|
|
いいえ |
パーティションの特定に使用されるキーに関する情報をイベント・タイプのどのプロパティに保持するかを指定します。開発者は、byte[]と文字列の間から選択してキーの値を設定できますが、他の任意のタイプを使用するとエラーがスローされます。 |
|
|
|
いいえ |
Oracle Stream Analytics開発者は、これにより、プロデューサAPIに関連する任意のネイティブKafkaプロパティを設定できます。使用可能なプロパティの完全なリストは、http://kafka.apache.org/082/documentation.html#producerconfigsを参照してください。 |
4.19 コヒーレンス・アダプタ
Coherenceキャッシュとは、使用頻度の高いデータへの高速アクセスを提供する、インメモリー分散型キャッシュのことです。イベント処理では、イベントの受信ストリームをリアルタイムで処理する機能が提供されます。Coherenceキャッシュでは、データは分散型インメモリー・キャッシュに格納されます。キャッシュ内の各エントリは、キーと値のペアです。すべてのキャッシュ・データ操作で、ユーザーがキーと値の両方を指定する必要があります。
Coherenceキャッシュでは、次の操作がサポートされます。
-
INSERT: 新しい{key,value}ペアを追加します。 -
DELETE: 既存の{key,value}ペアを削除します。 -
UPDATE: 既存の{key,value}ペアを変更します。
Oracle Stream Analyticsでは、キャッシュの変更イベントを受信データ・ストリームとしてリスニングしたり、出力ストリームをキャッシュに書き込むことができる、即時利用可能な2つのアダプタが提供されます。
4.19.1 コヒーレンス・インバウンド・アダプタ
コヒーレンス・インバウンド・アダプタでは、キャッシュからの変更イベントをリスニングする機能が提供されます。コヒーレンス・インバウンド・アダプタにより、Coherenceキャッシュへのリスナーとしてそのアダプタ自体が登録されます。キャッシュ内のすべてのデータ操作について、キャッシュにより、インバウンド・アダプタに変更の詳細が通知されます。コヒーレンス・インバウンド・アダプタでは、変更イベントを受信するとそれらをEvent Processingイベントに変換して、追加のイベント処理のためにダウンストリームにプッシュします。
キャッシュでは、データ操作のタイプに応じて、次の3つのタイプの変更イベントがサポートされます。
-
INSERT: 新しい{key,value}ペアをキャッシュに挿入します。 -
DELETE: 既存の{key,value}ペアをキャッシュから削除します。 -
UPDATE: 既存の{key,old-value}ペアを{key,new-value}ペアで更新します。
注意:
現在、コヒーレンス・インバウンド・アダプタでは、INSERT変更イベントのみ(キャッシュで受信されるすべての新しい{key,value}ペア)がリスニングされます。キャッシュのUPDATEまたはDELETE操作の結果として生成されるすべての変更イベントは、インバウンド・アダプタによって無視されます。
イベントの順序
Oracle Stream Analyticsコヒーレンス・インバウンド・アダプタでは、キャッシュからの挿入イベントを受信できます。マップ・クラスから継承される、データをキャッシュに挿入するための2つのメソッドがあります。
-
put(K key, V value)https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#put-K-Vを参照してください。 -
putAll(Map<? extends K,? extends V>M)。https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#putAll-java.util.Mapを参照してください。
挿入されるイベントの順序を特に考慮する場合は、LinkedHashMapを使用して、キャッシュのバッチ挿入操作を実行します。LinkedHashMapクラスでは、挿入されるイベントの順序が保持されます。EventProcessingでは、各イベントがLinkedHashMapに挿入されたとおりの同じ順序でイベントを受信できます。挿入順序を考慮するユーザーは、LinkedHashMapを使用して、キャッシュのバッチ挿入操作を実行する必要があります。
LinkedHashMapを使用したキャッシュのバッチ挿入操作の例は、次のとおりです。
Map batch = new LinkedHashMap();
while(condition)
{
batch.put(k1, v1)
batch.put(k2, v2)
batch.put(k3, v3)
......
}
cohrenceCache.putAll(batch)EventProcessingコヒーレンス・インバウンド・アダプタでは、元の順序k1、k2、k3を保持した順序で、キャッシュからイベントを受信できます。
構成
Oracle Stream Analyticsアプリケーションにコヒーレンス・インバウンド・アダプタを定義する構成要素は、次のとおりです。
<wlevs:adapter id="coherenceInboundAdapter" provider="coherence-inbound">
<wlevs:instance-property name="cache" ref="OracleCoherenceCache"/>
<wlevs:instance-property name="eventType" value="EmployeeEvent"/>
</wlevs:adapter>ここで、OracleCoherenceCacheは、アプリケーションEPN構成ですでに定義されているキャッシュ要素です。OracleCoherenceCacheおよびEmployeeEventを参照してください。
構成パラメータ
様々な構成要素の属性とプロパティに関する詳細は、次のとおりです。
-
cache- このプロパティは、ユーザーがリスニングする必要がある変更イベントを含むキャッシュを指定します。この値は、EPN構成で定義されているCoherenceキャッシュ要素を参照する必要があります。これは、アダプタ構成の必須プロパティです。 -
eventType- このプロパティは、インバウンド・アダプタによってダウンストリーム・ステージに伝播されるイベントのタイプを指定します。このイベント・タイプは、アプリケーションのイベント・タイプ・リポジトリですでに定義されている必要があります。これも、アダプタ構成の必須プロパティです。
ユーザーがイベントをCoherenceキャッシュに入力するとすぐに、キャッシュによってこのことがアダプタに通知され、リスニングするインバウンド・アダプタに変更イベントが提供されます。Coherenceキャッシュにより、リスニングするインバウンド・アダプタに、バインドなしバッファ・キューを保持する変更イベントがプッシュされます。バッファ・キューを保持する理由は、インバウンド・アダプタによって実装されたマップ・リスナー・インタフェースを起動するコールバック・スレッドを返すためです。コヒーレンス・アダプタでは、独自のスレッドでこれらのイベントをデキューし、ダウンストリーム・ステージにプッシュします。アダプタによって変更イベントがデキューされると、受信されたオブジェクトは、eventTypeで指定されたタイプのイベント・オブジェクトに変換されます。
注意:
変更イベント(キャッシュに挿入されるオブジェクト)のタイプは、マップ・エントリが属性名と値のペアである、java.util.Mapである必要があります。
4.19.2 コヒーレンス・アウトバウンド・アダプタ
コヒーレンス・アウトバウンド・アダプタでは、イベントをCoherenceキャッシュに書き込む機能が提供されます。キャッシュはマップであり、キャッシュ内のすべてのエントリは{key, value}ペアの形式であるため、アウトバウンド・アダプタにより、すべてのイベントについてキーが決定され、計算されたキーを使用して該当するイベントがキャッシュに配置されます。
Oracle Stream Analyticsアプリケーションにコヒーレンス・インバウンド・アダプタを定義する構成要素は、次のとおりです。
<wlevs:adapter id="coherenceOutboundAdapter" provider="coherence-outbound">
<wlevs:instance-property name="cache" ref="OracleCoherenceCache"/>
<wlevs:instance-property name="eventType" value="EmployeeEvent"/>
<wlevs:instance-property name="key" value="id"/>
</wlevs:adapter>ここで、OracleCoherenceCacheは、アプリケーションEPN構成ですでに定義されているキャッシュ要素です。OracleCoherenceCacheおよびEmployeeEventを参照してください。
構成パラメータ
様々な構成要素の属性とプロパティに関する詳細は、次のとおりです。
-
cache- このプロパティは、ユーザーが出力ストリームを書き込む必要がある宛先キャッシュを指定します。この値は、EPN構成で定義されているCoherenceキャッシュ要素を参照する必要があります。これは、アダプタ構成の必須プロパティです。 -
eventType- このプロパティは、アウトバウンド・アダプタによってダウンストリーム・キャッシュに伝播されるイベントのタイプを指定します。このイベント・タイプは、アプリケーションのイベント・タイプ・リポジトリですでに定義されている必要があります。これも、アダプタ構成の必須プロパティです。 -
key - このプロパティは、宛先キャッシュへの送信イベントについてイベント・タイプ定義にキー属性を指定します。
送信イベントがアウトバウンド・アダプタに着信すると、アダプタにより、イベントからキー属性の値がフェッチされます。次に、アダプタにより{key,value}ペアが形成されて宛先キャッシュに書き込まれ、このキャッシュで、キーがイベントのキー属性の値となります。値は、送信イベント・オブジェクトを使用して作成されるjava.util.Mapのインスタンスです。また、アウトバウンド・アダプタは非同期で機能し、すべての送信イベントが格納されるバッファ・キューを管理します。個別の出力スレッドにより、このバッファからイベントがデキューされ、Coherenceキャッシュにプッシュされます。バッファ・キューを保持する理由は、Coherenceキャッシュのput操作の動作のためです。Coherenceキャッシュにより、put操作のたびにすべてのマップ・リスナーが起動されるため、キャッシュ上のいずれかのダウンストリーム・マップ・リスナーでアウトバウンド・アダプタによって開始された各put操作に多くの時間がかかる場合、待機時間が増加する可能性があります。
4.19.3 OracleCoherenceCache
コヒーレンスOracleCoherenceCacheは、次のように、wlevs:cache要素を使用してEPN構成ファイルで定義されます。
<wlevs:caching-system id="coherenceCachingSystem" provider="coherence">
</wlevs:caching-system>
<wlevs:cache id="OracleCoherenceCache" >
<wlevs:caching-system ref="coherenceCachingSystem"/>
</wlevs:cache>4.19.4 EmployeeEvent
EmployeeEventは、次のように定義されます。
public class EmployeeEvent
{
private int id;
private String name;
private String dept;
private int phone;
public int getId()
{
return id;
}
public void setId(int id)
{
this.id = id;
}
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
public String getDept()
{
return dept;
}
public void setDept(String dept)
{
this.dept = dept;
}
public int getPhone()
{
return phone;
}