バルク・ロード・サンプル・プログラム

次のサンプル・プログラムを使用して、バルク・ロードのクラスとメソッドを説明します。

ソース・ファイルはBulkLoad.javaで、$ENDECA_HOME/aps/examplesディレクトリにあります。

package com.oracle.endeca.server.examples.ops;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;

import com.endeca.BulkLoad.AbortCallback;
import com.endeca.BulkLoad.BulkIngester;
import com.endeca.BulkLoad.ErrorCallback;
import com.endeca.BulkLoad.FinishedCallback;
import com.endeca.BulkLoad.StatusCallback;
import com.endeca.BulkLoad.Msg.Data;
import com.endeca.BulkLoad.Msg.Data.Record;

/**
 * This class provides an example of working with the bulk loading interface.
 */
public class BulkLoad {

	/**
	 * This callback is provided to the bulk loader for providing feedback to the client process.
	 */
	public static class BulkLoadCallback implements AbortCallback, ErrorCallback, FinishedCallback, StatusCallback
	{
		@Override
		public void handleAbort(String reason) {
			System.out.println("Abort: " + reason);
		}

		@Override
		public void handleError(String reason, Record reject) {
			System.out.println("Error: " + reason);
		}

		@Override
		public void handleStatus(long recordsAdded, long recordsQueued, long recordsRejected, String state) {
			System.out.println("Status: " + recordsAdded);
		}

		@Override
		public void handleFinish() {
			System.out.println("Finished!");
		}
	}

	/**
	 * Set up a basic bulk ingest connection.
	 */
	public BulkLoad(String host, int port, boolean ssl) throws IOException {
		_host = host;
		_port = port;
		BulkLoadCallback blcb = new BulkLoadCallback();

		SocketFactory sf = ssl ? SSLSocketFactory.getDefault() : SocketFactory.getDefault();
		boolean doFinalMerge = false;
		boolean doUpdateSpellingDictionary = false;
		long socketTimeout = TimeUnit.SECONDS.toMillis(90);
		String transactionId = null;

		// don't do a final merge after finishing ingest or update the spelling dictionary; timeout after 90s
		_bi = new BulkIngester(_host, _port, sf, transactionId, doFinalMerge, doUpdateSpellingDictionary, socketTimeout, blcb, blcb, blcb, blcb);
		_bi.begin();
	}

	/**
	 * Add a new record with a handful of given assignments.
	 */
	public void addRecord(int val, String words, boolean square, boolean prime) throws IOException {
		Data.Record.Builder recordBuilder = Data.Record.newBuilder();
		Data.Assignment.Builder assignmentBuilder = Data.Assignment.newBuilder();

		assignmentBuilder.clear();
		assignmentBuilder.setName("Value").setDataType(Data.Assignment.DataType.INT).setInt32Value(val);
		recordBuilder.setSpec(assignmentBuilder.build());

		assignmentBuilder.clear();
		assignmentBuilder.setName("Words").setDataType(Data.Assignment.DataType.STRING).setStringValue(words);
		recordBuilder.addAssignments(assignmentBuilder.build());

		assignmentBuilder.clear();
		assignmentBuilder.setName("Square").setDataType(Data.Assignment.DataType.BOOLEAN).setBoolValue(square);
		recordBuilder.addAssignments(assignmentBuilder.build());

		assignmentBuilder.clear();
		assignmentBuilder.setName("Prime").setDataType(Data.Assignment.DataType.BOOLEAN).setBoolValue(prime);
		recordBuilder.addAssignments(assignmentBuilder.build());

		Data.Record r = recordBuilder.build();
		_bi.sendRecord(r);
	}

	public void done() throws IOException, InterruptedException {
		_bi.endIngest();
	}

	private String _host;
	private int _port;
	private BulkIngester _bi;

}