一括格納操作
一括格納操作を使用すると、特別な目的のストリームで提供されたレコードをストアにロードできます。
エントリの一括ロードは、ハードウェア・リソースを効率的に使用するように最適化されています。結果として、この操作は、単一の格納APIと比較すると、より高いスループットを実現できます。
異なるストリームに含まれる重複エントリについて、一括格納操作の動作は定義されていません。重複エントリが単一のストリーム内に存在する場合、最初のエントリが挿入され(まだ存在しない場合)、2番目以降のエントリではEntryStream.keyExists(E)
メソッドが呼び出されます。重複がストリームをまたいで存在する場合、競合で優先された最初のエントリが挿入され、後続の重複ではEntryStream.keyExists(E)
が呼び出されます。
一括格納を使用するには、一括格納を提供するKVStore.put()
メソッドのいずれかを使用します。これらは、一連のストリームを受け取ってデータを一括ロードします。各ストリーム内の行が異なる表に関連付けられている場合があります。
これらのメソッドを使用する場合、オプションでBulkWriteOptions
クラス・インスタンスを指定することもできます。これにより、一括格納操作を構成する永続性、タイムアウトおよびタイムアウト単位を指定できます。
たとえば、3つの入力ストリームで1000のキー/値ペアをロードするとします。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import oracle.kv.BulkWriteOptions;
import oracle.kv.EntryStream;
import oracle.kv.FaultException;
import oracle.kv.KVStore;
import oracle.kv.KVStoreConfig;
import oracle.kv.KVStoreFactory;
import oracle.kv.Key;
import oracle.kv.KeyValue;
import oracle.kv.Value;
...
// KVStore handle creation is omitted for brevity
...
Integer streamParallelism = 3;
Integer perShardParallelism = 3;
Integer heapPercent = 30;
// In this case, sets the amount of key/value pairs to load
int nLoad = 1000;
BulkWriteOptions bulkWriteOptions =
new BulkWriteOptions(null, 0, null);
// Set the number of streams. The default is 1 stream.
bulkWriteOptions.setStreamParallelism(streamParallelism);
// Set the number of writer threads per shard.
// The default is 3 writer threads.
bulkWriteOptions.setPerShardParallelism(perShardParallelism);
// Set the percentage of max memory used for bulk put.
// The default is 40 percent.
bulkWriteOptions.setBulkHeapPercent(heapPercent);
final List<EntryStream<KeyValue>> streams =
new ArrayList<EntryStream<KeyValue>>(streamParallelism);
final int num = (nLoad + (streamParallelism - 1)) /streamParallelism;
for (int i = 0; i < streamParallelism; i++) {
final int min = num * i;
final int max = Math.min((min + num) , nLoad);
streams.add(new LoadKVStream("Stream" + i, i, min, max));
}
store.put(streams, bulkWriteOptions);
long total = 0;
long keyExists = 0;
for (EntryStream<KeyValue> stream: streams) {
total += ((LoadKVStream)stream).getCount();
keyExists += ((LoadKVStream)stream).getKeyExistsCount();
}
final String fmt = "Loaded %,d records, %,d pre-existing.";
System.err.println(String.format(fmt, total, keyExists));
}
ストリーム・インタフェースを実装して、バッチ処理され、ストアにロードされるデータを提供する必要があります。エントリは、EntryStream
インスタンスのリストによって提供されます。各ストリームは順番に読み取られます。つまり、次の操作が発行される前に、各EntryStream.getNext()
は終了できます。ロード操作は、通常、BulkWriteOptions.getStreamParallelism()
の設定に従って、これらのストリームから並列に読み取ります。
private class LoadKVStream implements EntryStream<KeyValue> {
private final String name;
private final long index;
private final long max;
private final long min;
private long id;
private long count;
private final AtomicLong keyExistsCount;
LoadKVStream(String name, long index, long min, long max) {
this.index = index;
this.max = max;
this.min = min;
this.name = name;
id = min;
count = 0;
keyExistsCount = new AtomicLong();
}
@Override
public String name() {
return name + "-" + index + ": " + min + "~" + max;
}
@Override
public KeyValue getNext() {
if (id++ == max) {
return null;
}
Key key = Key.fromString("/bulk/" + id);
Value value = Value.createValue(("value"+ id).getBytes());
KeyValue kv = new KeyValue(key, value);
count++;
return kv;
}
@Override
public void completed() {
System.err.println(name() + " completed, loaded: " + count);
}
@Override
public void keyExists(KeyValue entry) {
keyExistsCount.incrementAndGet();
}
@Override
public void catchException
(RuntimeException exception, KeyValue entry) {
System.err.println(name() + " catch exception: " +
exception.getMessage() + ": " +
entry.toString());
throw exception;
}
public long getCount() {
return count;
}
public long getKeyExistsCount() {
return keyExistsCount.get();
}
}