Spark定制班第13课:Spark Streaming源码解读之Driver容错安全性

时间:2021-07-09 20:49:13

本期内容:

1. ReceivedBlockTracker容错安全性

2. DStreamGraph和JobGenerator容错安全性


  从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息。

  从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。

  ReceivedBlockTracker在接收到元数据信息后调用addBlock方法,先写入磁盘中,然后在写入内存中。

  ReceivedBlockTracker:


  /** Add received block. This event will get written to the write ahead log (if enabled). */
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    try {
      val writeResult =  writeToLog (BlockAdditionEvent(receivedBlockInfo))
      if (writeResult) {
        synchronized {
           getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
        }
        logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId}")
      } else {
        logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
      }
      writeResult
    } catch {
      case NonFatal(e) =>
        logError(s"Error adding block $receivedBlockInfo", e)
        false
    }
  }


  ReceivedBlockTracker:


  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

   // 为分配的ReceivedBlock
  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
   // 已分配的 ReceivedBlock
  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
  private val writeAheadLogOption = createWriteAheadLog()

  根据batchTime分配属于当前BatchDuration要处理的数据到timToAllocatedBlocks数据结构中。


  ReceivedBlockTracker:

  /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   */
  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
...


  Time类的是一个case Class,记录时间,重载了操作符,隐式转换。


