mapreduce入门之wordcount注释详解

时间:2021-10-12 11:40:56

mapreduce版本:0.2.0之前

说明:  

  该注释为之前学习时找到的一篇,现在只是在入门以后对该注释做了一些修正以及添加。

  由于版本问题,该代码并没有在集群环境中运行,只将其做为理解mapreduce的参考吧。

  切记,该版本是0.2.0之前的版本,请分辨清楚!

正文:

  

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount
{
//Map类继承自MapReduceBase,并且实现了Mapper接口,此接口是一个规范类型.
//它有4种形式的参数,分别用来指定map的输入key、value值类型,输出key、value值类型
public static class Map
extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text(); //实现map方法,对输入值进行处理。(此处用来去掉空格)
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
} /*
//Reduce类也是继承自MapReduceBase的,需要实现Reducer接口。
//Reduce类以map的输出作为输入,因此Reduce的输入类型是<Text,Intwritable>。
//而Reduce的输出是单词和它的数目,因此,它的输出类型是<Text,IntWritable>。
//Reduce类也要实现reduce方法,在此方法中,reduce函数将输入的key值作为输出的key值,然后将获得多个value值加起来,作为输出的值。
*/
public static class Reduce
extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext())
{
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
} public static void main(String[] args) throws Exception
{ //1.用JobConf类对 MapReduce job进行初始化
JobConf conf = new JobConf(WordCount.class);
// 调用setJobName()方法命名这个Job
conf.setJobName("wordcount"); //setup2:设置Job输出结果<key,value>的中key和value数据类型,因为结果是<单词,个数>
//所以key设置为"Text"类型,相当于Java中String类型。
conf.setOutputKeyClass(Text.class);
//Value设置为"IntWritable",相当于Java中的int类型。
conf.setOutputValueClass(IntWritable.class); //setup3:指定job的MapReduce,以及combiner
//设置Job处理的Map(拆分)
conf.setMapperClass(Map.class);
//设置Job处理的Combiner(中间结果合并,这里用Reduce类来进行Map产生的中间结果合并,避免给网络数据传输产生压力。)
也可以不用设置(已默认)
conf.setCombinerClass(Reduce.class);
//设置Job处理的Reduce(合并)
conf.setReducerClass(Reduce.class); //指定输入输出路径,可在项目上右键->Run As->Run Configuration->arguments->program arguments中配置
即为main(String[] args)中String[] args赋值
//指定InputPaths
eg:hdfs://master:9000/input1/
FileInputFormat.setInputPaths(conf, new Path(args[0]));
//指定outputPaths
eg:hdfs://master:9000/input1/
FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf);
}
}