coalesce算子,相当绕口的一个英文单词,来闭上眼睛回忆一下编程手册,咋说的来着?
coalesce(numPartitions):
? ? Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
翻译一下:? 把一个RDD的分区数降低到指定的分区个数(即numPartitions个),主要用途是在大数据集的过滤后,使得后续操作更加搞笑,啊,不是,是更加高效。解释一下:原始数据集非常的大,所以我们需要把原始数据集切的很细(partition个数非常多),这样就可以充分利用spark的分布式、高并发的特性,来加快数据的过滤。巴特,过滤完之后,可能数据集就非常小了,比如把10亿过滤成了1000条,以前10亿需要1000个partition并行处理来满足时间要求,但是1000条数据如果还是用1000个partition,那意味着神马?意味着资源的极大浪费!因为rdd的partition是和task对应的,一个partition就会启动一个task进行处理,1000条数据,1000个partition,1000个task,基本上一个task处理一条数据,那就真的是有点搞笑了。虽然在一个executor内,使用线程池来减少启动task的开销,但是浪费依然是存在的,这个task占用资源期间,其他job的task就得等待了。所以这时就需要减少partition的数量了,于是coalesce应运而生。
好了,这时又需要大家再闭上眼睛回忆一下,网上咋评价这个算子来着?对,这个算子和repartition相比,某些情况下不会带来shuffle的开销。大家再跟着我脑补一下,假如有1000个partition,有100台服务器,那么最理想的分配方式就是平均分配,每台服务器处理10个partition,那么如果在调用coalesce时,传入的numPartitions是100,那么我直接把每个机器上的10个partition合并成一个,不就可以达到减少partition数但是又没有在服务器间传输大量数据(shuffle)的目的了吗?这个过程也和coalesce的中文释义吻合,即“合并”,而非repartition的“重分发”。那我们来看看coalesce神器是不是这么做的。首先来看主入口方法:
要进行分区合并,spark提供了一个叫DefaultPartitionCoalescer分区合并器的类来完成这个工作,分区合并器中一个重要的概念是PartitionGroup,即分区组,一个分区组就对应了一个把多个父rdd分区合并后的子rdd的分区,即CoalescedRDD的分区,但是为什么叫分区组而不直接就叫CoalescedRDD分区呢,那是因为PartitionGroup还是一个中间状态,无法表达一个真正的RDD的partition。
我们来倒着进行源码分析,看看getPartitions这个方法,这是一个接口方法,driver端在执行调度时,会调用这个方法,获取到一个rdd有哪些partition,然后进行task的分配(就是把task分配到哪些executor机器上去执行):
看了上图的执行分析,我们就会想知道父rdd的partition是怎么被分配到一个PartitionGroup中去的,那么就来看看coalesce方法吧:?
在setupGroups方法中,就涉及到一个我们本篇文章要发散的一个点了,那就是preferedLocation的概念,先说location,其实location就是一个ip地址或者主机名,用来标识一个partition要被分配到哪个服务器去处理。如果一个partition相关的数据在A节点,但是被分配到B节点执行,那存在两种情况,如果是HDFS这种共享磁盘的文件系统,那么就需要HDFS底层来把A节点上的数据拉到B节点,这就带来了磁盘读(从A节点读)磁盘写(在B节点写)以及网络开销;另外一种情况就是本地文件系统,那么就直接会报错了,因为不可能在B节点找到A节点上存储的文件数据。但是如果能让task直接就在A节点去执行,就可以解决上边的两种问题了,这就是传说中的“让计算去靠近存储”,而调度系统要完成这样一个优化,就需要RDD告诉调度系统,一个分区的preferedLocation具体是什么。?
好了,回到上边setupGroups方法中,如果父partition都没有这个preferedLocation,那么所有的父partition根据其在数组中的位置,会被平均的分配到一个PartitionGroup中,进行CoalescedRDD分区的构造。如果父partition有preferedLocation,那么就要进行额外的处理了,我们来看throwBalls方法:?
好了,我们知道了CoalescedRDD是如何选择分区的最优执行节点了,那么这个获取最优分区的过程会在哪里被调用呢?我们思考一下,spark的进程主要分为driver和executor,executor只需要根据driver发送过来的信息就行执行就行了,driver端才需要知道这些全局相关的信息,所以自然,RDD的preferredLocations方法会在调度阶段被调用,好了就不贴代码了,快去DAGScheduler中印证一下吧。?
那么我们顺着这个思路来看看还有哪些子RDD实现了这个getPreferredLocations方法呢?
HadoopPartition中存储了一个partition的三个副本都在哪些节点上,调度系统尅选择其中的一个进行任务的分发。
Union是把多个RDD合并成一个,但是其中每个RDD在物理上海市完全独立的,所以UnionRDD的partition在进行选择preferedlocation的原则就是,父rdd选择的是哪个,unionrdd这个儿子的partition的preferedlocation就是哪个节点。
shuffle过程中,中间结果会写入到本地的内存或者本地磁盘,所以这里一定要明确的告诉调度系统,地洞shuffleread时,需要到哪个节点上去读shufflewrite已经写好的中间结果。
通过coalesce算子源码的阅读,我们可以了解到父RDD的parititon子RDD的partition的对应关系,英语作文格式有利于我们理解整个spark job的计算流程。通过coalesceRDD在对父RDD的多个partition进行打包的过程,我们看到了preferedlocation的使用,这可以让我们在整体上调度系统的执行流程有一个了解。
更多spark学习资源和经验分享,加入spark技术学院,BAT一线工程师为你答疑解惑:
文章来源:https://blog.csdn.net/rlnLo2pNEfx9c/article/details/80731302