Spark定制班第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

时间:2022-07-22 20:48:32

本期内容:

1 DStream与RDD关系的彻底的研究

2 StreamingRDD的生成彻底研究


  我们有必要来思考三个重要有价值的问题:

  1 DStream生成RDD的过程,DStream到底是怎么生成RDD的?

  2 DStream和RDD到底什么关系?

  3 RDD生成后是怎么管理的?


  有些Spark Streaming应用程序中的最后部分,会有print输出。

  进入源码DStream.print()


   /**
   * Print the first ten elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(): Unit = ssc.withScope {
    print(10)
  }

  缺省时打印前10个。

  /**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println("Time: " + time)
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
     foreachRDD (context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }


  解释:这里的print源码内部用foreachRDD将通过foreachFunc构建的(RDD,Time)遍历操作。

  foreachRDD其实也是要产生ForEachDStream,对DStream遍历操作,ForEachDStream不会产生action操作,所以ForEachDStream操作是transform级别操作。所以我们得出一个结论:ForEachDStream并不一定会触发job的执行,但是会产生Job,(不会触发执行)(真正的job触发是Timer定时产生的额)

  ForEachDStream会产生Job其实也是假象,因为没有ForEachDStream,也会产生Job,定时器Timer时间到了,管你有没有ForEachDStream,还是会产生Job并执行。


  我们再来看一下foreachRDD。

  DStream.foreachRDD代码:


  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

  可以这样说:foreachRDD是Spark Streaming的后门,实际上可以任意操作RDD(表面上是DStream离散流数据)

  为了弄清楚DStream怎样生成RDD的,我们需要看DStream的源代码部分注释,如下所示


 * DStreams internally is characterized by a few basic properties:
 *  - A list of other DStreams that the DStream depends on
 *  - A time interval at which the DStream generates an RDD
 *  - A function that is used to generate an RDD after each time interval
 */


  DStream一共有三个关键重点:

  1 除了第一个DStream,后面的DStream都要依赖前一个DStream.

  2 DStream在每一个interval都会生成一个RDD。

  3 这个类里有个function可以在每一个interval后产生一个RDD.

  这里再次强调:DStream是RDD的模板,负责批量产生RDD。那么接下来,我们彻底深入查看具体过程。


  额外强调一下:为什么DStream要像RDD一样回溯,从后往前依赖,构建最后一个DStream?因为DStream要根据batch interval每隔一定时间产生RDD,必须和RDD高度步调一致(其实可以不一致,只不过会有很多问题)。

  这样又说明了:DStream是RDD模板,DStream Graph是DAG的模板。


  DStream的代码片段:

  

  // RDDs generated, marked as private[streaming] so that testsuites can access it
  @transient
  private[streaming] var generatedRDDs = new HashMap[Time,  RDD [T]] ()

  这是基于时间的RDD数据结构。

  其中的每一个RDD(实际代表最后一个RDD)意味着会执行一个job。

  如果弄清楚GeneratedRDD是怎么实例化的,就可以弄清楚RDD到底是怎么产生的了。

  DStream.getOrCompute


  /**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
             compute (time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
           generatedRDDs.put (time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }


  RDD变量生成了,但是并没有执行,只是在逻辑级别进行了代码的框架级别的优化管理。

  注意:Spark Streaming实际上在没有输入数据的时候仍然会产生RDD(空的BlockRDD),所以可以在此修改源码,提升性能。反过来仔细思考一下,流处理实际上就是时间极短的情况下完成的批处理。


  之前是RDD生成的逻辑级别的背景铺垫。接下来,我们着手物理级别的实际RDD生成过程。


  以官方NetworkWordCount代码为例:


object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc. socketTextStream (args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines. flatMap (_.split(" "))
    val wordCounts = words. map (x => (x, 1)).reduceByKey(_ + _)
    wordCounts. print ()
    ssc.start()
    ssc.awaitTermination()
  }
}

  四个步骤实际上都是transform(表面上最后一个是action)。


  先揭示一下NetworkWordCount中的DStream生成RDD的主流程图:

Spark定制班第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考


  DStreamGraph.generateJobs的代码:


  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption =  outputStream . generateJob (time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

  会调用每个 ForEachDStream genarateJobForEachDStream覆写了genarateJob
  ForEachDStream.genarateJob:

   override def generateJob(time: Time): Option[Job] = {
    parent. getOrCompute (time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

  事实上, genarateJob中通过父类DStream的getOrCompute与例程中各个DStream子类的compute方法组成了职责链模式。

  DStream.getOrCompute


  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {

    generatedRDDs.get(time).orElse {
      if (isTimeValid(time)) {
        ...
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
             compute (time)
          }
        }
        ...
      } else {
        None
      }
    }
  }

  MappedDStream.compute:

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent. getOrCompute (validTime).map(_.map[U](mapFunc))
  }

  FlatMappedDStream.compute:

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent. getOrCompute (validTime).map(_.flatMap(flatMapFunc))
  }

   ReceiverInputDStream 因为是第一个DStream,不 依赖其它 DStream ,所以必须要自己生成RDD。

  ReceiverInputDStream.compute的代码:


  /**
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
         createBlockRDD (validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

  ReceiverInputDStream.createBlockRDD的代码:


  private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]):  RDD [T] = {

    if (blockInfos.nonEmpty) {
      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

      // Are WAL record handles present with all the blocks
      val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

      if (areWALRecordHandlesPresent) {
        // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
        val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
        val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
      } else {
        // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
        // others then that is unexpected and log a warning accordingly.
        if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
          if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
            logError("Some blocks do not have Write Ahead Log information; " +
              "this is unexpected and data may not be recoverable after driver failures")
          } else {
            logWarning("Some blocks have Write Ahead Log information; this is unexpected")
          }
        }
        val validBlockIds = blockIds.filter { id =>
          ssc.sparkContext.env.blockManager.master.contains(id)
        }
        if (validBlockIds.size != blockIds.size) {
          logWarning("Some blocks could not be recovered as they were not found in memory. " +
            "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
            "for more details.")
        }
        new BlockRDD[T](ssc.sc, validBlockIds)
      }
    } else {
      // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
      // according to the configuration
      if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, Array.empty, Array.empty, Array.empty)
      } else {
        new BlockRDD[T](ssc.sc, Array.empty)
      }
    }
  }

  最后职责链又回到ForEachDStream.generateJob。

  ForEachDStream.generateJob:

   override def generateJob(time: Time): Option[Job] = {
    parent. getOrCompute (time) match {
      case Some( rdd ) =>
        val  jobFunc  = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc( rdd , time)
        }
        Some(new  Job (time,  jobFunc ))
      case None => None
    }
  }

  RDD会随jobFunc封装在了新生成的Job中。