Starts the Hadoop engine and sends the mapper, reducer, and combiner R functions for execution. You must load the data into HDFS first.
The name of a file in HDFS containing data to be processed. The file name can include a path that is either absolute or relative to the current path.
Name of a mapper function written in the R language.
Name of a reducer function written in the R language (optional).
Not supported in this release.
Names of exported R objects from your current R environment that are referenced by any of the mapper, reducer, or combiner functions (optional).
A function that is executed once before the mapper function begins (optional).
A function that is executed once after the reducer function completes (optional).
A descriptive name that you can use to track the progress of the MapReduce job instead of the automatically generated job name (optional).
Sets the configuration parameters for the MapReduce job (optional).
This argument is an instance of the mapred.config
class, and thus it has this format:
config = new("mapred.config", param1, param2,...)
See "ORCH mapred.config Class" for a description of this class.
Oracle R Connector for Hadoop does not support mapper-only jobs. Use orch.keyvals as a reducer body. See the example in orch.keyvals
.
This function provides more control of the data flow than the hadoop.run
function. You must use hadoop.exec
when chaining several mappers and reducers in a pipeline, because the data does not leave HDFS. The results are stored in HDFS files.
This sample script uses hdfs.attach
to obtain the object identifier of a small, sample data file in HDFS named ontime_R.
The MapReduce function counts the number of on-time flights arriving in the San Francisco International Airport (SFO).
dfs <- hdfs.attach('ontime_R') res <- NULL res <- hadoop.exec( dfs, mapper = function(key, ontime) { if (key == 'SFO') { keyval(key, ontime) } }, reducer = function(key, vals) { sumAD <- 0 count <- 0 for (x in vals) { if (!is.na(x$ARRDELAY)) {sumAD <- sumAD + x$ARRDELAY; count <- count + 1} } res <- sumAD / count keyval(key, res) } )
After the script runs, the res
variable identifies the location of the results in an HDFS file named /user/oracle/xq/orch3d0b8218:
R> res [1] "/user/oracle/xq/orch3d0b8218" attr(,"dfs.id") [1] TRUE R> print(hdfs.get(res)) val1 val2 1 SFO 27.05804
This code fragment is extracted from example-kmeans.R. The export option identifies the location of the ncenters
generated data set, which is exported as an HDFS file. The config
options provide a MapReduce job name of k-means.1
, and the mapper output format of a data frame.
mapf <- data.frame(key=0, val1=0, val2=0) dfs.points <- hdfs.put(points) dfs.centers <- hadoop.exec( dfs.id = dfs.points, mapper = function(k,v) { keyval(sample(1:ncenters,1), v) }, reducer = function(k,vv) { vv <- sapply(vv, unlist) keyval(NULL, c(mean(vv[1,]), mean(vv[2,]))) }, export = orch.export(ncenters), config = new("mapred.config", job.name = "k-means.1", map.output = mapf)