一、RDD基础
1.RDD分布式数据集的五大特性
(1)A list of partitions
(2)A function for computing each split
(3)A list of dependencies on other RDDs
(4)Optionally,a Partitioner for key-value RDDs
(5)Optionally,a list of preferred locations to compute each split
2.RDD的操作类型
Transformations:转换,lazy型,不会触发计算
Action:触发job
Persist:缓存也不会触发job,在第一次触发job之后才会真正进行缓存
3.RDD的计算
RDD的计算实际上我们可以分为两大部分。
1)Driver端的计算
主要是stage划分,task的封装,task调度执行
2)Executor端的计算
真正的计算开始,默认情况下每个cpu运行一个task。一个task实际上就是一个分区,我们的方法无论是转换算子里封装的,还是action算子里封装的都是此时在一个task里面计算一个分区的数据。
二、源码相关
1.第一次封装
可以看到方法通过clean操作(清理闭包,为序列化和网络传输做准备),进行了一次匿名函数的封装,
针对foreach方法,是我们的方法被传入了迭代器foreach(每个元素遍历执行一次函数),
而对于foreachpartition方法是迭代器被传入了我们的方法(每个分区执行一次函数,我们获取迭代器后需要自行进行迭代处理)
2.第二次封装
就是讲上述封装的方法进一步按照匿名函数封装
(ctx:TaskContext,it:Iterator[T] => cleanFunc(it))
3.执行的时候
Spark的Task类型我们用到的也就两个
1)shuffleMapTask
2)ResultTask
Action算子的方法是在ResultTask中执行的,也即ResultTask的runTask方法。
首先反序列化得到我们的方法和RDD,然后执行。传入的是迭代器
三、总结
RDD.foreach(foreachFunction)
RDD.foreachPatition(foreachPartitionFunction)
经过第二部分析我们可以理解,展开之后实际上就是
RDD的每个分区的iterator:
iterator.foreach(foreachFunction)
foreachPartitionFunction(iterator)
这就很明显了,假如我们的Function中有数据库,网络TCP等IO连接,文件流等等的创建关闭操作,采用foreachPartition方法,针对每个分区集合进行计算,更能提高我们的性能。