一括格納操作
一括格納操作を使用すると、特別な目的のストリームで提供されたレコードをストアにロードできます。
エントリの一括ロードは、ハードウェア・リソースを効率的に使用するように最適化されています。結果として、この操作は、単一の格納APIと比較すると、より高いスループットを実現できます。
異なるストリームに含まれる重複エントリについて、一括格納操作の動作は定義されていません。重複エントリが単一のストリーム内に存在する場合、最初のエントリが挿入され(まだ存在しない場合)、2番目以降のエントリではEntryStream.keyExists(E)
メソッドが呼び出されます。重複がストリームをまたいで存在する場合、競合で優先された最初のエントリが挿入され、後続の重複ではEntryStream.keyExists(E)
が呼び出されます。
一括格納を使用するには、一括格納を提供するTableAPI.put()
メソッドのいずれかを使用します。これらは、一連のストリームを受け取ってデータを一括ロードします。各ストリーム内の行が異なる表に関連付けられている場合があります。
これらのメソッドを使用する場合、オプションでBulkWriteOptions
クラス・インスタンスを指定することもできます。これにより、一括格納操作を構成する永続性、タイムアウトおよびタイムアウト単位を指定できます。
たとえば、3つの入力ストリームで1000行をロードするとします。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import oracle.kv.BulkWriteOptions;
import oracle.kv.EntryStream;
import oracle.kv.FaultException;
import oracle.kv.KVStore;
import oracle.kv.KVStoreConfig;
import oracle.kv.KVStoreFactory;
import oracle.kv.table.Row;
import oracle.kv.table.Table;
import oracle.kv.table.TableAPI;
...
// KVStore handle creation is omitted for brevity
...
Integer streamParallelism = 3;
Integer perShardParallelism = 3;
Integer heapPercent = 30;
// In this case, sets the amount of 1000 rows to load
int nLoad = 1000;
BulkWriteOptions bulkWriteOptions =
new BulkWriteOptions(null, 0, null);
// Set the number of streams. The default is 1 stream.
bulkWriteOptions.setStreamParallelism(streamParallelism);
// Set the number of writer threads per shard.
// The default is 3 writer threads.
bulkWriteOptions.setPerShardParallelism(perShardParallelism);
// Set the percentage of max memory used for bulk put.
// The default is 40 percent.
bulkWriteOptions.setBulkHeapPercent(heapPercent);
System.err.println("Loading rows to " + TABLE_NAME + "...");
final List<EntryStream<Row>> streams =
new ArrayList<EntryStream<Row>>(streamParallelism);
final int num = (nLoad + (streamParallelism - 1)) / streamParallelism;
for (int i = 0; i < streamParallelism; i++) {
final int min = num * i;
final int max = Math.min((min + num) , nLoad);
streams.add(new LoadRowStream(i, min, max));
}
final TableAPI tableImpl = store.getTableAPI();
tableImpl.put(streams, bulkWriteOptions);
long total = 0;
long keyExists = 0;
for (EntryStream<Row> stream: streams) {
total += ((LoadRowStream)stream).getCount();
keyExists += ((LoadRowStream)stream).getKeyExistsCount();
}
final String fmt = "Loaded %,d rows to %s, %,d pre-existing.";
System.err.println(String.format(fmt, total, TABLE_NAME, keyExists));
ストリーム・インタフェースを実装して、バッチ処理され、ストアにロードされるデータを提供する必要があります。エントリは、EntryStream
インスタンスのリストによって提供されます。各ストリームは順番に読み取られます。つまり、次の操作が発行される前に、各EntryStream.getNext()
は終了できます。ロード操作は、通常、BulkWriteOptions.getStreamParallelism()
の設定に従って、これらのストリームから並列に読み取ります。
private class LoadRowStream implements EntryStream<Row> {
private final String name;
private final long index;
private final long max;
private final long min;
private long id;
private long count;
private final AtomicLong keyExistsCount;
LoadRowStream(String name, long index, long min, long max) {
this.index = index;
this.max = max;
this.min = min;
this.name = name;
id = min;
count = 0;
keyExistsCount = new AtomicLong();
}
@Override
public String name() {
return name + "-" + index + ": " + min + "~" + max;
}
@Override
public Row getNext() {
if (id++ == max) {
return null;
}
final Row row = userTable.createRow();
row.put("id", id);
row.put("name", "name" + id);
row.put("age", 20 + id % 50);
count++;
return row;
}
@Override
public void completed() {
System.err.println(name() + " completed, loaded: " + count);
}
@Override
public void keyExists(Row entry) {
keyExistsCount.incrementAndGet();
}
@Override
public void catchException(RuntimeException exception, Row entry) {
System.err.println(name() + " catch exception: " +
exception.getMessage() + ": " +
entry.toJsonString(false));
throw exception;
}
public long getCount() {
return count;
}
public long getKeyExistsCount() {
return keyExistsCount.get();
}
}