Fair Scheduler调度器同步心跳分配任务的过程简单来讲会经历以下环节:
1、 对map/reduce是否已经达到资源上限的循环判断
2、 对pool队列根据Fair算法排序
3、然后循环pool队列,在pool中的job队列根据Fair算法排序,循环job,选择task
4、如果选择到一个task,跳出pool的循环,然后重新对pool排序,重复步骤2
Fair Scheduler调度策略对于pool和job的调度算法都是一致的,先解释在调度算法中的几个重要变量:
minshare : Minimum share slots assigned to the schedulable 最小共享量,pool池的最小共享量为每个资源池需要分配的最小的map或reducer slots数,也就是配置文件中的minMaps或minReduces,Job的minShare定义为0
demand : This is defined as number of currently running tasks + number of unlaunched tasks (tasks that are either not yet launched or need to be speculated)。简单概括就是JOB的slot需求量,其结果通过(正在运行的Tasks数量 + 未运行的Tasks数量)计算得出,pool池的demand是池中所有Job的demand之和。FairScheduler会启动一个UpdateThread线程来定时更新Demand值,更新间隔可以通过mapred.fairscheduler.update.interval配置,未配置或默认情况下是2500毫秒
runningTasks: 正在运行的Tasks数量,pool池的runningTasks值等于之中所有job的runningTasks之和。
weight:权重。Pool的权重为配置文件中weight配置项,job的权重为和优先级相关,例如normal=1.0,high=2.0,very high=4.0等,但是job的权重在运行过程中还会重算,如果开启了mapred.fairscheduler.sizebasedweight配置项,那么weight会重新计算:
weight = Math.log1p(demand) / Math.log(2); weight *= getPriorityFactor(job.getPriority());
也就是说job权重会随着job的slot需求量的变化而变化。如果开启了mapred.fairscheduler.weightadjuster配置项,那么weight的计算还会得到重新计算,Weightadjuster还需要配合mapred.newjobweightbooster.factor和mapred.newjobweightbooster.duration两个配置项来使用,factor为权重因子,duration为权重期限,如果满足:【当前时间(currentTime)-jobStartTime < duration】也就是说还在权重调整期限内,那么weight = weight*factor。
Fair Scheduler核心思想就是为了让尽可能保证所有的作业都能够获得等量的资源份额,首先会考虑作业的资源亏欠度来选择作业,然后才是考虑优先级,所谓资源亏欠度大体上可以理解为所得到的资源和当前所需资源的比值,下面就详细介绍Fair Scheduler核心调度算法,本质上其实就是对pool或job进行一定规则的排序操作,过程如下:
1、 先计算job的minShare,minShare等于minShare和demand值两者取小
2、 判断runningTasks是否小于minShare,如果是的话,优先级高
3、 步骤2不满足的话,那么判断runningTasks/max(mindshare,1.0)的值,值小的优先
4、 如果步骤3中的runningTasks/max(mindshare,1.0)值一样,判断runningTasks/weight的值,前面讲到如果开启了sizebasedweight配置项,那么作业就会随着作业的运行过程不断的变化其weight权重值,对于大作业权重值的变化曲线会类似于一个抛物线。
5、 如果权重值还一致,最后对比startTime值。
欢迎加入Hadoop技术群进行交流:147681830