Spark Streaming源码初探 (3)

时间:2023-02-15 20:47:35

本节分析一下Spark Streaming生成RDD的过程(也是生成Job的过程),DStream是Spark Streaming的抽象数据表示,底层是RDD实现。由于RDD是为了满足Job需要的,所以触发生成RDD的职责应该是由JobGenerator负责。换句话说:RDD的生成是在Job生成过程中生成的,所以查看Job生成过程也就是RDD的生成过程。

通过翻阅JobGenerator源码,JobGenerator有一成员如下:

 //TODO 生成Job的定时器,按照间隔定期执行
  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
						longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

正是此成员负责定时给EventLoop事件循环处理器发送GenerateJobs消息生成Job,RecurringTimer是一个底层使用线程实现的一个定时器,实现代码很简单,具体自行翻阅源码。而timer的启动是在jobGenerator的启动过程中启动的(注:eventLoop也是在jobGenerator的启动过程中启动的),如下:

  /** Start generation of jobs */
  def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      //TODO   onReceive方法在EventLoop的线程的run方法中被调用
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()//TODO 启动事件循环处理器

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      //TODO 启动定时生成job的timer启动
      startFirstTime()
    }
  }

org.apache.spark.streaming.scheduler.JobGenerator#startFirstTime代码如下:

  /** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    //TODO启动定时生成job的定期器
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }


通过上面timer的启动便完成了  eventLoop.post(GenerateJobs(new Time(longTime)))的定时执行。 接下来就看一下事件循环处理器的实现:

JobGenerator的事件循环处理器的实现是一个内部匿名类的方式实现的,重写onReceive和onError的方法。首先看一下事件循环处理器的抽象类:

/**
  * An event loop to receive events from the caller and process all events in the event thread. It
  * will start an exclusive event thread to process all events.
  *
  * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
  * handle events in time to avoid the potential OOM.
  * <br><br>
  * 内部一个线程负责处理消息
  *
  */
private[spark] abstract class EventLoop[E](name: String) extends Logging {

  /**eventQueue是一个事件队列*/
  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
  /**事件循环处理器的启动或结束标记*/
  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            //TODO 一旦有时间,则调用onReceive进行消息处理
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

  def stop(): Unit = {
    if (stopped.compareAndSet(false, true)) {
      eventThread.interrupt()
      var onStopCalled = false
      try {
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
        onStop()
      } catch {
        case ie: InterruptedException =>
          Thread.currentThread().interrupt()
          if (!onStopCalled) {
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
            onStop()
          }
      }
    } else {
      // Keep quiet to allow calling `stop` multiple times.
    }
  }

  /**
    * Put the event into the event queue. The event thread will process it later.
    */
  def post(event: E): Unit = {
    eventQueue.put(event)
  }

  /**
    * Return if the event thread has already been started but not yet stopped.
    */
  def isActive: Boolean = eventThread.isAlive

  /**
    * Invoked when `start()` is called but before the event thread starts.
    */
  protected def onStart(): Unit = {}

  /**
    * Invoked when `stop()` is called and the event thread exits.
    */
  protected def onStop(): Unit = {}

  /**
    * Invoked in the event thread when polling events from the event queue.
    *
    * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
    * and cannot process events in time. If you want to call some blocking actions, run them in
    * another thread.
    */
  protected def onReceive(event: E): Unit

  /**
    * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
    * will be ignored.
    */
  protected def onError(e: Throwable): Unit

}


JobGenerator的事件循环处理器的匿名实现:

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
  //TODO   onReceive方法在EventLoop的线程的run方法中被调用
  override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

  override protected def onError(e: Throwable): Unit = {
    jobScheduler.reportError("Error in job generator", e)
  }
}

到此,JobGenerator的事件循环处理器的匿名类的onReceive方法也会定时执行,所以接下来执行的就是processEvent(event)方法:


  /** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
        //TODO 处理生成Job时间
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

接下来调用org.apache.spark.streaming.scheduler.JobGenerator#generateJobs:


/** Generate jobs and perform checkpointing for the given `time`.
    *
    */
  private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      //TODO 根据DStreamGraph生成Job集合
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        //TODO 获取到输入数据源的信息
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        //TODO 提交作业
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    //TODO 每一次generateJobs之后便执行CheckPoint
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
在如上方法中调用org.apache.spark.streaming.DStreamGraph#generateJobs方法生成Jobs,org.apache.spark.streaming.DStreamGraph#generateJobs的实现如下:

  /**
    * 产生Job
    * @param time
    * @return
    */
  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>

        //TODO 调用了DStream的generateJob方法(org.apache.spark.streaming.dstream.DStream.generateJob方法中调用getOrCompute调用compute方法触发生成DStream)
        val jobOption = outputStream.generateJob(time)

        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

接下来org.apache.spark.streaming.dstream.DStream#generateJob方法:


 /**
    * 产生Job
    * @param time
    * @return
    */
  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>

        //TODO 调用了DStream的generateJob方法(org.apache.spark.streaming.dstream.DStream.generateJob方法中调用getOrCompute调用compute方法触发生成DStream)
        val jobOption = outputStream.generateJob(time)

        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }
接下来org.apache.spark.streaming.dstream.DStream#generateJob:

  /**
    * Generate a SparkStreaming job for the given time. This is an internal method that
    * should not be called directly. This default implementation creates a job
    * that materializes the corresponding RDD. Subclasses of DStream may override this
    * to generate their own jobs.
    */
  private[streaming] def generateJob(time: Time): Option[Job] = {
    //TODO 获取到当前时间对应的RDD
    getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          //TODO 和Spark RDD运行一样,最终都是使用sparkContext.runJob方法运行
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))

      case None => None
    }
  }

接下来org.apache.spark.streaming.dstream.DStream#getOrCompute:

 /**
    * Get the RDD corresponding to the given time; either retrieve it from cache or compute-and-cache it.
    * <br><br>
    * 获取当前时间time对应的RDD
    */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            //TODO 重点方法
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }


接下来调用DStream具体子类实现的compute方法生成RDD(DStream的compute方法是抽象的,需要根据具体数据源具体实现)。例如org.apache.spark.streaming.kafka010.DirectKafkaInputDStream#compute的实现如下:

 /**
    *
    * Method that generates an RDD for the given time
    *
    * @param validTime
    * @return  Option[KafkaRDD[K, V]]
    */
  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {

    val untilOffsets = clamp(latestOffsets()) //TODO 重点业务,其中包含消息区间的确定和速率的控制
    // OffsetRange包含信息有:topic,partition,起始位置,结束位置
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)// fo和uo是多数情况相等的
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }

    //TODO KafkaRDD构造函数的第三个参数比较重要:该参数定义了Kafka分区属于当前RDD数据的offset值
    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    //TODO 汇报当前记录数目和元数据信息到InputInfoTracker
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset//TODO 过滤掉区间为空的offsetRange
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    //TODO InputInfoTracker是运行在Driver端,负责计算数据的监控
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    //将当前消费过的最新偏移设置到currentOffsets中
    currentOffsets = untilOffsets
    commitAll()
    Some(rdd)
  }

到此跟踪完毕Job的生成过程,也是RDD的生成过程