SPARK的MAster资源调度原理(源码)分析

时间:2022-12-03 19:31:42

SPARK的MAster资源分配算法(SPARK1.3)

master资调度通过源码中的 org.apache.spark.deploy.master包下的schedule()方法实现

步骤如下:

  1. 首先判断master是否是alive状态,如果不是alive则返回,也就是只有活动的master才会进行资源调度,standby master是不会进行资源调度的

  2. 把之前注册的worker中的alive状态的worker传入 Random.shuffer方法,该方法主要是把worker顺序打乱,返回一个数组

  3. 获取返回的worker的数量

  4. 用for循环进行driver调度,只有启用yarn-cluster模式提交application才会进行driver调度,因为yarn-client和 standalone模式都是在提交的客户端启动driver,不需要调度

  5. for循环遍历WaittingDrivers ArrayBuffer,里面用while循环判断如果有alive的worker没有遍历,并且driver为为启动状态,则继续遍历

  6. 如果这个worker的内存>=driver需要的内存并且CPU>=driver需要的CPU,则启动driver,将driver从WaittingDrivers队列中移除

  7. 启动driver的方法为launchDriver,将driver加入worker的内部缓存,将worker剩余的内存、CPU减去driver需要的内存、CPU,worker也被加入到driver缓存结构中,然后调用worker的actor方法,给worker发送LaunchDriver消息,让它把driver启动起来,然后将driver状态改为RUNNING

  8. driver启动后,进行application的调度,这里有两个算法,spreadOutApps和非spreadOutApps算法,这个在代码的SparkConf里可以设置, ("spark.deploy.spreadOut", true),默认是为true,启用spreadoutApps

  9. for遍历WaittingApps中的application,并且用if守卫过滤出还需要进行CPU分配的application,for循环里再次过滤状态为alive并且可以被application使用的worker,然后按照其剩余的CPU数量倒序排序(可以被application使用的worker必须是可用内存大于等于application最小executor需要的需要的内存,并且没有被application启用过)

  10. 把需要分配的application数量放入一个数组,然后获取最终需要分配的CPU数量=application需要分配的CPU和worker总CPU的最小值

  11. while遍历worker,如果worker还有可分配的CPU,将总的需要分配的CPU-1,给这个worker分配的CPU+1,指针移到下一个CPU。循环一直到CPU分配完,这种分配算法的结果是application的CPU尽可能的平均分配到了各个worker上,应用程序尽可能多的运行在所有的Node上

  12. 给worker分配完CPU后,遍历分配到CPU的worker,在每个application内部缓存结构中,添加executor,创建executorDSC对象,其中封装了给这个executor分配多少 CPU core,然后在worker上启动executor,将application状态改为RUNNING

  13. 如果是非spreadOutApps算法,刚好相反,先把每个worker的CPU全部分配完,在分配下一个worker的CPU,

    以下是核心源码:


  private def schedule() {

    /*

     * 首先判断 master是否是alive状态

     */

    if (state != RecoveryState.ALIVE) { return }


    // First schedule drivers, they take strict precedence over applications

    // Randomization helps balance drivers

    //把alive状态的worker(之前注册的)传入Random.shuffle方法,把worker随机打乱

    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

    //获取当前可用worker的数量(随机打乱后)

    val numWorkersAlive = shuffledAliveWorkers.size

    var curPos = 0

    /*

     * driver的调度机制,遍历waitingDrivers这个ArrayBuffer

     * 只有用 yarn-cluster模式提交的时候,才会注册driver,并导致driver被调度,因为standalone和yarn-client模式

     * 都会在本地启动driver,而不会注册,更不会调度

     */ 

    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers

      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we

      // start from the last worker that was assigned a driver, and continue onwards until we have

      // explored all alive workers.

      var launched = false

      var numWorkersVisited = 0

      //当还有alive的worker没有遍历,并且driver没有启动,则继续遍历worker

      while (numWorkersVisited < numWorkersAlive && !launched) {

        val worker = shuffledAliveWorkers(curPos)

        numWorkersVisited += 1

        //如果这个worker空闲内存>=driver需要的内存并且worker的空闲CPU>=driver需要的CPU

        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

          //启动driver

          launchDriver(worker, driver)

          //并且经driver从waitingDrivers队列中移除

          waitingDrivers -= driver

          launched = true

        }

        //指针指向下一个worker

        curPos = (curPos + 1) % numWorkersAlive

      }

    }


    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app

    // in the queue, then the second app, etc.

    /*

     * application的调度机制

     * 这里两种算法,可以在sparkconf设置,默认为true(spreadOutApps算法)

     */

    if (spreadOutApps) {

      // Try to spread out each app among all the nodes, until it has all its cores

      //遍历waitingApps 中的application,并且用if守卫过滤出还需要进行CPU分配的application

      for (app <- waitingApps if app.coresLeft > 0) {

        //再次过滤状态为alive并且可以被application使用的worker,然后按照其剩余的CPU数量倒序排序

        //可以被application使用的worker必须是可用内存大于application最小Executor需要的内存,并且没有被该application启用过

        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)

          .filter(canUse(app, _)).sortBy(_.coresFree).reverse

          //创建一个数组,存储需要分配的CPU数量

        val numUsable = usableWorkers.length

        val assigned = new Array[Int](numUsable) // Number of cores to give on each node

        //获取到底需要分配多少CPU,取application需要分配的CPU和worker总共CPU数量的最小值

        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

        var pos = 0

        while (toAssign > 0) { }

        // Now that we've decided how many cores to give on each node, let's actually give them

        //给worker分配完CPU后,遍历worker

        for (pos <- 0 until numUsable) {

          //只要worker分配到了CPU

          if (assigned(pos) > 0) {

            //首先在每个application内部缓存结构中,添加executor,

            //创建executorDSC对象,其中封装了给这个executor分配多少 CPU core

            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))

            //那么就在worker上启动Executor

            launchExecutor(usableWorkers(pos), exec)

            //将application状态设置为RUNNING

            app.state = ApplicationState.RUNNING

          }

        }

      }

    } else {

      // Pack each app into as few nodes as possible until we've assigned all its cores

      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {

        for (app <- waitingApps if app.coresLeft > 0) {

          if (canUse(app, worker)) {

            val coresToUse = math.min(worker.coresFree, app.coresLeft)

            if (coresToUse > 0) {

              val exec = app.addExecutor(worker, coresToUse)

              launchExecutor(worker, exec)

              app.state = ApplicationState.RUNNING

            }

          }

        }

      }

    }

  }


  //executor的启动

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {

    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)

    //将executor加入worker内部缓存

    worker.addExecutor(exec)

    //向worker发送LaunchExecutor消息

    worker.actor ! LaunchExecutor(masterUrl,

      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)

      //向executor对应的application发送ExecutorAdded消息

    exec.application.driver ! ExecutorAdded(

      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

  }


//Driver的启动

  def launchDriver(worker: WorkerInfo, driver: DriverInfo) {

    logInfo("Launching driver " + driver.id + " on worker " + worker.id)

    //将driver加入到worker的内部缓存结构

    //将worker剩余的内存、CPU减去driver使用的内存和CPU

    worker.addDriver(driver)

    //worker也被加入到driver内部缓存结构中

    driver.worker = Some(worker)

    //然后调用worker的actor方法,给worker发送LaunchDriver消息,让它把driver启动起来

    worker.actor ! LaunchDriver(driver.id, driver.desc)

    //将driver状态改为RUNNING

    driver.state = DriverState.RUNNING

  }