Hadoop MapReduce是一个用于处理大量数据的编程模型和一个相应的实现框架。MapReduce作业通常分为两个阶段:Map阶段和Reduce阶段。
Map阶段
在Map阶段,你编写的Map函数会对输入数据进行处理。每个输入数据片段(例如一行文本)都会被Map函数处理,并产生中间键值对。
以单词计数为例,如果输入数据是一句话,如 “hello world hello”,Map函数会产生以下中间键值对:
(hello, 1) (world, 1) (hello, 1)
Reduce阶段
在Reduce阶段,所有Map阶段输出的中间键值对会被组合起来。具有相同键的值会一起处理。Reduce函数接收一个键和与该键相关的值的集合,然后合并这些值。
在单词计数的例子中,“hello” 这个键有两个值 “1”,Reduce函数将它们加起来,得到最终结果:
(hello, 2) (world, 1)
Hadoop MapReduce的工作流程
- 输入:输入数据被分割成固定大小的片段,每个片段由一个Map任务处理。
- Map任务:每个Map任务读取输入片段,并执行Map函数,输出中间键值对。
- Shuffle和Sort:系统会自动将所有Map任务的输出按键排序,并将相同键的值发送到同一个Reduce任务。
- Reduce任务:每个Reduce任务处理一组具有相同键的值,执行Reduce函数,并输出最终结果。
- 输出:Reduce任务的输出被写入到文件系统(通常是HDFS)。
示例代码
以下是一个简单的MapReduce程序示例,该程序计算文本文件中各个单词的出现频率。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// Map类
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
// Reduce类
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
// 主方法,配置MapReduce作业
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
让我们对上面给出的WordCount MapReduce示例代码进行更加深入的分析:
主要组件
-
TokenizerMapper
: 这是自定义的Mapper类,它继承自Hadoop的Mapper
类。它的作用是读取文本数据,分割成单词,并为每个单词输出一个键值对,键是单词本身,值是整数1。 -
IntSumReducer
: 这是自定义的Reducer类,继承自Hadoop的Reducer
类。它的作用是将Mapper输出的键值对中,相同键(单词)的值(出现次数)进行累加,得到每个单词的总出现次数。
Mapper内部逻辑TokenizerMapper
的map
方法接收输入数据,每次调用处理一行文本。map
方法使用StringTokenizer
将文本行分割成单词,并为每个单词输出一个(Text, IntWritable)
键值对,其中Text
是单词,IntWritable
是数字1。
Reducer内部逻辑IntSumReducer
的reduce
方法接收所有具有相同键的值的集合(Iterable<IntWritable>
),并对这些值进行累加,得到这个键(单词)的总出现次数。之后,它输出一个(Text, IntWritable)
键值对,其中Text
是单词,IntWritable
是该单词的总出现次数。
Job配置main
方法中配置了MapReduce作业:
-
setJarByClass
: 设置作业的JAR文件,这样Hadoop可以将其发送到集群中的节点上执行。 -
setMapperClass
: 设置Map阶段使用的Mapper类。 -
setCombinerClass
: 设置Combiner类,用于在Map阶段之后和Reduce阶段之前对输出进行局部合并。在这个例子中,它和Reducer类是相同的,因为单词计数的合并操作是可交换的和可结合的。 -
setReducerClass
: 设置Reduce阶段使用的Reducer类。 -
setOutputKeyClass
: 设置作业输出的键类型。 -
setOutputValueClass
: 设置作业输出的值类型。 -
FileInputFormat.addInputPath
: 设置输入数据的路径。 -
FileOutputFormat.setOutputPath
: 设置输出数据的路径。
在MapReduce中,Combiner的作用是对Map阶段的输出进行局部合并,以减少网络传输的数据量。Reducer的作用是对所有Map任务的输出进行全局合并,得到最终结果。
WordCount的例子中,Combiner和Reducer可以是同一个类,因为它们执行的操作是相同的:简单的整数求和。这个操作是可结合的(combine-able)和可交换的(commutative),意味着无论操作的顺序如何,或者如何分组执行这些操作,最终结果都是一样的。
运行作业job.waitForCompletion(true)
方法会提交作业到Hadoop集群并等待其完成。它返回一个布尔值,表示作业是否成功完成。
输入和输出
- 输入: MapReduce作业的输入通常是文本文件或其他文件格式,存储在HDFS上。在这个例子中,每一行文本都会被作为一个记录传递给一个
TokenizerMapper
实例。 - 输出: 输出是处理过后的结果,通常也是存储在HDFS上的文本文件。在这个例子中,输出文件包含了单词和对应的出现次数。
注意事项
- 数据类型: 在MapReduce中使用的键值对数据类型必须实现
Writable
接口,因此Hadoop提供了一些基本数据类型的Writable
版本,例如IntWritable
用于整数,Text
用于字符串。 - 性能优化: Combiner可以显著减少Map和Reduce之间数据传输量,因为它在Map端进行局部合并,减少了网络传输的数据。
- 容错与可扩展性: MapReduce框架本身具有处理节点失败的能力。如果某个节点执行失败,作业会被重新调度到另一个节点上执行。
这个WordCount程序是学习MapReduce的一个经典入门示例,它展示了MapReduce编程模型的基本概念:分解任务、处理数据并在集群中并行执行。
要运行这个MapReduce作业,需要将代码打包成一个JAR文件,并将其提交到Hadoop集群上运行。命令行参数提供输入和输出路径。例如:
hadoop jar wordcount.jar WordCount /input/path /output/path
这个例子展示了MapReduce编程模型的基本概念,实际的MapReduce作业可能会更复杂,包括自定义数据类型、多个Map和Reduce阶段、数据排序和分区等。