プライマリ・コンテンツに移動
Oracle® Fusion Middleware Oracle Stream Analyticsイベント処理スタート・ガイド
12cリリース(12.2.1.2.0)
E82781-01
目次へ移動
目次

前
前へ
次
次へ

6 QuickFixアダプタを使用した注文追跡アプリケーションの作成

QuickFixアダプタは、Financial Information eXchange (FIX)標準に準拠し、証券取引における電子交換をリアルタイム制御するメッセージング・エンジンです。QuickFixアダプタは、FIXメッセージのリスニング、FIXメッセージのOracle Stream Analyticsイベントへの変換、および次のステージで処理を行うためのイベント送信を行います。

この章の内容は次のとおりです。

6.1 注文追跡サンプル・アプリケーション

図6-1に、quickfix.ordertrackerという名前の単純な注文追跡アプリケーションのEPNを示します。FIXデータは左からEPNに入り、イベントに変換されます。イベントはOracle CQLプロセッサに運ばれて処理された後、イベント・シンクに運ばれます。

図6-1 quickfix.ordertrackerアプリケーションEPN

図6-1の説明が続きます
「図6-1 quickfix.ordertrackerアプリケーションEPN」の説明

サンプルは、次の構成ファイルおよびJavaクラス・ファイルで構成されています。

アセンブリ・ファイル

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:osgi="http://www.springframework.org/schema/osgi"
       xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
       xsi:schemaLocation="
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/osgi
  http://www.springframework.org/schema/osgi/spring-osgi.xsd
  http://www.bea.com/ns/wlevs/spring
  http://www.bea.com/ns/wlevs/spring/spring-wlevs-v12_1_3_0.xsd
  <wlevs:event-type-repository>
    <wlevs:event-type type-name="QuickFIXEvent">
      <wlevs:class>event.QuickFIXEvent</wlevs:class>
    </wlevs:event-type>
  </wlevs:event-type-repository>

  <wlevs:adapter id="QuickFIXInbound" provider="quickfix-inbound"/>
  <wlevs:processor id="QuickFIXProcessor"/>

  <wlevs:channel id="QuickFIXInChannel" event-type="QuickFIXEvent">
    <wlevs:listener ref="QuickFIXProcessor" />
    <wlevs:source ref="QuickFIXInbound"/>
  </wlevs:channel>

  <wlevs:event-bean id="eventListener" class="listeners.EventListener">
    <wlevs:instance-property name="nodeName" value="QuickFIXInbound" />
  </wlevs:event-bean>

  <wlevs:channel id="QuickFIXOutChannel" event-type="QuickFIXEvent">
    <wlevs:listener ref="eventListener" />
    <wlevs:source ref="QuickFIXProcessor"/>
  </wlevs:channel>
</beans>

構成ファイル

次のエントリで、QuickFixアダプタ構成を示します。構成には1つ以上の<default-session>プロパティ値が必要で、0個以上の<session>タグを持つことができます。

BeginStringプロパティがFIXメッセージのバージョンを示しています。QuickFIXエンジンは、このデータを検証しません。

注意:

12cリリースでは、Oracle Stream AnalyticsはQuickFIX動的アクセプタ・セッションをサポートしません。

<?xml version="1.0" encoding="UTF-8"?><wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" >
<quickfix-adapter>
  <name>QuickFIXInbound</name>
    <event-type>QuickFIXEvent</event-type>
    <default-session description="default configuration">
      <configuration>
        <config-name>FileStorePath</config-name>
        <config-value>/scratch/tprabish/garage/quickfix/acceptor/filestore
         </config-value>
      </configuration> 
      <configuration>
         <config-name>FileLogPath</config-name>
         <config-value>/scratch/tprabish/garage/quickfix/acceptor/filelog
         </config-value>
      </configuration>
      <configuration>
        <config-name>UseDataDictionary</config-name>
        <config-value>N</config-value>
      </configuration>
      <configuration description="identifier-message format">
        <config-name>BeginString</config-name>
        <config-value>FIX.4.2</config-value>
      </configuration>
      <configuration>
        <config-name>ConnectionType</config-name>
        <config-value>acceptor</config-value>
      </configuration>
      <configuration>
        <config-name>StartTime</config-name>
        <config-value>00:00:00</config-value>
      </configuration>
      <configuration>
        <config-name>EndTime</config-name>
        <config-value>00:00:00</config-value>
      </configuration>
      <configuration>
        <config-name>HeartBtInt</config-name>
        <config-value>30</config-value>
      </configuration>
      <configuration>
        <config-name>SocketAcceptPort</config-name>
        <config-value>9876</config-value>
      </configuration>
      <configuration>
        <config-name>RefreshMessageStoreAtLogon</config-name>
        <config-value>Y</config-value>
      </configuration>
    </default-session>
 
    <session description="ordertracker configuration">
      <configuration description="identifier-acceptor">
        <config-name>SenderCompID</config-name>
        <config-value>SAMPLEACCEPTOR</config-value>
      </configuration>
      <configuration description="identifier-initiator">
        <config-name>TargetCompID</config-name>
        <config-value>SAMPLESENDER</config-value>
      </configuration>
    </session>
