Spark源码剖析 - 任务提交与执行

时间:2023-03-09 13:21:44
Spark源码剖析 - 任务提交与执行

1. 任务概述

任务提交与执行过程:

1) build operator DAG:此阶段主要完成RDD的转换及DAG的构建;

2) split graph into stages of tasks:此阶段主要完成finalStage的创建与Stage的划分,做好Stage与Task的准备工作后,最后提交Stage与Task;

3) launch tasks via cluster manager:使用集群管理器(Cluster manager)分配资源与任务调度,对于失败的任务还会有一定的重试与容错机制;

4) execute tasks:执行任务,并将任务中间结果和最终结果存入存储体系

Spark源码剖析 - 任务提交与执行

2 广播Hadoop的配置信息

SparkContext的broadcast方法用于广播Hadoop的配置信息,其实现见代码如下:

Spark源码剖析 - 任务提交与执行

上面的代码通过使用BroadcastManager发送广播,广播结束将广播对象注册到ContextCleaner中,以便清理。BroadcastManager的newBroadcast方法实际代理了broadcastFactory的newBroadcast方法。而在BroadcastManager类初始化initialize方法里面创建TorrentBroadcastFactory,它实际上是broadcastFactory的子类,broadcastFactory的newBroadcast方法实际上调用了TorrentBroadcastFactory的生成newBroadcast方法生成TorrentBroadcast对象的代码如下:

Spark源码剖析 - 任务提交与执行

从下述代码中可以看到TorrentBroadcast的过程分为三步:

1) 设置广播配置信息。根据spark.broadcast.compress配置属性确认是否对广播消息进行压缩,并且生成CompressionCodec对象。根据spark.broadcast.blockSize配置属性确认块的大小,默认为4MB。

2) 生成BroadcastBlockId。

3) 块的写入操作,返回广播变量包含的块数。

Spark源码剖析 - 任务提交与执行

块的写入操作writeBlocks

从下述代码中,看到块写入操作writeBlocks的工作分三步:

1) 将要写入的对象在本地的存储体系中备份一份,以便于Task也可以在本地的Driver上运行;

2) 给ByteArrayChunkOutputStream指定压缩算法,并且将对象以序列化方式写入ByteArrayChunkOutputStream后转换为Array[ByteBuffer];

3) 将每一个ByteBuffer作为一个Block,使用putBytes方法写入存储体系;

Spark源码剖析 - 任务提交与执行

TorrentBroadcast.blockifyObject方法用于将对象序列化写入ByteArrayChunkOutputStream,并用CompressionCodec压缩,最终将ByteArrayChunkOutputStream转换为Array[ByteBuffer]。blockifyObject的实现如下:

Spark源码剖析 - 任务提交与执行

3. RDD转换及DAG构建

3.1 为什么需要RDD

以下从数据处理模型、依赖划分原则、数据处理效率及容错处理4个方面解释Spark发明RDD的原因。

1. 数据处理模型

RDD是一个容错的、并行的数据结构,可以控制将数据存储到磁盘或内存,能够获取数据的分区。RDD提供了一组类似于Scala的操作,比如map、flatMap、filter等,这些操作实际是对RDD进行转换(transformation)。此外,RDD还提供了join、groupBy、reduceByKey等操作完成数据计算(注意:reduceByKey是action,而非transformation)。

当前的大数据应用场景非常丰富,如流式计算、图计算、机器学习等。它们既有相似之处,又各有不同。为了能够对所有场景下的数据处理使用统一的方式,抽象出RDD这一模型。

通常数据处理的模型包括:迭代计算、关系查询、MapReduce、流式处理等。Hadoop采用MapReduce模型,Storm采用流式处理模型,而Spark则实现了以上所有模型。

2. 依赖划分原则

一个RDD包含一个或多个分区,每个分区实际是一个数据集合的片段。在构建DAG的过程中,会将RDD用依赖关系串联起来。每个RDD都有其依赖(除了最*RDD的依赖是空列表),这些依赖分为NarrowDependency和ShuffleDependency两种。为什么要对依赖进行区分?从功能角度讲它们是不一样的。NarrowDependency会被划分到同一Stage中,这样它们就能以管道的方式迭代执行。ShuffleDependency由于依赖的上游RDD不止一个,所以往往需要跨节点传输数据。从容灾角度讲,它们恢复计算结果的方式不同。NarrowDependency只需要重新执行父RDD的丢失分区的计算即可恢复。而ShuffleDependency则需要考虑恢复所有父RDD的丢失分区。

