spark的task调度器(FAIR公平调度算法)

时间:2022-01-02 14:36:34

FAIR公平调度器

代码流程

在fair调度器实例生成时,与fifo的调度器生成时有些许的区别,首先看看fair调度器生成时,需要的流程,由类FairSchedulableBuilder进行的调度器的实现:

在使用fair的调度器时,schedulableBuilder的实现为FairSchedulableBuilder.

schedulableBuilder.buildPools()

 

接下来看看这个函数的内部实现:

override def buildPools() {

读取spark.scheduler.allocation.file配置项配置的fair调度器的配置文件路径.这个配置必须是xml配置文件.

如果这个配置没有配置,默认从class path中读取fairscheduler.xml配置文件.
  var is: Option[InputStream] = None
  try {
    is = Option {
      schedulerAllocFile.map { f =>
        new FileInputStream(f)
      }.getOrElse {
        Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
      }
    }
这里解析配置文件中的内容,解析文件过程(这个过程可能没有,没读取到配置文件),可以配置多个,这里会迭代生成:

1,先解析根路径pool,解析这个路径的name属性,得到这个属性的值为poolName.

2,根据pool路径下的子路径schedulingMode,并得到配置的text的值.默认值FIFO,

3,读取pool路径下的子路径minShare.并得到配置的text的值,默认为0.

4,读取pool路径下的子路径weight,并得到配置的text的值,默认值为1.

5,根据上面读取到的配置信息,再生成一个Pool实例,并添加到初始的Pool实例中,当成一个子的调度器(FIFO时,子调度器就是TaskSetManager)
    is.foreach { i => buildFairSchedulerPool(i) }
  } finally {

最后关闭文件的读取流.
    is.foreach(_.close())
  }

生成一个默认的Pool实例.

通过default为poolName,检查初始的Pool实例中是否存在这个名称的实例,如果没有,生成一个,并添加到初始的Pool实例中.默认的Pool实例中:

poolName=default,调度算法=FIFO,minShare=0,weight=1
  // finally create "default" pool
  buildDefaultPool()
}

 

在TaskSchedulerImpl进行submitTasks操作时,调用addTaskSetManager函数时:

下面看看在Fair的调度器中,在submitTasks时addTaskSetManager函数的实现流程.

override def addTaskSetManager(manager: Schedulableproperties: Properties) {

1,先取出默认的的default为poolName的调度器,
  var poolName = DEFAULT_POOL_NAME
  var parentPool = rootPool.getSchedulableByName(poolName)
  if (properties != null) {

如果传入的properties中,包含有spark.scheduler.pool配置时,得到配置的poolName的名称.

通过这个poolName得到对应的调度器,如果这个poolName对应的调度器实例不存在时,根据这个poolName生成一个FIFO的调度器.

如果需要一个任务中每一个action中对应的调度器都不相同时,在每个action操作前,可以通过如下的代码来实现对每个action前的poolName的修改:saprkContext.setLocalProperty.


    poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIESDEFAULT_POOL_NAME)
    parentPool = rootPool.getSchedulableByName(poolName)
    if (parentPool == null) {
      // we will create a new pool that user has configured in app
      // instead of being defined in xml file
      parentPool = new Pool(poolNameDEFAULT_SCHEDULING_MODE,
        DEFAULT_MINIMUM_SHAREDEFAULT_WEIGHT)
      rootPool.addSchedulable(parentPool)
      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
        poolNameDEFAULT_SCHEDULING_MODEDEFAULT_MINIMUM_SHAREDEFAULT_WEIGHT))
    }
  }

根据poolName得到的调度器实例(子实例通常是一个Pool实例),调用其addSchedulable把manager添加到调度器的队列中.
  parentPool.addSchedulable(manager)
  logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
}

 

接下来看看Fair调度器的算法如何实现对TaskSetManager的排序:

这个在task向executor通知执行时,进行的操作.会调用Pool实例中的getSortedTaskSetQueue函数,这个函数根据调度算法来得到一组需要进行调度的TaskSetManager.

使用的计算排序的算法FairSchedulingAlgorithm,下面看看这个代码的实现:

在fair的调度器中,传入的s1,s2这两个Schedulable的实例在第一次调用时为Pool实例,第二次递归调用getSortedTaskSetQueue函数时,s1,s2对应的实例才是TaskSetManager的实例.

override def comparator(s1: Schedulables2: Schedulable): Boolean = {

这里得到的minShare的值,默认为0,除非通过fair的配置文件进行了配置指定.
  val minShare1 = s1.minShare
  val minShare2 = s2.minShare

这到这两个调度器中正在运行的task的个数,如果是TaskSetManager时,就是taskSet中运行的task的个数,如果是Pool实例是表示是所有使用这个poolName的所有的TaskSetManager正在运行的task的个数.
  val runningTasks1 = s1.runningTasks
  val runningTasks2 = s2.runningTasks

 

这里的比较,只有在minShare在fair的配置文件中显示配置,同时大于正在运行的task的个数时,才会为true.
  val s1Needy = runningTasks1 < minShare1
  val s2Needy = runningTasks2 < minShare2

 

得到运行的task的个数针对于minShare的比重,
  val minShareRatio1 = runningTasks1.toDouble / math.max(minShare11.0).toDouble
  val minShareRatio2 = runningTasks2.toDouble / math.max(minShare21.0).toDouble

 

得到正在运行的task个数针对于pool的weight的比重.
  val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
  val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
  var compare: Int 0

这里首先根据正在运行的task的个数是否已经达到调度队列中最小的分片的个数来进行排序,如果s1中运行运行的个数小于s1的pool的配置的minShare,返回true,表示s1排序在前面.

如果s2中运行的task的个数小于s2的pool中配置的minShare(最小分片数)的值,表示s1小于s2,这时s2排序应该靠前.
  if (s1Needy && !s2Needy) {
    return true
  else if (!s1Needy && s2Needy) {
    return false
  else if (s1Needy && s2Needy) {

这种情况表示s1与s2两个队列中,正在运行的task的个数都已经大于(不小于)了两个子调度器中配置的minShare的个数时,根据两个子调度器队列中正在运行的task的个数对应此调度器中最小分片的值所占的比重最小的一个排序更靠前.
    compare = minShareRatio1.compareTo(minShareRatio2)
  } else {

这种情况表示s1与s2两个子调度器的队列中,正在运行的task的个数都还没有达到配置的最小分片的个数的情况,比较两个队列中正在运行的task的个数对应调度器队列的weigth的占比,最小的一个排序更靠前.
    compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
  }

  if (compare < 0) {
    true
  else if (compare > 0) {
    false
  else {

如果两个根据上面的计算,排序值都相同,就看看这两个调度器的名称,按名称的字节序来排序了.
    s1.name < s2.name
  }
}