Oracle NoSQL Database Examples
version 12cR1.3.3.4

Package hadoop.table

The Table API MapReduce Cookbook: example code for a MapReduce job, along with supporting code and scripts, that can be run against data written via the Oracle NoSQL Database Table API.

See: Description

Package hadoop.table Description

The Table API MapReduce Cookbook: example code for a MapReduce job, along with supporting code and scripts, that can be run against data written via the Oracle NoSQL Database Table API.

Introduction

Prior to the introduction of the Table API to Oracle NoSQL Database, in order to run a Hadoop MapReduce job against data in an Oracle NoSQL Database KVStore, one would employ the interfaces and classes that integrate the Oracle NoSQL Database key/value API (the KV API) with Hadoop MapReduce as described here. With the addition of the Table API it became necessary to specify new interfaces and classes that would allow Hadoop MapReduce jobs to be run against so-called table data; that is, against data written to a KVStore via the Table API rather than the KV API. The information presented below describes how to run such a Hadoop MapReduce job against table data in a given KVStore. In addition to describing the core interfaces and classes involved in this process, this document also walks through the example currently provided with the Oracle NoSQL Database software distribution; which demonstrates how to use the Table API Hadoop integration classes with Hadoop MapReduce.

Prerequisites

Before attempting to execute the example that demonstrates the concepts presented in this document, you should first satisfy the following prerequisites: Using specific values for items such as the host names and admin port described above should allow you to more easily follow the example that is presented. Combined with the information contained in the Oracle NoSQL Database Getting Started Guide, as well as the Oracle NoSQL Database Admin Guide, you should then be able to generalize and extend the example to your own particular development scenario; substituting the values specific to the given environment where necessary.

A Brief Hadoop Primer

Hadoop can be thought of as consisting of two primary components: The various Hadoop distributions that are available (for example, Cloudera or Hortonworks) each provide an infrastructure which orchestrates the MapReduce processing that is performed. It does this by marshalling the distributed servers that run the various tasks in parallel, by managing all communications and data transfers between the various parts of the system, and by providing for redundancy and fault tolerance.

In addition, the Hadoop infrastructure provides a number of interactive tools — such as a command line interface (the Hadoop CLI) — that provide access to the data stored in HDFS. But the typical way application developers read, write, and process data stored in HDFS is via MapReduce jobs; which are programs that adhere to the Hadoop MapReduce programming model. For more detailed information on Hadoop HDFS and MapReduce, consult the Hadoop MapReduce tutorial.

As indicated above, with the introduction of the Table API, a new set of interfaces and classes that satisfy the Hadoop MapReduce programming model have been provided which support writing MapReduce jobs that can be run against table data contained in a KVStore. These new classes are located in the oracle.kv.hadoop.table package, and consist of the following types:

As described below, it is through the specific implementation of the InputFormat class provided in the Oracle NoSQL Database distribution that the Hadoop MapReduce infrastructure obtains access to a given KVStore and the desired table data that the store contains.

The Oracle NoSQL Database Table API Hadoop MapReduce Integration Classes

When writing a MapReduce job to process data stored in an Oracle NoSQL Database table, you should employ the following classes: For more detail about the semantics of the classes listed above, refer to the javadoc of each respective class.

Currently, Oracle NoSQL Database does not define a subclass of the Hadoop OutputFormat class. This means that it is not currently possible to write data from a MapReduce job into a KVStore. That is, from within a MapReduce job, you can only retrieve data from a desired KVStore table and process that data.

The CountTableRows Example

The hadoop.table example package is contained in the following location within your Oracle NoSQL Database distribution:
  <KVHOME>/examples/
      hadoop/table/
        create_vehicle_table.kvs
        CountTableRows.java
        LoadVehicleTable.java
The example script create_vehicle_table.kvs creates a table in a KVStore with the name and schema expected by the CountTableRows example program. Similarly, the standalone Java program LoadVehicleTable populates the table created by create_vehicle_table.kvs with rows of data consistent with the table's schema. Once the table is created and populated with example data, the MapReduce program CountTableRows can be executed to count the number of rows of data in the table.

This section explains how to use create_vehicle_table.kvs to create the example table, how to compile and execute LoadVehicleTable to populate the table with data, and finally, how to compile, build (JAR), and then execute the CountTableRows MapReduce job on the Hadoop cluster that was deployed for this example.

Create the example table 'vehicleTable'

