spark streaming源码解读

时间:2023-02-15 20:46:59

spark streaming源码解读

让我们按照源码一步步的解析。请一定一定一定和我们一起跟踪代码。一定要照着做,才能理解。

我们先看下StreamingContext创建的时候有哪些成员变量被初始化了。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}

object StreamingWordCountSelfScala {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据
    val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口
    val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce
    words.print() // 打印结果
    ssc.start() // 启动
    ssc.awaitTermination()
    ssc.stop(true)
  }
}


// 每5秒收割一次数据,这里只是定义收割时间。也就是每5秒中产生的数据是同一个批次的。
val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
// StreamingContext line 80
def this(conf: SparkConf, batchDuration: Duration) = {
  this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}

看下 StreamingContext.createNewSparkContext(conf)

直接new 了SparkContext,这和我们平常的基于Spark Core编写的程序完全一样。SparkStreaming就是Spark Core上的一个应用程序。只是复杂了点而已。

// StreamingContext.scala line 873
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
  new SparkContext(conf)
}


spark streaming源码解读

回到创建StreamingContext的构造中。代码片段列出了几个比较关键的成员变量初始化

spark streaming源码解读

spark streaming源码解读

spark streaming源码解读

// StreamingContext.scala line 49
class StreamingContext private[streaming] (
    sc_ : SparkContext,
    cp_ : Checkpoint,
    batchDur_ : Duration
  ) extends Logging{
  ...
// line 128
if (sc_ == null && cp_ == null) {
  throw new Exception("Spark Streaming cannot be initialized with " +
    "both SparkContext and checkpoint as null")
}


private[streaming] val isCheckpointPresent = (cp_ != null)
// line 136
private[streaming] val sc: SparkContext = {
  if (sc_ != null) {
    sc_
  } else if (isCheckpointPresent) {
    SparkContext.getOrCreate(cp_.createSparkConf()) // 通过SparkConf创建SparkContext,总之一定要以SparkContext为入口,天堂之门
  } else {
    throw new SparkException("Cannot create StreamingContext without a SparkContext")
  }
}


if (sc.conf.get("spark.master") == "local" || sc.conf.get("spark.master") == "local[1]") { 
// 判断local模式下,一定要1条以上线程,因为要分配一条线程接受数据,如果只有一条进程的话,就没有进程来处理接收到的数据了
  logWarning("spark.master should be set as local[n], n > 1 in local mode if you have receivers" +
    " to get data, otherwise Spark jobs will not get resources to process the received data.")
}


private[streaming] val conf = sc.conf


private[streaming] val env = sc.env


// line 155
private[streaming] val graph: DStreamGraph = {
  if (isCheckpointPresent) { // 如果CheckPoint存在,直接从CheckPoint恢复
    cp_.graph.setContext(this)
    cp_.graph.restoreCheckpointData()
    cp_.graph
  } else {
    require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
    val newGraph = new DStreamGraph() // 直接创建DStreamGraph
    newGraph.setBatchDuration(batchDur_) 
    newGraph
  }
}
// 原子Integer类型,哪会用到呢?我们拭目以待。
private val nextInputStreamId = new AtomicInteger(0)
// line 170 
private[streaming] var checkpointDir: String = {
  if (isCheckpointPresent) {
    sc.setCheckpointDir(cp_.checkpointDir)
    cp_.checkpointDir
  } else {
    null
  }
}
// line 179
// 这里出现另一个duration。若没单独设置,直接使用batchDuration的值,本例中为5秒
private[streaming] val checkpointDuration: Duration = {
  if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
}
// line 183
// 重点,后面单独详细说明
private[streaming] val scheduler = new JobScheduler(this)


private[streaming] val waiter = new ContextWaiter


private[streaming] val progressListener = new StreamingJobProgressListener(this)


private[streaming] val uiTab: Option[StreamingTab] =
  if (conf.getBoolean("spark.ui.enabled", true)) {
    Some(new StreamingTab(this))
  } else {
    None
  }


/* Initializing a streamingSource to register metrics */
private val streamingSource = new StreamingSource(this)


private var state: StreamingContextState = INITIALIZED


private val startSite = new AtomicReference[CallSite](null)


private[streaming] def getStartSite(): CallSite = startSite.get()


private var shutdownHookRef: AnyRef = _


conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)
  ...
}

有几个重点的成员变量,DStreamGraph、JobScheduler

让我们先深入看看DStreamGraph

// DStreamGraph.scala line 27
final private[streaming] class DStreamGraph extends Serializable with Logging { // 关注下这里,实现了Serializable接口,就意味着是可以被序列化的
// InputDStream类型的动态数组哦。后续分析哦。
  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  private val outputStreams = new ArrayBuffer[DStream[_]]()


  var rememberDuration: Duration = null
  var checkpointInProgress = false


  var zeroTime: Time = null
  var startTime: Time = null
  var batchDuration: Duration = null
    // ...
    // 定义的其他方法
}

spark streaming源码解读

再看下JobScheduler的构造

// JobScheduler.scala line 37
/**
 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
 * the jobs and runs them using a thread pool.
 * 本类对运行在Spark上的job进行调度。使用JobGenerator来生成Jobs,并且在线程池运行。
 * 说的很清楚了。由JobGenerator生成Job,在线程池中运行。
 */
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {


  // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
  // https://gist.github.com/AlainODea/1375759b8720a3f9f094
  private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 以时间作为key
  // 默认并发Jobs数为1
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  // 使用线程方式执行
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
    // line 50
    // 创建JobGenerator,后续会详细分析
  private val jobGenerator = new JobGenerator(this)
  val clock = jobGenerator.clock
  val listenerBus = new StreamingListenerBus()


  // These two are created only when scheduler starts.
  // eventLoop not being null means the scheduler has been started and not stopped
  var receiverTracker: ReceiverTracker = null
  // A tracker to track all the input stream information as well as processed record number
  var inputInfoTracker: InputInfoTracker = null


  private var eventLoop: EventLoop[JobSchedulerEvent] = null

再分析下JobGenerator

// JobScheduler.scala line 37
/**
 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
 * the jobs and runs them using a thread pool.
 * 本类对运行在Spark上的job进行调度。使用JobGenerator来生成Jobs,并且在线程池运行。
 * 说的很清楚了。由JobGenerator生成Job,在线程池中运行。
 */
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {


  // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
  // https://gist.github.com/AlainODea/1375759b8720a3f9f094
  private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 以时间作为key
  // 默认并发Jobs数为1
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  // 使用线程方式执行
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
    // line 50
    // 创建JobGenerator,后续会详细分析
  private val jobGenerator = new JobGenerator(this)
  val clock = jobGenerator.clock
  val listenerBus = new StreamingListenerBus()


  // These two are created only when scheduler starts.
  // eventLoop not being null means the scheduler has been started and not stopped
  var receiverTracker: ReceiverTracker = null
  // A tracker to track all the input stream information as well as processed record number
  var inputInfoTracker: InputInfoTracker = null


  private var eventLoop: EventLoop[JobSchedulerEvent] = null

让我们来回顾下,整个StreamingContext的实例化过程

  1. 创建SparkContext;其中创建的TaskScheduler、SchedulerBackend、DAGScheduler等属于Spark Core的内容。

  2. StreamingContext中实例化了DStreamingGraph、JobScheduler

  3. JobScheduler中实例化了JobGenerator、默认数量为1的Job执行线程池(jobExecutor)

  4. JobGenator中定义了定时触发的函数、并传入RecurringTimer(循环定时器)的构造中

至此,StreamingContext已经实例化完成。