hadoop.run

Starts the Hadoop engine and sends the mapper, reducer, and combiner R functions for execution. If the data is not already stored in HDFS, then hadoop.run first copies the data there.

Usage

hadoop.run(
        data, 
        mapper, 
        reducer, 
        combiner, 
        export,
        init,
        final,
        job.name,
        config)

Arguments

data

A data frame, Oracle R Enterprise frame (ore.frame), or an HDFS file name.

mapper

The name of a mapper function written in the R language.

reducer

The name of a reducer function written in the R language (optional).

combiner

Not supported in this release.

export

The names of exported R objects.

init

A function that is executed once before the mapper function begins (optional).

final

A function that is executed once after the reducer function completes (optional).

job.name

A descriptive name that you can use to track the progress of the job instead of the automatically generated job name (optional).

config

Sets the configuration parameters for the MapReduce job (optional).

This argument is an instance of the mapred.config class, and so it has this format:

config = new("mapred.config", param1, param2,...

See "ORCH mapred.config Class" for a description of this class.

Usage Notes

Oracle R Connector for Hadoop does not support mapper-only jobs. Use orch.keyvals as a reducer body. See the example in orch.keyvals.

The hadoop.run function returns the results from HDFS to the source of the input data. For example, the results for HDFS input data are kept in HDFS, and the results for ore.frame input data are copied into an Oracle database.

Return Value

An object in the same format as the input data

Example

This sample script uses hdfs.attach to obtain the object identifier of a small, sample data file in HDFS named ontime_R.

The MapReduce function counts the number of on-time flights arriving in the San Francisco International Airport (SFO).

dfs <- hdfs.attach('ontime_R')
res <- NULL
res <- hadoop.run(
    dfs,
    mapper = function(key, ontime) {
        if (key == 'SFO') {
            keyval(key, ontime)
        }
    },
    reducer = function(key, vals) {
        sumAD <- 0
        count <- 0
        for (x in vals) {
           if (!is.na(x$ARRDELAY)) {sumAD <- sumAD + x$ARRDELAY; count <- count + 1}
        }
        res <- sumAD / count
        keyval(key, res)
    }
)

After the script runs, the location of the results is identified by the res variable, in an HDFS file named /user/oracle/xq/orch3d0b8218:

R> res
[1] "/user/oracle/xq/orch3d0b8218"
attr(,"dfs.id")
[1] TRUE
R> print(hdfs.get(res))
 val1     val2
1 SFO 27.05804

The next example shows o

hadoop.run(x,
    mapper = function(k,v) {
        orch.keyval(k, v+1) # increment all values
    },
    reducer = function(k, vv) {
        orch.keyvals(k, vv) # pass-through
    }