本文以KafkaDirectDStream方式为例说明Spark-Streaming checkpoint的原理
JobGenrerator.generateJobs负责Streaming Job的产生,产生并且提交执行Job之后,会发送DoCheckpoint事件,源码如下:
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))//提交Job执行
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))//发送执行CheckPoint时间,发送周期为streaming batch接收数据的时间
}
从上面代码可知道, 每次产生Streaming Job都会触发Checkpoint的执行
JobGenerator.processEvent方法接收到DoCheckpoint事件后,调用JobGenerator.doCheckpoint方法进行Checkpoint处理
JobGenerator.doCheckpoint方法调用DStreamGraph.updateCheckpointData对输出DStream进行Checkpoint,
然后调用CheckpointWriter将Checkpoint信息写到Checkpoint目录,源码如下:
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)//输出DStream进行Checkpoint
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)//将Checkpoint信息写到Checkpoint目录
}
}
下面来看看如何对DStream进行Checkpoint的
def updateCheckpointData(time: Time) {
logInfo("Updating checkpoint data for time " + time)
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))//将输出流中的每个DStream信息转化成相应的Checkpoint信息
}
logInfo("Updated checkpoint data for time " + time)
}
可见DStreamGraph.updateCheckpointData方法所作的工作是将输出流中的每个DStream信息转化成相应的Checkpoint信息
对每个DStream信息转化成Checkpoint发生在DStream.updateCheckpointData方法,这个方法更新DStream的Checkpoint信息,并且更新DStream依赖的所有DStream的Checkpoint信息,源码如下:
private[streaming] def updateCheckpointData(currentTime: Time) {
logDebug("Updating checkpoint data for time " + currentTime)
checkpointData.update(currentTime)//更新DStream的Checkpoint信息
dependencies.foreach(_.updateCheckpointData(currentTime))//更新所有依赖的DStream的Checkpoint信息
logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}
DirectKafkaInputDStreamCheckpointData的Checkpoint信息更新如下:
<pre name="code" class="java">def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
}
override def update(time: Time) {
batchForTime.clear()//删除老的Checkpoint信息
generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray//a是一个数组,数组的一个元素是一个RDD的所有分区信息,一个分区信息包含了这个分区的起始数据和终止数据在Kafka的某个分区的offset
batchForTime += kv._1 -> a//将分区信息存储在DirectKafkaInputDStreamCheckpointData.data中
}
}
在Spark中,将所有没有成功完成的Job放在了JobScheduler.jobSets中,Job成功完成之后再将它从JobScheduler.jobSets删除,源码如下:
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)//将任务放到了jobSets
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
private def handleJobCompletion(job: Job) { job.result match { case Success(_) => val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { jobSets.remove(jobSet.time)//从任务等待队列中删除这个任务 jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } case Failure(e) => reportError("Error running job " + job, e) }}
Checkpoint.graph对应于Spark-streaming应用的DStreamGraph,DStreamGraph.outputStreams包含了要Checkpoint的DStream信息。C heckpoint.pendingTimes对应没有成功完成的Job,因此在将Checkpoint信息保存到HDFS的时候,这些信息都会被Checkpoint。
要想上一次Spark-streaming Application产生的Checkpoint信息有用,在创建StreamingContext的时候,必须要传入Checkpoint信息。上一次Spark-streaming Application产生的Checkpoint信息的读取可以通过调用CheckpointReader.read方法。
如果创建StreamingContext传入上次执行产生的Checkpoint信息则会使用Checkpoint包含的DStreamGraph作为本次Application的DStreamGraph,它里面包含了需要Checkpoint的DStream信息。然后根据DStreamGraph恢复上一次执行时的DStream信息。源码如下:
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)//使用Checkpoint信息里面的DStreamGraph作为本次Application的DStreamGraph,它里面包含了需要Checkpoint的DStream信息
cp_.graph.restoreCheckpointData()//恢复上一次执行时的DStream信息
cp_.graph
} else {
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
JobGenerator.start开始Streaming Job的产生,如果存在Checkpoint信息,则调用JobGenerator.restart开始Spark-streaming Job的执行,在这个方法里面会将上一次Application执行时已经产生但是还没有成功执行完成的Streaming Job先恢复出来,然后再把从崩溃到重新执行的时间之间没有产生Job补上,然后让Spark先执行这些丢失的Job,源码如下:
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
val batchDuration = ssc.graph.batchDuration
// Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
logInfo("Batches during down time (" + downTimes.size + " batches): "
+ downTimes.mkString(", "))
// Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)//上一次Application执行时已经产生但是还没有成功执行完成的Streaming Job先恢复出来
logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)//崩溃到重新执行的时间之间没有产生Job补上
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach { time =>
// Allocate the related blocks when recovering from failure, because some blocks that were
// added but not allocated, are dangling in the queue after recovering, we have to allocate
// those blocks to the next batch, which is the batch they were supposed to go.
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))//先执行丢失的Job
}
// Restart the timer
timer.start(restartTime.milliseconds)//进入新任务的产生
logInfo("Restarted JobGenerator at " + restartTime)
}