让我们按照源码一步步的解析。请一定一定一定和我们一起跟踪代码。一定要照着做,才能理解。
我们先看下StreamingContext创建的时候有哪些成员变量被初始化了。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
object StreamingWordCountSelfScala {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")
val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据
val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口
val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce
words.print() // 打印结果
ssc.start() // 启动
ssc.awaitTermination()
ssc.stop(true)
}
}
// 每5秒收割一次数据,这里只是定义收割时间。也就是每5秒中产生的数据是同一个批次的。
val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
// StreamingContext line 80
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
看下 StreamingContext.createNewSparkContext(conf)
直接new 了SparkContext,这和我们平常的基于Spark Core编写的程序完全一样。SparkStreaming就是Spark Core上的一个应用程序。只是复杂了点而已。
// StreamingContext.scala line 873
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)
}
回到创建StreamingContext的构造中。代码片段列出了几个比较关键的成员变量初始化
// StreamingContext.scala line 49
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
) extends Logging{
...
// line 128
if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}
private[streaming] val isCheckpointPresent = (cp_ != null)
// line 136
private[streaming] val sc: SparkContext = {
if (sc_ != null) {
sc_
} else if (isCheckpointPresent) {
SparkContext.getOrCreate(cp_.createSparkConf()) // 通过SparkConf创建SparkContext,总之一定要以SparkContext为入口,天堂之门
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
}
if (sc.conf.get("spark.master") == "local" || sc.conf.get("spark.master") == "local[1]") {
// 判断local模式下,一定要1条以上线程,因为要分配一条线程接受数据,如果只有一条进程的话,就没有进程来处理接收到的数据了
logWarning("spark.master should be set as local[n], n > 1 in local mode if you have receivers" +
" to get data, otherwise Spark jobs will not get resources to process the received data.")
}
private[streaming] val conf = sc.conf
private[streaming] val env = sc.env
// line 155
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) { // 如果CheckPoint存在,直接从CheckPoint恢复
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph() // 直接创建DStreamGraph
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
// 原子Integer类型,哪会用到呢?我们拭目以待。
private val nextInputStreamId = new AtomicInteger(0)
// line 170
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(cp_.checkpointDir)
cp_.checkpointDir
} else {
null
}
}
// line 179
// 这里出现另一个duration。若没单独设置,直接使用batchDuration的值,本例中为5秒
private[streaming] val checkpointDuration: Duration = {
if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
}
// line 183
// 重点,后面单独详细说明
private[streaming] val scheduler = new JobScheduler(this)
private[streaming] val waiter = new ContextWaiter
private[streaming] val progressListener = new StreamingJobProgressListener(this)
private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
} else {
None
}
/* Initializing a streamingSource to register metrics */
private val streamingSource = new StreamingSource(this)
private var state: StreamingContextState = INITIALIZED
private val startSite = new AtomicReference[CallSite](null)
private[streaming] def getStartSite(): CallSite = startSite.get()
private var shutdownHookRef: AnyRef = _
conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)
...
}
有几个重点的成员变量,DStreamGraph、JobScheduler
让我们先深入看看DStreamGraph
// DStreamGraph.scala line 27
final private[streaming] class DStreamGraph extends Serializable with Logging { // 关注下这里,实现了Serializable接口,就意味着是可以被序列化的
// InputDStream类型的动态数组哦。后续分析哦。
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
var rememberDuration: Duration = null
var checkpointInProgress = false
var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
// ...
// 定义的其他方法
}
再看下JobScheduler的构造
// JobScheduler.scala line 37
/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
* the jobs and runs them using a thread pool.
* 本类对运行在Spark上的job进行调度。使用JobGenerator来生成Jobs,并且在线程池运行。
* 说的很清楚了。由JobGenerator生成Job,在线程池中运行。
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
// https://gist.github.com/AlainODea/1375759b8720a3f9f094
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 以时间作为key
// 默认并发Jobs数为1
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
// 使用线程方式执行
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
// line 50
// 创建JobGenerator,后续会详细分析
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
// A tracker to track all the input stream information as well as processed record number
var inputInfoTracker: InputInfoTracker = null
private var eventLoop: EventLoop[JobSchedulerEvent] = null
再分析下JobGenerator
// JobScheduler.scala line 37
/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
* the jobs and runs them using a thread pool.
* 本类对运行在Spark上的job进行调度。使用JobGenerator来生成Jobs,并且在线程池运行。
* 说的很清楚了。由JobGenerator生成Job,在线程池中运行。
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
// https://gist.github.com/AlainODea/1375759b8720a3f9f094
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 以时间作为key
// 默认并发Jobs数为1
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
// 使用线程方式执行
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
// line 50
// 创建JobGenerator,后续会详细分析
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
// A tracker to track all the input stream information as well as processed record number
var inputInfoTracker: InputInfoTracker = null
private var eventLoop: EventLoop[JobSchedulerEvent] = null
让我们来回顾下,整个StreamingContext的实例化过程
-
创建SparkContext;其中创建的TaskScheduler、SchedulerBackend、DAGScheduler等属于Spark Core的内容。
-
StreamingContext中实例化了DStreamingGraph、JobScheduler
-
JobScheduler中实例化了JobGenerator、默认数量为1的Job执行线程池(jobExecutor)
-
JobGenator中定义了定时触发的函数、并传入RecurringTimer(循环定时器)的构造中
至此,StreamingContext已经实例化完成。