Hadoop - Find Number of Messages from every person in a conversation using MapReduce - Java @ Desk

Monday, November 24, 2014

Hadoop - Find Number of Messages from every person in a conversation using MapReduce

Find Number of Messages from every person in a conversation using MapReduce


Initially I have some data for example skype data i.e. some conversation between Mike and Anderson and let’s think they were discussing about hadoop technology.

For example: the text will be in the form of following






[03:04 pm] Mike:hi!!
[03:05 pm] Anderson:hello.
[03:06 pm] Mike:how are you?
[03:07 pm] Anderson:its great.doing well.it is very intresting working on hadoop.
[03:09 pm] Mike:hoo..what is this hadoop?
[03:10 pm] Mike:is it a technology or a platform?
[03:11 pm] Anderson:yes,this is a technology which used for bigdata processing and storage.
[03:13 pm] Mike:i think it's very good because now a days data is growing alot.      Forprocessing of this huge amount of data the previous technologies like  oracle and sql will not be worthy enough.
[03:14 pm] Mike:what are the sub projects it has Anderson?
[03:15 pm] Anderson:hadoop consists of MapReduce and HDFS and its environment includes     projects like hive,pig,sqoop,oziee,hbase and many more.
[03:16 pm] Mike:thank you Anderson for for the great information about hadoop.
[03:17 pm] Mike:bye,see you tomorrow.
[03:18 pm] Anderson:bye,take care.Now,I have some tasks on this data.


Task 1: To find the number of messages between Mike and Anderson

Explanation: For this task we are making "Mike" and "Anderson" as keys and incrementing its each appearance .Thus in result we will have number of messages from both the persons

package messages;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class NoOfMessages {
public static class MapForMessages extends Mapper<LongWritable,Text,Text,IntWritable>
 {
  public void map(LongWritable key, Text val, Context con) throws IOException,InterruptedException
  {
   String line = val.toString();
   String[] words = line.split("]");
   String a= words[2];
   String[] name=a.split(":");
   String x=name[1];
   con.write(new Text(x),new IntWritable(1));
        }
 }

public static class ReduceForMessages extends Reducer<Text,IntWritable,Text,IntWritable>
{
 public void reduce(Text x, Iterable<IntWritable> val, Context con) throws IOException, InterruptedException
 {
  int sum=0;
  for(IntWritable v : val)
   sum+=v.get();
  con.write(x, new IntWritable(sum));
 }
}

public static void main(String[] args) throws Exception
{
 Configuration c=new Configuration();
 String[] files = new GenericOptionsParser(c,args).getRemainingArgs();
 Path p1 = new Path(files[0]);
 Path p2 = new Path(files[1]);
 Job j = new Job(c,"NoOfMessages");
 j.setJarByClass(NoOfMessages.class);
 j.setMapperClass(MapForMessages.class);
 j.setCombinerClass(ReduceForMessages.class);
 j.setReducerClass(ReduceForMessages.class);
 j.setOutputKeyClass(Text.class);
 j.setOutputValueClass(IntWritable.class);
 FileInputFormat.addInputPath(j,p1);
 FileOutputFormat.setOutputPath(j,p2);
 j.waitForCompletion(true);
}
}


Sample output:
Mike 8
Anderson 5






No comments:

Post a Comment