6 Create an Order Tracking Application with QuickFix Adapter

The QuickFix Adapter is a messaging engine for handling the real-time electronic exchange of securities transactions according to the Financial Information eXchange (FIX) standard. The QuickFix adapter listens for FIX messages, converts the FIX messages to Oracle Stream Analytics events, and sends the events to the next stage for processing.

This chapter includes the following sections:

6.1 Order Tracking Example Application

Figure 6-1 shows the EPN for a simple order tracking application called quickfix.ordertracker. FIX data enters the EPN on the left and is converted to an event. The event is channeled to an Oracle CQL processor, and after processing, is channeled to an event sink.

Figure 6-1 quickfix.ordertracker application EPN

Description of Figure 6-1 follows
Description of "Figure 6-1 quickfix.ordertracker application EPN"

The example is comprised of the following configuration and Java class files:

Assembly File

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:osgi="http://www.springframework.org/schema/osgi"
       xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
       xsi:schemaLocation="
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/osgi
  http://www.springframework.org/schema/osgi/spring-osgi.xsd
  http://www.bea.com/ns/wlevs/spring
  http://www.bea.com/ns/wlevs/spring/spring-wlevs-v12_1_3_0.xsd
  <wlevs:event-type-repository>
    <wlevs:event-type type-name="QuickFIXEvent">
      <wlevs:class>event.QuickFIXEvent</wlevs:class>
    </wlevs:event-type>
  </wlevs:event-type-repository>

  <wlevs:adapter id="QuickFIXInbound" provider="quickfix-inbound"/>
  <wlevs:processor id="QuickFIXProcessor"/>

  <wlevs:channel id="QuickFIXInChannel" event-type="QuickFIXEvent">
    <wlevs:listener ref="QuickFIXProcessor" />
    <wlevs:source ref="QuickFIXInbound"/>
  </wlevs:channel>

  <wlevs:event-bean id="eventListener" class="listeners.EventListener">
    <wlevs:instance-property name="nodeName" value="QuickFIXInbound" />
  </wlevs:event-bean>

  <wlevs:channel id="QuickFIXOutChannel" event-type="QuickFIXEvent">
    <wlevs:listener ref="eventListener" />
    <wlevs:source ref="QuickFIXProcessor"/>
  </wlevs:channel>
</beans>

Configuration File

The following entries show the QuickFix adapter configuration. The configuration requires at least one <default-session> property value and can have zero or more <session> tags.

The BeginString property indicates the FIX message version. This data is not validated by the QuickFIX engine.

Note:

Oracle Stream Analytics does not support QuickFIX dynamic acceptor sessions in the 12c release.

<?xml version="1.0" encoding="UTF-8"?><wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" >
<quickfix-adapter>
  <name>QuickFIXInbound</name>
    <event-type>QuickFIXEvent</event-type>
    <default-session description="default configuration">
      <configuration>
        <config-name>FileStorePath</config-name>
        <config-value>/scratch/tprabish/garage/quickfix/acceptor/filestore
         </config-value>
      </configuration> 
      <configuration>
         <config-name>FileLogPath</config-name>
         <config-value>/scratch/tprabish/garage/quickfix/acceptor/filelog
         </config-value>
      </configuration>
      <configuration>
        <config-name>UseDataDictionary</config-name>
        <config-value>N</config-value>
      </configuration>
      <configuration description="identifier-message format">
        <config-name>BeginString</config-name>
        <config-value>FIX.4.2</config-value>
      </configuration>
      <configuration>
        <config-name>ConnectionType</config-name>
        <config-value>acceptor</config-value>
      </configuration>
      <configuration>
        <config-name>StartTime</config-name>
        <config-value>00:00:00</config-value>
      </configuration>
      <configuration>
        <config-name>EndTime</config-name>
        <config-value>00:00:00</config-value>
      </configuration>
      <configuration>
        <config-name>HeartBtInt</config-name>
        <config-value>30</config-value>
      </configuration>
      <configuration>
        <config-name>SocketAcceptPort</config-name>
        <config-value>9876</config-value>
      </configuration>
      <configuration>
        <config-name>RefreshMessageStoreAtLogon</config-name>
        <config-value>Y</config-value>
      </configuration>
    </default-session>
 
    <session description="ordertracker configuration">
      <configuration description="identifier-acceptor">
        <config-name>SenderCompID</config-name>
        <config-value>SAMPLEACCEPTOR</config-value>
      </configuration>
      <configuration description="identifier-initiator">
        <config-name>TargetCompID</config-name>
        <config-value>SAMPLESENDER</config-value>
      </configuration>
    </session>
