データ・ドメインへのレコードの送信

BulkIngesterクラスは、Endecaデータ・ドメインにデータをロードするための、クライアント側バルク・インタフェースのプライマリ・エントリ・ポイントです。

BulkIngesterでは、Endecaデータ・ドメインにソケット接続を確立して、返信を処理するスレッドを作成します。そのsendRecord()メソッドによって、指定したレコードがワイヤ経由でデータ・ドメインに送信されます。

このインタフェースのクライアントは、次の処理を行う必要があります。
  1. 4つのコールバック・インタフェース(ErrorCallbackFinishedCallbackAbortCallbackおよびStatusCallback)を実装し、それらのハンドラ・メソッドが呼び出されたときに(レスポンス・スレッドで発生)適切な処理を実行するクラスを定義します。
  2. コンストラクタが要求した適切なパラメータを指定して、BulkIngesterオブジェクトをインスタンス化します。
  3. begin()メソッドを呼び出して、レスポンス・スレッドを開始します。これが呼び出されないと、IOExceptionがスローされます。
  4. sendRecord()を繰り返し呼び出して、RecordオブジェクトをEndecaデータ・ドメインに送信します。
  5. レコードの送信が終了したら、endIngest()を呼び出してレスポンス・スレッドを終了し、ソケットを閉じます。

コールバック・インタフェースの定義

BulkIngesterコンストラクタには、パラメータとして4つのコールバック・インタフェースが必要です。
  • ErrorCallbackではエラー状態を処理します。収集操作中にエラーが発生すると、handleError()メソッドが呼び出されます。
  • Dgraphによって収集終了が報告されると、FinishedCallbackが呼び出されます。その後もレコードは受け入れるには、begin()を再度呼び出す必要があります。
  • AbortCallbackでは中断状態を処理します。中断状態は、BulkIngesterまたはDgraphのいずれかで発生する可能性があります。
  • StatusCallbackではステータス更新(正常に収集されたレコードの数と拒否されたレコードの数を含む)を処理します。
ErrorCallbackはレコードが拒否された理由を報告するので、特に役立ちます。サンプル・プログラムでは、このコールバックを次のように定義しています。
ErrorCallback errorCallback = new ErrorCallback() {
    void handleError(String reason, Record reject) {
        System.out.println("Record "
                + reject.getSpec().getName()
                + " rejected: " + reason);
    }
};

BulkIngesterオブジェクトのインスタンス化

BulkIngesterコンストラクタには、次の順に10個のパラメータが必要です。
  • host - Endecaデータ・ドメインが実行されているマシンの名前(String)。これは、管理WebサービスのallocateBulkLoadPort操作を使用して取得できます。
  • port - Endecaデータ・ドメインのバルク・ロード・ポート(int)。これは、管理WebサービスのallocateBulkLoadPort操作を使用して取得できます。
  • useSSL - 接続にSSLを使用するかどうかを指定するブール値(boolean)。
  • doFinalMerge - 収集後ただちにマージを実施するかどうかを指定するブール値(boolean)。
  • doUpdateDictionary - 収集後ただちにaspell辞書を更新するかどうかを指定するブール値( boolean)。
  • timeout - ミリ秒で表したEndecaデータ・ドメインへの接続タイムアウト(int)。
  • errorCallback - ErrorCallbackオブジェクト。
  • finishedCallback - FinishedCallbackオブジェクト。
  • abortCallback - AbortCallbackオブジェクト。
  • statusCallback - StatusCallbackオブジェクト。
サンプル・プログラムでは、BulkIngesterを次のように確立します。
BulkIngester ingester("endecaserver.example.com",
        1234,        // port
        false,       // useSSL
        true,        // doFinalMerge
        true,        // doUpdateDictionary
        90000        // timeout in ms
        errorCallback,
        finishedCallback,
        abortCallback,
        statusCallback);

収集の開始と終了

クライアント・プログラムでEndecaデータ・ドメインへの接続を確立したら、収集プロセスでは次のBulkIngesterメソッドをこの順序で使用する必要があります。
  1. begin()メソッドで収集プロセスを開始します。
  2. sendRecord()を繰り返し呼び出して、Recordオブジェクトを実際にデータ・ドメインに送信します。
  3. endIngest()メソッドで収集プロセスを終了します。
サンプル・プログラムのコードを次に示します(ここでは、2件のレコードのみを収集)。
Record widget = makeProductRecord("Widget", 12, 99.95);
Record thing = makeProductRecord("Thing", 110, 3.14); 
        
ingester.begin();
ingester.sendRecord(widget);
ingester.requestStatusUpdate();
ingester.sendRecord(thing);    
ingester.endIngest();

requestStatusUpdate()メソッドを使用して、収集操作のステータスを取得していることに注意してください。