本文从一个初学者的角度出发,用通俗易懂的语言介绍Hadoop中MapReduce的工作原理。在介绍MapReduce工作原理前,本文先介绍HDFS的工作原理及架构,再介绍MapReduce的工作原理以及Shuffle的过程。
HDFS
HDFS是Hadoop的分布式文件系统,HDFS中的文件会默认存储3份,存储在不同的机器上,提供容错机制,副本丢失或者宕机的自动恢复。HDFS的架构如下图所示,总体上采用Master/Slave的架构。整个HDFS架构由Client、NameNode、Secondary NameNode和DataNode构成,NameNode负责存储整个集群的元数据信息,Client可以根据元数据信息找到对应的文件。DataNode负责数据的实际存储,当一个文件上传到HDFS的时候,DataNode会按照Block为基本单位分布在各个DataNode中,而且为了保护数据的一致性和容错性,一般一份数据会在不同的DataNode上默认存储三份。如下架构图所示,当Client端上传了一个文件,这个文件被分成了4个block存在4个不同的DataNode里头,每个block会默认的存储三份,分别存在不同的DataNode里。而NameNode中的Block Map则维护着每个block和DataNode的配置关系,这样,整个文件在DataNode中的存储结构就在NameNode中记录下来。SecondaryNameNode不是NameNode的热备份,主要是为了承担一部分NameNode中比较消耗内存的工作而设置。
MapReduce
MapReduce的工作过程分成两个阶段,map阶段和reduce阶段。每个阶段都有键值对作为输入输出,map函数和reduce函数的具体实现由程序员完成。MapReduce的框架和HDFS分布式文件系统是运行在一组相同的节点上,也就是说计算节点和存储节点通常在一起,这种配置就允许框架在那些已经存好数据的节点上高效的调度任务,所以一般情况下,map运行的节点通常与其所要处理的那个block数据的存储节点是同一个。MapReduce的框架也是采用Master/Slave的方式组织,如下图所示,也是由四部分组成,分别为Client、JobTracker、TaskTracker以及Task。JobTracker主要负责资源监控和作业调度。JobTracker监控TaskTracker是否存活,任务执行的状态以及资源的使用情况,并且把得到的信息交给TaskSceduler。TaskSceduler根据每个TaskTracker的情况给分配响应的任务,在Hadoop系统中,TaskSceduler是一个可插拔的模块,Hadoop的使用者可以根据自己的需求定制TaskSceduler,实现自己的目标。
TaskTracker会周期性的通过heartbeats向JobTracker发送资源的使用情况,任务的执行状况等信息,同时会接收JobTracker的指令,TaskTracker把自己可支配的资源分成若干个Slot,Task只有拿到一个Slot资源才能执行任务。
Task任务分成Map Task和Reduce Task两种任务,都是由TaskTracker进行调度的。Map 任务的数据来源于HDFS,而数据在HDFS中的存储方式是block,而在Map 任务中,处理的数据单元是Split,一般情况下,Split是一个逻辑概念,包括数据的起始位置,数据长度等信息,Split和block数据的对应关系可以由用户定义,Split的多少决定了Map Task的多少,一个Split只由一个Map Task进行处理。Map Task先把需要处理的Split转化成key/value对,然后调用用户自定义的map()函数,生成map的临时结果,分成partition存在本地磁盘中,Reduce Task会对每个partition的数据进行Reduce操作,Reduce的输出结果会输出到HDFS上。
MapReduce的技术核心在于Shuffle,Shuffle这个过程存在的最基本的目的在于减少在跨节点拉取数据的过程中的带宽开销以及IO的损耗,因为Map Task最终处理的结果会产生很多中间的临时文件,而且Map Task和Reduce Task很有可能不在同一个节点上执行,很多情况下Reduce Task执行的时候需要跨节点的拉取数据,所以必须使拉取的数据量尽量小,以免整个集群的资源大量消耗在IO中。Shuffle的过程横跨Map Task和Reduce Task。Shuffle在Map端的过程如下图所示。Map Task读取Split的数据之后,经过用户的Map()函数之后输出的是一个一个的键值对,假设这个MapReduce任务中有8个MapTask,3个Reduce Task,Map Task的输出最终由哪个Reduce来处理,可以通过Partition来实现。在MapReduce默认的实现中,Partition只是对Key进行哈希操作然后取模,默认取模的方式是为了平衡每个Reduce之间的任务量,如果用户对Partition有特殊的需求,也可以定制并设置到Job上。Partition的输出结果会被写入临时内存缓冲区,内存缓冲区的大小是有限制的,一般情况下默认为100MB,在达到缓冲区的临界值的情况下,需要将缓冲区的数据写入磁盘,才能再重新启用这个缓冲区。从内存往磁盘写数据的过程称为Spill(溢写)。在溢写操作启动后,需要溢写的这部分内存被锁定,被锁定的内存数据将会被记录到磁盘中去,而溢写的过程不会影响Map()的输出结果往缓冲区输入。在溢写操作过程中,数据写入磁盘前,会对数据进行一个二次快速排序,首先是对数据所属的Partition排序,然后对每个Partition中再按照Key排序。Sort的输出包括一个索引文件和数据文件。如果用户定义了Combiner,则会对Sort后的数据进行一个简单的Reduce,使得Map端得到的数据更加紧凑。每当内存数据达到Spill的临界值的时候,都会进行Spill操作,从而会产生多个Spill文件,在Map任务完成之前,所有的Spill文件都会被归并排序为一个索引文件和数据文件。
当Spill归并完毕之后,Map端删除所有的临时文件并且告知TaskTracker其Map任务已经完成,Reduce端可以开展工作,工作流程如下图所示。Reduce端一开始的主要工作是把Map端的输出结果拷贝,同样,Map端的输出结果通常会先输出到缓冲区中,如果缓冲区不足,会溢写到磁盘中,溢写到磁盘过程中,merge的操作一直在进行,如果Map端的输出结果计较小,没有填满缓冲区,这数据会直接从缓冲区直接进入Reduce操作,Reduce操作完成后,输出结果会直接放到HDFS上。