Combiner
A Combiner is a local aggregation function for repeated keys produced by same map. For associative options like sum, count, max. Since the combiner decreases the size of intermediate data, hence it is an optimization, Hadoop does not guarantee of how many times it will call it for particular map output record.
Set the combiner in driver class by calling "job.setCombinerClass(Combiner.class)" method.
Program
We have climate data in the following format.
1950 55 1950 45 1940 40 1950 50 1950 55 1970 45 1950 40 so on.
Data is tab separated represent year and recorded temperature in that year. Now we have to find the maximum global temperature recorder in each year.
Both reducer and combiner are identical. So we could use a combiner function just like a reduce function, to find the Max temperature for each output. Eg:-
Input to mapper= 1950 42 ,1950 55,1960 55, 1950 43,1950 01 1st mapper emits 1950 42, 1950 55 2nd mapper emits 1950 43,1950 0 Then applying combiner to each mapper which emits MAX(1950 42, 1950 55)=(1950 55) MAX(1950 43,1950 0) = (1950 43) then final input to reducer is (1950 ,[55,43])
Let’s first analyze the Driver class.
Here we are using "KeyValueText" Input Format.
MaxtemperatureDriver.java
public class MaxtemperatureDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.out.printf("Usage: WordCountDriver <input dir> <output dir>\n"); return -1; } //create job obj. Using getConf() method to obtain configuration obj. Job job = new Job(getConf()); job.setJarByClass(MaxtemperatureDriver.class); job.setJobName("Word Count Driver"); // Input or Output path FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // specifying KeyValue Class job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //Setup MapReduce Job job.setMapperClass(MaxtemperatureMapper.class); job.setCombinerClass(SumCombiner.class);// Specify SumCombiner as the combiner class. job.setReducerClass(SumCombiner.class); if (job.getCombinerClass() == null) { throw new Exception("Combiner not set"); } boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new MaxtemperatureDriver(), args); System.exit(exitCode); } }
MaxtemperatureMapper.java
public class MaxtemperatureMapper extends Mapper<Text, Text, Text, IntWritable> { /* * The map method runs once for each line of text in the input file. * The method receives a key of type LongWritable, a value of type * Text, and a Context object. */ @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { /* * Convert the line, which is received as a Text object, * to a String object. */ String line = value.toString(); /* * The line.split("\\W+") call uses regular expressions to split the * line up by non-word characters. * * If you are not familiar with the use of regular expressions in * Java code, search the web for "Java Regex Tutorial." */ for (String word : line.split("\\W+")) { if (word.length() > 0) { /* * Call the write method on the Context object to emit a key * and a value from the map method. */ context.write(new Text(key), new IntWritable(Integer.parseInt(word))); } } } }
SumCombiner.java
public class SumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue =Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
Run the file using below command
$ hadoop jar MaxTempCombiner.jar MaxtemperatureDriver InputPath OutputPath
To download source, click here
This post is written by
Shashank Rai - Linkedin, Google+
He is a freelance writer, loves to explore latest features in Java technology.
No comments:
Post a Comment