Hadoopエンジンを起動し、実行用にマッパー、リデューサおよびコンバイナR関数を送信します。まず、データをHDFSにロードする必要があります。
引数
処理するデータを含む、HDFS内のファイルの名前。ファイル名には、現在のパスに絶対的または相対的なパスを使用できます。
R言語で記述されたマッパー関数の名前。
R言語で記述されたリデューサ関数の名前(オプション)。
このリリースではサポートされません。
マッパー、リデューサまたはコンバイナ関数によって参照されている、現在のR環境からエクスポートされるRオブジェクトの名前(オプション)。
マッパー関数の開始前に一度実行される関数(オプション)。
リデューサ関数の完了後に一度実行される関数(オプション)。
MapReduceジョブの構成パラメータを設定します(オプション)。
この引数は、mapred.config
クラスのインスタンスであるため、次の形式になります。
config = new("mapred.config", param1, param2,...)
このクラスの説明については、「ORCH mapred.configクラス」を参照してください。
使用上の注意
Oracle R Advanced Analytics for Hadoopでは、マッパーのみのジョブはサポートされません。orch.keyvalsをリデューサの本文として使用します。orch.keyvals
の例を参照してください。
この関数は、hadoop.run
関数よりもさらに正確にデータ・フローを制御します。複数のマッパーとリデューサをパイプラインでつなぐ場合、結果がHDFS内に保持されるため、hadoop.exec
を使用する必要があります。結果はHDFSファイルに格納されます。
例
このサンプル・スクリプトでは、hdfs.attach
を使用して、ontime_Rという小規模なサンプル・データ・ファイルのオブジェクト識別子をHDFSで取得します。
MapReduc関数は、サンフランシスコ国際空港(SFO)に到着する定時運行数をカウントします。
dfs <- hdfs.attach('ontime_R') res <- NULL res <- hadoop.exec( dfs, mapper = function(key, ontime) { if (key == 'SFO') { keyval(key, ontime) } }, reducer = function(key, vals) { sumAD <- 0 count <- 0 for (x in vals) { if (!is.na(x$ARRDELAY)) {sumAD <- sumAD + x$ARRDELAY; count <- count + 1} } res <- sumAD / count keyval(key, res) } )
スクリプトの実行後、res
変数は、/user/oracle/xq/orch3d0b8218というHDFSファイルで、結果の場所を特定します。
R> res [1] "/user/oracle/xq/orch3d0b8218" attr(,"dfs.id") [1] TRUE R> print(hdfs.get(res)) val1 val2 1 SFO 27.05804
このコード・フラグメントはexample-kmeans.Rから抽出されます。エクスポート・オプションでは、HDFSファイルからエクスポートされるncenters
生成データセットの場所を特定します。config
オプションでは、k-means.1
のMapReduceジョブ名と、データ・フレームのマッパー出力形式が提供されます。
mapf <- data.frame(key=0, val1=0, val2=0) dfs.points <- hdfs.put(points) dfs.centers <- hadoop.exec( dfs.id = dfs.points, mapper = function(k,v) { keyval(sample(1:ncenters,1), v) }, reducer = function(k,vv) { vv <- sapply(vv, unlist) keyval(NULL, c(mean(vv[1,]), mean(vv[2,]))) }, export = orch.export(ncenters), config = new("mapred.config", job.name = "k-means.1", map.output = mapf)