Hadoopエンジンを起動し、実行用にマッパー、リデューサおよびコンバイナR関数を送信します。データがまだHDFSに格納されていない場合、hadoop.run
は、まずデータをHDFSにコピーします。
使用方法
hadoop.run( data, mapper, reducer, combiner, export, init, final, job.name, config)
引数
R言語で記述されたマッパー関数の名前。
R言語で記述されたリデューサ関数の名前(オプション)。
このリリースではサポートされません。
エクスポートされるRオブジェクトの名前。
マッパー関数の開始前に一度実行される関数(オプション)。
リデューサ関数の完了後に一度実行される関数(オプション)。
MapReduceジョブの構成パラメータを設定します(オプション)。
この引数は、mapred.config
クラスのインスタンスであるため、次の形式になります。
config = new("mapred.config", param1, param2,...
このクラスの説明については、「ORCH mapred.configクラス」を参照してください。
使用上の注意
Oracle R Connector for Hadoopでは、マッパーのみのジョブはサポートされません。orch.keyvalsをリデューサの本文として使用します。orch.keyvals
の例を参照してください。
hadoop.run
関数は、HDFSから入力データのソースへ結果を返します。たとえば、HDFS入力データの結果はHDFSに保持され、ore.frame
入力データの結果はOracle Databaseにコピーされます。
戻り値
入力データと同じ形式のオブジェクト
関連項目
「hadoop.exec」
、「orch.dryrun」
、「orch.keyvals」
例
このサンプル・スクリプトでは、hdfs.attach
を使用して、ontime_Rという小規模なサンプル・データ・ファイルのオブジェクト識別子をHDFSで取得します。
MapReduc関数は、サンフランシスコ国際空港(SFO)に到着する定時運行数をカウントします。
dfs <- hdfs.attach('ontime_R') res <- NULL res <- hadoop.run( dfs, mapper = function(key, ontime) { if (key == 'SFO') { keyval(key, ontime) } }, reducer = function(key, vals) { sumAD <- 0 count <- 0 for (x in vals) { if (!is.na(x$ARRDELAY)) {sumAD <- sumAD + x$ARRDELAY; count <- count + 1} } res <- sumAD / count keyval(key, res) } )
スクリプトの実行後、結果の場所は、res
変数によって/user/oracle/xq/orch3d0b8218というHDFSファイルで特定されます。
R> res [1] "/user/oracle/xq/orch3d0b8218" attr(,"dfs.id") [1] TRUE R> print(hdfs.get(res)) val1 val2 1 SFO 27.05804
次に例を示します。
hadoop.run(x, mapper = function(k,v) { orch.keyval(k, v+1) # increment all values }, reducer = function(k, vv) { orch.keyvals(k, vv) # pass-through }