SPARK的MAster资源分配算法(SPARK1.3)
master资调度通过源码中的 org.apache.spark.deploy.master包下的schedule()方法实现
步骤如下:
首先判断master是否是alive状态,如果不是alive则返回,也就是只有活动的master才会进行资源调度,standby master是不会进行资源调度的
把之前注册的worker中的alive状态的worker传入 Random.shuffer方法,该方法主要是把worker顺序打乱,返回一个数组
获取返回的worker的数量
用for循环进行driver调度,只有启用yarn-cluster模式提交application才会进行driver调度,因为yarn-client和 standalone模式都是在提交的客户端启动driver,不需要调度
for循环遍历WaittingDrivers ArrayBuffer,里面用while循环判断如果有alive的worker没有遍历,并且driver为为启动状态,则继续遍历
如果这个worker的内存>=driver需要的内存并且CPU>=driver需要的CPU,则启动driver,将driver从WaittingDrivers队列中移除
启动driver的方法为launchDriver,将driver加入worker的内部缓存,将worker剩余的内存、CPU减去driver需要的内存、CPU,worker也被加入到driver缓存结构中,然后调用worker的actor方法,给worker发送LaunchDriver消息,让它把driver启动起来,然后将driver状态改为RUNNING
driver启动后,进行application的调度,这里有两个算法,spreadOutApps和非spreadOutApps算法,这个在代码的SparkConf里可以设置, ("spark.deploy.spreadOut", true),默认是为true,启用spreadoutApps
for遍历WaittingApps中的application,并且用if守卫过滤出还需要进行CPU分配的application,for循环里再次过滤状态为alive并且可以被application使用的worker,然后按照其剩余的CPU数量倒序排序(可以被application使用的worker必须是可用内存大于等于application最小executor需要的需要的内存,并且没有被application启用过)
把需要分配的application数量放入一个数组,然后获取最终需要分配的CPU数量=application需要分配的CPU和worker总CPU的最小值
while遍历worker,如果worker还有可分配的CPU,将总的需要分配的CPU-1,给这个worker分配的CPU+1,指针移到下一个CPU。循环一直到CPU分配完,这种分配算法的结果是application的CPU尽可能的平均分配到了各个worker上,应用程序尽可能多的运行在所有的Node上
给worker分配完CPU后,遍历分配到CPU的worker,在每个application内部缓存结构中,添加executor,创建executorDSC对象,其中封装了给这个executor分配多少 CPU core,然后在worker上启动executor,将application状态改为RUNNING
-
如果是非spreadOutApps算法,刚好相反,先把每个worker的CPU全部分配完,在分配下一个worker的CPU,
以下是核心源码:
private def schedule() {
/*
* 首先判断 master是否是alive状态
*/
if (state != RecoveryState.ALIVE) { return }
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
//把alive状态的worker(之前注册的)传入Random.shuffle方法,把worker随机打乱
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
//获取当前可用worker的数量(随机打乱后)
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
/*
* driver的调度机制,遍历waitingDrivers这个ArrayBuffer
* 只有用 yarn-cluster模式提交的时候,才会注册driver,并导致driver被调度,因为standalone和yarn-client模式
* 都会在本地启动driver,而不会注册,更不会调度
*/
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
//当还有alive的worker没有遍历,并且driver没有启动,则继续遍历worker
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
//如果这个worker空闲内存>=driver需要的内存并且worker的空闲CPU>=driver需要的CPU
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//启动driver
launchDriver(worker, driver)
//并且经driver从waitingDrivers队列中移除
waitingDrivers -= driver
launched = true
}
//指针指向下一个worker
curPos = (curPos + 1) % numWorkersAlive
}
}
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
/*
* application的调度机制
* 这里两种算法,可以在sparkconf设置,默认为true(spreadOutApps算法)
*/
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
//遍历waitingApps 中的application,并且用if守卫过滤出还需要进行CPU分配的application
for (app <- waitingApps if app.coresLeft > 0) {
//再次过滤状态为alive并且可以被application使用的worker,然后按照其剩余的CPU数量倒序排序
//可以被application使用的worker必须是可用内存大于application最小Executor需要的内存,并且没有被该application启用过
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
//创建一个数组,存储需要分配的CPU数量
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
//获取到底需要分配多少CPU,取application需要分配的CPU和worker总共CPU数量的最小值
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) { }
// Now that we've decided how many cores to give on each node, let's actually give them
//给worker分配完CPU后,遍历worker
for (pos <- 0 until numUsable) {
//只要worker分配到了CPU
if (assigned(pos) > 0) {
//首先在每个application内部缓存结构中,添加executor,
//创建executorDSC对象,其中封装了给这个executor分配多少 CPU core
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
//那么就在worker上启动Executor
launchExecutor(usableWorkers(pos), exec)
//将application状态设置为RUNNING
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}
//executor的启动
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
//将executor加入worker内部缓存
worker.addExecutor(exec)
//向worker发送LaunchExecutor消息
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
//向executor对应的application发送ExecutorAdded消息
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
//Driver的启动
def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
//将driver加入到worker的内部缓存结构
//将worker剩余的内存、CPU减去driver使用的内存和CPU
worker.addDriver(driver)
//worker也被加入到driver内部缓存结构中
driver.worker = Some(worker)
//然后调用worker的actor方法,给worker发送LaunchDriver消息,让它把driver启动起来
worker.actor ! LaunchDriver(driver.id, driver.desc)
//将driver状态改为RUNNING
driver.state = DriverState.RUNNING
}