shuffle的主要工作是从Map结束到Reduce开始之间的过程。shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle。
1.Map端的Shuffler
每个map task都有一个内存缓冲区(如上图中的buffer in memory默认为100MB),存储着map的输出数据(格式为Key-Value键值对),当缓冲区快满的时候,需要将缓冲区中的数据以一个临时文件的方式存放到磁盘上,溢写是单线程来完成的,他不影响往该缓存中写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent,这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。这里需要注意在写入磁盘之前,需要进行partition,sort,combine操作。随着数据不断地输出,溢写的文件越来越多,还要在磁盘上进行合并操作,最后合成一个溢出文件。
1)partition
每个输出数据要经过partition这个程序得到一个分区号。默认情况下,partition根据每一对Key-Value键值对的键的哈希值对reduce的个数取模得到一个分区号(hashcode%reduce个数)。partition的目的是把数据分配到一个reduce task中去。
2)sort
sort程序的目的是排序,排序的目标是map输出的数据。默认情况下,按照键值对的key的ASCII码值进行排序(也就是字典排序)。
3)combine
combine函数是将具有相同Key的多个Key-Value键值对合并成一个键值对,这里需要注意,若在map端使用combine操作,则首先需要在map中也调用group程序(在后面将会被介绍),因为在combine中需要用到相同的键。这样做的目的是为了减少网络传输
2.Reduce端的Shuffler
1)fetch
Reduce端按照各个Map端的partition程序得出的分区号进行抓取,抓取的数据同样存在于内存中。
2)sort
将Reduce抓到的数据再次进行排序,排序调用的还是Map端使用的Sort程序。
3)group
接下来进行的是分组,默认的分组模式是根据每个数据(Key-Value)的Key是否相同进行分组的。键相等就在同一个组中。每一组数据传给Reduce程序中(每组数据的特点都是相同的Key键)。
3)MapReduce的split大小
分片大小要趋向与一个HDFS的一个数据块的大小,默认是64MB,这里我们需要叙述一下为何要趋向与一个数据块的大小:
hadoop在存储输入数据的节点上运行map任务,可以获得最佳性能。这就是所谓的“数据本地化优化”,因为它无需使用宝贵的集群带宽资源。分片趋向于数据块大小的目的也是为了节省集群带宽资源,hdfs数据块的大小就等于存储在单个节点上的最大传输输入块的大小。若分片大于HDFS数据块(跨越两个数据块),对于任何一个节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务节点,这样就要消耗集群带宽资源。
split大小是根据以下算法确定的:
- max.split(100M)
- min.split(10M)
- block(64M)
- max(min.split,min(max.split,block))(任何一个split都不能大于block)