解释了依赖划分的原因,实际也解释了为什么要划分Stage这个问题。

3. 数据处理效率

ShuffleDependency所依赖的上游RDD的计算过程允许在多个节点并发执行,如图所示,实际也就是后面将会讲到的ShuffleMapTask在多个节点上的多个实例。如果数据量很大,可以适当增加分区数量,这种根据硬件条件对并发任务数量的控制,能更好地利用各种资源,也能有效提高Spark的数据处理效率。

4. 容错处理

传统关系型数据库往往采用日志记录的方式来容灾容错,数据恢复都依赖于重新执行日志中的SQL。Hadoop为了避免单机故障概率较高的问题,通过将数据备份到其他机器容灾。由于所有备份机器同时出故障的概率比单机故障概率低很多,从而在宕机等问题发生时,从备份机读取数据。RDD本身是一个不可变的(Scala中称为immutable)数据集,当某个Worker节点上的任务失败时,可以利用DAG重新调度计算这个失败的任务。由于不用复制数据,也大大降低了网络通信。在流式计算的场景中,Spark需要记录日志和检查点(CheckPoint),以便利用CheckPoint和日志对数据进行恢复。

3.2 RDD实现分析

HadoopRDD的DAG。

hadoopFile方法创建完HadoopRDD后,会调用RDD的map方法。map方法将HadoopRDD封装为MappedRDD。

Spark源码剖析 - 任务提交与执行

这里调用了SparkContext的clean方法,实现如下:

Spark源码剖析 - 任务提交与执行

clean方法实际调用了ClosureCleaner的clean方法,这里意在清除闭包中的不能序列化的变量,防止RDD在网络传输过程中反序列化失败。

构造MapPartitionsRDD的步骤如下:

1) 调用MapPartitionsRDD的父类RDD的辅助构造器,RDD的辅助构造器如下:

Spark源码剖析 - 任务提交与执行

辅助构造器首先将oneParent封装为OneToOneDependency,OneToOneDependency继承自NarrowDependency,其实现如下:

Spark源码剖析 - 任务提交与执行

2) 调用RDD的主构造器,主构造器实现如下:

Spark源码剖析 - 任务提交与执行

构建完MapPartitionsRDD后,此时的DAG如图所示:

Spark源码剖析 - 任务提交与执行

MapPartitionsRDD在JavaSparkContext中会被隐式转换为JavaRDD。接着执行JavaRDD的flatMap方法,由于JavaRDD实现了JavaRDDLike特质,所以实际调用了JavaRDDLike的flatMap方法,它的实现如下:

此时,JavaRDD内部的rdd属性实质上还是MapPartitionsRDD,调用MapPartitionsRDD的构造器方法,其实现如下:

Spark源码剖析 - 任务提交与执行

接着执行JavaRDD的mapToPair方法时,JavaRDD由于实现了JavaRDDLike特质,所以实际调用了JavaRDDLike的mapToPair方法,代码实现如下:

此时,JavaRDD内部的rdd属性实际上还是MapPartitions,此时调用RDD的map,又被封装为MapPartitionsRDD。

然后执行PairRDDFunctions的reduceByKey方法,其实现见代码如下:

Spark源码剖析 - 任务提交与执行

defaultPartitioner方法的实现见代码如下,其功能实现如下:

1) 将RDD转换为Seq,然后对Seq按照RDD的partitions_:Array[Partition]的size倒序排列。

2) 创建HashPartitioner对象。如果配置了spark.default.parallelism属性,则用此属性值作为分区数量。否则使用Seq中所有RDD的partitions函数返回值的最大值作为分区数量。

Spark源码剖析 - 任务提交与执行

RDD的partitions方法的实现见代码如下:

Spark源码剖析 - 任务提交与执行

本例中,partitions方法实际调用了MapPartitionsRDD的getPartitions方法。MapPartitionsRDD的getPartitions方法调用了RDD的firstParent,见代码如下:

