spark on yarn 中的延迟调度(delay scheduler)

时间:2022-06-30 23:43:40

延迟调度算法思想十分简单,为了实现data locality(即该task所需数据就在其运行的机器上),会尽量将task分布到有其所需数据的机器或者jvm中去,如果机器或者jvm已被占用就进行延迟等待,直到该机器或者jvm可以运行该task或者超过等待时限则将task运行到其他机器上。

这个想法基于以下几点:

1.往往数据比程序要大得多,分布式上处理的数据都是GB为单位的,将程序放到数据所在机器去执行,大大减少网络传输时间。

2.在集群上面task一般都是运行时间较短的,即整个集群上面不断有task完成,释放其占用的资源,延迟调度的task能够有极大的机会获得分配。

总之,就是延迟调度节省的网络传输时间远远大于task等待花费的时间。


延迟调度的思想是相通的,本文讨论的是spark在yarn集群上的延迟调度情况,故分为两层,第一层是yarn的延迟调度,第二层则是spark内部的延迟调度。

1.yarn级别的Delay Scheduler

spark在yarn上面的Delay Scheduler其实就是觉得spark的executor分配在哪些NodeManager上面,这是由yarn根据application的输入文件而定。尽量将executor分布到有数据的NodeManager上。因为,在这一层上如果executor无法做到data locality,那么到了spark的级别分配task到executor的时候,更加无法实现data locality。

在yarn中配置yarn.scheduler.capacity.node-locality-delay配置延迟等待次数。(通常设置机架数量)。


2.spark内部Task的Delay Scheduler

这个级别的Delay Scheduler是面临的问题,是将task分到有数据的executor上去,上面已经说了,这一层次的Delay Scheduler依赖于yarn对executor的分配。另外,在运算过程中,有task 的Delay Scheduler是因为我们在spark中对数据进行了cache或者persist。在shuffle中是不用考虑Delay Scheduler的,因为shuffle中的read task 是需要去所有的write task的disk上拉取数据的,故也就不存在通过延迟调度来选择data locality的问题了。

在spark中会有3个配置项:

spark.locality.wait.process default 3000ms

spark.locality.wait.node default spark.locality.wait.process

spark.locality.wait.rack default spark.locality.wait.process

目前就还有最后一个问题,需要解决了,配置项该以什么标准进行配置?

在这篇论文中Delay Scheduling: A Simple Technique for AchievingLocality and Fairness in Cluster Scheduling  有一个详细的介绍,这里我直接给出公式:

spark on yarn 中的延迟调度(delay scheduler)

Job等待一次task实现data locality所花的最长时间 W= (D/S)*T=D/(L*M) * T

D是实现Data Locality,需要延迟等待的次数

M是本次计算用到的集群节点数

L为每个节点能用的core数量

S即为集群能用的总的core

N为本次job的task数量

R为文件的备份数量(HDFS默认为3)

λ为期望本次job达到的数据本地率

T为单个task运行所需要的时间。

通过上面两个公式,我们就能计算出yarn和spark中的延迟调度项如何配置了。

(D则为yarn配置的延迟等待次数,W则为spark中配置中的等待时间。)