一括格納操作

一括格納操作を使用すると、特別な目的のストリームで提供されたレコードをストアにロードできます。

エントリの一括ロードは、ハードウェア・リソースを効率的に使用するように最適化されています。結果として、この操作は、単一の格納APIと比較すると、より高いスループットを実現できます。

異なるストリームに含まれる重複エントリについて、一括格納操作の動作は定義されていません。重複エントリが単一のストリーム内に存在する場合、最初のエントリが挿入され(まだ存在しない場合)、2番目以降のエントリではEntryStream.keyExists(E)メソッドが呼び出されます。重複がストリームをまたいで存在する場合、競合で優先された最初のエントリが挿入され、後続の重複ではEntryStream.keyExists(E)が呼び出されます。

一括格納を使用するには、一括格納を提供するTableAPI.put()メソッドのいずれかを使用します。これらは、一連のストリームを受け取ってデータを一括ロードします。各ストリーム内の行が異なる表に関連付けられている場合があります。

これらのメソッドを使用する場合、オプションでBulkWriteOptionsクラス・インスタンスを指定することもできます。これにより、一括格納操作を構成する永続性、タイムアウトおよびタイムアウト単位を指定できます。

たとえば、3つの入力ストリームで1000行をロードするとします。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    import oracle.kv.BulkWriteOptions;
    import oracle.kv.EntryStream;
    import oracle.kv.FaultException;
    import oracle.kv.KVStore;
    import oracle.kv.KVStoreConfig;
    import oracle.kv.KVStoreFactory;
    import oracle.kv.table.Row;
    import oracle.kv.table.Table;
    import oracle.kv.table.TableAPI;

    ...

    // KVStore handle creation is omitted for brevity

    ...
    Integer streamParallelism = 3;
    Integer perShardParallelism = 3;
    Integer heapPercent = 30;
    // In this case, sets the amount of 1000 rows to load
    int nLoad = 1000;

    BulkWriteOptions bulkWriteOptions =
                              new BulkWriteOptions(null, 0, null);
    // Set the number of streams. The default is 1 stream.
    bulkWriteOptions.setStreamParallelism(streamParallelism);
    // Set the number of writer threads per shard.
    // The default is 3 writer threads.
    bulkWriteOptions.setPerShardParallelism(perShardParallelism);
    // Set the percentage of max memory used for bulk put.
    // The default is 40 percent.
    bulkWriteOptions.setBulkHeapPercent(heapPercent);

    System.err.println("Loading rows to " + TABLE_NAME + "...");

    final List<EntryStream<Row>> streams =
        new ArrayList<EntryStream<Row>>(streamParallelism);
    final int num = (nLoad + (streamParallelism - 1)) / streamParallelism;
    for (int i = 0; i < streamParallelism; i++) {
        final int min = num * i;
        final int max = Math.min((min + num) , nLoad);
        streams.add(new LoadRowStream(i, min, max));
    }

    final TableAPI tableImpl = store.getTableAPI();
    tableImpl.put(streams, bulkWriteOptions);

    long total = 0;
    long keyExists = 0;
    for (EntryStream<Row> stream: streams) {
        total += ((LoadRowStream)stream).getCount();
        keyExists += ((LoadRowStream)stream).getKeyExistsCount();
    }
    final String fmt = "Loaded %,d rows to %s, %,d pre-existing.";
    System.err.println(String.format(fmt, total, TABLE_NAME, keyExists)); 

ストリーム・インタフェースを実装して、バッチ処理され、ストアにロードされるデータを提供する必要があります。エントリは、EntryStreamインスタンスのリストによって提供されます。各ストリームは順番に読み取られます。つまり、次の操作が発行される前に、各EntryStream.getNext()は終了できます。ロード操作は、通常、BulkWriteOptions.getStreamParallelism()の設定に従って、これらのストリームから並列に読み取ります。

private class LoadRowStream implements EntryStream<Row> {

    private final String name;
    private final long index;
    private final long max;
    private final long min;
    private long id;
    private long count;
    private final AtomicLong keyExistsCount;

    LoadRowStream(String name, long index, long min, long max) {
        this.index = index;
        this.max = max;
        this.min = min;
        this.name = name;
        id = min;
        count = 0;
        keyExistsCount = new AtomicLong();
    }

    @Override
    public String name() {
        return name + "-" + index + ": " + min + "~" + max;
    }

    @Override
    public Row getNext() {
        if (id++ == max) {
            return null;
        }
        final Row row = userTable.createRow();
        row.put("id", id);
        row.put("name", "name" + id);
        row.put("age", 20 + id % 50);
        count++;
        return row;
    }

    @Override
    public void completed() {
        System.err.println(name() + " completed, loaded: " + count);
    }

    @Override
    public void keyExists(Row entry) {
        keyExistsCount.incrementAndGet();
    }

    @Override
    public void catchException(RuntimeException exception, Row entry) {
        System.err.println(name() + " catch exception: " +
                           exception.getMessage() + ": " +
                           entry.toJsonString(false));
        throw exception;
    }

    public long getCount() {
        return count;
    }

    public long getKeyExistsCount() {
        return keyExistsCount.get();
    }
}