Sending records to the data domain

The BulkIngester class is the primary entry point for the client-side Bulk Load Interface for loading data into an Endeca data domain.

BulkIngester makes a socket connection to the Dgraph of an Endeca data domain and spawns a thread to handle replies. Its sendRecord() method sends the provided record over the wire to the Dgraph.

Clients to this interface must:
  1. Define classes that implement the four callback interfaces (ErrorCallback, FinishedCallback, AbortCallback, and StatusCallback), and perform the appropriate action when their handler methods are called (which happens in the response thread).
  2. Instantiate a BulkIngester object with the appropriate parameters required by the constructor.
  3. Call the begin() method to start the response thread. If this is not called, an IOException will be thrown.
  4. Call sendRecord() repeatedly to send Record objects to the Dgraph.
  5. When finished sending records, call endIngest() to terminate the response thread and close the socket.

Note that periodically you may want to call the requestStatusUpdate() method to poll the Dgraph on its current status.

Defining callback interfaces

The BulkIngester constructor requires the four callback interfaces as parameters:
  • ErrorCallback handles error conditions. The handleError() method is called when errors occur during the ingest operation.
  • FinishedCallback is called when the Dgraph reports that it has finished with the ingestion. No further records will be accepted without calling begin() again.
  • AbortCallback handles abort conditions. An abort condition can happen either in BulkIngester or on the Dgraph.
  • StatusCallback handles status updates, including the number of successfully ingested records and the number of rejected records.
ErrorCallback is especially useful, as it reports the reason that a record was rejected. An example of defining this callback is:
ErrorCallback errorCallback = new ErrorCallback() {
    void handleError(String reason, Record reject) {
        System.out.println("Record "
                + reject.getSpec().getName()
                + " rejected: " + reason);
    }
};

Instantiating a BulkIngester object

The BulkIngester constructor requires eleven parameters, in this order:
Parameter Meaning
host The name (a String) of the machine on which the Dgraph is running. The machine name can be obtained by using the allocateBulkLoadPort operation of the Manage Web Service.
port The bulk load port (an int) of the Dgraph. The port can be obtained by using the allocateBulkLoadPort operation of the Manage Web Service.
socketFactory A SocketFactory to create sockets for the bulk ingest operations. The sample program uses SocketFactory.getDefault() for non-SSL connections or SSLSocketFactory.getDefault() for SSL connections. Both methods are from the imported javax.net.SocketFactory and javax.net.ssl.SSLSocketFactory classes.
transactionId The ID (a String) of an outer transaction (if one has been started by the Transaction Web Service). Use null if an outer transaction is not being used.
doFinalMerge A boolean that specifies whether a merge is forced immediately after ingest. Setting it to true degrades ingest performance. Therefore, if you are performing multiple bulk load ingests, you should set this to false for all but the last ingest job.
doUpdateDictionary A boolean that specifies whether the aspell dictionary is updated immediately after ingest. Setting it to true degrades ingest performance. Therefore, if you are performing multiple bulk load ingests, you should set this to false for all but the last ingest job.
socketTimeoutMillis The timeout in milliseconds (an int) for a connection to the Dgraph.
errorCallback The ErrorCallback object.
finishedCallback The FinishedCallback object.
abortCallback The AbortCallback object.
statusCallback The StatusCallback object.
The sample program constructs the BulkIngester parameters follows:
_host = host;
_port = port;
BulkLoadCallback blcb = new BulkLoadCallback();

SocketFactory sf = ssl ? SocketFactory.getDefault() : SSLSocketFactory.getDefault();
boolean doFinalMerge = false;
boolean doUpdateSpellingDictionary = false;
long socketTimeout = TimeUnit.SECONDS.toMillis(90);
String transactionId = null;
The call to set up a connection to the Dgraph is:
_bi = new BulkIngester(_host, 
                       _port, 
                        sf, 
                        transactionId, 
                        doFinalMerge, 
                        doUpdateSpellingDictionary, 
                        socketTimeout, 
                        blcb, 
                        blcb, 
                        blcb, 
                        blcb);

Beginning and ending the ingest

After the client program has made a connection to the Endeca data domain, the ingest process requires the use of these BulkIngester methods in this order:
  1. The begin() method starts the ingest process.
  2. A series of sendRecord() calls actually sends the Record objects to the data domain.
  3. The endIngest() method terminates the ingest process.
The sample program, which ingests only one record, is coded as follows:
public BulkLoad(String host, int port, boolean ssl) throws IOException {
  ...
  _bi = new BulkIngester(_host, _port, sf, transactionId, doFinalMerge, doUpdateSpellingDictionary, socketTimeout, blcb, blcb, blcb, blcb);
  _bi.begin();
}

public void addRecord(int val, String words, boolean square, boolean prime) throws IOException {
		Data.Record.Builder recordBuilder = Data.Record.newBuilder();
		Data.Assignment.Builder assignmentBuilder = Data.Assignment.newBuilder();
  ...
  Data.Record r = recordBuilder.build();
  _bi.sendRecord(r);
}

public void done() throws IOException, InterruptedException {
  _bi.endIngest();
}

Although not used in the sample program, the requestStatusUpdate() method can be used to retrieve the status of the ingest operation.