别被官方文档迷惑了!这篇文章帮你详解yarn公平调度

时间:2021-11-16 14:37:33
**欢迎大家前往[腾讯云+社区](https://cloud.tencent.com/developer/?fromSource=waitui),获取更多腾讯海量技术实践干货哦~** > 本文由[@edwinhzhang](https://cloud.tencent.com/developer/user/2277860?fromSource=waitui)发表于[云+社区专栏](https://cloud.tencent.com/developer/column/1028?fromSource=waitui) FairScheduler是yarn常用的调度器,但是仅仅参考官方文档,有很多参数和概念文档里没有详细说明,但是这些参明显会影响到集群的正常运行。本文的主要目的是通过梳理代码将关键参数的功能理清楚。下面列出官方文档中常用的参数: | yarn.scheduler.fair.preemption.cluster-utilization-threshold | The utilization threshold after which preemption kicks in. The utilization is computed as the maximum ratio of usage to capacity among all resources. Defaults to 0.8f. | | ------------------------------------------------------------ | ------------------------------------------------------------ | | yarn.scheduler.fair.update-interval-ms | The interval at which to lock the scheduler and recalculate fair shares, recalculate demand, and check whether anything is due for preemption. Defaults to 500 ms. | | maxAMShare | limit the fraction of the queue’s fair share that can be used to run application masters. This property can only be used for leaf queues. For example, if set to 1.0f, then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0f will disable this feature and the amShare will not be checked. The default value is 0.5f. | | minSharePreemptionTimeout | number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. | | fairSharePreemptionTimeout | number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. | | fairSharePreemptionThreshold | If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. | 在上述参数描述中,timeout等参数值没有给出默认值,没有告知不设置会怎样。minShare,fairShare等概念也没有说清楚,很容易让人云里雾里。关于这些参数和概念的详细解释,在下面的分析中一一给出。 # FairScheduler整体结构 ![img](https://ask.qcloudimg.com/draft/2277860/j2c4zpkzoy.png?imageView2/2/w/1620) 图(1) FairScheduler 运行流程图 公平调度器的运行流程就是RM去启动FairScheduler,SchedulerDispatcher两个服务,这两个服务各自负责update线程,handle线程。 update线程有两个任务:(1)更新各个队列的资源(Instantaneous Fair Share),(2)判断各个leaf队列是否需要抢占资源(如果开启抢占功能) handle线程主要是处理一些事件响应,比如集群增加节点,队列增加APP,队列删除APP,APP更新container等。 # FairScheduler类图 ![img](https://ask.qcloudimg.com/draft/2277860/f8fgsyx2d6.png?imageView2/2/w/1620)图(2) FairScheduler相关类图 **队列继承模块**:yarn通过树形结构来管理队列。从管理资源角度来看,树的根节点root队列(FSParentQueue),非根节点(FSParentQueue),叶子节点(FSLeaf),app任务(FSAppAttempt,公平调度器角度的App)都是抽象的资源,它们都实现了Schedulable接口,都是一个**可调度资源对象**。它们都有自己的fair share(队列的资源量)方法(这里又用到了fair share概念),weight属性(权重)、minShare属性(最小资源量)、maxShare属性(最大资源量),priority属性(优先级)、resourceUsage属性(资源使用量属性)以及资源需求量属性(demand),同时也都实现了preemptContainer抢占资源的方法,assignContainer方法(为一个**ACCEPTED**的APP分配AM的container)。 ```js public interface Schedulable { /** * Name of job/queue, used for debugging as well as for breaking ties in * scheduling order deterministically. */ public String getName(); /** * Maximum number of resources required by this Schedulable. This is defined as * number of currently utilized resources + number of unlaunched resources (that * are either not yet launched or need to be speculated). */ public Resource getDemand(); /** Get the aggregate amount of resources consumed by the schedulable. */ public Resource getResourceUsage(); /** Minimum Resource share assigned to the schedulable. */ public Resource getMinShare(); /** Maximum Resource share assigned to the schedulable. */ public Resource getMaxShare(); /** Job/queue weight in fair sharing. */ public ResourceWeights getWeights(); /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/ public long getStartTime(); /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */ public Priority getPriority(); /** Refresh the Schedulable's demand and those of its children if any. */ public void updateDemand(); /** * Assign a container on this node if possible, and return the amount of * resources assigned. */ public Resource assignContainer(FSSchedulerNode node); /** * Preempt a container from this Schedulable if possible. */ public RMContainer preemptContainer(); /** Get the fair share assigned to this Schedulable. */ public Resource getFairShare(); /** Assign a fair share to this Schedulable. */ public void setFairShare(Resource fairShare); } ``` 队列运行模块:从类图角度描述公平调度的工作原理。SchedulerEventDispatcher类负责管理handle线程。FairScheduler类管理update线程,通过QueueManager获取所有队列信息。 我们从Instantaneous Fair Share 和Steady Fair Share 这两个yarn的基本概念开始进行代码分析。 # **Instantaneous Fair Share & Steady Fair Share** Fair Share指的都是Yarn根据每个队列的权重、最大,最小可运行资源计算的得到的可以分配给这个队列的最大可用资源。本文描述的是公平调度,公平调度的**默认策略FairSharePolicy**的规则是**single-resource**,即只关注内存资源这一项指标。 **Steady Fair Share**:是每个队列内存资源量的固定理论值。Steady Fair Share在RM初期工作后不再轻易改变,只有后续在增加节点(addNode)时才会重新计算。RM的初期工作也是handle线程把集群的每个节点添加到调度器中(addNode)。 **Instantaneous Fair Share**:是每个队列的内存资源量的实际值,是在动态变化的。yarn里的fair share如果没有专门指代,都是指的的Instantaneous Fair Share。 ## **1 Steady Fair Share计算方式** ![img](https://ask.qcloudimg.com/draft/2277860/w8bugr2sc7.png?imageView2/2/w/1620) 图(3) steady fair share 计算流程 handle线程如果接收到**NODE_ADDED**事件,会去调用addNode方法。 ```java private synchronized void addNode(RMNode node) { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); nodes.put(node.getNodeID(), schedulerNode); //将该节点的内存加入到集群总资源 Resources.addTo(clusterResource, schedulerNode.getTotalResource()); //更新available资源 updateRootQueueMetrics(); //更新一个container的最大分配,就是UI界面里的MAX(如果没有记错的话) updateMaximumAllocation(schedulerNode, true); //设置root队列的steadyFailr=clusterResource的总资源 queueMgr.getRootQueue().setSteadyFairShare(clusterResource); //重新计算SteadyShares queueMgr.getRootQueue().recomputeSteadyShares(); LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: " + clusterResource); } ``` recomputeSteadyShares 使用**广度优先遍历**计算每个队列的内存资源量,直到叶子节点。 ```js public void recomputeSteadyShares() { //广度遍历整个队列树 //此时getSteadyFairShare 为clusterResource policy.computeSteadyShares(childQueues, getSteadyFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare()); if (childQueue instanceof FSParentQueue) { ((FSParentQueue) childQueue).recomputeSteadyShares(); } } } ``` computeSteadyShares方法计算每个队列应该分配到的内存资源,总体来说是根据每个队列的权重值去分配,权重大的队列分配到的资源更多,权重小的队列分配到得资源少。但是实际的细节还会受到其他因素影响,是因为每队列有minResources和maxResources两个参数来限制资源的上下限。computeSteadyShares最终去调用computeSharesInternal方法。比如以下图为例: 图中的数字是权重,假如有600G的总资源,parent=300G,leaf1=300G,leaf2=210G,leaf3=70G。 ![img](https://ask.qcloudimg.com/draft/2277860/fwthyxmzxv.png?imageView2/2/w/1620)图(4) yarn队列权重 computeSharesInternal方法概括来说就是通过二分查找法寻找到一个资源比重值R(weight-to-slots),使用这个R为每个队列分配资源(在该方法里队列的类型是Schedulable,再次说明队列是一个资源对象),公式是**steadyFairShare=R \* QueueWeights**。 **computeSharesInternal是计算Steady Fair Share 和Instantaneous Fair Share共用的方法,根据参数isSteadyShare来区别计算。** 之所以要做的这么复杂,是因为队列不是单纯的按照比例来分配资源的(单纯按权重比例,需要maxR,minR都不设置。maxR的默认值是0x7fffffff,minR默认值是0)。如果设置了maxR,minR,按比例分到的资源小于minR,那么必须满足minR。按比例分到的资源大于maxR,那么必须满足maxR。因此想要找到一个R(weight-to-slots)来尽可能满足: - *R\*(Queue1Weights + Queue2Weights+...+QueueNWeights) <=totalResource* - *R\*QueueWeights >= minShare* - *R\*QueueWeights <= maxShare* **注:QueueNWeights为队列各自的权重,minShare和maxShare即各个队列的minResources和maxResources** computcomputeSharesInternal详细来说分为四个步骤: 1. 确定可用资源:*totalResources = min(totalResources-takenResources(fixedShare), totalMaxShare)* 2. 确定R上下限 3. 二分查找法逼近R 4. 使用R设置fair Share ```js private static void computeSharesInternal( Collection allSchedulables, Resource totalResources, ResourceType type, boolean isSteadyShare) { Collection