基本参数
要对你的MapReduce Task进行优化,首先我们需要了解一些基本的参数:
dfs.block.size, dfs.blocksize: 默认的HDFS文件系统的block大小,单位为byte。mapred.compress.map.output: map的输出是否压缩
mapred.map/reduce.tasks.speculative.execution: 推测执行的配置项,如果一个task的执行进度比总体的平均执行要慢,Job Tracker会启动一个新的任务(duplicate task),原有任务和新任务哪个先执行完就把另外一个kill掉,这也是我们经常在Job Tracker页面看到任务执行成功,但是总有些任务被kill,就是这个原因。
mapred.tasktracker.map/reduce.tasks.maximum:一个task tracker所能同时执行的最大map/reduce任务数
io.sort.mb:当Map任务运行时,所产生的中间数据并非直接写入disk,而是利用内存buffer来存储。而当buffer达到一定阈值,一个后台线程会被启动,并对buffer的内容进行排序并写入disk(一个spill文件)。该默认值是100M, 而大的集群可以设置为200M.
io.sort.factor: 当Map任务执行完之后,本地磁盘上一般会产生若干spill文件,而map task最后会对spill文件进行merge排序将其整合成一个文件(partition)。而当map task进行merge sort的时候,一次打开多少个spill文件则由该参数决定。
mapred.job.reuse.jvm.num.tasks: 默认每一个JVM只运行一个任务,使用JVM重用后一个JVM可以顺序执行多个任务,减少了启动时间。对于一个job,该参数表明可利用相同的JVM顺序执行多少个task。默认为1,该参数是客户端参数,修改不需要重启tasktracker,可以在提交job的shell或者代码中设置。
mapred.reduce.parallel.copies: Reduce阶段的copier线程数。Reduce一般来说分成三个阶段,copy, sort和reduce. Copy也等同于shuffle, 当job完成5%的map task数量之后reducer则启动,从不同的已经完成的mapper上下载属于自己的reduce部分数据,由于mapper的数量很多,对于reduce来说,可以并行下载。该参数默认值为5,而大型集群可调整为16-25.
工作流程
在了解以上主要参数后,我们来看看Map和Reduce的工作流程。顾名思义,MapReduce大体上分成Map Operation和Reduce Operation两大操作。而具体来说,Map Operation包含了:
- Map Processing: HDFS系统将输入文件进行分割,根据block的大小(dfs.block.size)。而输入文件数量则由mapred.min.split.size和mapred.max.split.size决定(如果你有进行定义的话)。输入文件将会被以key value数据对的形式读入,每一个key value数据对将会唤醒一个map函数,而产生的中间数据首先被写入内存的缓存。
- Spill:当缓存的读写超出了一个阈值之后(io.sort.spill.percent控制),一个后台线程将会被自动启动,将缓存里的内容写入磁盘,该部分内容以文件的形式存储,每一个文件称之为一个spill。Spill文件的写入方式为round-robin。写入磁盘的路径为由mapred.local.dir控制。Spill的写入和缓存的写入可以同时进行,每次缓存的写入超出阈值,那么一个新的spill文件就会产生。
- Partitioning:map task最后会对spill文件进行merge排序将其整合成一个文件(partition)。
- Sorting: 在数据写入内存的时候,Map task其实就已经根据key来对中间数据进行了排序。
- Merging: 当Map任务执行完之后,本地磁盘上一般会产生若干spill文件,而map task最后会对spill文件进行merge排序将其整合成一个文件(partition)。而当map task进行merge sort的时候,一次打开多少个spill文件则由io.sort.factor决定。
- Compression: Map任务所产生的输出文件在写入磁盘之前可以被压缩,这样可以节省一定的磁盘空间,但是当被压缩的输出文件在reduce之前需要被解压缩。是否对输出文件进行压缩由mapred.compress.map.output控制。而压缩函数的函数库由mapred.map.output.compression.codec控制。输出文件以HTTP协议传输至reducer,并行传输是可以被允许的,但传输线程数由tracker.http.threads控制。
Reduce Operation包括:
- Copy: Map任务的输出文件将会被copy到对应的reducer,reducer可以多线程copy数据,默认的最大线程数是5,由mapred.reduce.parallel.copies控制。类似于Map,Reducer也有一个内存缓存(大小由mapred.job.shuffle.input.buffer.percent控制,percent是针对于Java Heap大小),来自于map的输入文件先写入内存缓存,然后超出阈值之后才会被写入磁盘 (mapred.job.shuffle.merge.percent控制,或者超过总的map output的数量,由mapred.inmem.merge.threshold控制)。
- Merge: 由于在map任务执行的过程中,输出值已经被进行了排序,所以Reduce任务只需根据排序的结果进行merge。当所有的map任务结束和输出文件被copy到reducer之后,该任务启动。根据io.sort.factor的设置情况和map的输出文件,reduce会进行>=1轮的merge。举个例子,如果有40个map输出文件,而io.sort.factor设定为10,那么将会有四轮merge。第一轮,将会有4个输出文件进行排序(1个merge文件,30个未merge文件),接下来的三轮每轮将会有10个文件被merge。此时将会有4个文件,这4个文件将会直接进入Reduce阶段。
- Reduce: Reduce函数运行并将Merge过程所产生的剩余文件进行最终处理,把结果写入output文件夹(HDFS).
进行优化
清楚整个MapReduce的过程之后,我们就可以对MapReduce的过程进行优化:
- dfs.block.size: Hadoop默认block size是64M,对于一个160G的input文件来说,其产生的map数量为(160*1024)/64 = 2560maps。如果调整block大小为128M,那么map数量将会减少至1280. 对于小型集群(6-7个服务器),最好避免产生太多的map,所以根据实际情况,dfs.block.size不要太小,也不要太大而影响到整个集群的数据存储。dfs.block.size没有一个固定的设置,而应该根据集群的大小,map任务的复杂度以及平均输入文件的大小来调整。
- mapred.compress.map.output: 是否对map的输出文件进行压缩。如果设置成true,那么可以节省磁盘空间,加快数据写入时间,缩短数据传输时间。但是压缩数据需要时间,并且在Reduce阶段需要对数据进行解压缩。建议,对于大型集群,和大型的mapReduce任务,将该参数设置为true。
- mapred.map/reduce.tasks.speculative.execution: 推测执行的配置项,如果一个task的执行进度比总体的平均执行要慢,Job Tracker会启动一个新的任务(duplicate task),原有任务和新任务哪个先执行完就把另外一个kill掉。默认是true,建议是对于大型的mapReduce任务(每个task的执行时间较长),设置为false。
- mapred.tasktracker.map/reduce.tasks.maximum: 一个tasktracker所能执行的最大map/reduce任务数。默认为2,建议,根据集群的硬件配置进行设置,例如,一个8G内存+8core的服务器,可以设置为10,因为一个task所需求的最大memory为500M,而tasktracker和datanode以及其它程序所需的内存约为3G,所以(8-3)G/500MB=10.注意,每个task(JVM)的内存由mapred.child.java.opts控制,默认为200M.
- io.sort.mb: 当Map任务运行时,所产生的中间数据并非直接写入disk,而是利用内存buffer来存储。而当buffer达到一定阈值,一个后台线程会被启动,并对buffer的内容进行排序并写入disk(一个spill文件)。该默认值是100M, 而大的集群可以设置为200M.
- io.sort.factor:当Map任务执行完之后,本地磁盘上一般会产生若干spill文件,而map task最后会对spill文件进行merge排序将其整合成一个文件(partition)。而当map task进行merge sort的时候,一次打开多少个spill文件则由该参数决定。建议,对于大型mapReduce任务(map的输出文件很大),考虑增加此参数。
- mapred.job.reuse.jvm.num.tasks: 默认每一个JVM只运行一个任务,使用JVM重用后一个JVM可以顺序执行多个任务,减少了启动时间。对于一个job,该参数表明可利用相同的JVM顺序执行多少个task。默认为1,该参数是客户端参数,修改不需要重启tasktracker,可以在提交job的shell或者代码中设置。
- mapred.reduce.parallel.copies: Copy数据到Reducer时可以启动的最大数量的并行线程。
- Reducer数量: 通常reducer的数量可以根据这个公式:0.95 or 1.75 * (nodes * mapred.tasktracker.tasks.maximum)来确定。0.95表示reduce任务可以在map任务完成之后立即启动,而1.75表示高性能,速度快的服务器先进行reduce任务。其它系统参数的优化:
- Temporary space: 对于大型的任务,确保有足够的tmp空间保存中间数据。
- JVM大小: 对于datanode,至少分配1G的内存给TaskTracker和datanode。