MapReduce之如何处理失败的task

时间:2022-12-21 02:16:59

一 常见容错场景分析

1.1作业某个任务阻塞了,长时间占用资源不释放

1.2在MapTask任务运行完毕,ReduceTask运行过程中,某个MapTask节点挂了,或者某个MapTask结果存放的那磁盘坏掉了

 

二 作业某个任务阻塞了,长时间占用资源不释放

这种问题通常是由于程序bug,数据特性造成的,会让程序阻塞,任务运行停滞不前。在我们表面上看来是任务停滞不前。

 

这种问题经常发生,任务长时间占用着资源不释放,如果不采取一定手段,可能会永远被占用,造成资源泄露。

 

在TaskTracker,每个任务会定期向TaskTracker汇报进度,如果进度不变则不汇报,这样一旦达到超时限制,TaskTracker会杀掉该任务,并将任务状态KILLED汇报给YARN,从而重新调度该任务。

 

在实际应用场景中,有些正常的作业,其任务可能长时间没有读入或者输出,比如读取数据库的MapTask或者需要连接其他外部系统的Task,对于这类应用,在编写Mapper或Reducer时,应当启动一个额外的线程通过Reporter组件定期向TaskTracker汇报心跳(只是告诉TaskTracker自己还活着,不要把我杀了)。

 

三 在MapTask任务运行完毕,ReduceTask运行过程中,某个MapTask节点挂了,或者某个MapTask结果存放的那磁盘坏掉了

Case1:如果节点挂掉,JobTracker通过心跳机制知道TaskTracker死掉了,会重新调度之前正在运行的Task和正在运行的作业中已经运行完成的MapTask

 

Case2:如果节点没有挂,只是存放MapTask结果的磁盘损坏了,则分两种情况

#所有的ReduceTask已经完成shuffle阶段

#尚有部分ReduceTask没有完成shuffle阶段,需要读取该MapTask任务

 

对于第一种情况,如果所有ReduceTask一路顺风地运行下去,则无需对已经运行完成的MapTask作任何处理,如果某些ReduceTask一段时间后运行失败了,则处理方式与第二种一样。

 

对于第二种情况,当ReduceTask远程读取那个已经运行完成的MapTask结果(但结果已经损坏)时,会尝试读取若干次,如果尝试次数超过了某个上限值,则会通过RPC告诉所在的TaskTracker该MapTask结果已经损坏,而TaskTracker则进一步通过RPC告诉JobTracker,JobTracker收到该消息后,会重新调度该MapTask,进而重新计算生成结果。

问题一:在hadoop2.2也有个常见错误。输入60G的数据做waordcount(example)时,yarn在运行shuffle阶段时,container的物理内存溢出。

 

后来通过调整mapreduce.reduce.java.opts=-Xmx5000m任务才执行通过。也就是说shuffle4G内存

调整一下参数
mapreduce.reduce.shuffle.merge.percent=0.4
mapreduce.reduce.shuffle.parallelcopies=5
mapreduce.reduce.shuffle.input.buffer.percent=0.6
mapreduce.reduce.shuffle.memory.limit.percent=0.17
shuffle
任然要4G内存。感觉shuffle.memory.limit.percent没有起到作用

如果是物理内存溢出,需要调整mapreduce.reduce.memory.mb参数,默认是1024,如果是虚拟内存溢出,需要调整yarn.nodemanager.vmem-pmem-ratio,默认是2.1,调大即可,或者直接不限制虚拟内存,即yarn.nodemanager.vmem-check-enabled置为false(在yarn-site.xml中,默认是true),物理内存也可以不检查,将yarn.nodemanager.pmem-check-enabled置为false(默认是true)。

 

每次处理100G左右的数据,但是程序一运行,CPU使用率会达到100%,直到任务结束,内存使用不会达到最大。在NodeMangager机器上,使用jps命令查看,会发现有8个YarnChild进程,是否可以通过更改配置文件来减少YarnChild进程数量,来达到减少CPU占用。

 

