BulkIngesterクラスは、Endecaデータ・ドメインにデータをロードするための、クライアント側バルク・インタフェースのプライマリ・エントリ・ポイントです。
BulkIngesterでは、Endecaデータ・ドメインのDgraphに対するソケット接続を確立して、返信を処理するスレッドを作成します。そのsendRecord()メソッドによって、指定したレコードがワイヤ経由でDgraphに送信されます。
定期的にrequestStatusUpdate()メソッドを呼び出し、現在のステータスについてDgraphをポーリングすることをお薦めします。
ErrorCallback errorCallback = new ErrorCallback() { void handleError(String reason, Record reject) { System.out.println("Record " + reject.getSpec().getName() + " rejected: " + reason); } };
パラメータ | 意味 |
---|---|
host | Dgraphが実行されているマシンの名前(String)。マシン名は、管理WebサービスのallocateBulkLoadPort操作を使用して取得できます。 |
port | Dgraphのバルク・ロード・ポート(int)。ポートは、管理WebサービスのallocateBulkLoadPort操作を使用して取得できます。 |
socketFactory | バルク収集操作用のソケットを作成するSocketFactory。サンプル・プログラムでは、非SSL接続にSocketFactory.getDefault()を、SSL接続にSSLSocketFactory.getDefault()を使用します。どちらのメソッドも、インポートされたjavax.net.SocketFactoryクラスおよびjavax.net.ssl.SSLSocketFactoryクラスに属します。 |
transactionId | 外部トランザクション(トランザクションWebサービスにより開始された場合)のID (String)。外部トランザクションを使用していない場合は、nullを使用します。 |
doFinalMerge | 収集後ただちにマージを実施するかどうかを指定するブール値(boolean)。trueに設定すると、収集のパフォーマンスが低下します。したがって、複数のバルク・ロード収集を実行する場合には、最後の収集ジョブを除くすべてのジョブでこれをfalseに設定する必要があります。 |
doUpdateDictionary | 収集後ただちにaspell辞書を更新するかどうかを指定するブール値(boolean)。trueに設定すると、収集のパフォーマンスが低下します。したがって、複数のバルク・ロード収集を実行する場合には、最後の収集ジョブを除くすべてのジョブでこれをfalseに設定する必要があります。 |
socketTimeoutMillis | ミリ秒で表したDgraphへの接続のタイムアウト(int)。 |
errorCallback | ErrorCallbackオブジェクト。 |
finishedCallback | FinishedCallbackオブジェクト。 |
abortCallback | AbortCallbackオブジェクト。 |
statusCallback | StatusCallbackオブジェクト。 |
_host = host; _port = port; BulkLoadCallback blcb = new BulkLoadCallback(); SocketFactory sf = ssl ? SocketFactory.getDefault() : SSLSocketFactory.getDefault(); boolean doFinalMerge = false; boolean doUpdateSpellingDictionary = false; long socketTimeout = TimeUnit.SECONDS.toMillis(90); String transactionId = null;
_bi = new BulkIngester(_host, _port, sf, transactionId, doFinalMerge, doUpdateSpellingDictionary, socketTimeout, blcb, blcb, blcb, blcb);
public BulkLoad(String host, int port, boolean ssl) throws IOException { ... _bi = new BulkIngester(_host, _port, sf, transactionId, doFinalMerge, doUpdateSpellingDictionary, socketTimeout, blcb, blcb, blcb, blcb); _bi.begin(); } public void addRecord(int val, String words, boolean square, boolean prime) throws IOException { Data.Record.Builder recordBuilder = Data.Record.newBuilder(); Data.Assignment.Builder assignmentBuilder = Data.Assignment.newBuilder(); ... Data.Record r = recordBuilder.build(); _bi.sendRecord(r); } public void done() throws IOException, InterruptedException { _bi.endIngest(); }
サンプル・プログラムでは使用しませんが、requestStatusUpdate()メソッドを使用すると、収集操作のステータスを取得できます。