本文要解决的问题:
从scheduler各个类的具体方法阅读源码,进一步了解Spark的scheduler的工作原理和过程。
Scheduler的基本过程
用户提交的Job到DAGScheduler后,会封装成ActiveJob,同时启动JobWaiter监听作业的完成情况。同时依据job中RDD的dependency和dependency属性(窄依赖NarrowDependency,宽依赖ShufflerDependecy),DAGScheduler会根据依赖关系的先后产生出不同的stage DAG(result stage, shuffle map stage)。在每一个stage内部,根据stage产生出相应的task,包括ResultTask或是ShuffleMapTask,这些task会根据RDD中partition的数量和分布,产生出一组相应的task,并将其包装为TaskSet提交到TaskScheduler上去。
DAGScheduler
DAGScheduler是高层级别的调度器。实现了stage-oriented调度。它计算一个DAG中stage的工作。并将这些stage输出落地物化。
最终提交stage以taskSet方式提交给TaskScheduler。DAGScheduler需要接收上下层的消息,它也是一个actor。这里主要看看他的一些事件处理。以下是的所处理的事件。
private[scheduler] case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent
private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
private[scheduler] case object AllJobsCancelled extends DAGSchedulerEvent
private[scheduler]
case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
private[scheduler]
case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent
还有很多,不一一罗列。
JobSubmitted
//处理提交的作业,完成Job到stage的 转换,生成finalStage,并SubmitStage。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
进入submitStage方法。submitStage提交stage,第一个提交的是没有父依赖关系的。
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//没有stage依赖
submitMissingTasks(stage, jobId.get)
} else {
//有父依赖递归处理父stage
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
如果计算中发现当前的stage没有任何的依赖关系。则直接提交task。
源码中的getMissingParentStages是获取父stage。源码如下:
//查找所有的stage封装成list集合
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent *Error
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
//如果是ShufflerDependecy,则新建一个ShuffleMapStage,且该stage是可用的话,则加入missing中
//ShufflerDependecy表示Shuffle过程的依赖
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
}
继续submitStage,进入submitMissingTasks方法。该方法将stage根据parition拆分成task。然后生成TaskSet,并提交到TaskScheduler。该方法在之前有贴出来过,这里就不贴出来了。
DAGScheduler的主要功能:
1、接收用户提交的job。
2、以stage的形式划分job,并记录物化的stage。在stage内产生的task以taskSet的方式提交给taskScheduler。
TaskScheduler
TaskScheduler低级别的任务调度程序的接口,目前由TaskSchedulerImpl完全实现。该接口允许插入不同的任务调度。TaskScheduler接收DAGScheduler提交的taskSet,并负责发送任务到集群上运行。
TaskScheduler会根据部署方式而选择不同的SchedulerBackend来处理。针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合:
Local模式:TaskSchedulerImpl+ LocalBackend
Spark集群模式:TaskSchedulerImpl+ SparkDepolySchedulerBackend
Yarn-Cluster模式:YarnClusterScheduler + CoarseGrainedSchedulerBackend
Yarn-Client模式:YarnClientClusterScheduler + YarnClientSchedulerBackend
TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Master、Worker通信收集Worker上分配给该应用使用的资源情况。
TaskSchedulerImpl
TaskSchedulerImpl类就是负责为Task分配资源的。在CoarseGrainedSchedulerBackend获取到可用资源后就会通过makeOffers方法通知TaskSchedulerImpl对资源进行分配。
TaskSchedulerImpl的resourceOffers方法就是负责为Task分配计算资源的,在为Task分配好资源后又会通过lauchTasks方法发送LaunchTask消息通知Worker上的Executor执行Task。
下面看下TaskSchedulerImpl中的几个方法。
1.initialize:
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
}
}
schedulableBuilder.buildPools()
}
initialize方法主要就是初始化选择调度模式,这个可以由用户自己配置。
2.Start
override def start() {
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
3.submitTasks
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
TaskScheduler中实际执行task时会调用Backend.reviveOffers,在spark内有多个不同的backend。
Stage
一个stage是一组由相同函数计算出来的任务集合,它运行spark上的job。这里所有的任务都有相同的shuffle依赖。每个stage都是map函数计算,shuffle随机产生的,在这种情况下,它的任务的结果被输给stage,或者其返回一个stage,在这种情况下,它的任务直接计算发起的作业的动作(例如,count()),save()等)。都是ShuffleMapStage我们也可以跟踪每个节点上的输出分区。
Stage的构造如下:
private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite)
Task
Task: 一个执行单元,在Spark有两种实现:
org.apache.spark.scheduler.ShuffleMapTask
org.apache.spark.scheduler.ResultTask
一个Spark工作会包含一个或者多个stages。一个ResultTask执行任务,并将任务输出driver应用。一个ShuffleMapTask执行的任务,并把任务输出到多个buckets(基于任务的分区)
1.TaskSet
由TaskScheduler提交的一组Task集合
2.TaskSetManager
在TaskSchedulerImpl单内使用taskset调度任务.此类跟踪每个任务,重试任务如果失败(最多的有限次数),并经由延迟调度处理局部性感知调度此使用taskset。其主要接口有它resourceOffer,它要求使用taskset是否愿意在一个节点上运行一个任务,statusUpdate,它告诉它其任务之一状态发生了改变
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
clock: Clock = new SystemClock())
extends Schedulable with Logging {
val conf = sched.sc.conf
方法addPendingTask:
添加一个任务的所有没有被执行的任务列表,它是PendingTask。源码如下。
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
allPendingTasks += index // No point scanning this whole list to find the old task there
}
resourceOffer
解决如何在taskset内部schedule一个task。源码如下:
override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
stateLock.synchronized {
if (stopCalled) {
logDebug("Ignoring offers during shutdown")
// Driver should simply return a stopped status on race
// condition between this.stop() and completing here
offers.asScala.map(_.getId).foreach(d.declineOffer)
return
}
logDebug(s"Received ${offers.size} resource offers.")
val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
val offerAttributes = toAttributeMap(offer.getAttributesList)
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
}
declineUnmatchedOffers(d, unmatchedOffers)
handleMatchedOffers(d, matchedOffers)
}
}
Conf
Property Name | Default | Meaning |
---|---|---|
spark.task.cpus | 1 | Number of cores to allocate for each task. |
spark.task.maxFailures | 4 | Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1. |
spark.scheduler.mode | FIFO | he scheduling mode between jobs submitted to the same SparkContext. Can be set to FAIR to use fair sharing instead of queueing jobs one after another. Useful for multi-user services. |
spark.cores.max | (not set) | application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark’s standalone cluster manager, or infinite (all available cores) on Mesos. |
spark.mesos.coarse | false | If set to “true”, runs over Mesos clusters in “coarse-grained” sharing mode, where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use for the whole duration of the Spark job |
spark.speculation | false | If set to “true”, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. |
spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. |
spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. |
spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. |
spark.locality.wait | 3000 | Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well. |
spark.locality.wait.process | spark.locality.wait | Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process. |
spark.locality.wait.node | spark.locality.wait | Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information). |
spark.locality.wait.rack | spark.locality.wait | Customize the locality wait for rack locality. |
spark.scheduler.revive.interval | 1000 | The interval length for the scheduler to revive the worker resource offers to run tasks (in milliseconds). |
spark.scheduler.minRegisteredResourcesRatio | 0.0 for Mesos and Standalone mode, 0.8 for YARN | The minimum ratio of registered resources (registered resources / total expected resources) (resources are executors in yarn mode, CPU cores in standalone mode) to wait for before scheduling begins. Specified as a double between 0.0 and 1.0. Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config spark.scheduler.maxRegisteredResourcesWaitingTime. |
spark.scheduler.maxRegisteredResourcesWaitingTime | 30000 | Maximum amount of time to wait for resources to register before scheduling begins (in milliseconds). |
spark.localExecution.enabled | false | Enables Spark to run certain jobs, such as first() or take() on the driver, without sending tasks to the cluster. This can make certain jobs execute very quickly, but may require shipping a whole partition of data to the driver. |