Inserting Bulk Data into Oracle NoSQL Database


Options



Before You Begin

Purpose

In this tutorial, you do a bulk insert across different shards of an Oracle NoSQL Database.

Time to Complete

15 minutes (approximately).

Background

Oracle NoSQL Database is a scalable, distributed NoSQL database, designed to provide highly reliable, flexible and available data management across a configurable set of storage nodes.

Oracle NoSQL Database has the ability to perform a bulk insert or bulk put records across different shards. This functionality allows developers to work effectively with large data-sets.

The BulkPut API is available for both table and key/value data model. The API provides significant performance gains over single row inserts. It reduces the network traffic round trip and performs ordered inserts in batch on internally sorted data across different shards parallely.

Scenario

Consider an example of a shipping company, using Oracle NoSQL Database for their ship management application. This application is used to track the movement of their container ships that moves the cargo from port to port. The cargo ships are equipped with GPS and other tracking devices, which relays ship's location after few seconds into the application. The application is then queried for:

  1. The location of all the ships (displayed on the map)
  2. A specific ship's trajectory over a given period of time (displayed on the map)

As the volume of the location data increase each day, the company is finding it difficult to scale the application and is now looking at a back-end system that can ingest this large data-set efficiently.

What Do You Need?

Pinging the KVStore

In this section, you verify that Oracle NoSQL Database is installed and the KVStore is running.

  1. Log into the system where Oracle NoSQL Database is installed and running. Open a terminal window and navigate to your working directory.

    $ cd /home/oracle/kvstore
  2. Confirm that the configured kvstore is running by using the ping command.

    $ java -jar $KVHOME/lib/kvstore.jar ping -host localhost -port 5000
    Pinging components of store kvstore based upon topology sequence #14
    10 partitions and 1 storage nodes
    Time: 2016-12-07 13:03:17 UTC   Version: 12.1.4.2.14
    Shard Status: healthy:1 writable-degraded:0 read-only:0 offline:0
    Admin Status: healthy
    Zone [name=KVLite id=zn1 type=PRIMARY allowArbiters=false]   RN Status: online:1 offline:0
    Storage Node [sn1] on host01:5000    Zone: [name=KVLite id=zn1 type=PRIMARY allowArbiters=false]    Status: RUNNING   Ver: 12cR1.4.2.14 2016-11-24 03:59:15 UTC  Build id: 4c309251a012
    	Admin [admin1]		Status: RUNNING,MASTER
    	Rep Node [rg1-rn1]	Status: RUNNING,MASTER sequenceNumber:46 haPort:5006

Inserting Bulk Data

In this section, you create a java class to insert bulk data across different shards. You inspect the key lines of the code and then compile and run the Java class.

Saving the Java Class File

In this section, you save a java class file to your working machine.

  1. Download the class file (if not already done).

  2. Create a directory called bulkput and save the downloaded file to this directory.

    BulkPutExample.java file in the File Browser
    Description of this image

Understanding the Java Class File

