BulkIngesterクラスは、Endecaデータ・ドメインにデータをロードするための、クライアント側バルク・インタフェースのプライマリ・エントリ・ポイントです。
BulkIngesterでは、Endecaデータ・ドメインにソケット接続を確立して、返信を処理するスレッドを作成します。そのsendRecord()メソッドによって、指定したレコードがワイヤ経由でデータ・ドメインに送信されます。
このインタフェースのクライアントは、次の処理を行う必要があります。
- 4つのコールバック・インタフェース(ErrorCallback、FinishedCallback、AbortCallbackおよびStatusCallback)を実装し、それらのハンドラ・メソッドが呼び出されたときに(レスポンス・スレッドで発生)適切な処理を実行するクラスを定義します。
- コンストラクタが要求した適切なパラメータを指定して、BulkIngesterオブジェクトをインスタンス化します。
- begin()メソッドを呼び出して、レスポンス・スレッドを開始します。これが呼び出されないと、IOExceptionがスローされます。
- sendRecord()を繰り返し呼び出して、RecordオブジェクトをEndecaデータ・ドメインに送信します。
- レコードの送信が終了したら、endIngest()を呼び出してレスポンス・スレッドを終了し、ソケットを閉じます。
コールバック・インタフェースの定義
BulkIngesterコンストラクタには、パラメータとして4つのコールバック・インタフェースが必要です。
- ErrorCallbackではエラー状態を処理します。収集操作中にエラーが発生すると、handleError()メソッドが呼び出されます。
- Dgraphによって収集終了が報告されると、FinishedCallbackが呼び出されます。その後もレコードは受け入れるには、begin()を再度呼び出す必要があります。
- AbortCallbackでは中断状態を処理します。中断状態は、BulkIngesterまたはDgraphのいずれかで発生する可能性があります。
- StatusCallbackではステータス更新(正常に収集されたレコードの数と拒否されたレコードの数を含む)を処理します。
ErrorCallbackはレコードが拒否された理由を報告するので、特に役立ちます。サンプル・プログラムでは、このコールバックを次のように定義しています。
ErrorCallback errorCallback = new ErrorCallback() {
void handleError(String reason, Record reject) {
System.out.println("Record "
+ reject.getSpec().getName()
+ " rejected: " + reason);
}
};
BulkIngesterオブジェクトのインスタンス化
BulkIngesterコンストラクタには、次の順に10個のパラメータが必要です。
- host - Endecaデータ・ドメインが実行されているマシンの名前(String)。これは、管理WebサービスのallocateBulkLoadPort操作を使用して取得できます。
- port - Endecaデータ・ドメインのバルク・ロード・ポート(int)。これは、管理WebサービスのallocateBulkLoadPort操作を使用して取得できます。
- useSSL - 接続にSSLを使用するかどうかを指定するブール値(boolean)。
- doFinalMerge - 収集後ただちにマージを実施するかどうかを指定するブール値(boolean)。
- doUpdateDictionary - 収集後ただちにaspell辞書を更新するかどうかを指定するブール値( boolean)。
- timeout - ミリ秒で表したEndecaデータ・ドメインへの接続タイムアウト(int)。
- errorCallback - ErrorCallbackオブジェクト。
- finishedCallback - FinishedCallbackオブジェクト。
- abortCallback - AbortCallbackオブジェクト。
- statusCallback - StatusCallbackオブジェクト。
サンプル・プログラムでは、
BulkIngesterを次のように確立します。
BulkIngester ingester("endecaserver.example.com",
1234, // port
false, // useSSL
true, // doFinalMerge
true, // doUpdateDictionary
90000 // timeout in ms
errorCallback,
finishedCallback,
abortCallback,
statusCallback);
収集の開始と終了
クライアント・プログラムでEndecaデータ・ドメインへの接続を確立したら、収集プロセスでは次の
BulkIngesterメソッドをこの順序で使用する必要があります。
- begin()メソッドで収集プロセスを開始します。
- sendRecord()を繰り返し呼び出して、Recordオブジェクトを実際にデータ・ドメインに送信します。
- endIngest()メソッドで収集プロセスを終了します。
サンプル・プログラムのコードを次に示します(ここでは、2件のレコードのみを収集)。
Record widget = makeProductRecord("Widget", 12, 99.95);
Record thing = makeProductRecord("Thing", 110, 3.14);
ingester.begin();
ingester.sendRecord(widget);
ingester.requestStatusUpdate();
ingester.sendRecord(thing);
ingester.endIngest();
requestStatusUpdate()メソッドを使用して、収集操作のステータスを取得していることに注意してください。