MapReduce to implement Combiner in Hadoop - Java @ Desk

Thursday, January 1, 2015

MapReduce to implement Combiner in Hadoop

MapReduce to implement Combiner in Hadoop


Combiner
A Combiner is a local aggregation function for repeated keys produced by same map. For associative options like sum, count, max. Since the combiner decreases the size of intermediate data, hence it is an optimization, Hadoop does not guarantee of how many times it will call it for particular map output record.
Set the combiner in driver class by calling "job.setCombinerClass(Combiner.class)" method.

Program
We have climate data in the following format.
1950 55
1950 45
1940 40
1950 50
1950 55
1970 45
1950 40  so on.

Data is tab separated represent year and recorded temperature in that year. Now we have to find the maximum global temperature recorder in each year.
Both reducer and combiner are identical. So we could use a combiner function just like a reduce function, to find the Max temperature for each output. Eg:-
Input to mapper=  1950 42 ,1950 55,1960 55, 1950 43,1950 01
1st mapper emits 1950 42, 1950 55
2nd mapper emits 1950 43,1950 0

Then applying combiner to each mapper which emits 
MAX(1950 42, 1950 55)=(1950 55)
MAX(1950 43,1950 0) = (1950 43) then final input to reducer is (1950 ,[55,43])


Let’s first analyze the Driver class.
Here we are using "KeyValueText" Input Format.
MaxtemperatureDriver.java
public class MaxtemperatureDriver extends Configured implements Tool {

 @Override
 public int run(String[] args) throws Exception {
  if (args.length != 2) {
  System.out.printf("Usage: WordCountDriver <input dir> <output dir>\n");
  return -1;
 }

 //create job obj. Using getConf() method to obtain configuration obj.
 Job job = new Job(getConf());
 job.setJarByClass(MaxtemperatureDriver.class);
 job.setJobName("Word Count Driver");
 // Input or Output path
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job, new Path(args[1]));

 // specifying KeyValue Class
 job.setInputFormatClass(KeyValueTextInputFormat.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);

 //Setup MapReduce Job
 job.setMapperClass(MaxtemperatureMapper.class);
 job.setCombinerClass(SumCombiner.class);// Specify SumCombiner as the combiner class.
 job.setReducerClass(SumCombiner.class);

 if (job.getCombinerClass() == null) {
   throw new Exception("Combiner not set");
 }

 boolean success = job.waitForCompletion(true);
 return success ? 0 : 1;
 }

 public static void main(String[] args) throws Exception {
  int exitCode = ToolRunner.run(new Configuration(), new MaxtemperatureDriver(), args);
  System.exit(exitCode);
 }
}


MaxtemperatureMapper.java
public class MaxtemperatureMapper extends Mapper<Text, Text, Text, IntWritable> {

 /*
 * The map method runs once for each line of text in the input file.
 * The method receives a key of type LongWritable, a value of type
 * Text, and a Context object.
 */
 @Override
 public void map(Text key, Text value, Context context)
 throws IOException, InterruptedException {

  /*
  * Convert the line, which is received as a Text object,
  * to a String object.
  */
  String line = value.toString();

  /*
  * The line.split("\\W+") call uses regular expressions to split the
  * line up by non-word characters.
  * 
  * If you are not familiar with the use of regular expressions in
  * Java code, search the web for "Java Regex Tutorial." 
  */
  for (String word : line.split("\\W+")) {
   if (word.length() > 0) {
    /*
    * Call the write method on the Context object to emit a key
    * and a value from the map method.
    */
    context.write(new Text(key), new IntWritable(Integer.parseInt(word)));
   }
  }
 }
}


SumCombiner.java
public class SumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

 @Override
 public void reduce(Text key, Iterable<IntWritable> values, Context context)
   throws IOException, InterruptedException {
  int maxValue  =Integer.MIN_VALUE; 
  for (IntWritable value : values) {
   maxValue = Math.max(maxValue, value.get());
  }
  context.write(key, new IntWritable(maxValue));
 }
}


Run the file using below command
$ hadoop jar MaxTempCombiner.jar MaxtemperatureDriver InputPath OutputPath


To download source, click here

This post is written by
Shashank Rai - Linkedin, Google+
He is a freelance writer, loves to explore latest features in Java technology.








No comments:

Post a Comment