spark streaming job生成
spark Streaming每次提交job的时候,会提交几个呢?
DStreamGraph
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
分别根据每个outputStream生成job,也就是说有多少个outputStream,就会有多少job
outputStream如何生成呢
DStream
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
这里通过register方法来注册outputStream
DStream
/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
体现在代码级别上,每执行一个foreach方法,提交job的时候就会有新增一个job,如果整个应用中没有foreach,也就是说没有 outputStream的话,会触发异常。
DStreamGraph
def validate() {
this.synchronized {
require(batchDuration != null, "Batch duration has not been set")
// assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
// " is very low")
require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute")
}
}
生成job
DStream
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* to generate their own jobs.
*/
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
此处有jobFunc,直接调用的spark的runJob方法,runJob的分析,可以参考我另一篇博客。
job提交
JobGenerator
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
//此处生成job
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
//此处提交job
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
JobScheduler
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
内部有线程池,提交JobHandler
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
try {
....
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
//此处就是job的最终运行的地方
job.run()
}
....
}
}
Job
def run() {
_result = Try(func())
}
//此处func方法就是在job生成时的jobFunc,调用的runJob方法。