间Hadoop中map/reduce之WordCount实例——分解vs汇总一般的hadoop的编写,主要是编写Map和Reduce函数,也就是所谓的Map分解,Reduce汇总的过程,WordCount就是其典型。
3、程序示例及注释
package test;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MyWordCount {//这是自己写的一个WordCount和源代码中是一样的
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
//这种写法是新版本才有的,在hadoop-0.20-之后的写法
//以前的版本mapper是个接口,hadoop-0.20的时候,将其升为抽象类,更便于程序的抒写。
//显然这里的Mapper<Object,Text,Text,IntWritable>是范型,其实是
//Mapper<input_Key_Type,input_Value_Type,output_key_type,output_value_type>也就是借此规定map中用到的数据类型
//这几种类型除Object之外,其它是jdk中没有的,这是hadoop对它相应的jdk中数据类型的封装,
//这里的Text相当于jdk中的String,IntWritable相当于jdk的int类型,这样做的原因主要是为了hadoop的数据序化而做的。
private final static IntWritable one = new IntWritable(1);//声时一个IntWritable变量,作计数用,每出现一个key,给其一个value=1的值
private Text word = new Text(); //用来暂存map输出中的key值,Text类型的,故有此声明
public void map(Object key, Text value, Context context//这里就是map函数,也用到了范型,它是和Mapper抽象类中的相对应的,此 处//的Object key,Text value的类型和上边的Object,Text是相对应的,而且最好一 // 样,不然的话,多数情况运行时会报错。
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());//Hadoop读入的value是以行为单位的,其key为该行所对应的行号
//因为我们要计算每个单词的数目,默认以空格作为间隔,故用StringTokenizer辅助做一下字符串的拆分,也可以用string.split("")来作。
while (itr.hasMoreTokens()) {//遍历一下每行字符串中的单词,
word.set(itr.nextToken());//出现一个单词就给它设成一个key并将其值设为1
context.write(word, one);//输出设成的key/value值。
//以上就是打散的过程
}
}
}
public static class IntSumReducer //reduce所在的静态类
extends Reducer<Text,IntWritable,Text,IntWritable> {//这里和Map中的作用是一样的,设定输入/输出的值的类型
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {//由于map的打散,这里会得到如,{key,values}={"hello",{1,1,1,1,1,1,....}},这样的集合
sum += val.get(); //这里需要逐一将它们的value取出来予以相加,取得总的出现次数,即为汇和
}
result.set(sum); //将values的和取得,并设成result对应的值
context.write(key, result); //此时的key即为map打散之后输出的key,没有变化,变化的时result,以前得到的是一个数字的集合,此时已 经//给算出和了,并做为key/value输出。
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //取得系统的参数
if (args.length != 2) { //判断一下命令行输入路径/输出路径是否齐全,即是否为两个参数
System.err.println("Usage: wordcount <in> <out>");
System.exit(2); //若非两个参数,即退出
}
Job job = new Job(conf, "My Word Count,Ha Ha ~");//此程序的执行,在hadoop看来是一个Job,故进行初始化job操作
job.setJarByClass(MyWordCount.class); //可以认为成,此程序要执行MyWordCount.class这个字节码文件
job.setMapperClass(TokenizerMapper.class); //在这个job中,我用TokenizerMapper这个类的map函数
job.setReducerClass(IntSumReducer.class); //在这个job中,我用IntSumReducer这个类的reduce函数
job.setOutputKeyClass(Text.class); //在reduce的输出时,key的输出类型为Text
job.setOutputValueClass(IntWritable.class); //在reduce的输出时,value的输出类型为IntWritable
FileInputFormat.addInputPath(job, new Path(args[0]));//初始化要计算word的文件的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));//初始化要计算word的文件的之后的结果的输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1);
//这里就是真正的去提交job到hadoop上去执行了,意思是指如果这个job真正的执行完了则主函数退出了,若没有真正的执行完就退出了,
//则为非法退出
}
}