Spark源码剖析 - 任务提交与执行

firstParent用于返回依赖的第一个父RDD,代码实现如下:

Spark源码剖析 - 任务提交与执行

4. 任务提交

4.1 任务提交的准备

现在要执行JavaPairRDD的word count例子方法了。collect中调用了RDD的collect方法后转成Array,其代码实现如下:

RDD的collect方法调用了SparkContext的runJob,见代码如下:

Spark源码剖析 - 任务提交与执行

SparkContext的runJob又调用了重载的runJob,见代码如下:

Spark源码剖析 - 任务提交与执行

接着又调用两个重载的runJob,见代码如下:

Spark源码剖析 - 任务提交与执行

最终调用的runJob方法里又一次调用clean方法防止闭包的反序列化错误,然后运行dagScheduler的runJob,见代码如下:

Spark源码剖析 - 任务提交与执行

dagScheduler的runJob方法主要调用submitJob方法,之后的waiter.awaitResult()说明了任务的运行是异步的,见代码如下:

Spark源码剖析 - 任务提交与执行

1. 提交Job

submitJob方法用来将一个Job提交到job scheduler,见代码如下:

Spark源码剖析 - 任务提交与执行

根据上述代码分析,submitJob的处理步骤如下:

1) 调用RDD的partitions函数来获取当前Job的最大分区数,即maxPartitions。根据maxPartitions,确认我们没有在一个不存在的partition上运行任务。

2) 生成当前Job的jobId。

3) 创建JobWaiter,望文生义,即Job的服务员。此JobWaiter被阻塞,直到job完成或者被取消。

4) 向eventProcessLoop发送JobSubmitted事件。

5) 返回JobWaiter。

Spark源码剖析 - 任务提交与执行

2. 处理Job提交

DAGSchedulerEventProcessLoop收到JobSubmitted事件,会调用dagScheduler的handleJobSubmitted方法。handleJobSubmitted的具体执行过程如下:

1) 创建finalStage及Stage的划分。创建Stage的过程可能发生异常。比如,运行在HadoopRDD上的任务所依赖的底层HDFS文件被删除了。所以当异常发生时需要主动调用JobWaiter的jobFailed方法。

2) 创建ActiveJob并更新jobIdToActiveJob = new HashMap[Int, ActiveJob]、activeJobs = new HashSet[ActiveJob] 和 finalStage.resultOfJob。

3) 向listenerBus发送SparkListenerJobStart事件。

4) 提交finalStage。

5) 提交等待中的Stage。

Spark源码剖析 - 任务提交与执行

4.2 finalStage的创建与Stage的划分

在Spark中,一个Job可能被划分为一个或多个Stage,各个之间存在依赖关系,其中最下游的Stage也称为最终的Stage,用来处理Job最后阶段的工作。

1. createResultStage的实现分析

handleJobSubmitted方法使用createResultStage方法创建finalStage,createResultStage的处理步骤如下:

1) 调用getOrCreateParentStages获取所有的父Stage的列表,父Stage主要是宽依赖(如ShuffleDependency)对应的Stage,此列表内的Stage包含以下几种:

①当前RDD的直接或间接的依赖是ShuffleDependency且已经注册过的Stage。

②当前RDD的直接或间接的依赖是ShuffleDependency且没有注册过Stage的,则根据ShuffleDependency本身的RDD,找到它的直接或间接的依赖是ShuffleDependency且没有注册过Stage的所有ShuffleDependency,为他们生成Stage并注册。

③当前RDD的直接或间接的依赖是ShuffleDependency且没有注册过Stage的,为它们生成Stage且注册,最后也添加此Stage到List。

2) 生成Stage的Id,并创建Stage。

3) 将Stage注册到stageIdToStage = new HashMap[Int,Stage]中。

4) 调用updateJobIdStageIdMaps方法Stage及其祖先Stage与jobId的对应关系。

Spark源码剖析 - 任务提交与执行

2. 获取父Stage列表

Spark中Job会被划分为一到多个Stage,这些Stage的划分是从finalStage开始,从后往前边划分边创建的。getOrCreateParentStages方法用于获取或者创建给定RDD的所有父Stage,这些Stage将被分配到jobId对应的job,其处理步骤如下:

