spark-streaming系列------- 5. Spark-Streaming checkpoint的原理和实现

时间:2021-12-11 20:49:51

    本文以KafkaDirectDStream方式为例说明Spark-Streaming checkpoint的原理

    JobGenrerator.generateJobs负责Streaming Job的产生,产生并且提交执行Job之后,会发送DoCheckpoint事件,源码如下:

private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))//提交Job执行
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))//发送执行CheckPoint时间,发送周期为streaming batch接收数据的时间
}

从上面代码可知道, 每次产生Streaming Job都会触发Checkpoint的执行


    JobGenerator.processEvent方法接收到DoCheckpoint事件后,调用JobGenerator.doCheckpoint方法进行Checkpoint处理

    JobGenerator.doCheckpoint方法调用DStreamGraph.updateCheckpointData对输出DStream进行Checkpoint,

然后调用CheckpointWriter将Checkpoint信息写到Checkpoint目录,源码如下:

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)//输出DStream进行Checkpoint
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)//将Checkpoint信息写到Checkpoint目录
}
}

下面来看看如何对DStream进行Checkpoint的

def updateCheckpointData(time: Time) {
logInfo("Updating checkpoint data for time " + time)
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))//将输出流中的每个DStream信息转化成相应的Checkpoint信息
}
logInfo("Updated checkpoint data for time " + time)
}

可见DStreamGraph.updateCheckpointData方法所作的工作是将输出流中的每个DStream信息转化成相应的Checkpoint信息

对每个DStream信息转化成Checkpoint发生在DStream.updateCheckpointData方法,这个方法更新DStream的Checkpoint信息,并且更新DStream依赖的所有DStream的Checkpoint信息,源码如下:

private[streaming] def updateCheckpointData(currentTime: Time) {
logDebug("Updating checkpoint data for time " + currentTime)
checkpointData.update(currentTime)//更新DStream的Checkpoint信息
dependencies.foreach(_.updateCheckpointData(currentTime))//更新所有依赖的DStream的Checkpoint信息
logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}


DirectKafkaInputDStreamCheckpointData的Checkpoint信息更新如下:

<pre name="code" class="java">def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
}

override def update(time: Time) {
batchForTime.clear()//删除老的Checkpoint信息
generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray//a是一个数组,数组的一个元素是一个RDD的所有分区信息,一个分区信息包含了这个分区的起始数据和终止数据在Kafka的某个分区的offset
batchForTime += kv._1 -> a//将分区信息存储在DirectKafkaInputDStreamCheckpointData.data中
}
}

 

在Spark中,将所有没有成功完成的Job放在了JobScheduler.jobSets中,Job成功完成之后再将它从JobScheduler.jobSets删除,源码如下:

def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)//将任务放到了jobSets
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
private def handleJobCompletion(job: Job) {  job.result match {    case Success(_) =>      val jobSet = jobSets.get(job.time)      jobSet.handleJobCompletion(job)      logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)      if (jobSet.hasCompleted) {        jobSets.remove(jobSet.time)//从任务等待队列中删除这个任务        jobGenerator.onBatchCompletion(jobSet.time)        logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(          jobSet.totalDelay / 1000.0, jobSet.time.toString,          jobSet.processingDelay / 1000.0        ))        listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))      }    case Failure(e) =>      reportError("Error running job " + job, e)  }}

Checkpoint.graph对应于Spark-streaming应用的DStreamGraph,DStreamGraph.outputStreams包含了要Checkpoint的DStream信息。C heckpoint.pendingTimes对应没有成功完成的Job,因此在将Checkpoint信息保存到HDFS的时候,这些信息都会被Checkpoint。

要想上一次Spark-streaming Application产生的Checkpoint信息有用,在创建StreamingContext的时候,必须要传入Checkpoint信息。上一次Spark-streaming Application产生的Checkpoint信息的读取可以通过调用CheckpointReader.read方法。

如果创建StreamingContext传入上次执行产生的Checkpoint信息则会使用Checkpoint包含的DStreamGraph作为本次Application的DStreamGraph,它里面包含了需要Checkpoint的DStream信息。然后根据DStreamGraph恢复上一次执行时的DStream信息。源码如下:

private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)//使用Checkpoint信息里面的DStreamGraph作为本次Application的DStreamGraph,它里面包含了需要Checkpoint的DStream信息
cp_.graph.restoreCheckpointData()//恢复上一次执行时的DStream信息
cp_.graph
} else {
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}

JobGenerator.start开始Streaming Job的产生,如果存在Checkpoint信息,则调用JobGenerator.restart开始Spark-streaming Job的执行,在这个方法里面会将上一次Application执行时已经产生但是还没有成功执行完成的Streaming Job先恢复出来,然后再把从崩溃到重新执行的时间之间没有产生Job补上,然后让Spark先执行这些丢失的Job,源码如下:

def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()

if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}


private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}

val batchDuration = ssc.graph.batchDuration

// Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
logInfo("Batches during down time (" + downTimes.size + " batches): "
+ downTimes.mkString(", "))

// Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)//上一次Application执行时已经产生但是还没有成功执行完成的Streaming Job先恢复出来
logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)//崩溃到重新执行的时间之间没有产生Job补上
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach { time =>
// Allocate the related blocks when recovering from failure, because some blocks that were
// added but not allocated, are dangling in the queue after recovering, we have to allocate
// those blocks to the next batch, which is the batch they were supposed to go.
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))//先执行丢失的Job
}

// Restart the timer
timer.start(restartTime.milliseconds)//进入新任务的产生
logInfo("Restarted JobGenerator at " + restartTime)
}