hadoop.exec

Starts the Hadoop engine and sends the mapper, reducer, and combiner R functions for execution. You must load the data into HDFS first.

Usage

hadoop.exec(
        dfs.id, 
        mapper, 
        reducer, 
        combiner, 
        export,
        init,
        final,
        job.name,
        config)

Arguments

dfs.id

The name of a file in HDFS containing data to be processed. The file name can include a path that is either absolute or relative to the current path.

mapper

Name of a mapper function written in the R language.

reducer

Name of a reducer function written in the R language (optional).

combiner

Not supported in this release.

export

Names of exported R objects from your current R environment that are referenced by any of the mapper, reducer, or combiner functions (optional).

init

A function that is executed once before the mapper function begins (optional).

final

A function that is executed once after the reducer function completes (optional).

job.name

A descriptive name that you can use to track the progress of the MapReduce job instead of the automatically generated job name (optional).

config

Sets the configuration parameters for the MapReduce job (optional).

This argument is an instance of the mapred.config class, and thus it has this format:

config = new("mapred.config", param1, param2,...)

See "ORCH mapred.config Class" for a description of this class.

Usage Notes

Oracle R Connector for Hadoop does not support mapper-only jobs. Use orch.keyvals as a reducer body. See the example in orch.keyvals.

This function provides more control of the data flow than the hadoop.run function. You must use hadoop.exec when chaining several mappers and reducers in a pipeline, because the data does not leave HDFS. The results are stored in HDFS files.

Return Value

Data object identifier in HDFS

Example

This sample script uses hdfs.attach to obtain the object identifier of a small, sample data file in HDFS named ontime_R.

The MapReduce function counts the number of on-time flights arriving in the San Francisco International Airport (SFO).

dfs <- hdfs.attach('ontime_R')
res <- NULL
res <- hadoop.exec(
    dfs,
    mapper = function(key, ontime) {
        if (key == 'SFO') {
            keyval(key, ontime)
        }
    },
    reducer = function(key, vals) {
        sumAD <- 0
        count <- 0
        for (x in vals) {
           if (!is.na(x$ARRDELAY)) {sumAD <- sumAD + x$ARRDELAY; count <- count + 1}
        }
        res <- sumAD / count
        keyval(key, res)
    }
)

After the script runs, the res variable identifies the location of the results in an HDFS file named /user/oracle/xq/orch3d0b8218:

R> res
[1] "/user/oracle/xq/orch3d0b8218"
attr(,"dfs.id")
[1] TRUE
R> print(hdfs.get(res))
 val1     val2
1 SFO 27.05804

This code fragment is extracted from example-kmeans.R. The export option identifies the location of the ncenters generated data set, which is exported as an HDFS file. The config options provide a MapReduce job name of k-means.1, and the mapper output format of a data frame.

 mapf <- data.frame(key=0, val1=0, val2=0)
    dfs.points <- hdfs.put(points)
    dfs.centers <- hadoop.exec(
        dfs.id = dfs.points,
        mapper = function(k,v) {
            keyval(sample(1:ncenters,1), v)
        },
        reducer = function(k,vv) {
            vv <- sapply(vv, unlist)
            keyval(NULL, c(mean(vv[1,]), mean(vv[2,])))
        },
        export = orch.export(ncenters),
        config = new("mapred.config", 
            job.name = "k-means.1",
            map.output = mapf)