1) 通过调用RDD的getShuffleDependencies方法获取RDD的所有Dependency的序列。

2) 逐个访问每个RDD及其依赖的非Shuffle的RDD,遍历每个RDD的ShuffleDependency依赖,并调用getOrCreateShuffleMapStage获取或者创建Stage,并将这些返回的Stage都放入parents:HashSet[Stage]。由此可见,Stage的划分是以ShuffleDependency为分界线的。

Spark源码剖析 - 任务提交与执行

Spark源码剖析 - 任务提交与执行

3. 获取map任务对应Stage

getOrCreateShuffleMapStage方法用于获取或者创建Stage并注册到shuffleToMapStage:HashMap[Int, Stage]中,处理步骤如下:

1) 如果已经注册了ShuffleDependency对应的Stage,则直接返回此Stage。

2) 否则调用getMissingAncestorShuffleDependencies方法找到所有祖先中,还没有为其注册过Stage的ShuffleDependency,调用方法createShuffleMapStage创建Stage并注册。最后还会为当前ShuffleDependency,调用方法createShuffleMapStage创建、注册并返回此Stage。

Spark源码剖析 - 任务提交与执行

getMissingAncestorShuffleDependencies用来找到RDD直接或者间接依赖的所有祖先中,还没有为其注册过Stage的ShuffleDependency,见代码如下:

Spark源码剖析 - 任务提交与执行

createShuffleMapStage方法:首先调用ShuffleMapStage创建Stage,然后将ShuffleDependency的shuffleId和partitions的length注册到MapOutputTrackerMaster的mapStatuses = new ConcurrentHashMap[Int,Array[MapStatus]]() 中。

Spark源码剖析 - 任务提交与执行

很多地方都调用了createShuffleMapStage创建Stage,从下述代码中来看看Stage的数据结构。

Spark源码剖析 - 任务提交与执行

Stage的构造过程中调用了StageInfo的fromStage方法创建StageInfo。

Spark源码剖析 - 任务提交与执行

创建StageInfo的步骤如下:

1) 调用getNarrowAncestors方法获取RDD的所有直接或者间接的NarrowDependency的RDD,见代码如下:

Spark源码剖析 - 任务提交与执行

返回的Seq[RDD[]]全部map到RDDInfo.fromRdd方法,生成RddInfo,代码如下:

Spark源码剖析 - 任务提交与执行

2) 对当前Stage的RDD调用RDDInfo.fromRdd方法,也生成RDDInfo,然后所有生成的RDDInfo都合入rddInfos中。

3) 创建当前Stage的StageInfo。

回头看看4.2.1节点中调用的updateJobIdStageIdMaps方法,它的功能如下:

通过迭代调用内部的updateJobIdStageIdMapsList函数,最终将jobId添加到Stage及它的所有祖先Stage的映射jobIds = new HashSet[Int]中,将jobId和Stage及它的所有祖先Stage的id,更新到jobIdToStageIds = new HashMap[Int,HashSet[Int]]中。updateJobIdStageIdMaps的实现见代码如下:

Spark源码剖析 - 任务提交与执行

4.3 创建Job

ActiveJob的定义见代码如下,这里对其中的一些定义做些解释:

  • numPartitions:任务的分区数量。
  • finished:标识每个partition相关的任务是否完成。
  • numFinished:已经完成的任务数。

Spark源码剖析 - 任务提交与执行

我们回头看看SparkListenerJobStart事件的处理,SparkListenerBus的sparkListeners(比如JobProgressListener)中,凡是实现了onJobStart方法的,将被处理。

4.4 提交Stage

在提交finalStage之前,如果存在没有提交的祖先Stage,则需要先提交所有没有提交的祖先Stage。每个Stage提交之前,如果存在没有提交的祖先Stage,都会先提交祖先Stage,并且将子Stage放入waitingStages = new HashSet[Stage]中等待,如果不存在没有提交的祖先Stage,则提交所有未提交的Task。提交Stage的实现见代码如下:

Spark源码剖析 - 任务提交与执行

getMissingParentStages方法用来找到Stage的所有不可用的祖先Stage,见代码如下:

Spark源码剖析 - 任务提交与执行