</quickfix-adapter>
<processor>
  <name>QuickFIXProcessor</name>
    <rules>
      <query id="ExampleQuery"> <![CDATA[ select * from QuickFIXInChannel 
           [now] ]]>  </query>
     </rules>
  </processor>
</wlevs:config>

QuickFIXEvent.java

package event;
 
public class QuickFIXEvent {
  private String text;
  private String origClOrdID;
  private String clOrdID;
  private String symbol;
  private String side;

  public String getText() { return text; }
  public void setText(String text) { this.text = text; } 
  public String getOrigClOrdID() { return origClOrdID; }
  public void setOrigClOrdID(String origClOrdID) {this.origClOrdID = origClOrdID;}
  public String getClOrdID() { return clOrdID; } 
  public void setClOrdID(String clOrdID) { this.clOrdID = clOrdID; }
  public String getSymbol() { return symbol; }
  public void setSymbol(String symbol) { this.symbol = symbol; } 
  public String getSide() { return side; }
  public void setSide(String side) { this.side = side; }

  @Override
  public String toString() {
    StringBuilder sb=new StringBuilder();
    sb.append("\t\t\t[origClOrdID=").append(origClOrdID).append("]\n")
      .append("\t\t\t[clOrdID=").append(clOrdID).append("]\n")
      .append("\t\t\t[symbol=").append(symbol).append("]\n")
      .append("\t\t\t[side=").append(side).append("]\n")
      .append("\t\t\t[text=").append(text).append("]\n");
    return sb.toString();
  }
}

EventListerner.java

The EventListener.java class is used in the example code follows.

package listeners;

import com.bea.wlevs.ede.api.EventRejectedException;
import com.bea.wlevs.ede.api.StreamSink;

//The following EventListener class is an event sink. It is provided to
//demonstrate that the messages have been received and processed in the EPN.
public class EventListener implements StreamSink {
  private String nodeName_ = "N/A";
  public void setNodeName(String nodeName) {
    nodeName_ = nodeName;
  }

  public void onInsertEvent(Object event) throws EventRejectedException {
    System.out.println("Received the following event in the listener");
    System.out.println("[node:" + nodeName_ + "] onInsertEvent():\n"    + event);
        }
}

6.2 Test the Example Application

After you deploy the example QuickFix adapter application, it starts the QuickFIX engine to listen at the port specified in the configuration file. To test the application, code a QuickFIX initiator application to send messages to this adapter with the following files.

Note:

You need to replace some values shown in the following examples based on your environment.

QuickFixSampleInitiatorApp.java

package proto.quickfix;

import java.io.FileInputStream;
import quickfix.Acceptor;
import quickfix.Application;
import quickfix.DefaultMessageFactory;
import quickfix.FileLogFactory;
import quickfix.FileStoreFactory;
import quickfix.Initiator;
import quickfix.LogFactory;
import quickfix.MessageFactory;
import quickfix.MessageStoreFactory;
import quickfix.ScreenLogFactory;
import quickfix.Session;
import quickfix.SessionNotFound;
import quickfix.SessionSettings;
import quickfix.SocketAcceptor;
import quickfix.SocketInitiator;
import quickfix.field.ClOrdID;
import quickfix.field.OrigClOrdID;
import quickfix.field.Side;
import quickfix.field.Symbol;
import quickfix.field.Text;

public class QuickFixSampleInitiatorApp {

  private static SampleInitiator initiatorApp;

  public static void main(String args[]) throws Exception {
    Initiator initiator = startInitiator();
    while(!initiator.isLoggedOn()){
      System.out.println("Waiting for initiator logon");
      synchronized (initiator) {
     initiator.wait(1000);
    }
    };
    System.out.println("initiator loggedon");
    initiatorApp.sendMessage();
    initiator.stop();
  }

