Apache Spark-1.0.0浅析(七):资源调度——结果返回

时间:2021-08-28 14:37:44

对于ResultTask,直接执行func操作,最后告知任务是否执行完成;而对于ShuffleMapTask,则需要将中间结果存储到实例化DirectTaskResult,以备下一个task使用,同时还要返回实例化的MapStatus。

Executor.run中,当Task执行完毕调用execBackend.statusUpdate,在CoarseGrainedExecutorBackend继承了ExecutorBackend,重新定义statusUpdate,向driver发送StatusUpdate消息

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver
! StatusUpdate(executorId, taskId, state, data)
}
}

CoaseGrainedSchedulerBackend中定义的driverActor接收,首先执行scheduler.statusUpdate,更新状态,释放资源

case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
freeCores(executorId)
+= scheduler.CPUS_PER_TASK
makeOffers(executorId)
}
else {
// Ignoring the update since we don't know about the executor.
val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
logWarning(msg.format(taskId, state, sender, executorId))
}
}

scheduler.statusUpdate主要移除当前完成的task,同时更新taskSets

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String]
= None
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (activeExecutorIds.contains(execId)) {
removeExecutor(execId)
failedExecutor
= Some(execId)
}
}
taskIdToTaskSetId.get(tid) match {
case Some(taskSetId) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetId.remove(tid)
taskIdToExecutorId.remove(tid)
}
activeTaskSets.get(taskSetId).foreach { taskSet
=>
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
}
else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
(
"Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates)")
.format(state, tid))
}
}
catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}

其中,主要语句是taskResultGetter.enqueueSuccessfulTask,首先获得反序列化的结果数据,分为直接结果或非直接结果处理,最后执行scheduler.handleSuccessfulTask

def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(
new Runnable {
override def run(): Unit
= Utils.logUncaughtExceptions {
try {
val result
= serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] => directResult
case IndirectTaskResult(blockId) =>
logDebug(
"Fetching indirect task result for TID %s".format(tid))
scheduler.handleTaskGettingResult(taskSetManager, tid)
val serializedTaskResult
= sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result.
*/
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
val deserializedResult
= serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
sparkEnv.blockManager.master.removeBlock(blockId)
deserializedResult
}
result.metrics.resultSize
= serializedData.limit()
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
}
catch {
case cnf: ClassNotFoundException =>
val loader
= Thread.currentThread.getContextClassLoader
taskSetManager.abort(
"ClassNotFound with classloader: " + loader)
case ex: Exception =>
taskSetManager.abort(
"Exception while deserializing and fetching task: %s".format(ex))
}
}
})
}
scheduler.handleSuccessfulTask在TaskSchedulerImpl中定义如下,仅调用taskSetManager.handleSuccessfulTask
def handleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_])
= synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}

taskSetManager.handleSuccessfulTask,将task标记为successful,从RunningTask中移除,然后调用sched.dagScheduler.taskEnded

/**
* Marks the task as successful and notifies the DAGScheduler that a task has ended.
*/
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_])
= {
val info
= taskInfos(tid)
val index
= info.index
info.markSuccessful()
removeRunningTask(tid)
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
if (!successful(index)) {
tasksSuccessful
+= 1
logInfo(
"Finished TID %s in %d ms on %s (progress: %d/%d)".format(
tid, info.duration, info.host, tasksSuccessful, numTasks))
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie
= true
}
}
else {
logInfo(
"Ignorning task-finished event for TID " + tid + " because task " +
index
+ " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
}

sched.dagScheduler,taskEnded向eventProcessActor发送CompletionEvent消息

// Called by TaskScheduler to report task completions or failures.
def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
eventProcessActor
! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)

DAGScheduler中定义接收响应,调用dagScheduler.handleTaskCompletion

case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)

dagScheduler.handleTaskCompletion,如果是ResultTask,首先向listenerBus发送SparkListenerTaskEnd,获得task对应的stage,定义了一个本地方法markStageAsFinished,后续调用,判断事件类型,包含Success、Resubmitted、FetchFailed、ExceptionFailure、TaskResultLost等,最后submitWaitingStages()提交等待(依赖)的stages。

如果是Success事件,则进一步判断task是ResultTask或者ShuffleMapTask,如果是ResultTask,将task所属stage中的该部output标记为已完成,最后调用job.listener.taskSucceeded,如果整个stage完成,则标记markStageAsFinished,向listenerBus发送SparkListenerJobEnd。