如何判断Stage可用?它的判断十分简单:如果Stage不是Map任务,那么它是可用的;否则它的已经输出计算结果的分区任务数量要和分区数一样,即所有分区上的子任务都要完成。判断逻辑如下:

Spark源码剖析 - 任务提交与执行

回头看看handleJobSubmitted方法中调用的submitStages方法,submitStages实际上循环missingStages中的Stage并调用submitStage,实现如下:

Spark源码剖析 - 任务提交与执行

4.5 提交Task

提交Task的入口是submitMissingTask函数,此函数在Stage没有不可用的祖先Stage时,被调用处理当前Stage未提交的任务。

1.提交还未计算的任务

submitMissingTasks用于提交还未计算的任务。在分析submitMissingTasks之前,先对一些定义进行描述:

  • pendingTasks:类型时HashSet[Task[_]],存储有待处理的Task。
  • MapStatus:包括执行Task的BlockManager的地址和要传给reduce任务的Block的估算大小。
  • outputLocs:如果Stage是map任务,则outputLocs记录每个Partition的MapStatus。

submitMissingTasks的执行过程如下:

1) 清空pendingTasks。由于当前Stage的任务刚开始提交,所以需要清空,便于记录需计算的任务。

2) 找出还未计算的partition(如果Stage是map任务,那么outputLocs中partition对应的List[MapStatus]为Nil,说明此partition还未计算。如果Stage不是map任务,那么需要获取Stage的finalJob,并调用finished方法判断每个partition的任务是否完成)。

3) 将当前Stage加入运行中的Stage集合(runningStages:HashSet[Stage])中。

4) 使用StageInfo.fromStage方法创建当前Stage的latestInfo(StageInfo)。

5) 向listenerBus发送SparkListenerStageSubmitted事件。

6) 如果Stage是map任务,那么序列化Stage的RDD及ShuffleDependency。如果Stage不是map任务,那么序列化Stage的RDD及resultOfJob的处理函数。这些序列化得到的字节数组最后需要使用sc.broadcast进行广播。

7)  如果Stage是map任务,则创建ShuffleMapTask,否则创建ResultTask。还未计算的partition个数决定了最终创建的Task个数。并将创建的所有Task都添加到Stage的pendingTasks中。

8) 利用上一步创建的所有Task、当前Stage的id、jobId等信息创建TaskSet,并调用taskScheduler的submitTasks,批量提交Stage及其所有Task。

Spark源码剖析 - 任务提交与执行

submitTasks方法,提交任务主要分为以下步骤:

1) 构建任务集管理器。即将TaskScheduler、TaskSet及最大失败次数(maxTaskFailures)封装为TaskSetManager。

2) 设置任务集调度策略(调度模式有FAIR和FIFO两种,此处以默认的FIFO为例)。将TaskSetManager添加到FIFOSchedulableBuilder中,代码如下:

Spark源码剖析 - 任务提交与执行

实际上是把TaskSetManager加入rootPool的先进先出(FIFO)的调度队列schedulableQueue和schedulableNameToSchedulable中,并且设置TaskSetManager的parent是Pool。

注意:由于同时可能有多个任务提交,所以需要一种调度策略来决定究竟先提交哪个任务集,例如本例中的FIFO调度策略。

3) 资源分配。调用LocalSchedulerBackend的reviveOffers方法,实际向localEndpoint发送ReviveOffers消息。localEndpoint对ReviveOffers消息的匹配执行reviveOffers方法。

Spark源码剖析 - 任务提交与执行

Spark源码剖析 - 任务提交与执行

reviveOffers的处理步骤如下:

1) 使用ExecutorId、ExecutorHostName、freeCores(空闲CPU核数)创建WorkerOffer;

2) 调用TaskSchedulerImpl的resourceOffers方法分配资源;

3) 调用Executor 的launchTask方法运行任务。

发送消息

Spark源码剖析 - 任务提交与执行

Spark源码剖析 - 任务提交与执行

接收到消息启动

Spark源码剖析 - 任务提交与执行

2. 资源分配

resourceOffers方法用于Task任务的资源分配,其处理步骤如下:

1) 标记Executor与host的关系,增加激活的Executor的id,按照host对Executor分组,并向DAGSchedulerEventProcessActor发送ExecutorAdded事件等。

