Spark Streaming源码分析 – Checkpoint

时间:2021-12-18 20:47:06

Persistence
Streaming没有做特别的事情,DStream最终还是以其中的每个RDD作为job进行调度的,所以persistence就以RDD为单位按照原先Spark的方式去做就可以了,不同的是Streaming是无限,需要考虑Clear的问题
在clearMetadata时,在删除过期的RDD的同时,也会做相应的unpersist
比较特别的是,NetworkInputDStream,是一定会做persistence的,因为会事先将流数据转化为persist block,然后NetworkInputDStream直接从block中读到数据
在design中看到NetworkInputDStream会将source data存两份,防止丢失,但在代码中没有找到这段逻辑,只看到往blockManager写入一份

Checkpoint
在Streaming中Checkpoint有特殊的意义
对于普通的Spark,没有cp不会影响正确性,因为任何数据都是可以从source replay出来的,而source data往往在HDFS上,所以cp只是一种优化。
并且Spark也只在worker级别做了failover,worker挂了,没事把上面的tasks换个worker重新replay出来即可, 但是并没有做driver的failover,driver挂了就失败了
因为Spark本身就看成是个query engine,query失败了没什么损失,again就ok

但是对于SparkStreaming,这个问题就没有那么简单了,如果driver挂掉,不做任何处理,恢复以后到底从哪里开始做?
首先一定会丢数据,影响正确性,因为流数据是无限的,你不可能像Spark一样把所有数据replay一遍,即使source支持replay,比如kafka

所以对于Streaming的checkpoint分为两部分,RDD的cp和DStreamGraph的cp
对于RDD的cp和Spark是一致的,没有区别
下面谈谈对于DStreamGraph的cp,目的就是在StreamingContext被重启后,可以从cp中恢复出之前Graph的执行时状况
a. Graph对象是会整个被序列化到文件,而其中最关键的是outputStreams,看似这里只会persist最终的outputStreams,其实会persist整个graph上所有的DStream
因为在def dependencies: List[DStream[_]]会包含所有的上一层DStream,依次递归,就会包含所有的DStream对象
在恢复出DStream对象后,如何恢复当时的RDD状况,可以看到generatedRDDs是@transient的,并不会被persist
答案在DStream.DStreamCheckpointData中,通过currentCheckpointFiles可以记录下cp时,generatedRDDs中所有完成cp的RDD的(times,cpfilename)
所以在恢复时只需要将RDD从cpfile中读出来,并加入到generatedRDDs即可
并且cpfile是需要清理的,当每次完成DStreamGraph的cp时,在该graph中的最老的RDD之前的所有RDD的cpfile都可以删掉,因为这些老的RDD不可能再被用到
b. 在Checkpoint对象中除了graph对象,还有该比较重要的是pendingTimes,这个记录在cp时,有多少的jobs没有被提交
这样当JobScheduler重新启动的时候会重新提交这些jobs,这里是at-least once逻辑,因为不知道在cp完多久后crash,所以其中某些job有可能已经被成功执行

创建cp的过程,
1. 在JobGenerator中,每次提交一组jobs到Spark后,会执行对DoCheckpoint将Checkpoint对象序列化写入文件(其中Checkpoint对象包含graph对象等信息)
2. 在完成DoCheckpoint后,会调用ClearCheckpointData清除过期的RDD的checkpoint文件

使用cp的过程,
1. 调用StreamingContext.getOrCreate,使用CheckpointReader.read从文件中反序列化出Checkpoint对象, 并使用Checkpoint对象去初始化StreamingContext对象
2. 在StreamingContext中调用cp_.graph.restoreCheckpointData来恢复每个DStream.generatedRDDs
3. 在JobGenerator中调用Restart,重新提交哪些在cp中未被提交的jobs

 

DStreamGraph

Spark Streaming源码分析 – CheckpointSpark Streaming源码分析 – Checkpoint

DStreamCheckpointData

Spark Streaming源码分析 – CheckpointSpark Streaming源码分析 – Checkpoint

DStream

Spark Streaming源码分析 – CheckpointSpark Streaming源码分析 – Checkpoint

JobGenerator
1. 在每次runJobs结束,即每次新提交一组jobs后,会执行对DoCheckpoint将Checkpoint对象写入文件
2. 在restart的时候,会重新run pendingTimes + downTimes的jobs,保证at-least once逻辑

Spark Streaming源码分析 – CheckpointSpark Streaming源码分析 – Checkpoint

StreamingContext
在有checkpoint文件时,需要先读出Checkpoint对象,然后去初始化StreamingContext
从而使用Checkpoint去恢复graph中所有的DStream

Spark Streaming源码分析 – CheckpointSpark Streaming源码分析 – Checkpoint

Checkpoint (org.apache.spark.streaming)
Checkpoint主要是为了cp DStreamGraph对象,通过CheckpointWriter将Checkpoint序列化到文件

Spark Streaming源码分析 – CheckpointSpark Streaming源码分析 – Checkpoint

CheckpointWriter,用于将CP对象写入文件

Spark Streaming源码分析 – CheckpointSpark Streaming源码分析 – Checkpoint

CheckpointReader

Spark Streaming源码分析 – CheckpointSpark Streaming源码分析 – Checkpoint