Oracle® Fusion Middleware Oracle Stream Analyticsイベント処理スタート・ガイド 12cリリース(12.2.1.2.0) E82781-01 |
|
前へ |
次へ |
QuickFixアダプタは、Financial Information eXchange (FIX)標準に準拠し、証券取引における電子交換をリアルタイム制御するメッセージング・エンジンです。QuickFixアダプタは、FIXメッセージのリスニング、FIXメッセージのOracle Stream Analyticsイベントへの変換、および次のステージで処理を行うためのイベント送信を行います。
この章の内容は次のとおりです。
図6-1に、quickfix.ordertracker
という名前の単純な注文追跡アプリケーションのEPNを示します。FIXデータは左からEPNに入り、イベントに変換されます。イベントはOracle CQLプロセッサに運ばれて処理された後、イベント・シンクに運ばれます。
サンプルは、次の構成ファイルおよびJavaクラス・ファイルで構成されています。
アセンブリ・ファイル
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi" xmlns:wlevs="http://www.bea.com/ns/wlevs/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd http://www.bea.com/ns/wlevs/spring http://www.bea.com/ns/wlevs/spring/spring-wlevs-v12_1_3_0.xsd <wlevs:event-type-repository> <wlevs:event-type type-name="QuickFIXEvent"> <wlevs:class>event.QuickFIXEvent</wlevs:class> </wlevs:event-type> </wlevs:event-type-repository> <wlevs:adapter id="QuickFIXInbound" provider="quickfix-inbound"/> <wlevs:processor id="QuickFIXProcessor"/> <wlevs:channel id="QuickFIXInChannel" event-type="QuickFIXEvent"> <wlevs:listener ref="QuickFIXProcessor" /> <wlevs:source ref="QuickFIXInbound"/> </wlevs:channel> <wlevs:event-bean id="eventListener" class="listeners.EventListener"> <wlevs:instance-property name="nodeName" value="QuickFIXInbound" /> </wlevs:event-bean> <wlevs:channel id="QuickFIXOutChannel" event-type="QuickFIXEvent"> <wlevs:listener ref="eventListener" /> <wlevs:source ref="QuickFIXProcessor"/> </wlevs:channel> </beans>
構成ファイル
次のエントリで、QuickFixアダプタ構成を示します。構成には1つ以上の<default-session>
プロパティ値が必要で、0個以上の<session>タグを持つことができます。
BeginString
プロパティがFIXメッセージのバージョンを示しています。QuickFIXエンジンは、このデータを検証しません。
注意:
12cリリースでは、Oracle Stream AnalyticsはQuickFIX動的アクセプタ・セッションをサポートしません。
<?xml version="1.0" encoding="UTF-8"?><wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" > <quickfix-adapter> <name>QuickFIXInbound</name> <event-type>QuickFIXEvent</event-type> <default-session description="default configuration"> <configuration> <config-name>FileStorePath</config-name> <config-value>/scratch/tprabish/garage/quickfix/acceptor/filestore </config-value> </configuration> <configuration> <config-name>FileLogPath</config-name> <config-value>/scratch/tprabish/garage/quickfix/acceptor/filelog </config-value> </configuration> <configuration> <config-name>UseDataDictionary</config-name> <config-value>N</config-value> </configuration> <configuration description="identifier-message format"> <config-name>BeginString</config-name> <config-value>FIX.4.2</config-value> </configuration> <configuration> <config-name>ConnectionType</config-name> <config-value>acceptor</config-value> </configuration> <configuration> <config-name>StartTime</config-name> <config-value>00:00:00</config-value> </configuration> <configuration> <config-name>EndTime</config-name> <config-value>00:00:00</config-value> </configuration> <configuration> <config-name>HeartBtInt</config-name> <config-value>30</config-value> </configuration> <configuration> <config-name>SocketAcceptPort</config-name> <config-value>9876</config-value> </configuration> <configuration> <config-name>RefreshMessageStoreAtLogon</config-name> <config-value>Y</config-value> </configuration> </default-session> <session description="ordertracker configuration"> <configuration description="identifier-acceptor"> <config-name>SenderCompID</config-name> <config-value>SAMPLEACCEPTOR</config-value> </configuration> <configuration description="identifier-initiator"> <config-name>TargetCompID</config-name> <config-value>SAMPLESENDER</config-value> </configuration> </session> </quickfix-adapter> <processor> <name>QuickFIXProcessor</name> <rules> <query id="ExampleQuery"> <![CDATA[ select * from QuickFIXInChannel [now] ]]> </query> </rules> </processor> </wlevs:config>
QuickFIXEvent.java
package event; public class QuickFIXEvent { private String text; private String origClOrdID; private String clOrdID; private String symbol; private String side; public String getText() { return text; } public void setText(String text) { this.text = text; } public String getOrigClOrdID() { return origClOrdID; } public void setOrigClOrdID(String origClOrdID) {this.origClOrdID = origClOrdID;} public String getClOrdID() { return clOrdID; } public void setClOrdID(String clOrdID) { this.clOrdID = clOrdID; } public String getSymbol() { return symbol; } public void setSymbol(String symbol) { this.symbol = symbol; } public String getSide() { return side; } public void setSide(String side) { this.side = side; } @Override public String toString() { StringBuilder sb=new StringBuilder(); sb.append("\t\t\t[origClOrdID=").append(origClOrdID).append("]\n") .append("\t\t\t[clOrdID=").append(clOrdID).append("]\n") .append("\t\t\t[symbol=").append(symbol).append("]\n") .append("\t\t\t[side=").append(side).append("]\n") .append("\t\t\t[text=").append(text).append("]\n"); return sb.toString(); } }
EventListerner.java
EventListener.java
クラスは、次のサンプル・コードで使用されます。
package listeners; import com.bea.wlevs.ede.api.EventRejectedException; import com.bea.wlevs.ede.api.StreamSink; //The following EventListener class is an event sink. It is provided to //demonstrate that the messages have been received and processed in the EPN. public class EventListener implements StreamSink { private String nodeName_ = "N/A"; public void setNodeName(String nodeName) { nodeName_ = nodeName; } public void onInsertEvent(Object event) throws EventRejectedException { System.out.println("Received the following event in the listener"); System.out.println("[node:" + nodeName_ + "] onInsertEvent():\n" + event); } }
サンプルQuickFixアダプタ・アプリケーションをデプロイすると、QuickFIXエンジンが開始され、構成ファイルで指定されたポートをリスニングします。アプリケーションをテストするには、次のファイルでQuickFIXイニシエータ・アプリケーションを記述してこのアダプタにメッセージを送信します。
注意:
次のサンプルで示されている値の一部は、環境に基づいて置換する必要があります。
QuickFixSampleInitiatorApp.java
package proto.quickfix; import java.io.FileInputStream; import quickfix.Acceptor; import quickfix.Application; import quickfix.DefaultMessageFactory; import quickfix.FileLogFactory; import quickfix.FileStoreFactory; import quickfix.Initiator; import quickfix.LogFactory; import quickfix.MessageFactory; import quickfix.MessageStoreFactory; import quickfix.ScreenLogFactory; import quickfix.Session; import quickfix.SessionNotFound; import quickfix.SessionSettings; import quickfix.SocketAcceptor; import quickfix.SocketInitiator; import quickfix.field.ClOrdID; import quickfix.field.OrigClOrdID; import quickfix.field.Side; import quickfix.field.Symbol; import quickfix.field.Text; public class QuickFixSampleInitiatorApp { private static SampleInitiator initiatorApp; public static void main(String args[]) throws Exception { Initiator initiator = startInitiator(); while(!initiator.isLoggedOn()){ System.out.println("Waiting for initiator logon"); synchronized (initiator) { initiator.wait(1000); } }; System.out.println("initiator loggedon"); initiatorApp.sendMessage(); initiator.stop(); } public static Initiator startInitiator() throws Exception { String fileName = "D:/installs/helios/eclipse/workspace/quickfix.ordertracker/src/proto/ initiator_sample.cfg"; //initiatorApp is your class that implements the Application interface. initiatorApp = new SampleInitiator(); SessionSettings settings = new SessionSettings(new FileInputStream(fileName)); MessageStoreFactory messageStoreFactory = new FileStoreFactory(settings); LogFactory logFactory = new ScreenLogFactory(true, true, true); MessageFactory messageFactory = new DefaultMessageFactory(); Initiator initiator = new SocketInitiator(initiatorApp, messageStoreFactory, settings, logFactory, messageFactory); initiator.start(); return initiator; } }
SampleInitiator.java
package proto.quickfix; import quickfix.Application; import quickfix.DoNotSend; import quickfix.FieldNotFound; import quickfix.IncorrectDataFormat; import quickfix.IncorrectTagValue; import quickfix.Message; import quickfix.RejectLogon; import quickfix.Session; import quickfix.SessionID; import quickfix.SessionNotFound; import quickfix.UnsupportedMessageType; import quickfix.field.ClOrdID; import quickfix.field.OrigClOrdID; import quickfix.field.Side; import quickfix.field.Symbol; import quickfix.field.Text; public class SampleInitiator implements Application{ private SessionID sessionIDTmp; public volatile boolean isLoggedOn; @Override public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon { } @Override public void fromApp(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType { System.out.println("Initiator received message"); } public void sendMessage() throws SessionNotFound { quickfix.fix42.OrderCancelRequest message = new quickfix.fix41.OrderCancelRequest( new OrigClOrdID("123"), new ClOrdID("321"), new Symbol("LNUX"), new Side(Side.BUY)); message.set(new Text("Cancel My Order!")); System.out.println("SESSION ID IS-"+sessionIDTmp); Session.sendToTarget(message, sessionIDTmp); } @Override public void onCreate(SessionID arg0) { System.out.println("Initiator session created"+arg0); this.sessionIDTmp=arg0; } @Override public void onLogon(SessionID arg0) { System.out.println("Initiator logon done"+arg0); isLoggedOn=true; } //Optionally, you can implement the onLogout and toAdmin callback methods below. //To implement them, see the QuickFix documentation: http://www.quickfixj.org. @Override public void onLogout(SessionID arg0){ //Notifies you when an FIX session is offline. //The cause can be a logout, forced termination, or loss of network connection. } @Override public void toAdmin(Message arg0, SessionID arg1) { //Provides a look at the administrative messages sent from your FIX machine //to another party. This method enables you to log messages. You can add // fields to an adminstrative message before it is sent. } @Override public void toApp(Message arg0, SessionID arg1) throws DoNotSend { System.out.println("Initiator toApp"); } }
initiator_sample.cfg
[default] FileStorePath=/scratch/myusername/garage/quickfix/initiator/filestore BeginString=FIX.4.2 UseDataDictionary=N ConnectionType=initiator [session] SenderCompID=SAMPLESENDER TargetCompID=SAMPLEACCEPTOR SocketConnectHost=10.240.30.74 SocketConnectPort=9876 StartTime=00:00:00 EndTime=00:00:00 HeartBtInt=30 ReconnectInterval=5
runInitiator.bat
set QFJ_HOME=D:\installs\helios\eclipse\workspace\quickfix.ordertracker set CP=%QFJ_HOME%/lib/mina-core-1.1.7.jar;%QFJ_HOME%/lib/slf4j-api-1.6.3.jar;%QFJ_HOME%/lib/slf4j-jdk14-1.6.3.jar;%QFJ_HOME%/lib/slf4j-log4j12-1.6.3.jar;%QFJ_HOME%/lib/quickfixj-all-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-core-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix40-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix41-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix42-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix43-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix44-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix50-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fixt11-1.5.2.jar;%QFJ_HOME%/dist/quickfix.ordertracker.jar java -cp %CP% proto.quickfix.QuickFixSampleInitiatorApp