2) 计算资源的分配与计算。对所有WorkerOffer随机洗牌,避免将任务总是分配给同样的WorkerOffer。

3) 根据每个WorkerOffer的可用的CPU核数创建同等尺寸的任务描述(TaskDescription)数组。

4) 将每个WorkerOffer的可用的CPU核数统计到可用CPU(availableCpus)数组中。

5) 对rootPool中的所有TaskSetManager按照调度算法排序(本例中为FIFO调度算法)。

6) 调用每个TaskSetManager的resourceOffer方法,根据WorkerOffer的ExecutorId和host找到需要执行的任务并进一步进行资源处理。

7) 任务分配到相应的host和Executor后,将taskId和TaskSetId的关系、taskId与ExecutorId的关系、executors与Host的分组关系等更新并且将availableCpus数目减去每个任务分配的CPU核数(CPUS_PER_TASK)。

8) 返回第3)步生成的TaskDescription列表。

Spark源码剖析 - 任务提交与执行

DAGSchedulerEventProcessLoop会将ExecutorAdded事件匹配执行DagScheduler的handleExecutorAdded方法,用于将跟踪失败的节点重新恢复正常和提交等待中的Stage,见代码如下:

Spark源码剖析 - 任务提交与执行

3. Worker任务分配

resourceOffer方法用于给Worker分配Task,其处理步骤如下:

1) 获取当前任务集允许使用的本地化级别。

2) 调用findTask寻找Executor、Host、pendingTasksWithNoPrefs中有待运行的task。

3) 创建TaskInfo,并对task,addedFiles、addedJars进行序列化。

4) 调用DagScheduler的taskStarted方法,笔者认为此处方法名不当,因为taskStarted的功能是向DAGSchedulerEventProcessLoop发送BeginEvent事件,它的实现如下:

Spark源码剖析 - 任务提交与执行

DAGSchedulerEventProcessLoop在接收BeginEvent事件后,调用了dagScheduler的方法handleBeginEvent。handleBeginEvent方法通过发送SparkListenerTaskStart事件给listenerBus,用以各种监听器更新SparkUI的显示。

5) 最终封装TaskDescription对象并返回。

Spark源码剖析 - 任务提交与执行

Spark源码剖析 - 任务提交与执行

local模式下,任务提交的过程可以用图5-9来表示:

Spark源码剖析 - 任务提交与执行

4. 本地化分析

与Hadoop类似,Spark中任务的处理也要考虑数据的本地性(Locality)。Spark目前支持PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NO_PREF(没有喜好)、RACK_LOCAL(本地机架)、ANY(任何)几种。

Spark涉及本地性的数据只有两种、HadoopRDD和数据源于存储体系的RDD(即由CacheManager从BlockManager中读取,或者Streaming数据源RDD)。

除了NO_PREF,其他定义都比较好理解。什么是NO_PREF?

当Driver应用程序刚刚启动,Driver分配获得Executor很可能还没有初始化完毕。所以会有一部分任务的本地化级别被设置为NO_PREF。如果是ShuffleRDD,其本地性始终为NO_PREF。对于这两种本地化级别是NO_PREF的情况,在任务分配时会被优先分配到非本地节点执行,达到一定的优化效果。

调用getAllowedLocalityLevel方法来获取任务集允许使用的本地化级别。在讲解getAllowedLocalityLevel之前,我们先介绍本地化的几个概念。

  • myLocalityLevels:当前TaskSetManager允许使用的本地化级别。

myLocalityLevels实际是对函数computeValidLocalityLevels的引用,代码如下:

Spark源码剖析 - 任务提交与执行

computeValidLocalityLevels方法用于计算有效的本地化级别。以PROCESS_LOCAL为例,如果存在Executor中有待执行的任务(pendingTasksForExecutor步为空)且PROCESS_LOCAL本地化的等待时间不为0(调用getLocalityWait方法获得)且存在Executor已被激活(pendingTasksForExecutor中的ExecutorId有存在于TaskScheduler的activeExecutorIds中的),那么允许的本地化级别里包括PROCESS_LOCAL。

Spark源码剖析 - 任务提交与执行

getLocalityWait方法用于获取各个本地化级别的等待时间,这些配置如下所示:

Spark源码剖析 - 任务提交与执行

  • localityWaits:本地化级别等待时间。

