The QuickFix Adapter is a messaging engine for handling the real-time electronic exchange of securities transactions according to the Financial Information eXchange (FIX) standard. The QuickFix adapter listens for FIX messages, converts the FIX messages to Oracle Stream Analytics events, and sends the events to the next stage for processing.
This chapter includes the following sections:
Figure 6-1 shows the EPN for a simple order tracking application called quickfix.ordertracker
. FIX data enters the EPN on the left and is converted to an event. The event is channeled to an Oracle CQL processor, and after processing, is channeled to an event sink.
Figure 6-1 quickfix.ordertracker application EPN
The example is comprised of the following configuration and Java class files:
Assembly File
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi" xmlns:wlevs="http://www.bea.com/ns/wlevs/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd http://www.bea.com/ns/wlevs/spring http://www.bea.com/ns/wlevs/spring/spring-wlevs-v12_1_3_0.xsd <wlevs:event-type-repository> <wlevs:event-type type-name="QuickFIXEvent"> <wlevs:class>event.QuickFIXEvent</wlevs:class> </wlevs:event-type> </wlevs:event-type-repository> <wlevs:adapter id="QuickFIXInbound" provider="quickfix-inbound"/> <wlevs:processor id="QuickFIXProcessor"/> <wlevs:channel id="QuickFIXInChannel" event-type="QuickFIXEvent"> <wlevs:listener ref="QuickFIXProcessor" /> <wlevs:source ref="QuickFIXInbound"/> </wlevs:channel> <wlevs:event-bean id="eventListener" class="listeners.EventListener"> <wlevs:instance-property name="nodeName" value="QuickFIXInbound" /> </wlevs:event-bean> <wlevs:channel id="QuickFIXOutChannel" event-type="QuickFIXEvent"> <wlevs:listener ref="eventListener" /> <wlevs:source ref="QuickFIXProcessor"/> </wlevs:channel> </beans>
Configuration File
The following entries show the QuickFix adapter configuration. The configuration requires at least one <default-session>
property value and can have zero or more <session> tags.
The BeginString
property indicates the FIX message version. This data is not validated by the QuickFIX engine.
Note:
Oracle Stream Analytics does not support QuickFIX dynamic acceptor sessions in the 12c release.
<?xml version="1.0" encoding="UTF-8"?><wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" > <quickfix-adapter> <name>QuickFIXInbound</name> <event-type>QuickFIXEvent</event-type> <default-session description="default configuration"> <configuration> <config-name>FileStorePath</config-name> <config-value>/scratch/tprabish/garage/quickfix/acceptor/filestore </config-value> </configuration> <configuration> <config-name>FileLogPath</config-name> <config-value>/scratch/tprabish/garage/quickfix/acceptor/filelog </config-value> </configuration> <configuration> <config-name>UseDataDictionary</config-name> <config-value>N</config-value> </configuration> <configuration description="identifier-message format"> <config-name>BeginString</config-name> <config-value>FIX.4.2</config-value> </configuration> <configuration> <config-name>ConnectionType</config-name> <config-value>acceptor</config-value> </configuration> <configuration> <config-name>StartTime</config-name> <config-value>00:00:00</config-value> </configuration> <configuration> <config-name>EndTime</config-name> <config-value>00:00:00</config-value> </configuration> <configuration> <config-name>HeartBtInt</config-name> <config-value>30</config-value> </configuration> <configuration> <config-name>SocketAcceptPort</config-name> <config-value>9876</config-value> </configuration> <configuration> <config-name>RefreshMessageStoreAtLogon</config-name> <config-value>Y</config-value> </configuration> </default-session> <session description="ordertracker configuration"> <configuration description="identifier-acceptor"> <config-name>SenderCompID</config-name> <config-value>SAMPLEACCEPTOR</config-value> </configuration> <configuration description="identifier-initiator"> <config-name>TargetCompID</config-name> <config-value>SAMPLESENDER</config-value> </configuration> </session> </quickfix-adapter> <processor> <name>QuickFIXProcessor</name> <rules> <query id="ExampleQuery"> <![CDATA[ select * from QuickFIXInChannel [now] ]]> </query> </rules> </processor> </wlevs:config>
QuickFIXEvent.java
package event; public class QuickFIXEvent { private String text; private String origClOrdID; private String clOrdID; private String symbol; private String side; public String getText() { return text; } public void setText(String text) { this.text = text; } public String getOrigClOrdID() { return origClOrdID; } public void setOrigClOrdID(String origClOrdID) {this.origClOrdID = origClOrdID;} public String getClOrdID() { return clOrdID; } public void setClOrdID(String clOrdID) { this.clOrdID = clOrdID; } public String getSymbol() { return symbol; } public void setSymbol(String symbol) { this.symbol = symbol; } public String getSide() { return side; } public void setSide(String side) { this.side = side; } @Override public String toString() { StringBuilder sb=new StringBuilder(); sb.append("\t\t\t[origClOrdID=").append(origClOrdID).append("]\n") .append("\t\t\t[clOrdID=").append(clOrdID).append("]\n") .append("\t\t\t[symbol=").append(symbol).append("]\n") .append("\t\t\t[side=").append(side).append("]\n") .append("\t\t\t[text=").append(text).append("]\n"); return sb.toString(); } }
EventListerner.java
The EventListener.java
class is used in the example code follows.
package listeners; import com.bea.wlevs.ede.api.EventRejectedException; import com.bea.wlevs.ede.api.StreamSink; //The following EventListener class is an event sink. It is provided to //demonstrate that the messages have been received and processed in the EPN. public class EventListener implements StreamSink { private String nodeName_ = "N/A"; public void setNodeName(String nodeName) { nodeName_ = nodeName; } public void onInsertEvent(Object event) throws EventRejectedException { System.out.println("Received the following event in the listener"); System.out.println("[node:" + nodeName_ + "] onInsertEvent():\n" + event); } }
After you deploy the example QuickFix adapter application, it starts the QuickFIX engine to listen at the port specified in the configuration file. To test the application, code a QuickFIX initiator application to send messages to this adapter with the following files.
Note:
You need to replace some values shown in the following examples based on your environment.
QuickFixSampleInitiatorApp.java
package proto.quickfix; import java.io.FileInputStream; import quickfix.Acceptor; import quickfix.Application; import quickfix.DefaultMessageFactory; import quickfix.FileLogFactory; import quickfix.FileStoreFactory; import quickfix.Initiator; import quickfix.LogFactory; import quickfix.MessageFactory; import quickfix.MessageStoreFactory; import quickfix.ScreenLogFactory; import quickfix.Session; import quickfix.SessionNotFound; import quickfix.SessionSettings; import quickfix.SocketAcceptor; import quickfix.SocketInitiator; import quickfix.field.ClOrdID; import quickfix.field.OrigClOrdID; import quickfix.field.Side; import quickfix.field.Symbol; import quickfix.field.Text; public class QuickFixSampleInitiatorApp { private static SampleInitiator initiatorApp; public static void main(String args[]) throws Exception { Initiator initiator = startInitiator(); while(!initiator.isLoggedOn()){ System.out.println("Waiting for initiator logon"); synchronized (initiator) { initiator.wait(1000); } }; System.out.println("initiator loggedon"); initiatorApp.sendMessage(); initiator.stop(); } public static Initiator startInitiator() throws Exception { String fileName = "D:/installs/helios/eclipse/workspace/quickfix.ordertracker/src/proto/ initiator_sample.cfg"; //initiatorApp is your class that implements the Application interface. initiatorApp = new SampleInitiator(); SessionSettings settings = new SessionSettings(new FileInputStream(fileName)); MessageStoreFactory messageStoreFactory = new FileStoreFactory(settings); LogFactory logFactory = new ScreenLogFactory(true, true, true); MessageFactory messageFactory = new DefaultMessageFactory(); Initiator initiator = new SocketInitiator(initiatorApp, messageStoreFactory, settings, logFactory, messageFactory); initiator.start(); return initiator; } }
SampleInitiator.java
package proto.quickfix; import quickfix.Application; import quickfix.DoNotSend; import quickfix.FieldNotFound; import quickfix.IncorrectDataFormat; import quickfix.IncorrectTagValue; import quickfix.Message; import quickfix.RejectLogon; import quickfix.Session; import quickfix.SessionID; import quickfix.SessionNotFound; import quickfix.UnsupportedMessageType; import quickfix.field.ClOrdID; import quickfix.field.OrigClOrdID; import quickfix.field.Side; import quickfix.field.Symbol; import quickfix.field.Text; public class SampleInitiator implements Application{ private SessionID sessionIDTmp; public volatile boolean isLoggedOn; @Override public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon { } @Override public void fromApp(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType { System.out.println("Initiator received message"); } public void sendMessage() throws SessionNotFound { quickfix.fix42.OrderCancelRequest message = new quickfix.fix41.OrderCancelRequest( new OrigClOrdID("123"), new ClOrdID("321"), new Symbol("LNUX"), new Side(Side.BUY)); message.set(new Text("Cancel My Order!")); System.out.println("SESSION ID IS-"+sessionIDTmp); Session.sendToTarget(message, sessionIDTmp); } @Override public void onCreate(SessionID arg0) { System.out.println("Initiator session created"+arg0); this.sessionIDTmp=arg0; } @Override public void onLogon(SessionID arg0) { System.out.println("Initiator logon done"+arg0); isLoggedOn=true; } //Optionally, you can implement the onLogout and toAdmin callback methods below. //To implement them, see the QuickFix documentation: http://www.quickfixj.org. @Override public void onLogout(SessionID arg0){ //Notifies you when an FIX session is offline. //The cause can be a logout, forced termination, or loss of network connection. } @Override public void toAdmin(Message arg0, SessionID arg1) { //Provides a look at the administrative messages sent from your FIX machine //to another party. This method enables you to log messages. You can add // fields to an adminstrative message before it is sent. } @Override public void toApp(Message arg0, SessionID arg1) throws DoNotSend { System.out.println("Initiator toApp"); } }
initiator_sample.cfg
[default] FileStorePath=/scratch/myusername/garage/quickfix/initiator/filestore BeginString=FIX.4.2 UseDataDictionary=N ConnectionType=initiator [session] SenderCompID=SAMPLESENDER TargetCompID=SAMPLEACCEPTOR SocketConnectHost=10.240.30.74 SocketConnectPort=9876 StartTime=00:00:00 EndTime=00:00:00 HeartBtInt=30 ReconnectInterval=5
runInitiator.bat
set QFJ_HOME=D:\installs\helios\eclipse\workspace\quickfix.ordertracker set CP=%QFJ_HOME%/lib/mina-core-1.1.7.jar;%QFJ_HOME%/lib/slf4j-api-1.6.3.jar;%QFJ_HOME%/lib/slf4j-jdk14-1.6.3.jar;%QFJ_HOME%/lib/slf4j-log4j12-1.6.3.jar;%QFJ_HOME%/lib/quickfixj-all-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-core-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix40-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix41-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix42-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix43-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix44-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix50-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fixt11-1.5.2.jar;%QFJ_HOME%/dist/quickfix.ordertracker.jar java -cp %CP% proto.quickfix.QuickFixSampleInitiatorApp