データ・ドメインへのレコードの送信

BulkIngesterクラスは、Endecaデータ・ドメインにデータをロードするための、クライアント側バルク・インタフェースのプライマリ・エントリ・ポイントです。

BulkIngesterでは、Endecaデータ・ドメインのDgraphに対するソケット接続を確立して、返信を処理するスレッドを作成します。そのsendRecord()メソッドによって、指定したレコードがワイヤ経由でDgraphに送信されます。

このインタフェースのクライアントは、次の処理を行う必要があります。
  1. 4つのコールバック・インタフェース(ErrorCallbackFinishedCallbackAbortCallbackおよびStatusCallback)を実装し、それらのハンドラ・メソッドが呼び出されたときに(レスポンス・スレッドで発生)適切な処理を実行するクラスを定義します。
  2. コンストラクタが要求した適切なパラメータを指定して、BulkIngesterオブジェクトをインスタンス化します。
  3. begin()メソッドを呼び出して、レスポンス・スレッドを開始します。これが呼び出されないと、IOExceptionがスローされます。
  4. sendRecord()を繰り返し呼び出して、RecordオブジェクトをDgraphに送信します。
  5. レコードの送信が終了したら、endIngest()を呼び出してレスポンス・スレッドを終了し、ソケットを閉じます。

定期的にrequestStatusUpdate()メソッドを呼び出し、現在のステータスについてDgraphをポーリングすることをお薦めします。

コールバック・インタフェースの定義

BulkIngesterコンストラクタには、パラメータとして4つのコールバック・インタフェースが必要です。
  • ErrorCallbackではエラー状態を処理します。収集操作中にエラーが発生すると、handleError()メソッドが呼び出されます。
  • Dgraphによって収集終了が報告されると、FinishedCallbackが呼び出されます。その後もレコードは受け入れるには、begin()を再度呼び出す必要があります。
  • AbortCallbackでは中断状態を処理します。中断状態は、BulkIngesterまたはDgraphのいずれかで発生する可能性があります。
  • StatusCallbackではステータス更新(正常に収集されたレコードの数と拒否されたレコードの数を含む)を処理します。
ErrorCallbackはレコードが拒否された理由を報告するので、特に役立ちます。このコールバックを定義する例は次のとおりです。
ErrorCallback errorCallback = new ErrorCallback() {
    void handleError(String reason, Record reject) {
        System.out.println("Record "
                + reject.getSpec().getName()
                + " rejected: " + reason);
    }
};

BulkIngesterオブジェクトのインスタンス化

BulkIngesterコンストラクタには、次の順に11個のパラメータが必要です。
パラメータ 意味
host Dgraphが実行されているマシンの名前(String)。マシン名は、管理WebサービスのallocateBulkLoadPort操作を使用して取得できます。
port Dgraphのバルク・ロード・ポート(int)。ポートは、管理WebサービスのallocateBulkLoadPort操作を使用して取得できます。
socketFactory バルク収集操作用のソケットを作成するSocketFactory。サンプル・プログラムでは、非SSL接続にSocketFactory.getDefault()を、SSL接続にSSLSocketFactory.getDefault()を使用します。どちらのメソッドも、インポートされたjavax.net.SocketFactoryクラスおよびjavax.net.ssl.SSLSocketFactoryクラスに属します。
transactionId 外部トランザクション(トランザクションWebサービスにより開始された場合)のID (String)。外部トランザクションを使用していない場合は、nullを使用します。
doFinalMerge 収集後ただちにマージを実施するかどうかを指定するブール値(boolean)。trueに設定すると、収集のパフォーマンスが低下します。したがって、複数のバルク・ロード収集を実行する場合には、最後の収集ジョブを除くすべてのジョブでこれをfalseに設定する必要があります。
doUpdateDictionary 収集後ただちにaspell辞書を更新するかどうかを指定するブール値(boolean)。trueに設定すると、収集のパフォーマンスが低下します。したがって、複数のバルク・ロード収集を実行する場合には、最後の収集ジョブを除くすべてのジョブでこれをfalseに設定する必要があります。
socketTimeoutMillis ミリ秒で表したDgraphへの接続のタイムアウト(int)。
errorCallback ErrorCallbackオブジェクト。
finishedCallback FinishedCallbackオブジェクト。
abortCallback AbortCallbackオブジェクト。
statusCallback StatusCallbackオブジェクト。
サンプル・プログラムでは、BulkIngesterパラメータを次のように構成します。
_host = host;
_port = port;
BulkLoadCallback blcb = new BulkLoadCallback();

SocketFactory sf = ssl ? SocketFactory.getDefault() : SSLSocketFactory.getDefault();
boolean doFinalMerge = false;
boolean doUpdateSpellingDictionary = false;
long socketTimeout = TimeUnit.SECONDS.toMillis(90);
String transactionId = null;
Dgraphへの接続を設定する呼出しは次のとおりです。
_bi = new BulkIngester(_host, 
                       _port, 
                        sf, 
                        transactionId, 
                        doFinalMerge, 
                        doUpdateSpellingDictionary, 
                        socketTimeout, 
                        blcb, 
                        blcb, 
                        blcb, 
                        blcb);

収集の開始と終了

クライアント・プログラムでEndecaデータ・ドメインへの接続を確立したら、収集プロセスでは次のBulkIngesterメソッドをこの順序で使用する必要があります。
  1. begin()メソッドで収集プロセスを開始します。
  2. sendRecord()を繰り返し呼び出して、Recordオブジェクトを実際にデータ・ドメインに送信します。
  3. endIngest()メソッドで収集プロセスを終了します。
サンプル・プログラムのコードを次に示します(ここでは、1件のレコードのみを収集)。
public BulkLoad(String host, int port, boolean ssl) throws IOException {
  ...
  _bi = new BulkIngester(_host, _port, sf, transactionId, doFinalMerge, doUpdateSpellingDictionary, socketTimeout, blcb, blcb, blcb, blcb);
  _bi.begin();
}

public void addRecord(int val, String words, boolean square, boolean prime) throws IOException {
		Data.Record.Builder recordBuilder = Data.Record.newBuilder();
		Data.Assignment.Builder assignmentBuilder = Data.Assignment.newBuilder();
  ...
  Data.Record r = recordBuilder.build();
  _bi.sendRecord(r);
}

public void done() throws IOException, InterruptedException {
  _bi.endIngest();
}

サンプル・プログラムでは使用しませんが、requestStatusUpdate()メソッドを使用すると、収集操作のステータスを取得できます。