spark streaming job生成与运行

时间:2022-02-10 20:55:15

spark streaming job生成

spark Streaming每次提交job的时候,会提交几个呢?

DStreamGraph
  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

分别根据每个outputStream生成job,也就是说有多少个outputStream,就会有多少job

outputStream如何生成呢

DStream
  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

这里通过register方法来注册outputStream

DStream
  /**
   * Register this streaming as an output stream. This would ensure that RDDs of this
   * DStream will be generated.
   */
  private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }

体现在代码级别上,每执行一个foreach方法,提交job的时候就会有新增一个job,如果整个应用中没有foreach,也就是说没有 outputStream的话,会触发异常。

DStreamGraph
  def validate() {
    this.synchronized {
      require(batchDuration != null, "Batch duration has not been set")
      // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
      // " is very low")
      require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute")
    }
  }

生成job

DStream
   * Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   */
  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }

此处有jobFunc,直接调用的spark的runJob方法,runJob的分析,可以参考我另一篇博客。

job提交

JobGenerator
  /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      //此处生成job
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        //此处提交job
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
JobScheduler
  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

内部有线程池,提交JobHandler

  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
  private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      try {
            ....
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          //此处就是job的最终运行的地方
            job.run()
          }
          ....
    }
  }
Job
  def run() {
    _result = Try(func())
  }

//此处func方法就是在job生成时的jobFunc,调用的runJob方法。