/*- * * This file is part of Oracle NoSQL Database * Copyright (C) 2011, 2015 Oracle and/or its affiliates. All rights reserved. * * Oracle NoSQL Database is free software: you can redistribute it and/or * modify it under the terms of the GNU Affero General Public License * as published by the Free Software Foundation, version 3. * * Oracle NoSQL Database is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public * License in the LICENSE file along with Oracle NoSQL Database. If not, * see . * * An active Oracle commercial licensing agreement for this product * supercedes this license. * * For more information please contact: * * Vice President Legal, Development * Oracle America, Inc. * 5OP-10 * 500 Oracle Parkway * Redwood Shores, CA 94065 * * or * * berkeleydb-info_us@oracle.com * * [This line intentionally left blank.] * [This line intentionally left blank.] * [This line intentionally left blank.] * [This line intentionally left blank.] * [This line intentionally left blank.] * [This line intentionally left blank.] * EOF * */ package bulkput; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import oracle.kv.BulkWriteOptions; import oracle.kv.EntryStream; import oracle.kv.FaultException; import oracle.kv.KVStore; import oracle.kv.KVStoreConfig; import oracle.kv.KVStoreFactory; import oracle.kv.Key; import oracle.kv.KeyValue; import oracle.kv.Value; import oracle.kv.table.Row; import oracle.kv.table.Table; import oracle.kv.table.TableAPI; /** * This is a simple example that demonstrates the bulk put feature of KV * interface KVStore.put(List>, BulkWriteOptions) and * Table interface TableAPI.put(List>, BulkWriteOptions). * * usage: bulkput.BulkPutExample * -store * -host * -port * [-load <# records to load>] (default: 1000) * [-streamParallelism ] (default: 1) * [-perShardParallelism ] (default: 2) * [-heapPercent ] (default: 70)] * [-useTable] * * If -useTable is specified, then it create table "users" and load records to * the table. Otherwise, the key/value records are loaded to store. * * The format of key/value entry: * - key: /bulk/ * - value: a fixed 128 bytes array. * * The schema of table "users" is like below: * create table if not exists users ( * id long, * name string, * data binary, * primary key(id)) * * - id: the index of row. * - name: "name" * - data: a fixed 128 bytes array. * * e.g. Load 10000 KVs with 3 input streams: * java -cp KVHOME/lib/kvclient.jar: * bulkput.BulkPutExample -store kvstore -host localhost -port 5000 * -load 10000 -streamParallelism 3 -perShardParallelism 3 * -heapPercent 50 * */ public class BulkPutExample { private final KVStore store; private final BulkWriteOptions bulkWriteOptions; private int streamParallelism = 1; private int perShardParallelism = 2; private int heapPercent = 70; private int nLoad = 1000; private boolean useTable = false; private final static String TABLE_NAME = "users"; private final static int BINARY_LEN = 128; private final static String CREATE_TABLE = "CREATE TABLE if not exists " + TABLE_NAME + "(id long, name string, data binary, primary key(id))"; private Table userTable = null; public static void main(final String args[]) { try { BulkPutExample runTest = new BulkPutExample(args); runTest.run(); } catch (Exception e) { e.printStackTrace(); } } BulkPutExample(final String[] argv) { String storeName = null; String hostName = null; int port = -1; final int nArgs = argv.length; int argc = 0; if (nArgs == 0) { usage(null); } while (argc < nArgs) { final String thisArg = argv[argc++]; if (thisArg.equals("-store")) { if (argc < nArgs) { storeName = argv[argc++]; } else { usage("-store requires an argument"); } } else if (thisArg.equals("-host")) { if (argc < nArgs) { hostName = argv[argc++]; } else { usage("-host requires an argument"); } } else if (thisArg.equals("-port")) { if (argc < nArgs) { port = Integer.parseInt(argv[argc++]); } else { usage("-port requires an integer argument"); } } else if (thisArg.equals("-streamParallelism")) { if (argc < nArgs) { streamParallelism = Integer.parseInt(argv[argc++]); } else { usage("-streamParallelism requires an argument"); } } else if (thisArg.equals("-perShardParallelism")) { if (argc < nArgs) { perShardParallelism = Integer.parseInt(argv[argc++]); } else { usage("-perShardParallelism requires an argument"); } } else if (thisArg.equals("-heapPercent")) { if (argc < nArgs) { heapPercent = Integer.parseInt(argv[argc++]); } else { usage("-heapPercent requires an argument"); } } else if (thisArg.equals("-load")) { if (argc < nArgs) { nLoad = Integer.parseInt(argv[argc++]); } else { usage("-load requires an argument"); } } else if (thisArg.equals("-useTable")) { useTable = true; } else { usage("Unknown argument: " + thisArg); } } if (hostName == null) { usage("Missing argument: -host"); } if (port == -1) { usage("Missing argument: -port"); } if (storeName == null) { usage("Missing argument: -store"); } final String fmt = "\thost:%s\n\tport:%d\n\tstore:%s\n" + "\tnumToload:%,d\n\tuseTable:%s\n" + "\tbulkWriteOptions:\n" + "\t\tstreamParallelism:%d\n" + "\t\tperShardParallelism:%d\n" + "\t\theapPercent:%d\n"; System.err.println(String.format(fmt, hostName, port, storeName, nLoad, String.valueOf(useTable), streamParallelism, perShardParallelism, heapPercent)); bulkWriteOptions = new BulkWriteOptions(null, 0, null); bulkWriteOptions.setStreamParallelism(streamParallelism); bulkWriteOptions.setPerShardParallelism(perShardParallelism); bulkWriteOptions.setBulkHeapPercent(heapPercent); store = KVStoreFactory.getStore (new KVStoreConfig(storeName, hostName + ":" + port)); } private void usage(final String message) { if (message != null) { System.err.println("\n" + message + "\n"); } System.err.println("usage: " + getClass().getName()); System.err.println ("\t-store \n" + "\t-host \n" + "\t-port \n" + "\t[-load <# records to load>] (default: 1000)\n" + "\t[-streamParallelism ]" + " (default: 1)\n" + "\t[-perShardParallelism ]" + " (default: 2)\n" + "\t[-heapPercent ]" + " (default: 70)]\n" + "\t[-useTable]"); System.exit(1); } private void run() { try { if (useTable) { createTable(); doLoadRows(); } else { doLoadKVs(); } } finally { store.close(); } } private void createTable() { final TableAPI tableImpl = store.getTableAPI(); try { store.executeSync(CREATE_TABLE); } catch (FaultException fe) { System.out.println("Create table failed: " + fe.getMessage()); throw fe; } userTable = tableImpl.getTable(TABLE_NAME); System.err.println("Created table " + TABLE_NAME + "."); } private void doLoadKVs() { System.err.println("Loading KVs..."); final List> streams = new ArrayList>(streamParallelism); final int num = (nLoad + (streamParallelism - 1)) /streamParallelism; for (int i = 0; i < streamParallelism; i++) { final int min = num * i; final int max = Math.min((min + num) , nLoad); streams.add(new LoadKVStream(i, min, max)); } store.put(streams, bulkWriteOptions); long total = 0; long keyExists = 0; for (EntryStream stream: streams) { total += ((LoadKVStream)stream).getCount(); keyExists += ((LoadKVStream)stream).getKeyExistsCount(); } final String fmt = "Loaded %,d records, %,d pre-existing."; System.err.println(String.format(fmt, total, keyExists)); } private void doLoadRows() { System.err.println("Loading rows to " + TABLE_NAME + "..."); final List> streams = new ArrayList>(streamParallelism); final int num = (nLoad + (streamParallelism - 1)) / streamParallelism; for (int i = 0; i < streamParallelism; i++) { final int min = num * i; final int max = Math.min((min + num) , nLoad); streams.add(new LoadRowStream(i, min, max)); } final TableAPI tableImpl = store.getTableAPI(); tableImpl.put(streams, bulkWriteOptions); long total = 0; long keyExists = 0; for (EntryStream stream: streams) { total += ((LoadRowStream)stream).getCount(); keyExists += ((LoadRowStream)stream).getKeyExistsCount(); } final String fmt = "Loaded %,d rows to %s, %,d pre-existing."; System.err.println(String.format(fmt, total, TABLE_NAME, keyExists)); } private class LoadKVStream extends LoadStream { LoadKVStream(long index, long min, long max) { super("LoadKVStream", index, min, max); } @Override KeyValue createEntry(long idx) { final Key key = Key.fromString("/bulk/" + idx); final Value value = Value.createValue(getBinaryData(BINARY_LEN)); return new KeyValue(key, value); } @Override String entryToString(KeyValue entry) { return entry.toString(); } } private class LoadRowStream extends LoadStream { LoadRowStream(long index, long min, long max) { super("LoadRowStream", index, min, max); } @Override Row createEntry(long idx) { return createUserRow(idx); } @Override String entryToString(Row entry) { return entry.toJsonString(false); } private Row createUserRow(long id) { final Row row = userTable.createRow(); final String name = "name" + id; row.put("id", id); row.put("name", name); row.put("data", getBinaryData(BINARY_LEN)); return row; } } private byte[] buffer = null; private byte[] getBinaryData(int len) { if (buffer != null) { return buffer; } buffer = new byte[len]; for (int i = 0; i < len; i++) { buffer[i] = (byte)('A' + i % 26); } return buffer; } private static abstract class LoadStream implements EntryStream { private final String name; private final long index; private final long max; private final long min; private long id; private long count; private final AtomicLong keyExistsCount; LoadStream(String name, long index, long min, long max) { this.index = index; this.max = max; this.min = min; this.name = name; id = min; count = 0; keyExistsCount = new AtomicLong(); } abstract E createEntry(long idx); abstract String entryToString(E entry); @Override public String name() { return name + index + "- [" + min + ", " + max + ")"; } @Override public E getNext() { if (id++ == max) { return null; } count++; return createEntry(id); } @Override public void completed() { System.err.println(name() + " completed, loaded: " + count); } @Override public void keyExists(E entry) { keyExistsCount.incrementAndGet(); } @Override public void catchException(RuntimeException exception, E entry) { System.err.println(name() + " catch exception: " + exception.getMessage() + " for entry: " + entryToString(entry)); throw exception; } public long getCount() { return count; } public long getKeyExistsCount() { return keyExistsCount.get(); } } }