一括格納操作

一括格納操作を使用すると、特別な目的のストリームで提供されたレコードをストアにロードできます。

エントリの一括ロードは、ハードウェア・リソースを効率的に使用するように最適化されています。結果として、この操作は、単一の格納APIと比較すると、より高いスループットを実現できます。

異なるストリームに含まれる重複エントリについて、一括格納操作の動作は定義されていません。重複エントリが単一のストリーム内に存在する場合、最初のエントリが挿入され(まだ存在しない場合)、2番目以降のエントリではEntryStream.keyExists(E)メソッドが呼び出されます。重複がストリームをまたいで存在する場合、競合で優先された最初のエントリが挿入され、後続の重複ではEntryStream.keyExists(E)が呼び出されます。

一括格納を使用するには、一括格納を提供するKVStore.put()メソッドのいずれかを使用します。これらは、一連のストリームを受け取ってデータを一括ロードします。各ストリーム内の行が異なる表に関連付けられている場合があります。

これらのメソッドを使用する場合、オプションでBulkWriteOptionsクラス・インスタンスを指定することもできます。これにより、一括格納操作を構成する永続性、タイムアウトおよびタイムアウト単位を指定できます。

たとえば、3つの入力ストリームで1000のキー/値ペアをロードするとします。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    import oracle.kv.BulkWriteOptions;
    import oracle.kv.EntryStream;
    import oracle.kv.FaultException;
    import oracle.kv.KVStore;
    import oracle.kv.KVStoreConfig;
    import oracle.kv.KVStoreFactory;
    import oracle.kv.Key;
    import oracle.kv.KeyValue;
    import oracle.kv.Value;

    ...

    // KVStore handle creation is omitted for brevity

    ...
    Integer streamParallelism = 3;
    Integer perShardParallelism = 3;
    Integer heapPercent = 30;
    // In this case, sets the amount of key/value pairs to load
    int nLoad = 1000;

    BulkWriteOptions bulkWriteOptions =
                               new BulkWriteOptions(null, 0, null);
    // Set the number of streams. The default is 1 stream.
    bulkWriteOptions.setStreamParallelism(streamParallelism);
    // Set the number of writer threads per shard.
    // The default is 3 writer threads.
    bulkWriteOptions.setPerShardParallelism(perShardParallelism);
    // Set the percentage of max memory used for bulk put.
    // The default is 40 percent.
    bulkWriteOptions.setBulkHeapPercent(heapPercent);

    final List<EntryStream<KeyValue>> streams =
    new ArrayList<EntryStream<KeyValue>>(streamParallelism);
    final int num = (nLoad + (streamParallelism - 1)) /streamParallelism;
    for (int i = 0; i < streamParallelism; i++) {
     final int min = num * i;
     final int max = Math.min((min + num) , nLoad);
     streams.add(new LoadKVStream("Stream" + i, i, min, max));
    }

    store.put(streams, bulkWriteOptions);

    long total = 0;
    long keyExists = 0;
    for (EntryStream<KeyValue> stream: streams) {
     total += ((LoadKVStream)stream).getCount();
     keyExists += ((LoadKVStream)stream).getKeyExistsCount();
    }
    final String fmt = "Loaded %,d records, %,d pre-existing.";
    System.err.println(String.format(fmt, total, keyExists));
} 

ストリーム・インタフェースを実装して、バッチ処理され、ストアにロードされるデータを提供する必要があります。エントリは、EntryStreamインスタンスのリストによって提供されます。各ストリームは順番に読み取られます。つまり、次の操作が発行される前に、各EntryStream.getNext()は終了できます。ロード操作は、通常、BulkWriteOptions.getStreamParallelism()の設定に従って、これらのストリームから並列に読み取ります。

private class LoadKVStream implements EntryStream<KeyValue> {

    private final String name;
    private final long index;
    private final long max;
    private final long min;
    private long id;
    private long count;
    private final AtomicLong keyExistsCount;

    LoadKVStream(String name, long index, long min, long max) {
        this.index = index;
        this.max = max;
        this.min = min;
        this.name = name;
        id = min;
        count = 0;
        keyExistsCount = new AtomicLong();
    }

    @Override
    public String name() {
        return name + "-" + index + ": " + min + "~" + max;
    }

    @Override
    public KeyValue getNext() {
        if (id++ == max) {
            return null;
        }
        Key key = Key.fromString("/bulk/" + id);
        Value value = Value.createValue(("value"+ id).getBytes());
        KeyValue kv = new KeyValue(key, value);
        count++;
        return kv;
    }

    @Override
    public void completed() {
        System.err.println(name() + " completed, loaded: " + count);
    }

    @Override
    public void keyExists(KeyValue entry) {
        keyExistsCount.incrementAndGet();
    }

    @Override
    public void catchException
    (RuntimeException exception, KeyValue entry) {
        System.err.println(name() + " catch exception: " +
                           exception.getMessage() + ": " +
                           entry.toString());
        throw exception;
    }

    public long getCount() {
        return count;
    }

    public long getKeyExistsCount() {
        return keyExistsCount.get();
    }
}