第6课:Spark Streaming源码解读之Job动态生成和深度思考

时间:2022-07-24 20:48:59

上一节我们从总体上讲解了Spark Streaming job的运行机制。本节我们针对job如何生成进行详细的阐述,请看下图:

第6课:Spark Streaming源码解读之Job动态生成和深度思考

在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler:

/**
 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
 * the jobs and runs them using a thread pool.
  * 这个类调度jobs在Spark上运行,它使用JobGenerator产生jobs,并且使用线程池来运行jobs
 */
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging


JobScheduler有两个非常重要的成员:

  • JobGenerator 

  • ReceiverTracker

JobScheduler 将每个batch的RDD DAG的具体生成工作委托给JobGenerator,将源头数据输入的记录工作委托给ReceiverTracker 。


在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop;RecurringTimer它控制了job的触发。每到batchInterval时间,就往EventLoop的队列中放入一个消息。而EventLoop则不断的查看消息队列,一旦有消息就处理;


在Spark Streaming应用程序中都会调用

ssc.start() //ssc 代表StreamingContext

这将隐含的导致一系列的模块的启动:

ssc.start() 

    --> scheduler.start() 

       --> jobGenerator.start() 


我们来具体的看看JobGenerator.start()的代码:

def start(): Unit = synchronized {  ...   eventLoop.start()  //启动RPC处理线程  if (ssc.isCheckpointPresent) {    restart()       // 如果不是第一次启动,就从Checkpoint中恢复  } else {    startFirstTime() //第一次启动  }}

在startFirstTime中将DStreamGraph、定时器启动

private def startFirstTime() {  val startTime = new Time(timer.getStartTime())  graph.start(startTime - graph.batchDuration)  timer.start(startTime.milliseconds)  logInfo("Started JobGenerator at " + startTime)}


定时器RecurringTimer启动后,使用线程每到一个新的batchInterval,就会向EventLoop中发生一个消息。

private def triggerActionForNextInterval(): Unit = {  clock.waitTillTime(nextTime)  callback(nextTime)  prevTime = nextTime  nextTime += period  logDebug("Callback for " + name + " called at time " + prevTime)}


这里的callback函数就是RecurringTimer初始化时传入的匿名函数:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")


当EventLoop收到消息后:

override def run(): Unit = {  try {    while (!stopped.get) {      val event = eventQueue.take()      try {        onReceive(event)      } catch {        case NonFatal(e) => {          try {            onError(e)          } catch {            case NonFatal(e) => logError("Unexpected error in " + name, e)          }        }      }    }  } catch {    case ie: InterruptedException => // exit even if eventQueue is not empty    case NonFatal(e) => logError("Unexpected error in " + name, e)  }}

不断的去处理事件:

/** Processes all events */private def processEvent(event: JobGeneratorEvent) {  logDebug("Got event " + event)  event match {    case GenerateJobs(time) => generateJobs(time)    case ClearMetadata(time) => clearMetadata(time)    case DoCheckpoint(time, clearCheckpointDataLater) =>      doCheckpoint(time, clearCheckpointDataLater)    case ClearCheckpointData(time) => clearCheckpointData(time)  }}

这里调用generateJobs方法:

private def generateJobs(time: Time) {  // Set the SparkEnv in this thread, so that job generation code can access the environment  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.  SparkEnv.set(ssc.env)  Try {    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch    graph.generateJobs(time) // generate jobs using allocated block  } match {    case Success(jobs) =>      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))    case Failure(e) =>      jobScheduler.reportError("Error generating jobs for time " + time, e)  }  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}

这段代码异常精悍,包含了JobGenerator主要工作4个步骤

  1. 要求ReceiverTracker将目前已收到的数据进行一次allocate,即将上次batch切分后的数据切分到到本次新的batch里

  2. 要求DStreamGraph复制出一套新的 RDD DAG 的实例。整个DStreamGraph.generateJobs(time)遍历结束的返回值是Seq[Job]

  3. 将第2步生成的本 batch 的 RDD DAG,和第1步获取到的 meta 信息,一同提交给JobScheduler异步执行这里我们提交的是将 (a) time (b) Seq[job] (c) 块数据的meta信息。这三者包装为一个JobSet,然后调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler。这里的向JobScheduler提交过程与JobScheduler接下来在jobExecutor里执行过程是异步分离的,因此本步将非常快即可返回。

  4. 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个checkpoint这里做checkpoint也只是异步提交一个DoCheckpoint消息请求,不用等 checkpoint 真正写完成即可返回这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的JobSet等实际运行时信息。



备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


本文出自 “叮咚” 博客,请务必保留此出处http://lqding.blog.51cto.com/9123978/1772958