Spark2.2 TaskScheduler原理剖析与源码分析

时间:2021-11-17 19:08:23

TaskScheduler的作用

TaskScheduler的主要作用是把TaskSet中的每一个task提交到executor上去执行;
其中最主要的就是task的分配算法。


TaskScheduler源码分析

TaskScheduler 提交tasks的入口 submitTasks

  • 为每一个taskSet创建一个taskSetManager
  • 调用CoarseGrainedSchedulerBackend 的 reviveOffers()方法进行task分配
  /**
* TaskScheduler 提交程序的入口
*/

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {

/**
* leen
* 为每一个taskSet创建一个taskSetManager
* taskSetManager实际上,会负责它的TaskSet执行状况的监视与管理
* 当tasks失败的时候重试task,(直到超过重试次数限制)
*/

val manager = createTaskSetManager(taskSet, maxTaskFailures)

val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}

/**
* 在sparkContext原理剖析的时候,讲过,创建TaskScheduler的时候,一件非常重要的事情就是
* TaskSchedulerImpl创建一个CoarseGrainedSchedulerBackend,这里的backend就是;
*/

backend.reviveOffers()
}

CoarseGrainedSchedulerBackend 的方法 reviveOffers

  • 1,过滤出活着的Executors
  • 2,新建WorkerOffer对象,调用scheduler.resourceOffers()分配资源到各个Executor上去;
  • 3,分配好task到Executor上之后,执行自己的lauchTasks(),将分配的task发送launchTasks信息;
/** reviveOffers() */
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

case ReviveOffers => makeOffers()

/************************************************************/

// 向所有的Executors提供的资源
private def makeOffers() {
/**
* 1,过滤出活着的Executors
* 2,新建WorkerOffer对象,调用scheduler.resourceOffers()分配资源到各个Executor上去;
* 3,分配好task到Executor上之后,执行自己的lauchTasks(),将分配的task发送launchTasks信息;
*/

val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// 当tasks在正在启动的时候,确保没有Executor被杀掉,过滤掉被杀掉的Executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
/**
* 给workerOffers传入的是什么?
* 传入的是这个application所有可用的executor,并且封装成workOffer
* workOffer :代表了所有可用的每个Executor可用的CPU资源
*/

val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
// 调用scheduler.resourceOffers()分配资源到各个Executor上去;
scheduler.resourceOffers(workOffers)
}
/** 启动 tasks */
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}

task分配算法 resourceOffers

  • 移除黑名单中的node
  • 将可用的Executor进行Shuffle,尽可能做到负载均衡
  • 从rootpool中取出排序之后的TaskSet,sortedTaskSets
  • 对于每一个taskSet,从最好的一个【本地化级别】 开始遍历,调用resourceOfferSingleTaskSet()方法实现task的分配到各个executor上
    • 本地化级别分类:
      • PROCESS_LOCAL : 进程本地化,RDD的Partition与task进入一个Executor内,速度最快
      • NODE_LOCAL : 节点本地化,RDD的Partition与task不在一个Executor,即不在一个进程,但是在一个Worker上
      • NO_PREF : 无所谓本地化级别
      • RACK_LOCAL : 机架本地化,至少RDD的Partition与task在一个机架上
      • ANY :任意的本地化级别
  /**
* leen
* 集群manager向slaves分配资源
* 优先响应激活状态的TaskSet
* 我们用递归遍历的方式给每一个node分配tasks,从而确保负载均衡
*/

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// 标记每一个slave为ALIVE,并且记住他们的主机名;
// 同时跟踪是否有新的Executor被添加
var newExecAvail = false
for (o <- offers) {
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}

// 在开始分配资源之前,移除黑名单中的node;
// 这样做是为了避免一个单独线程和添加的同步过度消耗;
// 而且因为更新黑名单只与任务提供时相关。
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted(offer.host) &&
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)

/**
* leen
* 首先,将可用的Executor进行Shuffle,尽可能做到负载均衡
*/

val shuffledOffers = shuffleOffers(filteredOffers)
// 构建分配给每个worker的任务列表。
// tasks可以理解为是一个二维数组arrayBuffer,元素又是一个arrayBuffer
// 每个子arrayBuffer的数量是固定的,也就是Executor可用的核数
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray

/**
* 这个很很重要,从rootpool中取出排序之后的TaskSet
* 之前讲解的TaskScheduler初始化的时候,创建完TaskSchedulerImpl/SparkDeploySchedulerBackend之后
* 执行的一个initialize()这个方法中会创建一个调度池
* 这里相当于是说,所提交的taskset,首先会放到这个调度池中,
* 之后调度task的分配算法的时候,会从这个调度池中,取出排序好队的taskset
*/

val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}

/**
* leen
* 任务分配算法的核心
* 双重循环遍历所有的taskset,已经每一种本地化级别
* 本地化级别分类:
* 1. PROCESS_LOCAL : 进程本地化,RDD的Partition与task进入一个Executor内,速度最快
* 2. NODE_LOCAL : 节点本地化,RDD的Partition与task不在一个Executor,即不在一个进程,但是在一个Worker上
* 3. NO_PREF : 无所谓本地化级别
* 4. RACK_LOCAL : 机架本地化,至少RDD的Partition与task在一个机架上
* 5. ANY :任意的本地化级别
*
*
* 对于每一个taskSet,从最好的一个本地化级别 开始遍历
*/


for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
/**
* leen
* 对于当前的taskSet
* 尝试优先使用最小的本地化级别,将taskSet的task,在executor上启动
* 如果启动不了,就跳出do while 循环,进入下一种本地化级别,也就是放大本地化级别
* 以此类推,直到尝试将taskSet在某种本地化级别下,将task在executor上全部启动
*/

launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}

if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}

提交task到相应的executor上 launchTasks

  • 首先将每一个要执行的task信息,统一进行序列化
  • 找到对应的executor
  • 将executor的资源减去使用的cpu资源
  • 向executor上发LaunchTask的信息,在executor上启动task
/**
* leen
* 根据分配好的情况,去executor上启动相应的task
*/

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//首先将每一个要执行的task信息,统一进行序列化
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
//找到对应的executor
val executorData = executorDataMap(task.executorId)
//将executor的资源减去使用的cpu资源
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//向executor上发LaunchTask的信息,在executor上启动task
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}