有几种方法,一种是使用多租户资源调度器,比如fairschedulercapacity scheduler,配置多类型资源调度功能,这样,你可以使用参数mapreduce.map.cpu.vcoresmapreduce.reduce.cpu.vcores指定每个任务使用的CPU数目,默认是1个,另外,默认只支持内存调度,你可以通过参数mapreduce.map.memory.mbmapreduce.reduce.memory.mb增大任务的内存使用量,减少一个节点上的并发任务数目

 

 

Error:org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shufflein fetcher#1
        at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
        atorg.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
        atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
        atjava.security.AccessController.doPrivileged(Native Method)
        atjavax.security.auth.Subject.doAs(Subject.java:396)
        atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
        atorg.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.OutOfMemoryError: Java heap space
        atorg.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)
        atorg.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46)
        atorg.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
        atorg.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297)
        at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287)
        atorg.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
        atorg.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
        at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)

 

参数mapreduce.reduce.shuffle.input.buffer.percent控制运行reduce任务的机子上多少比例的内存用作上述buffer(默认值为0.70),参数mapreduce.reduce.shuffle.parallelcopies控制shuffle过程的并行度(默认值为5)
那么"mapreduce.reduce.shuffle.input.buffer.percent"* "mapreduce.reduce.shuffle.parallelcopies" 必须小于等于1,否则就会出现如上错误
因此,我将mapreduce.reduce.shuffle.input.buffer.percent设置成值为0.1,就可以正常运行了(设置成0.2,还是会抛同样的错)


另外,可以发现如果使用两个参数的默认值,那么两者乘积为3.5,大大大于1了,为什么没有经常抛出以上的错误呢?
1)首先,把默认值设为比较大,主要是基于性能考虑,将它们设为比较大,可以大大加快从map复制数据的速度

2)其次,要抛出如上异常,还需满足另外一个条件,就是map任务的数据一下子准备好了等待shuffle去复制,在这种情况下,就会导致shuffle过程的“线程数量”和“内存buffer使用量”都是满负荷的值,自然就造成了内存不足的错误;而如果map任务的数据是断断续续完成的,那么没有一个时刻shuffle过程的“线程数量”和“内存buffer使用量”是满负荷值的,自然也就不会抛出如上错误

 

另外,如果在设置以上参数后,还是出现错误,那么有可能是运行Reduce任务的进程的内存总量不足,可以通过mapred.child.Java.opts参数来调节,比如设置mapred.child.java.opts=-Xmx2024m

 

我也遇到了任务一直在pending状态,不能往下运行,经过几天的倒腾的,总算解决了现在把我的解决方法跟大家分享下,这几天在网上也查了很多资料,没有比较靠谱的回答因为我设置了yarn.nodemanager.resource.memory-mb这个的大小为1024MB
即每个节点上的内存大小为1024,但是我运行的wordcount需要的内存比我设置的要大,导致我的任务状态一直在pending状态中

如果你配置了yarn.nodemanager.resource.memory-mb这个配置项,你把值改大些,或者直接就用默认的然后再根据需要去调整

 

问题:发现每次reduce阶段跑到98%,相关的Container被杀,报出的log大概的意思:container600s未报告进度超时被杀

mapreduce程序里加上conf.set("mapred.task.timeout","0");

 //不检查超时

重新打包运行,最后没有出现错误,但是运行很长时间,总算运行完了,怀疑程序问题有大循环
最后检查reduce发现会有双层循环,有可能会非常大,导致卡在reduce内部,长时间没有进行任何的读写,也就没有汇报他的进度情况

总结

问题的原因起于对于悬挂的task,如果NMr在一段时间(默认是10min,可以通过mapred.task.timeout属性值来设置,单位是毫秒)内一直没有收到它的进度报告,则把它标记为失效