Wednesday, May 2, 2012

Hadoop Demo

Create MapReduce application:
  a. specify input and output locations
      Input/Output => <key,value> 
      key and value should implement "Writable",  and key should implement "WritableComparable".
  b. specify map and reduce functions
  c. job client submit the job and configuration to JobTracker.

Example1: WordCount
  public class WordCount{
         public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable{     
                private final static IntWritable one = new IntWritable(1); //value
                private Text word = new Text();//use word as key
                public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{
                          String line = value.toString();
                          StringTokenizer tokenizer = new StringTokenizer(line);
                          while(tokenizer.hasMoreTokens()){
                                word.set(tokenizer.nextToken()); //set key
                                output.collect(word,one); //value will always be one
                          }
               }
        } 
        public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>{
               public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{
                         int sum=0
                         //for values with the same key, sum up the value (1) and output <key, sum>
                         while(values.hasNext()){
                                 sum+=values.next().get(); 
                         }
                         output.collect(key,new IntWritable(sum));
               }
        }

        public static void main(String[] args) throws Exception{
              JobConf conf = new JobConf(WordCount.class);
              conf.setJobName("wordcount");
              conf.setOutputKeyClass(Text.class); //word
              conf.setOutputValueClass(IntWritable.class);//sum
              conf.setMapperClass(Map.class);
              //conf.setOutputKeyComparatorClass(Comparator.class));  //control the grouping of output
              conf.setCombinerClass(Reduce.class);
              conf.setReducerClass(Reduce.class);
              conf.setInputFormat(TextInputFormat.class);//textfile
              conf.setOutputFormat(TextOutputFormat.class);//textfile
              FileInputFormat.setInputPaths(conf, new Path(args[0]));//location of input file
              FileOutputFormat.setOutputPath(conf, new Path(args[1]));//location of output file
              JobClient.run(conf);
        }
   }


No comments:

Post a Comment