Hadoop中map/reduce之WordCount实例——分解vs汇总

时间:2022-12-04 21:19:33

Hadoop中map/reduce之WordCount实例——分解vs汇总一般的hadoop的编写,主要是编写Map和Reduce函数,也就是所谓的Map分解,Reduce汇总的过程,WordCount就是其典型。

3、程序示例及注释

 package test;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MyWordCount {//这是自己写的一个WordCount和源代码中是一样的

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{

  //这种写法是新版本才有的,在hadoop-0.20-之后的写法

 //以前的版本mapper是个接口,hadoop-0.20的时候,将其升为抽象类,更便于程序的抒写。

//显然这里的Mapper<Object,Text,Text,IntWritable>是范型,其实是

  //Mapper<input_Key_Type,input_Value_Type,output_key_type,output_value_type>也就是借此规定map中用到的数据类型

//这几种类型除Object之外,其它是jdk中没有的,这是hadoop对它相应的jdk中数据类型的封装,

//这里的Text相当于jdk中的String,IntWritable相当于jdk的int类型,这样做的原因主要是为了hadoop的数据序化而做的。  
    private final static IntWritable one = new IntWritable(1);//声时一个IntWritable变量,作计数用,每出现一个key,给其一个value=1的值
    private Text word = new Text();                                     //用来暂存map输出中的key值,Text类型的,故有此声明
    public void map(Object key, Text value, Context context//这里就是map函数,也用到了范型,它是和Mapper抽象类中的相对应的,此                                                                                       处//的Object key,Text value的类型和上边的Object,Text是相对应的,而且最好一                                                                                        //       样,不然的话,多数情况运行时会报错。
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());//Hadoop读入的value是以行为单位的,其key为该行所对应的行号

//因为我们要计算每个单词的数目,默认以空格作为间隔,故用StringTokenizer辅助做一下字符串的拆分,也可以用string.split("")来作。
      while (itr.hasMoreTokens()) {//遍历一下每行字符串中的单词,
        word.set(itr.nextToken());//出现一个单词就给它设成一个key并将其值设为1
        context.write(word, one);//输出设成的key/value值。

//以上就是打散的过程
      }
    }
  }
  public static class IntSumReducer                                    //reduce所在的静态类
       extends Reducer<Text,IntWritable,Text,IntWritable> {//这里和Map中的作用是一样的,设定输入/输出的值的类型
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {//由于map的打散,这里会得到如,{key,values}={"hello",{1,1,1,1,1,1,....}},这样的集合
        sum += val.get();               //这里需要逐一将它们的value取出来予以相加,取得总的出现次数,即为汇和
      }
      result.set(sum);                  //将values的和取得,并设成result对应的值
      context.write(key, result);  //此时的key即为map打散之后输出的key,没有变化,变化的时result,以前得到的是一个数字的集合,此时已                                              经//给算出和了,并做为key/value输出。
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();                      //取得系统的参数
    if (args.length != 2) {                                                      //判断一下命令行输入路径/输出路径是否齐全,即是否为两个参数
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);                                                            //若非两个参数,即退出
    }
    Job job = new Job(conf, "My Word Count,Ha Ha ~");//此程序的执行,在hadoop看来是一个Job,故进行初始化job操作
    job.setJarByClass(MyWordCount.class);                  //可以认为成,此程序要执行MyWordCount.class这个字节码文件
    job.setMapperClass(TokenizerMapper.class);          //在这个job中,我用TokenizerMapper这个类的map函数
    job.setReducerClass(IntSumReducer.class);            //在这个job中,我用IntSumReducer这个类的reduce函数
    job.setOutputKeyClass(Text.class);                           //在reduce的输出时,key的输出类型为Text
    job.setOutputValueClass(IntWritable.class);              //在reduce的输出时,value的输出类型为IntWritable
    FileInputFormat.addInputPath(job, new Path(args[0]));//初始化要计算word的文件的路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));//初始化要计算word的文件的之后的结果的输出路径
    System.exit(job.waitForCompletion(true) ? 0 : 1);          

//这里就是真正的去提交job到hadoop上去执行了,意思是指如果这个job真正的执行完了则主函数退出了,若没有真正的执行完就退出了,

//则为非法退出
  }
}