Hadoop MapReduce解析

时间:2024-04-14 14:17:10

Hadoop MapReduce是一个用于处理大量数据的编程模型和一个相应的实现框架。MapReduce作业通常分为两个阶段:Map阶段和Reduce阶段。

Map阶段

在Map阶段,你编写的Map函数会对输入数据进行处理。每个输入数据片段(例如一行文本)都会被Map函数处理,并产生中间键值对。

以单词计数为例,如果输入数据是一句话,如 “hello world hello”,Map函数会产生以下中间键值对:

(hello, 1) (world, 1) (hello, 1) 

Reduce阶段

在Reduce阶段,所有Map阶段输出的中间键值对会被组合起来。具有相同键的值会一起处理。Reduce函数接收一个键和与该键相关的值的集合,然后合并这些值。

在单词计数的例子中,“hello” 这个键有两个值 “1”,Reduce函数将它们加起来,得到最终结果:

(hello, 2) (world, 1) 

Hadoop MapReduce的工作流程

  1. 输入:输入数据被分割成固定大小的片段,每个片段由一个Map任务处理。
  2. Map任务:每个Map任务读取输入片段,并执行Map函数,输出中间键值对。
  3. Shuffle和Sort:系统会自动将所有Map任务的输出按键排序,并将相同键的值发送到同一个Reduce任务。
  4. Reduce任务:每个Reduce任务处理一组具有相同键的值,执行Reduce函数,并输出最终结果。
  5. 输出:Reduce任务的输出被写入到文件系统(通常是HDFS)。

示例代码

以下是一个简单的MapReduce程序示例,该程序计算文本文件中各个单词的出现频率。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    // Map类
    public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    // Reduce类
    public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
                           ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    // 主方法,配置MapReduce作业
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

让我们对上面给出的WordCount MapReduce示例代码进行更加深入的分析:

主要组件

  • TokenizerMapper: 这是自定义的Mapper类,它继承自Hadoop的Mapper类。它的作用是读取文本数据,分割成单词,并为每个单词输出一个键值对,键是单词本身,值是整数1。
  • IntSumReducer: 这是自定义的Reducer类,继承自Hadoop的Reducer类。它的作用是将Mapper输出的键值对中,相同键(单词)的值(出现次数)进行累加,得到每个单词的总出现次数。

Mapper内部逻辑
TokenizerMappermap方法接收输入数据,每次调用处理一行文本。map方法使用StringTokenizer将文本行分割成单词,并为每个单词输出一个(Text, IntWritable)键值对,其中Text是单词,IntWritable是数字1。

Reducer内部逻辑
IntSumReducerreduce方法接收所有具有相同键的值的集合(Iterable<IntWritable>),并对这些值进行累加,得到这个键(单词)的总出现次数。之后,它输出一个(Text, IntWritable)键值对,其中Text是单词,IntWritable是该单词的总出现次数。

Job配置
main方法中配置了MapReduce作业:

  • setJarByClass: 设置作业的JAR文件,这样Hadoop可以将其发送到集群中的节点上执行。
  • setMapperClass: 设置Map阶段使用的Mapper类。
  • setCombinerClass: 设置Combiner类,用于在Map阶段之后和Reduce阶段之前对输出进行局部合并。在这个例子中,它和Reducer类是相同的,因为单词计数的合并操作是可交换的和可结合的。
  • setReducerClass: 设置Reduce阶段使用的Reducer类。
  • setOutputKeyClass: 设置作业输出的键类型。
  • setOutputValueClass: 设置作业输出的值类型。
  • FileInputFormat.addInputPath: 设置输入数据的路径。
  • FileOutputFormat.setOutputPath: 设置输出数据的路径。
    在MapReduce中,Combiner的作用是对Map阶段的输出进行局部合并,以减少网络传输的数据量。Reducer的作用是对所有Map任务的输出进行全局合并,得到最终结果。
    WordCount的例子中,Combiner和Reducer可以是同一个类,因为它们执行的操作是相同的:简单的整数求和。这个操作是可结合的(combine-able)和可交换的(commutative),意味着无论操作的顺序如何,或者如何分组执行这些操作,最终结果都是一样的。

运行作业
job.waitForCompletion(true)方法会提交作业到Hadoop集群并等待其完成。它返回一个布尔值,表示作业是否成功完成。

输入和输出

  • 输入: MapReduce作业的输入通常是文本文件或其他文件格式,存储在HDFS上。在这个例子中,每一行文本都会被作为一个记录传递给一个TokenizerMapper实例。
  • 输出: 输出是处理过后的结果,通常也是存储在HDFS上的文本文件。在这个例子中,输出文件包含了单词和对应的出现次数。

注意事项

  • 数据类型: 在MapReduce中使用的键值对数据类型必须实现Writable接口,因此Hadoop提供了一些基本数据类型的Writable版本,例如IntWritable用于整数,Text用于字符串。
  • 性能优化: Combiner可以显著减少Map和Reduce之间数据传输量,因为它在Map端进行局部合并,减少了网络传输的数据。
  • 容错与可扩展性: MapReduce框架本身具有处理节点失败的能力。如果某个节点执行失败,作业会被重新调度到另一个节点上执行。

这个WordCount程序是学习MapReduce的一个经典入门示例,它展示了MapReduce编程模型的基本概念:分解任务、处理数据并在集群中并行执行。

要运行这个MapReduce作业,需要将代码打包成一个JAR文件,并将其提交到Hadoop集群上运行。命令行参数提供输入和输出路径。例如:

hadoop jar wordcount.jar WordCount /input/path /output/path

这个例子展示了MapReduce编程模型的基本概念,实际的MapReduce作业可能会更复杂,包括自定义数据类型、多个Map和Reduce阶段、数据排序和分区等。