Implementazione dell'applicazione MapReduce

Implementare un'applicazione MapReduce.

  1. Creare src\main\java\org\apache\hadoop\examples\wordcount.java.
  2. Copiare e incollare nel nuovo file il seguente codice Java.
    package org.apache.hadoop.examples;
     
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
     
    /**
     * Simple Word count application that defines Mapper and Reducer classes that counts the words in a give input file
     */
    public class WordCount {
     
        public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{
     
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
     
        // map function is called for every line in the input file
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
             word.set(itr.nextToken());
     
            // Update count as 1 for every word read from input line
             context.write(word, one);
            }
        }
    }
     
    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
     
       // reduce function is called with list of values sent by map function
        public void reduce(Text key, Iterable<IntWritable> values,
                            Context context) throws IOException, InterruptedException {
    
            // perform necessary aggregate operation and write result to the context
    
            int sum = 0;
            for (IntWritable val : values) {
              sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
     
    /**
     * Driver code to provide the mapper & reducer function classes for Hadoop Map Reduce framework
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
  3. Chiude il file.

Creazione del file JAR dell'applicazione

Eseguire mvn clean package dalla cartella radice del progetto per creare il file JAR dell'applicazione nella cartella di destinazione. Ad esempio, wordcountjava-1.0-SNAPSHOT.jar.

mvn clean package

Questo comando pulisce tutti gli artifact di build precedenti, scarica tutte le dipendenze non ancora installate, quindi crea e crea un package per l'applicazione.

Al termine del comando, la directory wordcountjava/target contiene un file denominato wordcountjava-1.0-SNAPSHOT.jar.

Nota

Il file wordcountjava-1.0-SNAPSHOT.jar è un uberjar che contiene non solo il job WordCount, ma anche le dipendenze richieste dal job in runtime.

Esecuzione del file JAR dell'applicazione MapReduce

Copiare il file JAR in qualsiasi nodo. Ad esempio:

- yarn jar /tmp/wordcountjava-1.0-SNAPSHOT.jar org.apache.hadoop.examples.WordCount /example/data/input.txt /example/data/wordcountout

Parametri:

  • /example/data/input.txt: percorso HDFS per il file di testo di input
  • /example/data/wordcountout: percorso HDFS per il risultato (output del riduttore)

Output di esempio:

 >> hdfs dfs -cat /example/data/wordcountout/* 

          zeal    1
          zelus   1
          zenith  2
Nota

Il file wordcountjava-1.0-SNAPSHOT.jar è un file JAR grasso se creato tramite maven-shade-plugin. In alternativa, utilizzare l'opzione -libjars per fornire JAR aggiuntivi al percorso della classe di job. In un cluster sicuro sono necessari kinit e i criteri Ranger appropriati prima di sottomettere il job.