若是ShuffleMapTask,记录task在executor完成,addOutputLoc添加Shuffle output location,markStageAsFinished,判断如果该stage是runningStages且该stage挂起的tasks为空,主要动作是getMissingParentStages获得依赖waitingStages,最后submitMissingTasks提交依赖tasks

/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
*/
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task
= event.task
val stageId
= task.stageId
val taskType
= Utils.getFormattedClassName(task)
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
}
val stage
= stageIdToStage(task.stageId)

def markStageAsFinished(stage: Stage)
= {
val serviceTime
= stageToInfos(stage).submissionTime match {
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}
logInfo(
"%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stageToInfos(stage).completionTime
= Some(System.currentTimeMillis())
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
runningStages
-= stage
}
event.reason match {
case Success =>
logInfo(
"Completed " + task)
if (event.accumUpdates != null) {
Accumulators.add(event.accumUpdates)
// TODO: do this only if task wasn't resubmitted
}
pendingTasks(stage)
-= task
task match {
case rt: ResultTask[_, _] =>
resultStageToJob.get(stage) match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId)
= true
job.numFinished
+= 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job, Some(stage))
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
case None =>
logInfo(
"Ignoring result from " + rt + " because its job has finished")
}

case smt: ShuffleMapTask =>
val status
= event.result.asInstanceOf[MapStatus]
val execId
= status.location.executorId
logDebug(
"ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(
"Ignoring possibly bogus ShuffleMapTask completion from " + execId)
}
else {
stage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
markStageAsFinished(stage)
logInfo(
"looking for newly runnable stages")
logInfo(
"running: " + runningStages)
logInfo(
"waiting: " + waitingStages)
logInfo(
"failed: " + failedStages)
if (stage.shuffleDep.isDefined) {
// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// epoch incremented to refetch them.
// TODO: Only increment the epoch number if this is not the first time
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list
=> if (list.isEmpty) null else list.head).toArray,
changeEpoch
= true)
}
clearCacheLocs()
if (stage.outputLocs.exists(_ == Nil)) {
// Some tasks had failed; let's resubmit this stage
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + stage + " (" + stage.name +
") because some of its tasks had failed: " +
stage.outputLocs.zipWithIndex.filter(_._1
== Nil).map(_._2).mkString(", "))
submitStage(stage)
}
else {
val newlyRunnable
= new ArrayBuffer[Stage]
for (stage <- waitingStages) {
logInfo(
"Missing parents for " + stage + ": " + getMissingParentStages(stage))
}
for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
newlyRunnable
+= stage
}
waitingStages
--= newlyRunnable
runningStages
++= newlyRunnable
for {
stage
<- newlyRunnable.sortBy(_.id)
jobId
<- activeJobForStage(stage)
} {
logInfo(
"Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
submitMissingTasks(stage, jobId)
}
}
}
}

case Resubmitted =>
logInfo(
"Resubmitted " + task + ", so marking it as still running")
pendingTasks(stage)
+= task

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
runningStages
-= failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
") for resubmision due to a fetch failure")
// Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}
logInfo(
"The failed fetch was from " + mapStage + " (" + mapStage.name +
"); marking it for resubmission")
if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
import env.actorSystem.dispatcher
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
}
failedStages
+= failedStage
failedStages
+= mapStage
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
}

case ExceptionFailure(className, description, stackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures

case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.

case other =>
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
submitWaitingStages()
}

ResultTask执行成功调用的job.listener.taskSucceeded,JobWaiter继承了JobListener,重新定义了taskSucceeded,判断如果已完成的task数量和总共task数量相等,则意味着job完成,向所有listener发送JobSucceeded消息

override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
if (_jobFinished) {
throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
}
resultHandler(index, result.asInstanceOf[T])
finishedTasks
+= 1
if (finishedTasks == totalTasks) {
_jobFinished
= true
jobResult
= JobSucceeded
this.notifyAll()
}
}

接DAGScheduler.runJob,waiter等待接受消息JobSucceeded消息,整个job执行完毕

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T])
=> U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U)
=> Unit,
properties: Properties
= null)
{
val waiter
= submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded => {}
case JobFailed(exception: Exception) =>
logInfo(
"Failed to run " + callSite)
throw exception
}
}

 

END