Hadoopエンジンを起動し、実行用にマッパー、リデューサおよびコンバイナR関数を送信します。まず、データをHDFSにロードする必要があります。
引数
処理するデータを含む、HDFS内のファイルの名前。ファイル名には、現在のパスに絶対的または相対的なパスを使用できます。
R言語で記述されたマッパー関数の名前。
R言語で記述されたリデューサ関数の名前(オプション)。
このリリースではサポートされません。
マッパー、リデューサまたはコンバイナ関数によって参照されている、現在のR環境からエクスポートされるRオブジェクトの名前(オプション)。
マッパー関数の開始前に一度実行される関数(オプション)。
リデューサ関数の完了後に一度実行される関数(オプション)。
MapReduceジョブの構成パラメータを設定します(オプション)。
この引数は、mapred.configクラスのインスタンスであるため、次の形式になります。
config = new("mapred.config", param1, param2,...)
このクラスの説明については、「ORCH mapred.configクラス」を参照してください。
使用上の注意
Oracle R Connector for Hadoopでは、マッパーのみのジョブはサポートされません。orch.keyvalsをリデューサの本文として使用します。orch.keyvalsの例を参照してください。
この関数は、hadoop.run関数よりもさらに正確にデータ・フローを制御します。複数のマッパーとリデューサをパイプラインでつなぐ場合、結果がHDFS内に保持されるため、hadoop.execを使用する必要があります。結果はHDFSファイルに格納されます。
例
このサンプル・スクリプトでは、hdfs.attachを使用して、ontime_Rという小規模なサンプル・データ・ファイルのオブジェクト識別子をHDFSで取得します。
MapReduc関数は、サンフランシスコ国際空港(SFO)に到着する定時運行数をカウントします。
dfs <- hdfs.attach('ontime_R')
res <- NULL
res <- hadoop.exec(
dfs,
mapper = function(key, ontime) {
if (key == 'SFO') {
keyval(key, ontime)
}
},
reducer = function(key, vals) {
sumAD <- 0
count <- 0
for (x in vals) {
if (!is.na(x$ARRDELAY)) {sumAD <- sumAD + x$ARRDELAY; count <- count + 1}
}
res <- sumAD / count
keyval(key, res)
}
)
スクリプトの実行後、res変数は、/user/oracle/xq/orch3d0b8218というHDFSファイルで、結果の場所を特定します。
R> res [1] "/user/oracle/xq/orch3d0b8218" attr(,"dfs.id") [1] TRUE R> print(hdfs.get(res)) val1 val2 1 SFO 27.05804
このコード・フラグメントはexample-kmeans.Rから抽出されます。エクスポート・オプションでは、HDFSファイルからエクスポートされるncenters生成データセットの場所を特定します。configオプションでは、k-means.1のMapReduceジョブ名と、データ・フレームのマッパー出力形式が提供されます。
mapf <- data.frame(key=0, val1=0, val2=0)
dfs.points <- hdfs.put(points)
dfs.centers <- hadoop.exec(
dfs.id = dfs.points,
mapper = function(k,v) {
keyval(sample(1:ncenters,1), v)
},
reducer = function(k,vv) {
vv <- sapply(vv, unlist)
keyval(NULL, c(mean(vv[1,]), mean(vv[2,])))
},
export = orch.export(ncenters),
config = new("mapred.config",
job.name = "k-means.1",
map.output = mapf)