spark 调度模块详解及源码分析
@(SPARK)[spark]
一、概述
通过spark-submit向集群提交应用后,spark就开始了调度的过程,其调度模块主要包括2部分:
* DAGScheduler:负责将用户提交的计算任务分割成不同的stage。
* TaskScheduler & SchedulerBackend:负责将stage中的task提交到集群中。
先看一下2个核心的图:
(一)三个主要的类
与调度模块相关的三个主要的类均位于org.apache.spark.scheduler,包括:
1、class DAGScheduler
负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分成不同的Stage,其中每个stage由可以并发执行的一组task构成,这些task的逻辑完全相同,只是作用于不同的数据。
DAGScheduler在不同的资源管理框架下的实现是完全相同的。
2、trait TaskScheduler
TaskScheduler从DAGScheduler中接收不同的stage的任务,并且向集群调度这些任务。
yarn-cluster的具体实现为:YarnClusterScheduler
yarn-client的具体实现为:YarnScheduler
3、trait SchedulerBackend
SchedulerBackend向当前等待分配资源的task分配计算资源,并且在分配的executor中启动task。
yarn-cluster的具体实现为:YarnClusterSchedulerBackend。
yarn-client的具体实现为:YarnClientSchedulerBackend
(二)基本流程
图片来源于spark内幕技术P45.
(三)TaskScheduler & SchedulerBackend
图片来源于spark内幕技术P44.
TaskScheduler侧重于调度,而SchedulerBackend是实际运行。
每个SchedulerBackend都会对应一个唯一的TaskScheduler。注意图中的TaskScheduler & SchedulerBackend都会有针对于yarn的特定实现。
二、DAGScheduler
DAGScheduler在不同的资源管理框架下的实现是完全相同的。因为DAGScheduler实现的功能是将DAG划分为不同的stage,这是根据宽依赖进行划分的,每个宽依赖均会调用shuffle,以此作为一个新的stage。这与具体的资源管理框架无关。
每个stage由可以并发执行的一组task构成,这些task的执行逻辑完全相同,只是作用于不同的数据。
DAGScheduler与TaskScheduler都是在SparkContext创建的时候创建的。其中TaskScheduler是通过SparkContext#createTaskScheduler创建的,而DAGScheduler是直接调用它的构造函数创建的。只不过,DAGScheduler保存了TaskScheduler的引用,因此需要先创建TaskScheduler。
=================================
步骤一:用户代码中创建SparkContext对象,SparkContext中创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象
步骤二:用户代码构建各种tranformation及至少一个action,这个action会通过SparkContext#runJob调用DAGScheduler#runJob
步骤三:DAGScheduler提交作业到一队列,handleJobSubmitted从这个队列取出作业,划分stage,并开始生成最终的TaskSet,调用submitTasks()向TaskScheduler提交任务
(一)用户代码中创建SparkContext对象,SparkContext中创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象
1、用户代码中创建SparkContext对象
下面我们从一个简单的程进行出发,分析它的进行过程。程序如下:
package com.lujinhong.sparkdemo
import org.apache.spark.SparkContext
object GrepWord {
def grepCountLog(sc:SparkContext,path: String, keyWord: String) {
println("grep " + keyWord + " in " + path + ", the lineCount is: ")
val all = sc.textFile(path)
val ret = all.filter(line => line.contains(keyWord))
println(ret.count)
}
def main(args: Array[String]) {
val sc = new SparkContext();
grepCountLog(sc,"/src/20151201", "\"keyword\": \"20302\"");
}
}
上面的代码很简单,指定一个目录,搜索这个目录中的文件有多少个keyword。分为三步:读入文件,过滤关键字,count。
我们这里主要分析yarn-cluster/client的模式,根据前面的分析,我们向YARN提交应用后,YARN会返回分配资源,然后启动AM。在AM中的driver会开始执行用户的代码,开始进行调度。详细分析请见:
http://blog.csdn.net/lujinhong2/article/details/50344095
那我们这里就从用户的代码开始继续往下分析。
用户代码中开始的时候必须首先创建一个SparkContext,我们看一下SparkContext的代码,以及它被创建时执行了哪些操作。
先看一下官方的一个图。DriverProgram就是用户提交的程序,在用户代码中创建一个SparkContext的对象。SparkContext是所有Spark应用的入口,它负责和整个集群的交互,包括创建RDD,累积器、广播变量等。
每个JVM中只能有一个SparkContext,在创建一个新的Context前,你必须先stop()旧的。这个限制可能会在以后去掉,见SPARK-2243。
2、SparkContext源码简单分析
SparkContext完成了以下几个主要的功能:
(1)创建RDD,通过类似textFile等的方法。
(2)与资源管理器交互,通过runJob等方法启动应用。
(3)创建DAGScheduler、TaskScheduler等。
3、SparkContext创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象
创建一个SparkContext,只需要一个SparkConf参数,表示一些配置项。如果未指定参数,则会创建一个默认的SparkConf,如我们代码中的:
val sc = new SparkContext();
创建的代码为:
def this() = this(new SparkConf())
在SparkContext的一个try模块中,会进行一些初始化的工作,其中一部分是创建了DAGScheduler与TaskScheduler/TaskSchedulerBackend对象。
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor
_taskScheduler.start()
其中TaskScheduler/TaskSchedulerBackend对象通过createTaskScheduler()方法进行创建,而DAGScheduler对象直接使用构建函数创建。
4、createTaskScheduler:创建TaskScheduler/TaskSchedulerBackend对象的具体过程
SparkContext通过createTaskScheduler来同时创建TaskScheduler/TaskSchedulerBackend对象。它接收的参数mater是一个url,或者一个yarn-cluster等的字符串,指明了使用哪种运行模式。
createTaskScheduler函数中主要就是match各种master,然后创建相应的TaskScheduler/TaskSchedulerBackend对象。
我们先看一下yarn-cluster的:
case "yarn-standalone" | "yarn-cluster" =>
if (master == "yarn-standalone") {
logWarning(
"\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
}
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = try {
val clazz =
Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
TaskScheduler的实现类为:
org.apache.spark.scheduler.cluster.YarnClusterScheduler
TaskSchedulerBacked的实现类为:
org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
其中后者需要前者来创建:
scheduler.initialize(backend)
最后返回一个元组:
(backend, scheduler)
再看看yarn-client的:
case "yarn-client" =>
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = try {
val clazz =
Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
TaskScheduler的实现类为:
org.apache.spark.scheduler.cluster.YarnScheduler
TaskSchedulerBacked的实现类为:
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
(二)用户代码创建各种transformation与至少一个action,这个action会通过SparkContext#runJob调用DAGScheduler#runJob
在用户代码中创建了一个SparkContext对象后,就可以开始创建RDD,转换RDD等了。我们的代码只有3行:
val all = sc.textFile(path)
val ret = all.filter(line => line.contains(keyWord))
println(ret.count)
1、textFile
我们先看一下如何创建一个RDD:
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
读取一个hadoop支持的类型的文件,返回一个String类型的RDD。
它接收2个参数,一个是文件路径,一个是最小分区数,默认为:
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)。
注意,这里使用的是min,因此如果没指定分区数量,最大的情况下就是2个分区了,详细分析请见: https://github.com/mesos/spark/pull/718
最后是调用hadoopFile来创建RDD的:
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
2、filter
仅返回符合条件的元素组成的RDD。
3、count
count的逻辑很简单:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
sum用于统计一个集合中的元素数量。
但它调用了SparkContext的runJob开始执行任务了,我们分析一下这个过程。
SparkContext定义了多个runJob的形式,但它最后的调用为:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
关键代码就是一行:
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
开始调用DAGScheduler的runJob了。
(三)步骤三:DAGScheduler提交作业,划分stage,并生成最终的TaskSet
经过上面的分析,我们知道了DAGScheduler与TaskScheduler/TaskSchedulerBackend对象的对象是如何创建的,并分析到了由用户代码出发,如果至调用dagScheduler.runJob。下面我们分析一下dagScheduler.runJob完成了什么功能。
1、创建DAGScheduler对象的详细实现
上面介绍过在SparkContext中会通过DAGScheduler的构建函数创建一个DAGScheduler对象,具体是如何实现的呢?
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
......
}
看一下DAGScheduler的主构造函数。
* SparkContext:就是前面创建的对象。
* taskScheduler:DAGScheduler保存一个taskScheduler,当最后处理完生成TaskSet时,需要调用submitMissingTasks,而在这个方法中会调用taskScheduler.submitTasks(),就是将TaskSet交由taskScheduler进行下一步的处理。
* mapOutputTracker:是运行在Driver端管理shuffle的中间输出位置信息的。
* blockManagerMaster:也是运行在Driver端的,它是管理整个Job的Bolck信息。
2、作业的提交
首先注意区分2个概述:
job: 每个action都是执行runJob方法,可以将之视为一个job。
stage:在这个job内部,会根据宽依赖,划分成多个stage。
前面说过,用户代码中存在一个action时,它最终会调用SparkContext#runJob(),而SparkContext#runJob()的最后一步都是调用DAGScheduler#runJob()
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
而DAGScheduler#runJob()的核心代码为:
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
即调用submitJob方法,我们进一步看看submitJob()
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
....
val jobId = nextJobId.getAndIncrement()
.....
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
submitJob()方法主要完成了以下3个工作:
* 获取一个新的jobId
* 生成一个JobWaiter,它会监听Job的执行状态,而Job是由多个Task组成的,因此只有当Job的所有Task均已完成,Job才会标记成功
* 最后调用eventProcessLoop.post()将Job提交到一个队列中,等待处理。这是一个典型的生产者消费者模式。这些消息都是通过handleJobSubmitted来处理。
简单看一下handleJobSubmitted是如何被调用的。
首先是DAGSchedulerEventProcessLoop#onReceive调用doOnReceive:
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
DAGSchedulerEventProcessLoop是EventLoop的子类,它重写了EventLoop的onReceive方法。以后再分析这个EventLoop。
然后,doOnReceive会调用handleJobSubmitted。
3、stage的划分
刚才说到handleJobSubmitted会从eventProcessLoop中取出Job来进行处理,处理的第一步就是将Job划分成不同的stage。handleJobSubmitted主要2个工作,一是进行stage的划分,这是这部分要介绍的内容;二是创建一个activeJob,并生成一个任务,这在下一小节介绍。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
...
finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
.....
activeJobs += job
......
submitStage(finalStage)
}
submitWaitingStages()
}
newResultStage()经过多层调用后,最终会调用getParentStages()。
因为是从最终的stage往回推算的,这需要计算最终stage所依赖的各个stage。
4、任务的生成
回到handleJobSubmitted中的代码:
submitStage(finalStage)
submitStage会提交finalStage,如果这个stage的某些parentStage未提交,则递归调用submitStage(),直至所有的stage均已计算完成。
submitStage()会调用submitMissingTasks():
submitMissingTasks(stage, jobId.get)
而submitMissingTasks()会完成DAGScheduler最后的工作:它判断出哪些Partition需要计算,为每个Partition生成Task,然后这些Task就会封闭到TaskSet,最后提交给TaskScheduler进行处理。
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
三、TaskScheduler && TaskSchedulerBackend
上文分析到在DAGScheduler中最终会执行taskScheduler.submitTasks()方法,我们先简单看一下从这里开始往下的执行逻辑:
(1)taskScheduler#submitTasks()
(2) schedulableBuilder#addTaskSetManager()
(3)CoarseGrainedSchedulerBackend#reviveOffers()
(4)CoarseGrainedSchedulerBackend#makeOffers()
(5)TaskSchedulerImpl#resourceOffers
(6)CoarseGrainedSchedulerBackend#launchTasks
(7)executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
步骤一、二中主要将这组任务的TaskSet加入到一个TaskSetManager中。TaskSetManager会根据数据就近原则为task分配计算资源,监控task的执行状态等,比如失败重试,推测执行等。
步骤三、四逻辑较为简单。
步骤五为每个task具体分配资源,它的输入是一个Executor的列表,输出是TaskDescription的二维数组。TaskDescription包含了TaskID, Executor ID和task执行的依赖信息等。
步骤六、七就是将任务真正的发送到executor中执行了,并等待executor的状态返回。