Spark的WordCount到底产生了多少个RDD

时间:2021-08-19 00:59:15


Spark的WordCount到底产生了多少个RDD

不少的同学在面试中会被问到:这样的一句标准的sparkcore的wordcount的代码到底能要产生几个RDD呢。相信大家对于一个标准的WordCount的代码一定不陌生:

sc.textFile("hdfs://myha01/wc/input/words.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://myha01/wc/output/")

这局代码:

1、开始使用了一个textFile用来读取数据的方法

2、中间使用了三个标准的RDD的操作算子:

flatMap(_.split(" ")) 负责把由每一行组成的RDD按照空格切开压平成标准的由单词组成的RDD

map((_,1)) 负责把每个单词word变成(word,1)每个单词出现一次

reduceByKey(_+_) 负责把按照key相同也就是单词相同的key-value划分成一组,然后每一组做count聚合,最终就得出了输入文件中,每个单词出现了多少次。

3、最后,使用了一个saveAsTextFile的方法来存储数据

那到底这句代码中执行过程中,是不是刚好每个算子生成一个RDD呢? 很不幸,不是的。如果需要知晓答案,最好的方式,就是翻阅参与运算的每个算子到底做了什么事情。

 

接下来是详细分析:

1、首先看sc.textFile("hdfs://myha01/wc/input/words.txt"):textFile方法在SparkContext类中

Spark的WordCount到底产生了多少个RDD

接着看textFile中的hadoopFile方法的实现:

Spark的WordCount到底产生了多少个RDD

通过这个代码可以得知,在hadoopFile的内部产生了第一个RDD:HadoopRDD

接着回到textFile方法:

Spark的WordCount到底产生了多少个RDD

发现,其实返回的HadoopRDD又调用了map算子,看map算子的实现:

Spark的WordCount到底产生了多少个RDD

map算子的内部实现中,又创建了一个RDD,这就是第二个RDD: MapPartitionsRDD

那也就是说,textFile算子的最终返回值就是第二个RDD:MapPartitionsRDD

 

接着看:flatMap(_.split(" "))算子的操作实现:flatMap算子在RDD中

Spark的WordCount到底产生了多少个RDD

所以flatMap(_.split(" "))算子操作产生了第三个RDD:MapPartitionsRDD

 

接着看map((_,1))算子操作:map算子在RDD类中

Spark的WordCount到底产生了多少个RDD

map((_,1))算子的具体实现依然是简单的new MapPartitionRDD的方式生成第四个RDD:MapPartitionsRDD

 

接着看:reduceByKey(_+_)算子的具体实现:reduceByKey在PairRDDFunctions类中

Spark的WordCount到底产生了多少个RDD

跳到:

Spark的WordCount到底产生了多少个RDD

跳到:

Spark的WordCount到底产生了多少个RDD

到这个地方说明:reduceByKey算子的返回值其实是创建了第五个RDD:ShuffledRDD

 

接着看:saveAsTextFile("hdfs://myha01/wc/output/")算子的具体实现:saveAsTextFile算子在RDD类中

Spark的WordCount到底产生了多少个RDD

this.mapPartitions这句代码在调用的时候,在mapPartitions的内部,其实又创建了第六个RDD:MapPartitionRDD

Spark的WordCount到底产生了多少个RDD

接着回到:saveAsTextFile方法的实现,其实返现,最后一句话在调用中,也会生成一个RDD

这就是第七个RDD:MapPartitionRDD

Spark的WordCount到底产生了多少个RDD

到底为止,其他的地方,是没有再产生RDD的。

所以按照刚才的分析得出的最终结论是:

第一个RDD:HadoopRDD

第二个RDD:MapPartitionsRDD

第三个RDD:MapPartitionsRDD

第四个RDD:MapPartitionsRDD

第五个RDD:ShuffledRDD

第六个RDD:MapPartitionRDD

第七个RDD:MapPartitionRDD

其实,在执行saveAsTextFile之前,我们可以通过RDD提供的toDebugString看到这些个算子在调用的时候到底产生了多少个RDD:

Spark的WordCount到底产生了多少个RDD

望各位仁兄牢记。如果不记得,请翻阅源码。本篇文章是基于最新的Spark-2.3.1的版本