本节分析一下Spark Streaming生成RDD的过程(也是生成Job的过程),DStream是Spark Streaming的抽象数据表示,底层是RDD实现。由于RDD是为了满足Job需要的,所以触发生成RDD的职责应该是由JobGenerator负责。换句话说:RDD的生成是在Job生成过程中生成的,所以查看Job生成过程也就是RDD的生成过程。
通过翻阅JobGenerator源码,JobGenerator有一成员如下:
//TODO 生成Job的定时器,按照间隔定期执行 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
正是此成员负责定时给EventLoop事件循环处理器发送GenerateJobs消息生成Job,RecurringTimer是一个底层使用线程实现的一个定时器,实现代码很简单,具体自行翻阅源码。而timer的启动是在jobGenerator的启动过程中启动的(注:eventLoop也是在jobGenerator的启动过程中启动的),如下:
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { //TODO onReceive方法在EventLoop的线程的run方法中被调用 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start()//TODO 启动事件循环处理器 if (ssc.isCheckpointPresent) { restart() } else { //TODO 启动定时生成job的timer启动 startFirstTime() } }
org.apache.spark.streaming.scheduler.JobGenerator#startFirstTime代码如下:
/** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) //TODO启动定时生成job的定期器 timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
JobGenerator的事件循环处理器的实现是一个内部匿名类的方式实现的,重写onReceive和onError的方法。首先看一下事件循环处理器的抽象类:
/** * An event loop to receive events from the caller and process all events in the event thread. It * will start an exclusive event thread to process all events. * * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can * handle events in time to avoid the potential OOM. * <br><br> * 内部一个线程负责处理消息 * */ private[spark] abstract class EventLoop[E](name: String) extends Logging { /**eventQueue是一个事件队列*/ private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() /**事件循环处理器的启动或结束标记*/ private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { //TODO 一旦有时间,则调用onReceive进行消息处理 onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() } def stop(): Unit = { if (stopped.compareAndSet(false, true)) { eventThread.interrupt() var onStopCalled = false try { eventThread.join() // Call onStop after the event thread exits to make sure onReceive happens before onStop onStopCalled = true onStop() } catch { case ie: InterruptedException => Thread.currentThread().interrupt() if (!onStopCalled) { // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since // it's already called. onStop() } } } else { // Keep quiet to allow calling `stop` multiple times. } } /** * Put the event into the event queue. The event thread will process it later. */ def post(event: E): Unit = { eventQueue.put(event) } /** * Return if the event thread has already been started but not yet stopped. */ def isActive: Boolean = eventThread.isAlive /** * Invoked when `start()` is called but before the event thread starts. */ protected def onStart(): Unit = {} /** * Invoked when `stop()` is called and the event thread exits. */ protected def onStop(): Unit = {} /** * Invoked in the event thread when polling events from the event queue. * * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked * and cannot process events in time. If you want to call some blocking actions, run them in * another thread. */ protected def onReceive(event: E): Unit /** * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError` * will be ignored. */ protected def onError(e: Throwable): Unit }
JobGenerator的事件循环处理器的匿名实现:
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { //TODO onReceive方法在EventLoop的线程的run方法中被调用 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } }
到此,JobGenerator的事件循环处理器的匿名类的onReceive方法也会定时执行,所以接下来执行的就是processEvent(event)方法:
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { //TODO 处理生成Job时间 case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
接下来调用org.apache.spark.streaming.scheduler.JobGenerator#generateJobs:
/** Generate jobs and perform checkpointing for the given `time`. * */ private def generateJobs(time: Time) { // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //TODO 根据DStreamGraph生成Job集合 graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => //TODO 获取到输入数据源的信息 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) //TODO 提交作业 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } //TODO 每一次generateJobs之后便执行CheckPoint eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }在如上方法中调用org.apache.spark.streaming.DStreamGraph#generateJobs方法生成Jobs,org.apache.spark.streaming.DStreamGraph#generateJobs的实现如下:
/** * 产生Job * @param time * @return */ def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => //TODO 调用了DStream的generateJob方法(org.apache.spark.streaming.dstream.DStream.generateJob方法中调用getOrCompute调用compute方法触发生成DStream) val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
接下来org.apache.spark.streaming.dstream.DStream#generateJob方法:
/** * 产生Job * @param time * @return */ def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => //TODO 调用了DStream的generateJob方法(org.apache.spark.streaming.dstream.DStream.generateJob方法中调用getOrCompute调用compute方法触发生成DStream) val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }接下来org.apache.spark.streaming.dstream.DStream#generateJob:
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { //TODO 获取到当前时间对应的RDD getOrCompute(time) match { case Some(rdd) => val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } //TODO 和Spark RDD运行一样,最终都是使用sparkContext.runJob方法运行 context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) case None => None } }
接下来org.apache.spark.streaming.dstream.DStream#getOrCompute:
/** * Get the RDD corresponding to the given time; either retrieve it from cache or compute-and-cache it. * <br><br> * 获取当前时间time对应的RDD */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { //TODO 重点方法 compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
/** * * Method that generates an RDD for the given time * * @param validTime * @return Option[KafkaRDD[K, V]] */ override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { val untilOffsets = clamp(latestOffsets()) //TODO 重点业务,其中包含消息区间的确定和速率的控制 // OffsetRange包含信息有:topic,partition,起始位置,结束位置 val offsetRanges = untilOffsets.map { case (tp, uo) => val fo = currentOffsets(tp)// fo和uo是多数情况相等的 OffsetRange(tp.topic, tp.partition, fo, uo) } //TODO KafkaRDD构造函数的第三个参数比较重要:该参数定义了Kafka分区属于当前RDD数据的offset值 val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true) // Report the record number and metadata of this batch interval to InputInfoTracker. //TODO 汇报当前记录数目和元数据信息到InputInfoTracker val description = offsetRanges.filter { offsetRange => // Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset//TODO 过滤掉区间为空的offsetRange }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) //TODO InputInfoTracker是运行在Driver端,负责计算数据的监控 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) //将当前消费过的最新偏移设置到currentOffsets中 currentOffsets = untilOffsets commitAll() Some(rdd) }
到此跟踪完毕Job的生成过程,也是RDD的生成过程