case class Time(private val millis: Long) {

  def milliseconds: Long = millis

  def < (that: Time): Boolean = (this.millis < that.millis)

  def <= (that: Time): Boolean = (this.millis <= that.millis)

  def > (that: Time): Boolean = (this.millis > that.millis)

  def >= (that: Time): Boolean = (this.millis >= that.millis)

  def + (that: Duration): Time = new Time(millis + that.milliseconds)

  def - (that: Time): Duration = new Duration(millis - that.millis)

  def - (that: Duration): Time = new Time(millis - that.milliseconds)

  // Java-friendlier versions of the above.

  def less(that: Time): Boolean = this < that

  def lessEq(that: Time): Boolean = this <= that

  def greater(that: Time): Boolean = this > that

  def greaterEq(that: Time): Boolean = this >= that

  def plus(that: Duration): Time = this + that

  def minus(that: Time): Duration = this - that

  def minus(that: Duration): Time = this - that


  def floor(that: Duration): Time = {
    val t = that.milliseconds
    new Time((this.millis / t) * t)
  }

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

  def isMultipleOf(that: Duration): Boolean =
    (this.millis % that.milliseconds == 0)

  def min(that: Time): Time = if (this < that) this else that

  def max(that: Time): Time = if (this > that) this else that

  def until(that: Time, interval: Duration): Seq[Time] = {
    (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
  }

  def to(that: Time, interval: Duration): Seq[Time] = {
    (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
  }


  override def toString: String = (millis.toString + " ms")

}

object Time {
  implicit val ordering = Ordering.by((time: Time) => time.millis)
}

  跟踪Time对象,ReceiverTracker的allocateBlocksToBatch方法中的入参batchTime是被JobGenerator的generateJobs方法调用的。

  ReceiverTracker:


  /** Allocate all unallocated blocks to the given batch. */
  def allocateBlocksToBatch(batchTime: Time): Unit = {
    if (receiverInputStreams.nonEmpty) {
      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    }
  }


  JobGenerator的generateJobs方法是被定时器发送GenerateJobs消息调用的。

  JobGenerator:


  /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
       jobScheduler.receiverTracker.allocateBlocksToBatch (time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

  JobGenerator:

  /** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case  GenerateJobs (time) =>  generateJobs (time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

  JobGenerator:


  private val timer = new  RecurringTimer (clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

  GenerateJobs中的时间参数就是nextTime,而nextTime+=period,这个period就是ssc.graph.batchDuration.milliseconds。

  RecurringTimer:


  private def triggerActionForNextInterval(): Unit = {
    clock.waitTillTime(nextTime)
     callback (nextTime)
    prevTime = nextTime
    nextTime += period
    logDebug("Callback for " + name + " called at time " + prevTime)
  }

  nextTime的初始值是在start方法中传入的startTime赋值的,即RecurringTimer的getStartTime方法的返回值,是当前时间period的(整数倍+1)。

  RecurringTimer:


  /**
   * Start at the given start time.
   */
  def start(startTime: Long): Long = synchronized {
     nextTime = startTime
    thread.start()
    logInfo("Started timer for " + name + " at time " + nextTime)
    nextTime
  }

  JobGenerator:

  /** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new  Time (timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer. start (startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

  RecurringTimer:

  /**
   * Get the time when this timer will fire if it is started right now.
   * The time will be a multiple of this timer's period and more than
   * current system time.
   */
  def getStartTime(): Long = {
    (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
  }


  Period这个值是我们调用new StreamingContext来构造StreamingContext时传入的Duration值。

  DStreamGraph:

  def setBatchDuration(duration: Duration) {
    this.synchronized {
      require(batchDuration == null,
        s"Batch duration already set as $batchDuration. Cannot set it again.")
      batchDuration = duration
    }
  }

  StreamingContext:

  private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      cp_.graph.setContext(this)
      cp_.graph.restoreCheckpointData()
      cp_.graph
    } else {
      require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph. setBatchDuration (batchDur_)
      newGraph
    }
  }


  ReceivedBlockTracker会清除过期的元数据信息,从HashMap中移除,也是先写入磁盘,然后在写入内存。

  StreamingContext:


class StreamingContext private[streaming] (
    sc_ : SparkContext,
    cp_ : Checkpoint,
    batchDur_ : Duration
  ) extends Logging {

  /**
   * Create a StreamingContext using an existing SparkContext.
   * @param sparkContext existing SparkContext
   * @param batchDuration the time interval at which streaming data will be divided into batches
   */
  def this(sparkContext: SparkContext, batchDuration: Duration) = {
    this(sparkContext, null, batchDuration)
  }


  元数据的生成,消费和销毁都有WAL,所以失败时就可以从日志中恢复。从源码分析中得出只有设置了checkpoint目录,才进行WAL机制。

   ReceiverTracker:


class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {

  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
  private val receiverInputStreamIds = receiverInputStreams.map { _.id }
  private val receivedBlockTracker = new  ReceivedBlockTracker (
    ssc.sparkContext.conf,
    ssc.sparkContext.hadoopConfiguration,
    receiverInputStreamIds,
    ssc.scheduler.clock,
    ssc.isCheckpointPresent,
     Option(ssc.checkpointDir)
  )
  private val listenerBus = ssc.scheduler.listenerBus


  对传入的checkpoint目录来创建日志目录进行WAL。

  ReceivedBlockTracker:


  /** Optionally create the write ahead log manager only if the feature is enabled */
  private def  createWriteAheadLog (): Option[WriteAheadLog] = {
    checkpointDirOption.map { checkpointDir =>
      val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
      WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
    }
  }


  这里是在checkpoint目录下创建文件夹名为receivedBlockMetadata的文件夹来保存WAL记录的数据。

  ReceivedBlockTracker:


private[streaming] object ReceivedBlockTracker {
  def checkpointDirToLogDir(checkpointDir: String): String = {
    new Path(checkpointDir, " receivedBlockMetadata ").toString
  }
}

  ReceivedBlockTracker:


  /** Write an update to the tracker to the write ahead log */
  private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
    if ( isWriteAheadLogEnabled ) {
      logTrace(s"Writing record: $record")
      try {
        writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
          clock.getTimeMillis())
        true
      } catch {
        case NonFatal(e) =>
          logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
          false
      }
    } else {
      true
    }
  }


  把当前的DStream和JobGenerator的状态进行checkpoint,该方法是在generateJobs方法最后通过发送DoCheckpoint消息,来调用的。


  JobGenerator:

  /** Perform checkpoint for the give `time`. */
  private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
      logInfo("Checkpointing graph for time " + time)
      ssc.graph.updateCheckpointData(time)
       checkpointWriter.write (new Checkpoint(ssc, time), clearCheckpointDataLater)
    }
  }

  JobGenerator:

  /** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
         doCheckpoint (time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

  JobGenerator:

  /** Generate jobs and perform checkpoint for the given `time`.  */
  private def  generateJobs (time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }


  总结:

  ReceivedBlockTracker是通过WAL方式来进行数据容错的。

  DStreamGraph和JobGenerator是通过checkpoint方式来进行数据容错的。


备注:

资料来源于:DT_大数据梦工厂(Spark版本定制班课程)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580