spark-streaming系列------- 2. spark-streaming的Job调度 下

时间:2022-02-10 20:54:57

    接着上一篇文章,spark-streaming系列------- 1. spark-streaming的Job调度 上 讲spark-streaming job的具体生成

    spark-streaming job的具体生成在JobGenrerator.generatorJobs,代码如下:

    

 private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))<span style="color:#ff0000;">//提交Job的运行</span>
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
Job的生成和运行都是在这个方法,产生Job后提交Job的运行



DStreamGraph.generateJobs方法。代码如下:


  

def generateJobs(time: Time): Seq[Job] = {<span style="color:#ff6666;">//time是batch结束时的时间,从1970年1月1日  00:00:00算起 </span>
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap(outputStream => outputStream.generateJob(time))
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

下面看看OutputStream.generateJob方法的定义:

 private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)<span style="color:#ff0000;">//产生的Job是执行SparkContext.runJob</span>
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }
这个方法首先调用DStream.getOrCompute方法生成RDD,RDD成功产生之后,创建处理RDD的Job。从上面代码可知,生成得Job是执行SparkContext.runJob,讲Job提交给Spark core得主任务调度流程执行


DStream.getOrCompute方法通过一系列的RDD依赖运算,最终调用到了KafkaRDD的生成。关于Spark RDD的依赖运算,会在接下来的文章提到。

KafkaRDD的生成最终调用到了DirectKafkaInputDStream.compute

代码为:

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
  val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
  val rdd = KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

  // Report the record number of this batch interval to InputInfoTracker.
  val inputInfo = InputInfo(id, rdd.count)
  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)<span style="color:#ff0000;">//更新currentOffsets,下次计算KafkaRDD的时候为各个分区的起始offse</span>t
  Some(rdd)
}

DirectKafkaInputDStream只在驱动启动的时候创建1次。在 DirectKafkaInputDStream.compute 计算里面,根据偏移量创建KafkaRDD之后,会更新 currentOffsets 值,以备下次接收的时候作为接收的起始偏移量


下面来看看Spark Streaming Job的运行

Job产生之后通过调用JobScheduler.submitJobSet提交任务执行,代码如下:

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))<span style="color:#ff0000;">//依次启动每个JOb的执行线程,这个时候Job被提交到了Spark core的主任务调度,开始接收和处理流数据</span>
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

结论:在JobScheduler.submitJobSet提交任务执行之前,所有的工作都是在Driver执行,Spark-streaming的任务调度主要是定时产生spark-streaming Job,然后把这些产生的Job提交给Spark core的主任务调度去执行Job