</quickfix-adapter>
<processor>
  <name>QuickFIXProcessor</name>
    <rules>
      <query id="ExampleQuery"> <![CDATA[ select * from QuickFIXInChannel 
           [now] ]]>  </query>
     </rules>
  </processor>
</wlevs:config>

QuickFIXEvent.java

package event;
 
public class QuickFIXEvent {
  private String text;
  private String origClOrdID;
  private String clOrdID;
  private String symbol;
  private String side;

  public String getText() { return text; }
  public void setText(String text) { this.text = text; } 
  public String getOrigClOrdID() { return origClOrdID; }
  public void setOrigClOrdID(String origClOrdID) {this.origClOrdID = origClOrdID;}
  public String getClOrdID() { return clOrdID; } 
  public void setClOrdID(String clOrdID) { this.clOrdID = clOrdID; }
  public String getSymbol() { return symbol; }
  public void setSymbol(String symbol) { this.symbol = symbol; } 
  public String getSide() { return side; }
  public void setSide(String side) { this.side = side; }

  @Override
  public String toString() {
    StringBuilder sb=new StringBuilder();
    sb.append("\t\t\t[origClOrdID=").append(origClOrdID).append("]\n")
      .append("\t\t\t[clOrdID=").append(clOrdID).append("]\n")
      .append("\t\t\t[symbol=").append(symbol).append("]\n")
      .append("\t\t\t[side=").append(side).append("]\n")
      .append("\t\t\t[text=").append(text).append("]\n");
    return sb.toString();
  }
}

EventListerner.java

EventListener.javaクラスは、次のサンプル・コードで使用されます。

package listeners;

import com.bea.wlevs.ede.api.EventRejectedException;
import com.bea.wlevs.ede.api.StreamSink;

//The following EventListener class is an event sink. It is provided to
//demonstrate that the messages have been received and processed in the EPN.
public class EventListener implements StreamSink {
  private String nodeName_ = "N/A";
  public void setNodeName(String nodeName) {
    nodeName_ = nodeName;
  }

  public void onInsertEvent(Object event) throws EventRejectedException {
    System.out.println("Received the following event in the listener");
    System.out.println("[node:" + nodeName_ + "] onInsertEvent():\n" 	+ event);
 	}
}

6.2 サンプル・アプリケーションのテスト

サンプルQuickFixアダプタ・アプリケーションをデプロイすると、QuickFIXエンジンが開始され、構成ファイルで指定されたポートをリスニングします。アプリケーションをテストするには、次のファイルでQuickFIXイニシエータ・アプリケーションを記述してこのアダプタにメッセージを送信します。

注意:

次のサンプルで示されている値の一部は、環境に基づいて置換する必要があります。

QuickFixSampleInitiatorApp.java

package proto.quickfix;

import java.io.FileInputStream;
import quickfix.Acceptor;
import quickfix.Application;
import quickfix.DefaultMessageFactory;
import quickfix.FileLogFactory;
import quickfix.FileStoreFactory;
import quickfix.Initiator;
import quickfix.LogFactory;
import quickfix.MessageFactory;
import quickfix.MessageStoreFactory;
import quickfix.ScreenLogFactory;
import quickfix.Session;
import quickfix.SessionNotFound;
import quickfix.SessionSettings;
import quickfix.SocketAcceptor;
import quickfix.SocketInitiator;
import quickfix.field.ClOrdID;
import quickfix.field.OrigClOrdID;
import quickfix.field.Side;
import quickfix.field.Symbol;
import quickfix.field.Text;

public class QuickFixSampleInitiatorApp {

  private static SampleInitiator initiatorApp;

  public static void main(String args[]) throws Exception {
    Initiator initiator = startInitiator();
    while(!initiator.isLoggedOn()){
      System.out.println("Waiting for initiator logon");
      synchronized (initiator) {
     initiator.wait(1000);
    }
    };
    System.out.println("initiator loggedon");
    initiatorApp.sendMessage();
    initiator.stop();
  }

