2、Reduce端shuffle
Reduce端的shuffle主要包括三个阶段,copy,sort(merge),reduce
Map的输出文件放置在运行MapTask的NodeManager的本地磁盘上,它是运行ReduceTask的TaskTracker所需要的输入数据,但是Reduce输出不是这样的,它一般写到HDFS中(Reduce阶段)。
Copy阶段
Reduce进程启动一些数据copy线程,通过HTTP方式请求MapTask所在的NodeManager以获取输出文件。
NodeManager需要为分区文件运行reduce任务。并且reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。而每个map任务的完成时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。
reduce任务有少量复制线程,因此能够并行取得map输出。默认线程数为5,但这个默认值可以通过mapreduce.reduce.shuffle.parallelcopies属性进行设置。
【Reducer如何知道自己应该处理哪些数据呢?】
因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。每个Reducer会处理一个或者多个partition。
【reducer如何知道要从哪台机器上去的map输出呢?】
map任务完成后,它们会使用心跳机制通知它们的application master、因此对于指定作业,application master知道map输出和主机位置之间的映射关系。reducer中的一个线程定期询问master以便获取map输出主机的位置。知道获得所有输出位置。
由于reducer可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们。相反,tasktracker会等待,直到jobtracker告知它可以删除map输出,这是作业完成后执行的。
1、由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据,因此map和reduce是交叉进行的,其实就是shuffle。Reduce任务通过HTTP向各个Map任务拖取(下载)它所需要的数据(网络传输),Reducer是如何知道要去哪些机器取数据呢?一旦map任务完成之后,就会通过常规心跳通知应用程序的Application Master。reduce的一个线程会周期性地向master询问,直到提取完所有数据(如何知道提取完?)数据被reduce提走之后,map机器不会立刻删除数据,这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。
2、reduce进程启动数据copy线程(Fetcher),通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,那到底同时到多少个Mapper下载数据??这个并行度是可以通过mapreduce.reduce.shuffle.parallelcopies(default5)调整。默认情况下,每个Reducer只会有5个map端并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,那么reduce也最多只能同时下载5个map的数据,所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。 在Reducer内存和网络都比较好的情况下,可以调大该参数;
3、reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等情况,这样reduce的下载就有可能失败,所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。reduce下载线程的这个最大的下载时间段是可以通过mapreduce.reduce.shuffle.read.timeout(default180000秒)调整的。如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。一般情况下都会调大这个参数,这是企业级最佳实战。
Merge阶段
中间输出结果的合并与溢出
将Map端复制过来的数据先放入内存缓冲区中
Merge有3种形式,分别是内存到内存,内存到磁盘,磁盘到磁盘。默认情况下第一种形式不启用,第二种Merge方式一直在运行(spill阶段)直到结束,然后启用第三种磁盘到磁盘的Merge方式生成最终的文件。(注意:为了合并,压缩的map输出都必须在内存中被解压缩)
如果map输出相当小,则会被复制到reduce任务的JVM内存,否则map输出被复制到磁盘。任务占用的缓存空间在堆栈空间中的占用比例由mapreduce.reduce.shuffle.input.buffer.percent属性控制(默认0.70)
一旦内存缓冲区达到缓存溢出到磁盘的阈值时(由mapreduce.reduce.shuffle.merge.percent决定,默认0.66),或达到Map任务在缓存溢出前能够保留在内存中的输出个数的阈值(由mapreduce.reduce.merge.inmem.threshold控制,默认1000),则合并后溢出写到磁盘中。如果指定combiner,则在合并期间运行它以降低写入硬盘的数据量。
随着磁盘上副本的增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。
复制完所有map输出后,reduce任务进入sort排序阶段(更恰当的说法是merge合并阶段,因为这个阶段的主要工作是执行了归并排序),循环进行归并排序(维持其顺序排序)。同时合并的文件流的数量由mapreduce.task.io.sort.factor属性决定(默认10)。
比如,如果有40个map输出,而合并因子是10(10为默认设置,由mapreduce.task.io.sort.factor属性设置,与map的合并类似),合并将进行4趟,最后有4个中间文件。其中每趟合并的文件数实际上并不是每次合并10个文件。这40个文件并不会在四趟中每趟合并10个文件从而得到4个文件。相反,第一趟只合并4个文件,随后的三趟合并完整的10个文件。在最后一趟中,4个已合并的文件和余下的6个(未合并)文件总共10个文件。目标是合并最小数量的文件来满足最后一趟的合并系数。
这并没有改变合并次数,它只是一个优化措施,目的是尽量减少写到磁盘的数据量,因为最后一趟总是直接将数据输入reduce函数中,从而省略了一次磁盘往返行程。
这里的merge和map端的merge动作类似,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。
这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源码里面写死了) 来设置,这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。JVM的heapsize的70%。内存到磁盘merge的启动门限可以通过mapreduce.reduce.shuffle.merge.percent(default0.66)配置。
也就是说,如果该reduce task的最大heap使用量(通常通过mapreduce.admin.reduce.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。默认情况下,reduce会使用其heapsize的70%来在内存中缓存数据。假设 mapreduce.reduce.shuffle.input.buffer.percent 为0.7,reducetask的max heapsize为1G,那么用来做下载数据缓存的内存就为大概700MB左右。这700M的内存,跟map端一样,也不是要等到全部写满才会往磁盘刷的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷(刷磁盘前会先做sortMerge)。
这个限度阈值也是可以通过参数 mapreduce.reduce.shuffle.merge.percent(default0.66)来设定。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。这种merge方式一直在运行,直到没有map端的数据时才结束,然后启动磁盘到磁盘的merge方式生成最终的那个文件。
merge的三种形式
1)内存到内存(memToMemMerger)
2)内存中Merge(inMemoryMerger)
3)磁盘上的Merge(onDiskMerger)具体包括两个:(一)Copy过程中磁盘合并(二)磁盘到磁盘。
(1)内存到内存Merge(memToMemMerger)
Hadoop定义了一种MemToMem合并,这种合并将内存中的map输出合并,然后再写入内存。这种合并默认关闭,可以通过mapreduce.reduce.merge.memtomem.enabled(default:false)打开,当map输出文件达到mapreduce.reduce.merge.memtomem.threshold时,触发这种合并。
(2)内存中Merge(inMemoryMerger)
当缓冲中数据达到配置的阈值时,这些数据在内存中被合并、写入机器磁盘。阈值有2种配置方式:
配置内存比例:前面提到reduceJVM堆内存的一部分用于存放来自map任务的输入,在这基础之上配置一个开始合并数据的比例。假设用于存放map输出的内存为500M,mapreduce.reduce.shuffle.merge.percent配置为0.66,则当内存中的数据达到330M的时候,会触发合并写入。
配置map输出数量: 通过mapreduce.reduce.merge.inmem.threshold配置。在合并的过程中,会对被合并的文件做全局的排序。如果作业配置了Combiner,则会运行combine函数,减少写入磁盘的数据量。
(3)磁盘上的Merge(onDiskMerger):
3.1 Copy过程中磁盘Merge:在copy过来的数据不断写入磁盘的过程中,一个后台线程会把这些文件合并为更大的、有序的文件。如果map的输出结果进行了压缩,则在合并过程中,需要在内存中解压后才能给进行合并。这里的合并只是为了减少最终合并的工作量,也就是在map输出还在拷贝时,就开始进行一部分合并工作。合并的过程一样会进行全局排序。
3.2 最终磁盘中Merge:当所有map输出都拷贝完毕之后,所有数据被最后合并成一个整体有序的文件,作为reduce任务的输入。这个合并过程是一轮一轮进行的,最后一轮的合并结果直接推送给reduce作为输入,节省了磁盘操作的一个来回。最后(所以map输出都拷贝到reduce之后)进行合并的map输出可能来自合并后写入磁盘的文件,也可能来及内存缓冲,在最后写入内存的map输出可能没有达到阈值触发合并,所以还留在内存中。
每一轮合并不一定合并平均数量的文件数,指导原则是使用整个合并过程中写入磁盘的数据量最小,为了达到这个目的,则需要最终的一轮合并中合并尽可能多的数据,因为最后一轮的数据直接作为reduce的输入,无需写入磁盘再读出。因此我们让最终的一轮合并的文件数达到最大,即合并因子的值,通过mapreduce.task.io.sort.factor(default:10)配置
Reduce阶段中一个Reduce过程可能的合并方式为:假设现在有20个map输出文件,合并因子配置为5,则需要4轮的合并。最终的一轮确保合并5个文件,其中包括2个来自前2轮的合并结果,因此原始的20个中,再留出3个给最终一轮。
reduce阶段
在reduce阶段,对已排序输出中的每个键都要调用reduce函数。此阶段的输出直接写到输出文件系统,一般为HDFS。如果采用HDFS,由于NodeManager也运行数据节点,所以第一个块副本将被写到本地磁盘。
1、当reduce将所有的map上对应自己partition的数据下载完成后,reducetask真正进入reduce函数的计算阶段。由于reduce计算时同样是需要内存作为buffer,可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代码MergeManagerImpl.java:674行)来设置reduce的缓存。
这个参数默认情况下为0,也就是说,reduce是全部从磁盘开始读处理数据。如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,可以分一部分内存用来缓存数据,可以提升计算的速度。所以默认情况下都是从磁盘读取数据,如果内存足够大的话,务必设置该参数让reduce直接从缓存读数据,这样做就有点Spark Cache的感觉。
2、Reduce在这个阶段,框架为已分组的输入数据中的每个键值对对调用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任务的输出通常是通过调用 OutputCollector.collect(WritableComparable,Writable)写入文件系统的。Reducer的输出是没有排序的。