1. 定义
shuffle是指map任务输出到reduce任务输入的过程。
2. 目的
在Hadoop集群中,大部分map任务与reduce任务在不同的节点执行。因此,reduce任务在执行时需要跨节点去获取map的输出结果。如果集群上有很多作业,那么网络资源消耗会很严重,需要最大化减少不必要的资源。另外,如果map的输出和reduce的输入只是简单的磁盘读写操作,那么磁盘IO时间将对作业完成时间产生较大影响,应该减少磁盘IO的影响。
所以,shuffle阶段的目的有两个:
<1>在跨节点获取map输出时,尽可能减少网络带宽不必要的消耗。
<2>优化内存使用,减少磁盘IO操作。
3. shuffle过程解析
图 1
3.1 Map输出
上图是官方给出的shuffle的流程图,但是上图中的“partition, sort amd spill to disk”过程并没有说明partition、sort和spill在哪个过程进行,难以理解。因此,我画了另外一张图,解释上图左半边的流程(map task),如图:
图 2
(1) Partition操作
map的输出结果是多个键值对(key/value),将由reduce合并,而集群中有多个reduce,该由哪个reduce负责处理这些键值对?答案是MapReduce提供了Partitioner接口。
Partitioner接口可以根据key或value、以及reduce的数量来决定当前的map输出结果由哪个reduce处理。(默认方式是,计算key的哈希值,再对reduce数量取模。比如,计算得到的值是0,则指定第一个reduce处理)
所以,得到map输出后,进行partition操作,从而指定某个reduce处理该输出结果。
(2) Spill操作
map输出时使用的内存缓冲区有大小限制,默认是100MB。当输出结果很多时,内存就会被填满,因此需要将缓冲区的数据写入磁盘,然后重新使用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill(溢出写)。
溢出写由单独一个线程负责,不影响往缓冲区写map结果的线程。所以在溢出写的过程中,map输出结果会继续被写入内存。当缓冲区的数据达到阈值(默认是80%,由属性 io.sort.spill.percent 设定),溢出写线程启动,对这80%的内存进行溢出写操作。
在溢出写把map结果写到磁盘之前,需要进行两个重要的操作:sort(排序)和combine(合并)。
sort是把这80%的map结果按照key排序。
另外,因为一个map的输出结果会由不同的reduce处理(不同的key通过partition操作计算出来的值不同),所以要将属于同一个reduce处理的输出结果进行combine操作。
(3)Merge操作
每次溢出写都会在磁盘上生成一个溢出写文件,如果map输出结果很大,那么将会生成多个溢出写文件。(当map任务完成时,会把内存缓冲区中最后的结果也写到一个溢出写文件中)。为了方便后续阶段reduce来获取输出结果,这些溢出写文件将会被合并成一个文件,这就是merge操作。
最后,合并后的溢出写文件被放在TaskTracker的本地目录下,map端的工作结束。
3.2 Reduce输入
reduce端不断通过RPC从JobTracker获取map任务是否完成的信息。如果reduce端得到通知,shuffle的后半段就开始了。
(1) copy阶段
reduce任务启动一些copy线程(默认值是5个线程,可设置mapred.reduce.parallel.copies属性),通过HTTP方式把TaskTracker目录下的map输出结果复制到内存缓冲区(这里缓冲区大小比map端灵活,是基于JVM的heap size设置的。因为在copy阶段不执行reduce操作,所以绝大部分内存都给copy线程使用)。当缓冲区中的数据达到阈值,就进行溢出写操作(与map端类似)。
(2) sort阶段
其实,这里的所说的sort更恰当的说是merge,因为排序是在map端进行的,而这个阶段的任务是合并来自多个map端的输出结果。比如,有50个map输出,而合并因子是10(由io.sort.factor属性设置),那么将进行5趟合并,每趟合并10个文件。最后合并成5个文件。
(3) reduce阶段
对sort阶段生成的文件执行reduce操作,把输出结果放到HDFS。