 public static Initiator startInitiator() throws Exception {
   String fileName = 
   "D:/installs/helios/eclipse/workspace/quickfix.ordertracker/src/proto/
   initiator_sample.cfg";

  //initiatorApp is your class that implements the Application interface.
    initiatorApp = new SampleInitiator();
    SessionSettings settings = new SessionSettings(new FileInputStream(fileName));
    MessageStoreFactory messageStoreFactory = new FileStoreFactory(settings);
    LogFactory logFactory = new ScreenLogFactory(true, true, true);
    MessageFactory messageFactory = new DefaultMessageFactory();
    Initiator initiator = new SocketInitiator(initiatorApp,
    messageStoreFactory, settings, logFactory, messageFactory);
    initiator.start();
    return initiator;
  }
}

SampleInitiator.java

package proto.quickfix;

import quickfix.Application;
import quickfix.DoNotSend;
import quickfix.FieldNotFound;
import quickfix.IncorrectDataFormat;
import quickfix.IncorrectTagValue;
import quickfix.Message;
import quickfix.RejectLogon;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SessionNotFound;
import quickfix.UnsupportedMessageType;
import quickfix.field.ClOrdID;
import quickfix.field.OrigClOrdID;
import quickfix.field.Side;
import quickfix.field.Symbol;
import quickfix.field.Text;

public class SampleInitiator implements Application{
  private SessionID sessionIDTmp;
  public volatile boolean isLoggedOn;
 
  @Override
  public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound,
  IncorrectDataFormat, IncorrectTagValue, RejectLogon { }
 
  @Override
  public void fromApp(Message arg0, SessionID arg1) throws FieldNotFound,
       IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
    System.out.println("Initiator received message");
  }

  public void sendMessage() throws SessionNotFound {
    quickfix.fix42.OrderCancelRequest message = 
      new quickfix.fix41.OrderCancelRequest( new OrigClOrdID("123"), 
      new ClOrdID("321"), 
      new Symbol("LNUX"),
      new Side(Side.BUY)); 
     message.set(new Text("Cancel My Order!"));
     System.out.println("SESSION ID IS-"+sessionIDTmp);
     Session.sendToTarget(message, sessionIDTmp);
  }

  @Override
  public void onCreate(SessionID arg0) {
    System.out.println("Initiator session created"+arg0);
    this.sessionIDTmp=arg0;
  }
 
  @Override
  public void onLogon(SessionID arg0) {
    System.out.println("Initiator logon done"+arg0);
    isLoggedOn=true;
  }
 
//Optionally, you can implement the onLogout and toAdmin callback methods below.
//To implement them, see the QuickFix documentation: http://www.quickfixj.org.
 @Override
 public void onLogout(SessionID arg0){
   //Notifies you when an FIX session is offline. 
   //The cause can be a logout, forced termination, or loss of network connection. 
 }
 
  @Override
  public void toAdmin(Message arg0, SessionID arg1) { 
    //Provides a look at the administrative messages sent from your FIX machine
    //to another party. This method enables you to log messages. You can add
    // fields to an adminstrative message before it is sent. 
  }
 
  @Override
  public void toApp(Message arg0, SessionID arg1) throws DoNotSend {
    System.out.println("Initiator toApp");
  } 
}

initiator_sample.cfg

[default]
FileStorePath=/scratch/myusername/garage/quickfix/initiator/filestore
BeginString=FIX.4.2
UseDataDictionary=N
ConnectionType=initiator
 
[session]
SenderCompID=SAMPLESENDER
TargetCompID=SAMPLEACCEPTOR
SocketConnectHost=10.240.30.74
SocketConnectPort=9876
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=30
ReconnectInterval=5

runInitiator.bat

set QFJ_HOME=D:\installs\helios\eclipse\workspace\quickfix.ordertracker
set CP=%QFJ_HOME%/lib/mina-core-1.1.7.jar;%QFJ_HOME%/lib/slf4j-api-1.6.3.jar;%QFJ_HOME%/lib/slf4j-jdk14-1.6.3.jar;%QFJ_HOME%/lib/slf4j-log4j12-1.6.3.jar;%QFJ_HOME%/lib/quickfixj-all-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-core-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix40-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix41-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix42-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix43-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix44-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fix50-1.5.2.jar;%QFJ_HOME%/lib/quickfixj-msg-fixt11-1.5.2.jar;%QFJ_HOME%/dist/quickfix.ordertracker.jar
 
java -cp %CP% proto.quickfix.QuickFixSampleInitiatorApp