In this section, you review the key code lines in the Java class file.

  1. Open the Java class file in an editor and review the code. The file starts with all the required import statements.

    package bulkput;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    
    import oracle.kv.BulkWriteOptions;
    import oracle.kv.EntryStream;
    import oracle.kv.FaultException;
    import oracle.kv.KVStore;
    import oracle.kv.KVStoreConfig;
    import oracle.kv.KVStoreFactory;
    import oracle.kv.Key;
    import oracle.kv.KeyValue;
    import oracle.kv.Value;
    import oracle.kv.table.Row;
    import oracle.kv.table.Table;
    import oracle.kv.table.TableAPI;
  2. In this example, the table name is set to users and the CREATE_TABLE string is defined.

    public class BulkPutExample {
    
        private final KVStore store;
        private final BulkWriteOptions bulkWriteOptions;
    
        private int streamParallelism = 1;
        private int perShardParallelism = 2;
        private int heapPercent = 70;
        private int nLoad = 1000;
        private boolean useTable = false;
    
        private final static String TABLE_NAME = "users";
        private final static int BINARY_LEN = 128;
        private final static String CREATE_TABLE = "CREATE TABLE if not exists " +
            TABLE_NAME + "(id long, name string, data binary, primary key(id))";
        private Table userTable = null;
    
        public static void main(final String args[]) {
            try {
                BulkPutExample runTest =
                    new BulkPutExample(args);
                runTest.run();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
  3. The bulkwrite option is created. The class constructor retrieves the kvstore handle that is required to perform any operation on a kvstore. In this example, the storeName and hostName is set to null.

    • BulkWriteOptions(Durability durability, long timeout, TimeUnit timeoutUnit): The options used to configure the bulk put operation. In this example, Durability and TimeUnit is set to null, and TimeOut is set to 0.

    • setStreamParallelism: Sets the maximum number of streams that can be read concurrently. Each stream is read by a dedicated thread from a thread pool. This setting determines the size of the thread pool used for reading streams.

      The default parallelism is 1. For streams with high overheads, say because the I/O device underlying the stream is slow and there are different I/O devices underlying each stream, a higher value would be appropriate.

    • setPerShardParallelism: Sets the maximum number of threads that can concurrently write it's batch of entries to a single shard in the store.

      The default value is 3 and allows for overlapping the reading of the next batch with processing of the current batch at a server node. Higher capacity networks and and storage nodes can allow for higher parallelism.

    • setBulkHeapPercent: The percentage of Runtime.maxMemory() that can be used for the operation. This heap is used to assemble batches of entries associated with specific shards and partitions.

      The default is 40%.

        BulkPutExample(final String[] argv) {
    
            String storeName = null;
            String hostName = null;
            int port = -1;
    
            final int nArgs = argv.length;
            int argc = 0;
    
            if (nArgs == 0) {
                usage(null);
            }
    
            while (argc < nArgs) {
                final String thisArg = argv[argc++];
    
                if (thisArg.equals("-store")) {
                    if (argc < nArgs) {
                        storeName = argv[argc++];
                    } else {
                        usage("-store requires an argument");
                    }
                } else if (thisArg.equals("-host")) {
                    if (argc < nArgs) {
                        hostName = argv[argc++];
                    } else {
                        usage("-host requires an argument");
                    }
                } else if (thisArg.equals("-port")) {
                    if (argc < nArgs) {
                        port = Integer.parseInt(argv[argc++]);
                    } else {
                        usage("-port requires an integer argument");
                    }
                } else if (thisArg.equals("-streamParallelism")) {
                    if (argc < nArgs) {
                        streamParallelism = Integer.parseInt(argv[argc++]);
                    } else {
                        usage("-streamParallelism requires an argument");
                    }
                } else if (thisArg.equals("-perShardParallelism")) {
                    if (argc < nArgs) {
                        perShardParallelism = Integer.parseInt(argv[argc++]);
                    } else {
                        usage("-perShardParallelism requires an argument");
                    }
                } else if (thisArg.equals("-heapPercent")) {
                    if (argc < nArgs) {
                        heapPercent = Integer.parseInt(argv[argc++]);
                    } else {
                        usage("-heapPercent requires an argument");
                    }
                } else if (thisArg.equals("-load")) {
                    if (argc < nArgs) {
                        nLoad = Integer.parseInt(argv[argc++]);
                    } else {
                        usage("-load requires an argument");
                    }
                } else if (thisArg.equals("-useTable")) {
                    useTable = true;
                } else {
                    usage("Unknown argument: " + thisArg);
                }
            }
    
            if (hostName == null) {
                usage("Missing argument: -host");
            }
            if (port == -1) {
                usage("Missing argument: -port");
            }
            if (storeName == null) {
                usage("Missing argument: -store");
            }
    
            final String fmt = "\thost:%s\n\tport:%d\n\tstore:%s\n" +
            		           "\tnumToload:%,d\n\tuseTable:%s\n" +
            		           "\tbulkWriteOptions:\n" +
            		           "\t\tstreamParallelism:%d\n" +
            		           "\t\tperShardParallelism:%d\n" +
            		           "\t\theapPercent:%d\n";
            System.err.println(String.format(fmt, hostName, port, storeName, nLoad,
                                             String.valueOf(useTable),
                                             streamParallelism, perShardParallelism,
                                             heapPercent));
    
            bulkWriteOptions = new BulkWriteOptions(null, 0, null);
            bulkWriteOptions.setStreamParallelism(streamParallelism);
            bulkWriteOptions.setPerShardParallelism(perShardParallelism);
            bulkWriteOptions.setBulkHeapPercent(heapPercent);
    
            store = KVStoreFactory.getStore
                    (new KVStoreConfig(storeName, hostName + ":" + port));
        }
  4. Review the run method. Here, the program creates a table, if the user wishes to use table, and loads the rows into the table. If table is not being used, then it loads key value pairs.

        private void usage(final String message) {
            if (message != null) {
                System.err.println("\n" + message + "\n");
            }
    
            System.err.println("usage: " + getClass().getName());
            System.err.println
                ("\t-store <instance name>\n" +
                 "\t-host <host name>\n" +
                 "\t-port <port number>\n" +
                 "\t[-load <# records to load>] (default: 1000)\n" +
                 "\t[-streamParallelism <the number of streams>]" +
                 " (default: 1)\n" +
                 "\t[-perShardParallelism <the number of writer threads per shard>]" +
                 " (default: 2)\n" +
                 "\t[-heapPercent <percentage of max memory used for bulk put>]" +
                 " (default: 70)]\n" +
                 "\t[-useTable]");
            System.exit(1);
        }
    
        private void run() {
            try {
                if (useTable) {
                    createTable();
                    doLoadRows();
                } else {
                    doLoadKVs();
                }
            } finally {
                store.close();
            }
        }
  5. The createTable() method is executed to create a table when the -useTable flag is used. It creates a table called users.

        private void createTable() {
            final TableAPI tableImpl = store.getTableAPI();
            try {
                store.executeSync(CREATE_TABLE);
            } catch (FaultException fe) {
                System.out.println("Create table failed: " + fe.getMessage());
                throw fe;
            }
            userTable = tableImpl.getTable(TABLE_NAME);
            System.err.println("Created table " + TABLE_NAME + ".");
        }
  6. The doLoadKVs() method loads the key values to the NoSQL Database. This method is executed when the -useTable flag is not used.

        private void doLoadKVs() {
            System.err.println("Loading KVs...");
            final List<EntryStream<KeyValue>> streams =
                new ArrayList<EntryStream<KeyValue>>(streamParallelism);
            final int num = (nLoad + (streamParallelism - 1)) /streamParallelism;
            for (int i = 0; i < streamParallelism; i++) {
                final int min = num * i;
                final int max = Math.min((min + num) , nLoad);
                streams.add(new LoadKVStream(i, min, max));
            }
    
            store.put(streams, bulkWriteOptions);
    
            long total = 0;
            long keyExists = 0;
            for (EntryStream<KeyValue> stream: streams) {
                total += ((LoadKVStream)stream).getCount();
                keyExists += ((LoadKVStream)stream).getKeyExistsCount();
            }
            final String fmt = "Loaded %,d records, %,d pre-existing.";
            System.err.println(String.format(fmt, total, keyExists));
        }
  7. The doLoadRows() method loads the rows inside the table. This method is executed with the -useTable flag is used.

        private void doLoadRows() {
            System.err.println("Loading rows to " + TABLE_NAME + "...");
    
            final List<EntryStream<Row>> streams =
                new ArrayList<EntryStream<Row>>(streamParallelism);
            final int num = (nLoad + (streamParallelism - 1)) / streamParallelism;
            for (int i = 0; i < streamParallelism; i++) {
                final int min = num * i;
                final int max = Math.min((min + num) , nLoad);
                streams.add(new LoadRowStream(i, min, max));
            }
    
            final TableAPI tableImpl = store.getTableAPI();
            tableImpl.put(streams, bulkWriteOptions);
    
            long total = 0;
            long keyExists = 0;
            for (EntryStream<Row> stream: streams) {
                total += ((LoadRowStream)stream).getCount();
                keyExists += ((LoadRowStream)stream).getKeyExistsCount();
            }
            final String fmt = "Loaded %,d rows to %s, %,d pre-existing.";
            System.err.println(String.format(fmt, total, TABLE_NAME, keyExists));
        }
  8. The LoadKVStream class creates entries in the NoSQL Database for key value pairs.

        private class LoadKVStream extends LoadStream<KeyValue> {
    
            LoadKVStream(long index, long min, long max) {
                super("LoadKVStream", index, min, max);
            }
    
            @Override
            KeyValue createEntry(long idx) {
                final Key key = Key.fromString("/bulk/" + idx);
                final Value value = Value.createValue(getBinaryData(BINARY_LEN));
                return new KeyValue(key, value);
            }
    
            @Override
            String entryToString(KeyValue entry) {
                return entry.toString();
            }
        }
  9. The LoadRowStream class creates rows in the users table.

        private class LoadRowStream extends LoadStream<Row> {
            LoadRowStream(long index, long min, long max) {
                super("LoadRowStream", index, min, max);
            }
    
            @Override
            Row createEntry(long idx) {
                return createUserRow(idx);
            }
    
            @Override
            String entryToString(Row entry) {
                return entry.toJsonString(false);
            }
    
            private Row createUserRow(long id) {
                final Row row = userTable.createRow();
                final String name = "name" + id;
                row.put("id", id);
                row.put("name", name);
                row.put("data", getBinaryData(BINARY_LEN));
                return row;
            }
        }
    
        private byte[] buffer = null;
    
        private byte[] getBinaryData(int len) {
            if (buffer != null) {
                return buffer;
            }
            buffer = new byte[len];
            for (int i = 0; i < len; i++) {
                buffer[i] = (byte)('A' + i % 26);
            }
            return buffer;
        }
    
        private static abstract class LoadStream<E> implements EntryStream<E> {
    
            private final String name;
            private final long index;
            private final long max;
            private final long min;
            private long id;
            private long count;
            private final AtomicLong keyExistsCount;
    
            LoadStream(String name, long index, long min, long max) {
                this.index = index;
                this.max = max;
                this.min = min;
                this.name = name;
                id = min;
                count = 0;
                keyExistsCount = new AtomicLong();
            }
    
            abstract E createEntry(long idx);
            abstract String entryToString(E entry);
    
            @Override
            public String name() {
                return name + index + "- [" + min + ", " + max + ")";
            }
    
            @Override
            public E getNext() {
                if (id++ == max) {
                    return null;
                }
                count++;
                return createEntry(id);
            }
    
            @Override
            public void completed() {
                System.err.println(name() + " completed, loaded: " + count);
            }
    
            @Override
            public void keyExists(E entry) {
                keyExistsCount.incrementAndGet();
            }
    
            @Override
            public void catchException(RuntimeException exception, E entry) {
                System.err.println(name() + " catch exception: " +
                    exception.getMessage() + " for entry: " +
                    entryToString(entry));
                throw exception;
            }
    
            public long getCount() {
                return count;
            }
    
            public long getKeyExistsCount() {
                return keyExistsCount.get();
            }
        }
    }

Compiling and Running the Java Class

  1. In the terminal window, ensure that the present working directory is set to /home/oracle/kvstore and compile the Java class file.

    $ javac -classpath $KVHOME/lib/kvstore.jar bulkput/BulkPutExample.java
  2. Run the Java class to bulk insert into a table.

    $ java -cp $KVHOME/lib/kvclient.jar:. bulkput.BulkPutExample \
    -store kvstore -host localhost -port 5000 -streamParallelism 3 \
    -load 2000 -perShardParallelism 2 -heapPercent 70 -useTable
    	host:localhost
    	port:5000
    	store:kvstore
    	numToload:2,000
    	useTable:true
    	bulkWriteOptions:
    		streamParallelism:3
    		perShardParallelism:2
    		heapPercent:70
    
    Created table users.
    Loading rows to users...
    LoadRowStream0- [0, 667) completed, loaded: 667
    LoadRowStream1- [667, 1334) completed, loaded: 667
    LoadRowStream2- [1334, 2000) completed, loaded: 666
    Loaded 2,000 rows to users, 0 pre-existing.
  3. Run the Java class to bulk insert key values.

    $ java -cp $KVHOME/lib/kvclient.jar:. bulkput.BulkPutExample \
    -store kvstore -host localhost -port 5000 -load 1000
    	host:localhost
    	port:5000
    	store:kvstore
    	numToload:1,000
    	useTable:false
    	bulkWriteOptions:
    		streamParallelism:1
    		perShardParallelism:2
    		heapPercent:70
    
    Loading KVs...
    LoadKVStream0- [0, 1000) completed, loaded: 1000
    Loaded 1,000 records, 0 pre-existing.

Want to Learn More?