接着上一篇文章,spark-streaming系列------- 1. spark-streaming的Job调度 上 讲spark-streaming job的具体生成
spark-streaming job的具体生成在JobGenrerator.generatorJobs,代码如下:
private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))<span style="color:#ff0000;">//提交Job的运行</span> case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }Job的生成和运行都是在这个方法,产生Job后提交Job的运行
DStreamGraph.generateJobs方法。代码如下:
def generateJobs(time: Time): Seq[Job] = {<span style="color:#ff6666;">//time是batch结束时的时间,从1970年1月1日 00:00:00算起 </span> logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap(outputStream => outputStream.generateJob(time)) } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
下面看看OutputStream.generateJob方法的定义:
private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc)<span style="color:#ff0000;">//产生的Job是执行SparkContext.runJob</span> } Some(new Job(time, jobFunc)) } case None => None } }这个方法首先调用DStream.getOrCompute方法生成RDD,RDD成功产生之后,创建处理RDD的Job。从上面代码可知,生成得Job是执行SparkContext.runJob,讲Job提交给Spark core得主任务调度流程执行
DStream.getOrCompute方法通过一系列的RDD依赖运算,最终调用到了KafkaRDD的生成。关于Spark RDD的依赖运算,会在接下来的文章提到。
KafkaRDD的生成最终调用到了DirectKafkaInputDStream.compute
代码为:
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number of this batch interval to InputInfoTracker. val inputInfo = InputInfo(id, rdd.count) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)<span style="color:#ff0000;">//更新currentOffsets,下次计算KafkaRDD的时候为各个分区的起始offse</span>t Some(rdd) }
DirectKafkaInputDStream只在驱动启动的时候创建1次。在 DirectKafkaInputDStream.compute 计算里面,根据偏移量创建KafkaRDD之后,会更新 currentOffsets 值,以备下次接收的时候作为接收的起始偏移量
下面来看看Spark Streaming Job的运行
Job产生之后通过调用JobScheduler.submitJobSet提交任务执行,代码如下:
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))<span style="color:#ff0000;">//依次启动每个JOb的执行线程,这个时候Job被提交到了Spark core的主任务调度,开始接收和处理流数据</span> logInfo("Added jobs for time " + jobSet.time) } }
结论:在JobScheduler.submitJobSet提交任务执行之前,所有的工作都是在Driver执行,Spark-streaming的任务调度主要是定时产生spark-streaming Job,然后把这些产生的Job提交给Spark core的主任务调度去执行Job