 public static Initiator startInitiator() throws Exception {
   String fileName = 
   "D:/installs/helios/eclipse/workspace/quickfix.ordertracker/src/proto/
   initiator_sample.cfg";

  //initiatorApp is your class that implements the Application interface.
    initiatorApp = new SampleInitiator();
    SessionSettings settings = new SessionSettings(new FileInputStream(fileName));
    MessageStoreFactory messageStoreFactory = new FileStoreFactory(settings);
    LogFactory logFactory = new ScreenLogFactory(true, true, true);
    MessageFactory messageFactory = new DefaultMessageFactory();
    Initiator initiator = new SocketInitiator(initiatorApp,
    messageStoreFactory, settings, logFactory, messageFactory);
    initiator.start();
    return initiator;
  }
}

SampleInitiator.java

package proto.quickfix;

import quickfix.Application;
import quickfix.DoNotSend;
import quickfix.FieldNotFound;
import quickfix.IncorrectDataFormat;
import quickfix.IncorrectTagValue;
import quickfix.Message;
import quickfix.RejectLogon;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SessionNotFound;
import quickfix.UnsupportedMessageType;
import quickfix.field.ClOrdID;
import quickfix.field.OrigClOrdID;
import quickfix.field.Side;
import quickfix.field.Symbol;
import quickfix.field.Text;

public class SampleInitiator implements Application{
  private SessionID sessionIDTmp;
  public volatile boolean isLoggedOn;
 
  @Override
  public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound,
  IncorrectDataFormat, IncorrectTagValue, RejectLogon { }
 
  @Override
  public void fromApp(Message arg0, SessionID arg1) throws FieldNotFound,
       IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
    System.out.println("Initiator received message");
  }

  public void sendMessage() throws SessionNotFound {
    quickfix.fix42.OrderCancelRequest message = 
      new quickfix.fix41.OrderCancelRequest( new OrigClOrdID("123"), 
      new ClOrdID("321"), 
      new Symbol("LNUX"),
      new Side(Side.BUY)); 
     message.set(new Text("Cancel My Order!"));
     System.out.println("SESSION ID IS-"+sessionIDTmp);
     Session.sendToTarget(message, sessionIDTmp);
  }

  @Override
  public void onCreate(SessionID arg0) {
    System.out.println("Initiator session created"+arg0);
    this.sessionIDTmp=arg0;
  }
 
  @Override
  public void onLogon(SessionID arg0) {
    System.out.println("Initiator logon done"+arg0);
    isLoggedOn=true;
  }
 
//Optionally, you can implement the onLogout and toAdmin callback methods below.
//To implement them, see the QuickFix documentation: http://www.quickfixj.org.
 @Override
 public void onLogout(SessionID arg0){
   //Notifies you when an FIX session is offline. 
   //The cause can be a logout, forced termination, or loss of network connection. 
 }
 
  @Override
  public void toAdmin(Message arg0, SessionID arg1) { 
    //Provides a look at the administrative messages sent from your FIX machine
    //to another party. This method enables you to log messages. You can add
    // fields to an adminstrative message before it is sent. 
  }
 
  @Override
  public void toApp(Message arg0, SessionID arg1) throws DoNotSend {
    System.out.println("Initiator toApp");
  } 
}

initiator_sample.cfg

[default]
FileStorePath=/scratch/myusername/garage/quickfix/initiator/filestore
BeginString=FIX.4.2
UseDataDictionary=N
ConnectionType=initiator
 
[session]
SenderCompID=SAMPLESENDER
TargetCompID=SAMPLEACCEPTOR
SocketConnectHost=10.240.30.74
SocketConnectPort=9876
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=30
ReconnectInterval=5

runInitiator.bat

set QFJ_HOME=D:\installs\helios\eclipse\workspace\quickfix.ordertracker
set CP=%QFJ_HOME%/lib/mina-core-1.1.7.jar;%QFJ_HOME%/lib/slf4j-api-1.6.3.jar;%QFJ_HOME%/lib/slf4j-jdk14-1.6.3.jar;%QFJ_HOME%/lib/slf4j-log4j12-1.6.3.jar;%QFJ_HOME%/lib/quickfixj-all-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-core-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix40-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix41-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix42-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix43-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix44-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix50-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fixt11-1.5.2.jar;%QFJ_HOME%/dist/quickfix.ordertracker.jar
 
java -cp %CP% proto.quickfix.QuickFixSampleInitiatorApp