localityWaits实际是对myLocalityLevels应用getLocalityWait方法获得,代码如下:

Spark源码剖析 - 任务提交与执行

现在一起分析getAllowedLocalityLevel方法,它的处理步骤如下:

1) 根据当前本地化级别索引(currentLocalityIndex刚开始为0),获取此本地化级别的等待时长;

2) 如果当前时间与上次运行本地化时间(lastLaunchTime)之差大于等于上一步获得的时长并且当前本地化级别索引小于myLocalityLevels的索引范围,那么将第1)步的时长增加到lastLaunchTime中,然后使currentLocalityIndex增加1,最后重新从第1)步开始执行(这个过程也称为本地化级别跳级)。

Spark源码剖析 - 任务提交与执行

5. 执行任务

调用Executor的launchTask方法时,标志着任务执行阶段的开始。launchTask的执行过程如下:

1) 创建TaskRunner,并将其与taskId、taskName及serializedTask添加到runningTasks = new ConcurrentHashMap[Long, TaskRunner]中。

2) TaskRunner实现了Runnable接口(Scala中称为Runnable特质),最后使用线程池执行TaskRunner。

Spark源码剖析 - 任务提交与执行

我们知道线程执行时,会调用TaskRunner的run方法。run方法的处理动作包括状态更新、任务反序列化、任务运行。

5.1 状态更新

调用execBackend的statusUpdate方法更新任务状态,代码如下:

Spark源码剖析 - 任务提交与执行

以LocalSchedulerBackend为例,实际向LocalEndpoint发送statusUpdate消息,代码如下:

Spark源码剖析 - 任务提交与执行

LocalEndpoint在接收到statusUpdate事件时,匹配执行TaskSchedulerImpl的statusUpdate方法,并根据Task的最新状态做一系列处理。

5.2 任务还原

所谓任务还原就是将Driver提交的Task在Executor上通过反序列化、更新依赖达到Task还原效果的过程

对4.5.3序列化的serializedTask执行反序列化操作,代码如下:

Spark源码剖析 - 任务提交与执行

更新依赖的文件或者jar包,代码如下:

Spark源码剖析 - 任务提交与执行

updateDependencies方法获取依赖是利用了Utils.fetchFile方法实现的。下载的jar文件还会添加到Executor自身类加载器的URL中。

Spark源码剖析 - 任务提交与执行

最后将Task的ByteBuffer反序列化为Task实例,实现如下:

Spark源码剖析 - 任务提交与执行

5.3 任务运行

TaskRunner最终调用Task的run方法来运行任务,实现如下:

Spark源码剖析 - 任务提交与执行

run方法中创建了TaskContextImpl,并且设置到TaskContext的ThreadLocal中。最后调用runTask方法,见代码如下:

Spark源码剖析 - 任务提交与执行

在word count的例子中,首先执行的Task是ShuffleMapTask,那么ShuffleMapTask的runTask方法都做了什么?曾经介绍submitMissingTasks的时候,其中对任务的RDD和ShuffleDependency进行过序列化操作,现在是时候反序列化了,这样可以得到RDD和ShuffleDependency。接下来调用SortShuffleManager的getWriter方法获取partitionId指定分区的SortShuffleWriter。之后便利用此Writer将计算的中间结果写入文件。

Spark源码剖析 - 任务提交与执行

SortShuffleManager的getWriter实现,参数mapId实际传入的是partitionId,由此我们可以看到partition与map任务的关系。

SortShuffleWriter负责计算结果的缓存处理持久化。我们暂时只需理解的是map任务的Stage的任务执行结果将通过SortShuffleManager持久化到存储体系即可。RDD的iterator方法触发任务计算。

Spark源码剖析 - 任务提交与执行

6. 任务执行后续处理

6.1 计量统计与执行结果序列化

分析下述代码,可以看到任务执行结束后,还会有以下处理。

1) 任务执行结果的简单序列化

2) 计量统计,需要更新的指标有:

  • Executor反序列化消耗的时间;
  • Executor实际执行任务消耗的时间;
  • Executor执行垃圾回收消耗的时间;
  • Executor执行结果序列化消耗的时间。

3) 将前两步得到的简单序列化结果和计量统计内容封装为DirectTaskResult,然后序列化。

