本期内容:
1 DStream与RDD关系的彻底的研究
2 StreamingRDD的生成彻底研究
我们有必要来思考三个重要有价值的问题:
1 DStream生成RDD的过程,DStream到底是怎么生成RDD的?
2 DStream和RDD到底什么关系?
3 RDD生成后是怎么管理的?
有些Spark Streaming应用程序中的最后部分,会有print输出。
进入源码DStream.print()
/**
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(): Unit = ssc.withScope {
print(10)
}
缺省时打印前10个。
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD
(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
解释:这里的print源码内部用foreachRDD将通过foreachFunc构建的(RDD,Time)遍历操作。
foreachRDD其实也是要产生ForEachDStream,对DStream遍历操作,ForEachDStream不会产生action操作,所以ForEachDStream操作是transform级别操作。所以我们得出一个结论:ForEachDStream并不一定会触发job的执行,但是会产生Job,(不会触发执行)(真正的job触发是Timer定时产生的额)
ForEachDStream会产生Job其实也是假象,因为没有ForEachDStream,也会产生Job,定时器Timer时间到了,管你有没有ForEachDStream,还是会产生Job并执行。
我们再来看一下foreachRDD。
DStream.foreachRDD代码:
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
可以这样说:foreachRDD是Spark Streaming的后门,实际上可以任意操作RDD(表面上是DStream离散流数据)
为了弄清楚DStream怎样生成RDD的,我们需要看DStream的源代码部分注释,如下所示
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
*/
DStream一共有三个关键重点:
1 除了第一个DStream,后面的DStream都要依赖前一个DStream.
2 DStream在每一个interval都会生成一个RDD。
3 这个类里有个function可以在每一个interval后产生一个RDD.
这里再次强调:DStream是RDD的模板,负责批量产生RDD。那么接下来,我们彻底深入查看具体过程。
额外强调一下:为什么DStream要像RDD一样回溯,从后往前依赖,构建最后一个DStream?因为DStream要根据batch interval每隔一定时间产生RDD,必须和RDD高度步调一致(其实可以不一致,只不过会有很多问题)。
这样又说明了:DStream是RDD模板,DStream Graph是DAG的模板。
DStream的代码片段:
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time,
RDD
[T]] ()
这是基于时间的RDD数据结构。
其中的每一个RDD(实际代表最后一个RDD)意味着会执行一个job。
如果弄清楚GeneratedRDD是怎么实例化的,就可以弄清楚RDD到底是怎么产生的了。
DStream.getOrCompute:
/**
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute
(time)
}
}
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put
(time, newRDD)
}
rddOption
} else {
None
}
}
}
RDD变量生成了,但是并没有执行,只是在逻辑级别进行了代码的框架级别的优化管理。
注意:Spark Streaming实际上在没有输入数据的时候仍然会产生RDD(空的BlockRDD),所以可以在此修改源码,提升性能。反过来仔细思考一下,流处理实际上就是时间极短的情况下完成的批处理。
之前是RDD生成的逻辑级别的背景铺垫。接下来,我们着手物理级别的实际RDD生成过程。
以官方NetworkWordCount代码为例:
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.
socketTextStream
(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.
flatMap
(_.split(" "))
val wordCounts = words.
map
(x => (x, 1)).reduceByKey(_ + _)
wordCounts.
print
()
ssc.start()
ssc.awaitTermination()
}
}
四个步骤实际上都是transform(表面上最后一个是action)。
先揭示一下NetworkWordCount中的DStream生成RDD的主流程图:
DStreamGraph.generateJobs的代码:
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption =
outputStream
.
generateJob
(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
会调用每个
ForEachDStream
的genarateJob。ForEachDStream覆写了genarateJob。
ForEachDStream.genarateJob:
override def generateJob(time: Time): Option[Job] = {
parent.
getOrCompute
(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
事实上,
genarateJob中通过父类DStream的getOrCompute与例程中各个DStream子类的compute方法组成了职责链模式。
DStream.getOrCompute:
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
generatedRDDs.get(time).orElse {
if (isTimeValid(time)) {
...
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute
(time)
}
}
...
} else {
None
}
}
}
MappedDStream.compute:
override def compute(validTime: Time): Option[RDD[U]] = {
parent.
getOrCompute
(validTime).map(_.map[U](mapFunc))
}
FlatMappedDStream.compute:
override def compute(validTime: Time): Option[RDD[U]] = {
parent.
getOrCompute
(validTime).map(_.flatMap(flatMapFunc))
}
ReceiverInputDStream
因为是第一个DStream,不
依赖其它
DStream
,所以必须要自己生成RDD。
ReceiverInputDStream.compute的代码:
/**
* Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDD
createBlockRDD
(validTime, blockInfos)
}
}
Some(blockRDD)
}
ReceiverInputDStream.createBlockRDD的代码:
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]):
RDD
[T] = {
if (blockInfos.nonEmpty) {
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
// others then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; " +
"this is unexpected and data may not be recoverable after driver failures")
} else {
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
val validBlockIds = blockIds.filter { id =>
ssc.sparkContext.env.blockManager.master.contains(id)
}
if (validBlockIds.size != blockIds.size) {
logWarning("Some blocks could not be recovered as they were not found in memory. " +
"To prevent such data loss, enabled Write Ahead Log (see programming guide " +
"for more details.")
}
new BlockRDD[T](ssc.sc, validBlockIds)
}
} else {
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
// according to the configuration
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
} else {
new BlockRDD[T](ssc.sc, Array.empty)
}
}
}
最后职责链又回到ForEachDStream.generateJob。
ForEachDStream.generateJob:
override def generateJob(time: Time): Option[Job] = {
parent.
getOrCompute
(time) match {
case Some(
rdd
) =>
val
jobFunc
= () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(
rdd
, time)
}
Some(new
Job
(time,
jobFunc
))
case None => None
}
}
RDD会随jobFunc封装在了新生成的Job中。