To execute the CountTableRows MapReduce job, a table named vehicleTable must be created in the KVStore that was deployed for this example; where the table must consist of the following schema:
  • A field named "type" of type STRING
  • A field named "make" of type STRING
  • A field named "model" of type STRING
  • A field named "class" of type STRING
  • A field named "color" of type STRING
  • A field named "price" of type DOUBLE
  • A field named "count" of type INTEGER
  • A PrimaryKey consisting of the fields: "type", "make", "model", "class", and "color"
  • A ShardKey consisting fo the fields: "type", "make", and "model"
Thus, the example table named vehicleTable consists of rows representing a particular vehicle a dealer might have in stock for purchase. Each such row contains fields representing the "type" of vehicle (for example, car, truck, SUV, etc.), the "make" of the vehicle (Ford, GM, Chrysler, etc.), the "model" (Explorer, Camaro, Lebaron, etc.), the "class" (4WheelDrive, FrontWheelDrive, etc.), the "color", the "price", and finally the number of vehicles in stock (the "count").

To create the table described above, connect the Oracle NoSQL Database administrative CLI to the administrative service of the KVStore that was deployed for this example. To do so, type the following at the command line:

  > java -jar <KVHOME>/lib/kvcli.jar -host kv-host-1 -port 13230 -store example-store
Then, from the CLI command prompt, create the table by either entering the appropriate commands individually, or by loading the create_vehicle_table.kvs script that is supplied with the example as a convenience. That is, at the administrative CLI command prompt, enter the following:
  kv-> load -file <KVHOME>/kv/examples/hadoop/table/create_vehicle_table.kvs
  Table vehicleTable built.
  Executed plan 9, waiting for completion...
  Plan 9 ended successfully
At this point, the KVStore should contain an empty table named vehicleTable that has the schema described above. To verify this, type the following at the CLI command prompt, and observe that the output indicates that the table exists but is empty:
  kv-> show tables
  Tables: 
          vehicleTable

  kv-> get table -name vehicleTable
  0 row returned

  kv-> exit

Populate 'vehicleTable' with example data

After the table creation has occurred, the example table can be populated; either manually from the CLI (one put command per row), or by executing the LoadVehicleTable program that is also supplied with the example as a convenience. But before executing LoadVehicleTable, that program must first be compiled. To do this, exit the administrative CLI and type the following from the command line:
  > cd <KVHOME>
  > javac -classpath lib/kvstore.jar:examples examples/hadoop/table/LoadVehicleTable.java
which should produce the file <KVHOME>/examples/hadoop/table/LoadVehicleTable.class. To then execute the LoadVehicleTable program and populate the table named vehicleTable with example data, type the following at the command line:
  > cd <KVHOME>
  > java -classpath lib/kvstore.jar:examples hadoop.table.LoadVehicleTable \
             -store example-store -host kv-host-1 -port 13230 -nops 79 [-delete]
where the parameters -store, -host, -port, and -nops are required.

In the example command line above, the value -nops 79 specifies that 79 rows be written to the vehicleTable. If more or less than that number of rows is desired, then the value of the -nops parameter should be changed.

If LoadVehicleTable is executed a second time and the optional -delete parameter is specified, then all rows added by any previous executions of LoadVehicleTable are deleted from the table prior to adding the new rows. Otherwise, all pre-existing rows are left in place, and the number of rows in the table will be increased by the specified -nops number of new rows.

Compile and build the CountTableRows example program

After table creation and propulation has occurred, before the example MapReduce program can be executed, it must first be compiled and built for deployment to the Hadoop infrastructure. In order to compile the CountTableRows program, a number of Hadoop JAR files must be installed and available in the build environment for inclusion in the program classpath. Those JAR files are:
  • commons-logging-<version>.jar
  • hadoop-common-<version>.jar
  • hadoop-mapreduce-client-core-<version>.jar
  • hadoop-annotations-<version>.jar
where the <version> token represents the particular version number of the corresponding JAR file contained in the Hadoop distribution installed in the build environment.

For example, suppose that the 2.3.0 version of Hadoop that is delivered via the 5.1.0 package provided by Cloudera (cdh) is installed under the <HADOOPROOT> base directory. And suppose that the classes from that version of Hadoop use the 1.1.3 version of commons-logging. Then, to compile the CountTableRows program, type the following at the command line:

  > cd <KVHOME>
  > javac -classpath <HADOOPROOT>/hadoop/share/hadoop/common/lib/commons-logging-1.1.3.jar: \
                     <HADOOPROOT>/hadoop/share/hadoop/common/hadoop-common-2.3.0-cdh5.1.0.jar: \
                     <HADOOPROOT>/hadoop/share/hadoop/mapreduce2/hadoop-mapreduce-client-core-2.3.0-cdh5.1.0.jar: \
                     <HADOOPROOT>/hadoop/share/hadoop/common/lib/hadoop-annotations-2.3.0-cdh5.1.0.jar: \
                     lib/kvstore.jar:examples
