Bulk Put Operations

Bulk put operations allow you to load records supplied by special purpose streams into the store.

The bulk loading of the entries is optimized to make efficient use of hardware resources. As a result, this operation can achieve much higher throughput when compared with single put APIs.

The behavior of the bulk put operation with respect to duplicate entries contained in different streams is thus undefined. If the duplicate entries are just present in a single stream, then the first entry will be inserted (if it is not already present) and the second entry and subsequent entries will result in the invocation of EntryStream.keyExists(E) method. If duplicates exist across streams, then the first entry to win the race is inserted and subsequent duplicates will result in EntryStream.keyExists(E) being invoked on them.

To use bulk put, use one of the TableAPI.put() methods that provide bulk put. These accept a set of streams to bulk load data. The rows within each stream may be associated with different tables.

When using these methods, you can also optionally specify a BulkWriteOptions class instance which allows you to specify the durability, timeout, and timeout unit to configure the bulk put operation.

For example, suppose you are loading 1000 rows with 3 input streams:

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    import oracle.kv.BulkWriteOptions;
    import oracle.kv.EntryStream;
    import oracle.kv.FaultException;
    import oracle.kv.KVStore;
    import oracle.kv.KVStoreConfig;
    import oracle.kv.KVStoreFactory;
    import oracle.kv.table.Row;
    import oracle.kv.table.Table;
    import oracle.kv.table.TableAPI;

    ...

    // KVStore handle creation is omitted for brevity

    ...
    Integer streamParallelism = 3;
    Integer perShardParallelism = 3;
    Integer heapPercent = 30;
    // In this case, sets the amount of 1000 rows to load
    int nLoad = 1000;

    BulkWriteOptions bulkWriteOptions =
                              new BulkWriteOptions(null, 0, null);
    // Set the number of streams. The default is 1 stream.
    bulkWriteOptions.setStreamParallelism(streamParallelism);
    // Set the number of writer threads per shard.
    // The default is 3 writer threads.
    bulkWriteOptions.setPerShardParallelism(perShardParallelism);
    // Set the percentage of max memory used for bulk put.
    // The default is 40 percent.
    bulkWriteOptions.setBulkHeapPercent(heapPercent);

    System.err.println("Loading rows to " + TABLE_NAME + "...");

    final List<EntryStream<Row>> streams =
        new ArrayList<EntryStream<Row>>(streamParallelism);
    final int num = (nLoad + (streamParallelism - 1)) / streamParallelism;
    for (int i = 0; i < streamParallelism; i++) {
        final int min = num * i;
        final int max = Math.min((min + num) , nLoad);
        streams.add(new LoadRowStream(i, min, max));
    }

    final TableAPI tableImpl = store.getTableAPI();
    tableImpl.put(streams, bulkWriteOptions);

    long total = 0;
    long keyExists = 0;
    for (EntryStream<Row> stream: streams) {
        total += ((LoadRowStream)stream).getCount();
        keyExists += ((LoadRowStream)stream).getKeyExistsCount();
    }
    final String fmt = "Loaded %,d rows to %s, %,d pre-existing.";
    System.err.println(String.format(fmt, total, TABLE_NAME, keyExists)); 

You should implement the stream interface that supplies the data to be batched and loaded into the store. Entries are supplied by a list of EntryStream instances. Each stream is read sequentially, that is, each EntryStream.getNext() is allowed to finish before the next operation is issued. The load operation typically reads from these streams in parallel as determined by BulkWriteOptions.getStreamParallelism().

private class LoadRowStream implements EntryStream<Row> {

    private final String name;
    private final long index;
    private final long max;
    private final long min;
    private long id;
    private long count;
    private final AtomicLong keyExistsCount;

    LoadRowStream(String name, long index, long min, long max) {
        this.index = index;
        this.max = max;
        this.min = min;
        this.name = name;
        id = min;
        count = 0;
        keyExistsCount = new AtomicLong();
    }

    @Override
    public String name() {
        return name + "-" + index + ": " + min + "~" + max;
    }

    @Override
    public Row getNext() {
        if (id++ == max) {
            return null;
        }
        final Row row = userTable.createRow();
        row.put("id", id);
        row.put("name", "name" + id);
        row.put("age", 20 + id % 50);
        count++;
        return row;
    }

    @Override
    public void completed() {
        System.err.println(name() + " completed, loaded: " + count);
    }

    @Override
    public void keyExists(Row entry) {
        keyExistsCount.incrementAndGet();
    }

    @Override
    public void catchException(RuntimeException exception, Row entry) {
        System.err.println(name() + " catch exception: " +
                           exception.getMessage() + ": " +
                           entry.toJsonString(false));
        throw exception;
    }

    public long getCount() {
        return count;
    }

    public long getKeyExistsCount() {
        return keyExistsCount.get();
    }
}