Spark源码剖析 - 任务提交与执行

6.2 内存回收

TaskRunner的run方法最后还会在finally中做一些清理工作,包括:

1) 释放当前线程通过ShuffleMemoryManager获得的内存;

2) 释放当前线程在MemoryStore的unrollMemoryMap中展开占用的内存;

3) 释放当前线程用于聚合计算占用的内存;

4) 将当前Task从runningTasks中移除。

暂略

6.3 执行结果处理

任务完成的时候会发送一次statusUpdate消息,LocalEndpoint会先匹配执行TaskSchedulerImpl的statusUpdate方法,然后调用reviveOffers方法调用其他的任务。

TaskSchedulerImpl的statusUpdate方法会从taskIdToTaskSetId、taskIdToExecutorId中移除此任务,并且调用taskResultGetter的enqueueSuccessfulTask方法。

Spark源码剖析 - 任务提交与执行

taskResultGetter的enqueueSuccessfulTask和enqueueFailedTask方法,分别用于处理执行成功任务的返回结果和执行失败任务的返回结果。我们以enqueueSuccessfulTask方法为例:

Spark源码剖析 - 任务提交与执行

从enqueueSuccessfulTask的实现不难看出其中另起的线程,主要调用了TaskSchedulerImpl的handleSuccessfulTask方法。TaskSchedulerImpl的handleSuccessfulTask方法的实现如下:

Spark源码剖析 - 任务提交与执行

TaskSetManager的handleSuccessfulTask方法对TaskSet中的任务信息进行成功状态标记,然后调用DagScheduler的taskEnded方法。

Spark源码剖析 - 任务提交与执行

DagScheduler的taskEnded方法的实现如下:

Spark源码剖析 - 任务提交与执行

DAGSchedulerEventProcessLoop接收CompletionEvent消息,将处理交给了handleTaskCompletion。handleTaskCompletion方法首先向listenerBus发送SparkListenerTaskEnd,代码如下:

Spark源码剖析 - 任务提交与执行

1. ResultTask任务的结果处理

如果是ResultTask,那么将执行下面的代码,其处理步骤如下:

1) 标识ActiveJob的finished里对应分区的任务为完成状态,并且将已完成的任务数numFinished加1。

2) 如果ActiveJob的所有任务都完成,则标记当前Stage完成并向listenerBus发送SparkListenerJobEnd事件。

3) 调用JobWaiter的taskSucceeded方法,以便通知JobWaiter有任务成功。

Spark源码剖析 - 任务提交与执行

JobWaiter的taskSucceeded方法,其处理步骤如下:

1) JobWaiter中的resultHandler实际是代码清单5-20里的匿名函数(index,res) => results(index) = res,通过回调此匿名函数,将当前任务的结果加入最终结果集。

2) finishedTasks自增,当完成任务数finishedTasks等于全部任务数totalTasks时,标记job完成,并且唤醒等待的线程,即执行代码清单5-22中调用awaitResult方法的线程。

2. ShuffleMapTask任务的结果处理

如果是ShuffleMapTask,那么将执行下述代码所示的代码分支,其处理步骤如下:

1) 将Task的partitionId和MapStatus追加到Stage的outputLocs中。

2) 将当前Stage标记为完成,然后将当前Stage的shuffleId和outputLocs中的MapStatus注册到mapOutputTracker。根据3.2.3节的内容,这里注册的map任务状态将最终被reduce任务所用。

3) 如果Stage的outputLocs中某个分区的输出为Nil,那么说明有任务失败了,这时需要再次提交此Stage。

4) 如果不存在Stage的outputLocs中某个分区的输出为Nil,那么说明所有任务执行成功了,这时需要遍历waitingStages中的Stage并将它们放入runningStages,最后调用submitMissingTasks方法逐个提交这些准备运行的Stage的任务。在word count例子里,由于map任务的Stage已经运行完成,现在运行的是reduce任务的Stage,所以此时调用submitMissingTasks方法则创建了ResultTask。

Spark源码剖析 - 任务提交与执行

ResultTask的runTask方法与ShuffleMapTask有很多不同,见如下代码:

Spark源码剖析 - 任务提交与执行

参考资料:

暂无