which produces the following files: If your specific environment has a different, compatible Hadoop distribution installed, then simply replace the versions referenced in the example command line above with the specific versions that are installed.

Once CountTableRows has been compiled, the resulting class files should be placed in a JAR file so that the program can be deployed to the example Hadoop cluster. For example,

  > cd <KVHOME>/examples
  > jar cvf count-table-rows.jar hadoop
which should produce the file count-table-rows.jar in the current working directory.

Deploy and execute the CountTableRows MapReduce job in the Hadoop cluster

After CountTableRows has been compiled and built, the example MapReduce job can be deployed and executed by typing the following at the command line:
  > cd <KVHOME>
  > hadoop jar examples/count-table-rows.jar hadoop.table.CountTableRows \
               -libjars <KVHOME>/lib/kvclient.jar \
               example-store kv-host-1:13230 vehicleTable \
               /user/<username>/CountTableRows/vehicleTable/<000N>
where the contents of the example command above are displayed on separate lines in this document for readability, but in practice, will typically be entered as a single, continuous command with no line breaks.

Note that in the last argument, the token <username> represents a directory under the HDFS /user top-level directory, and typically corresponds to the user who has initiated the MapReduce job. This directory is usually created in HDFS by the Hadoop cluster administrator. Additionally, the <000N> token represents a string such as 0000, 0001, 0002, etc. Although any string can be used for this token, using a different number for "N" on each execution of the job makes it easier to keep track of results when multiple executions of the job occur.

As the job runs, assuming no errors, the output from the job will look like the following:

  ...
  2014-12-04 08:59:47,996 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1344)) - Running job: job_1409172332346_0024
  2014-12-04 08:59:54,107 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1372)) -  map 0% reduce 0%
  2014-12-04 09:00:16,148 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1372)) -  map 7% reduce 0%
  2014-12-04 09:00:17,368 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1372)) -  map 26% reduce 0%
  2014-12-04 09:00:18,596 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1372)) -  map 56% reduce 0%
  2014-12-04 09:00:19,824 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1372)) -  map 100% reduce 0%
  2014-12-04 09:00:23,487 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1372)) -  map 100% reduce 100%
  2014-12-04 09:00:23,921 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1383)) - Job job_1409172332346_0024 completed successfully
  2014-12-04 09:00:24,117 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1390)) - Counters: 49
        File System Counters
                FILE: Number of bytes read=2771
                FILE: Number of bytes written=644463
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=2660
                HDFS: Number of bytes written=32
                HDFS: Number of read operations=15
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=6
                Launched reduce tasks=1
                Rack-local map tasks=6
                Total time spent by all maps in occupied slots (ms)=136868
                Total time spent by all reduces in occupied slots (ms)=2103
                Total time spent by all map tasks (ms)=136868
                Total time spent by all reduce tasks (ms)=2103
                Total vcore-seconds taken by all map tasks=136868
                Total vcore-seconds taken by all reduce tasks=2103
                Total megabyte-seconds taken by all map tasks=140152832
                Total megabyte-seconds taken by all reduce tasks=2153472
        Map-Reduce Framework
                Map input records=79
                Map output records=79
                Map output bytes=2607
                Map output materialized bytes=2801
                Input split bytes=2660
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=2801
                Reduce input records=79
                Reduce output records=1
                Spilled Records=158
                Shuffled Maps =6
                Failed Shuffles=0
                Merged Map outputs=6
                GC time elapsed (ms)=549
                CPU time spent (ms)=9460
                Physical memory (bytes) snapshot=1888358400
                Virtual memory (bytes) snapshot=6424895488
                Total committed heap usage (bytes)=1409286144
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=0
        File Output Format Counters 
                Bytes Written=32
To see the results of the job — that is, to verify that the program actually counted the correct number of rows in the table — the Hadoop CLI is used to display the contents of the MapReduce results file located in HDFS. That is, type the following at the command line:
  > hadoop fs -cat /user/<username>/CountTableRows/vehicleTable/<000N>/part-r-00000
where <username> and <000N> are replaced with the appropriate string values that were used when the job was run. If the job was successful, then the output should look like:
  /type/make/model/class/color  79
where /type/make/model/class/color are the names of the fields making up the PrimaryKey of the vehicleTable; and 79 is the number of rows in the table.
Oracle NoSQL Database Examples
version 12cR1.3.3.4

Copyright (c) 2011, 2015 Oracle and/or its affiliates. All rights reserved.