reduceByKey和groupByKey区别与用法

时间:2023-01-29 22:17:55

转自:https://blog.csdn.net/zongzhiyuan/article/details/49965021

在spark中,我们知道一切的操作都是基于RDD的。在使用中,RDD有一种非常特殊也是非常实用的format——pair RDD,即RDD的每一行是(key, value)的格式。这种格式很像Python的字典类型,便于针对key进行一些处理。


针对pair RDD这样的特殊形式,spark中定义了许多方便的操作,今天主要介绍一下reduceByKey和groupByKey,因为在接下来讲解《在spark中如何实现SQL中的group_concat功能?》时会用到这两个operations。


首先,看一看spark官网[1]是怎么解释的:

reduceByKey(func, numPartitions=None)

Merge the values for each key using an associative reduce function. This will also perform the merginglocally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified.

也就是,reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。

groupByKey(numPartitions=None)

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.

也就是,groupByKey也是对每个key进行操作,但只生成一个sequence。需要特别注意“Note”中的话,它告诉我们:如果需要对sequence进行aggregation操作(注意,groupByKey本身不能自定义操作函数),那么,选择reduceByKey/aggregateByKey更好。这是因为groupByKey不能自定义函数,我们需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。


为了更好的理解上面这段话,下面我们使用两种不同的方式去计算单词的个数[2]:

[java]  view plain  copy
  1. val words = Array("one""two""two""three""three""three")  
  2.   
  3. val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))  
  4.   
  5. val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)  
  6.   
  7. val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))  
上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一样的,但是,它们的内部运算过程是不同的。


(1)当采用reduceByKeyt时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合。借助下图可以理解在reduceByKey里究竟发生了什么。 注意在数据对被搬移前同一机器上同样的key是怎样被组合的(reduceByKey中的lamdba函数)。然后lamdba函数在每个区上被再次调用来将所有值reduce成一个最终结果。整个过程如下:

reduceByKey和groupByKey区别与用法


(2)当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。整个过程如下:

reduceByKey和groupByKey区别与用法

因此,在对大数据进行复杂计算时,reduceByKey优于groupByKey

另外,如果仅仅是group处理,那么以下函数应该优先于 groupByKey :
  (1)、combineByKey 组合数据,但是组合之后的数据类型与输入时值的类型不一样。
  (2)、foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。


最后,对reduceByKey中的func做一些介绍:

如果是用Python写的spark,那么有一个库非常实用:operator[3],其中可以用的函数包括:大小比较函数,逻辑操作函数,数学运算函数,序列操作函数等等。这些函数可以直接通过“from operator import *”进行调用,直接把函数名作为参数传递给reduceByKey即可。如下:

[python]  view plain  copy
  1. <span style="font-size:14px;">from operator import add  
  2. rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])  
  3. sorted(rdd.reduceByKey(add).collect())  
  4.   
  5. [('a'2), ('b'1)]</span>  

下面是附加源码更加详细的解释

转自:https://blog.csdn.net/ZMC921/article/details/75098903

一、首先他们都是要经过shuffle的,groupByKey在方法shuffle之间不会合并原样进行shuffle,。reduceByKey进行shuffle之前会先做合并,这样就减少了shuffle的io传送,所以效率高一点。
案例:
[plain] view plain copy
  1. object GroupyKeyAndReduceByKeyDemo {  
  2.   def main(args: Array[String]): Unit = {  
  3.     Logger.getLogger("org").setLevel(Level.WARN)  
  4.     val config = new SparkConf().setAppName("GroupyKeyAndReduceByKeyDemo").setMaster("local")  
  5.     val sc = new SparkContext(config)  
  6.     val arr = Array("val config", "val arr")  
  7.     val socketDS = sc.parallelize(arr).flatMap(_.split(" ")).map((_, 1))  
  8.     //groupByKey 和reduceByKey 的区别:  
  9.     //他们都是要经过shuffle的,groupByKey在方法shuffle之间不会合并原样进行shuffle,  
  10.     //reduceByKey进行shuffle之前会先做合并,这样就减少了shuffle的io传送,所以效率高一点  
  11.     socketDS.groupByKey().map(tuple => (tuple._1, tuple._2.sum)).foreach(x => {  
  12.       println(x._1 + " " + x._2)  
  13.     })  
  14.     println("----------------------")  
  15.     socketDS.reduceByKey(_ + _).foreach(x => {  
  16.       println(x._1 + " " + x._2)  
  17.     })  
  18.     sc.stop()  
  19.   }  
  20. }  
二 、首先groupByKey有三种
reduceByKey和groupByKey区别与用法
查看源码groupByKey()实现了 groupByKey(defaultPartitioner(self))
[java] view plain copy
  1. /** 
  2.    * Group the values for each key in the RDD into a single sequence. Hash-partitions the 
  3.    * resulting RDD with the existing partitioner/parallelism level. The ordering of elements 
  4.    * within each group is not guaranteed, and may even differ each time the resulting RDD is 
  5.    * evaluated. 
  6.    * 
  7.    * @note This operation may be very expensive. If you are grouping in order to perform an 
  8.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 
  9.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 
  10.    */  
  11.   def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {  
  12.     groupByKey(defaultPartitioner(self))  
  13.   }  

查看源码 groupByKey(numPartitions: Int) 实现了 groupByKey(new HashPartitioner(numPartitions))

[java] view plain copy
  1. /** 
  2.    * Group the values for each key in the RDD into a single sequence. Hash-partitions the 
  3.    * resulting RDD with into `numPartitions` partitions. The ordering of elements within 
  4.    * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. 
  5.    * 
  6.    * @note This operation may be very expensive. If you are grouping in order to perform an 
  7.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 
  8.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 
  9.    * 
  10.    * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any 
  11.    * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. 
  12.    */  
  13.   def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {  
  14.     groupByKey(new HashPartitioner(numPartitions))  
  15.   }  

其实上面两个都是实现了groupByKey(partitioner: Partitioner)

[java] view plain copy
  1. /** 
  2.    * Group the values for each key in the RDD into a single sequence. Allows controlling the 
  3.    * partitioning of the resulting key-value pair RDD by passing a Partitioner. 
  4.    * The ordering of elements within each group is not guaranteed, and may even differ 
  5.    * each time the resulting RDD is evaluated. 
  6.    * 
  7.    * @note This operation may be very expensive. If you are grouping in order to perform an 
  8.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 
  9.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 
  10.    * 
  11.    * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any 
  12.    * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. 
  13.    */  
  14.   def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {  
  15.     // groupByKey shouldn't use map side combine because map side combine does not  
  16.     // reduce the amount of data shuffled and requires all map side data be inserted  
  17.     // into a hash table, leading to more objects in the old gen.  
  18.     val createCombiner = (v: V) => CompactBuffer(v)  
  19.     val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v  
  20.     val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2  
  21.     val bufs = combineByKeyWithClassTag[CompactBuffer[V]](  
  22.       createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)  
  23.     bufs.asInstanceOf[RDD[(K, Iterable[V])]]  
  24.   }  
而groupByKey(partitioner: Partitioner)有实现了combineByKeyWithClassTag,所以可以说groupByKey其实底层都是combineByKeyWithClassTag的实现,只是实现的方式不同。


三、再查看reduceByKey也有三种方式
reduceByKey和groupByKey区别与用法

[java] view plain copy
  1. /** 
  2.    * Merge the values for each key using an associative and commutative reduce function. This will 
  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 
  4.    * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ 
  5.    * parallelism level. 
  6.    */  
  7.   def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {  
  8.     reduceByKey(defaultPartitioner(self), func)  
  9.   }  
[java] view plain copy
  1. /** 
  2.    * Merge the values for each key using an associative and commutative reduce function. This will 
  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 
  4.    * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. 
  5.    */  
  6.   def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {  
  7.     reduceByKey(new HashPartitioner(numPartitions), func)  
  8.   }  
[java] view plain copy
  1. /** 
  2.    * Merge the values for each key using an associative and commutative reduce function. This will 
  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 
  4.    * to a "combiner" in MapReduce. 
  5.    */  
  6.   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {  
  7.     combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)  
  8.   }  
通过查看这三种reduceByKey不难发现,前两种是最后一种的实现。而最后一种是又实现了combineByKeyWithClassTag。

### groupByKey是这样实现的

combineByKeyWithClassTag[CompactBuffer[V]](createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)

### reduceByKey是这样实现的
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)

对比上面发现,groupByKey设置了mapSideCombine = false,在map端不进行合并,那就是在shuffle前不合并。而reduceByKey没有设置

难道reduceByKey默认合并吗????

四、接下来,我们仔细看一下combineByKeyWithClassTag
[java] view plain copy
  1. /** 
  2.    * :: Experimental :: 
  3.    * Generic function to combine the elements for each key using a custom set of aggregation 
  4.    * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C 
  5.    * 
  6.    * Users provide three functions: 
  7.    * 
  8.    *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) 
  9.    *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) 
  10.    *  - `mergeCombiners`, to combine two C's into a single one. 
  11.    * 
  12.    * In addition, users can control the partitioning of the output RDD, and whether to perform 
  13.    * map-side aggregation (if a mapper can produce multiple items with the same key). 
  14.    * 
  15.    * @note V and C can be different -- for example, one might group an RDD of type 
  16.    * (Int, Int) into an RDD of type (Int, Seq[Int]). 
  17.    */  
  18.   @Experimental  
  19.   def combineByKeyWithClassTag[C](  
  20.       createCombiner: V => C,  
  21.       mergeValue: (C, V) => C,  
  22.       mergeCombiners: (C, C) => C,  
  23.       partitioner: Partitioner,  
  24.       mapSideCombine: Boolean = true,  
  25.       serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {  
  26.     require(mergeCombiners != null"mergeCombiners must be defined"// required as of Spark 0.9.0  
  27.     if (keyClass.isArray) {  
  28.       if (mapSideCombine) {  
  29.         throw new SparkException("Cannot use map-side combining with array keys.")  
  30.       }  
  31.       if (partitioner.isInstanceOf[HashPartitioner]) {  
  32.         throw new SparkException("HashPartitioner cannot partition array keys.")  
  33.       }  
  34.     }  
  35.     val aggregator = new Aggregator[K, V, C](  
  36.       self.context.clean(createCombiner),  
  37.       self.context.clean(mergeValue),  
  38.       self.context.clean(mergeCombiners))  
  39.     if (self.partitioner == Some(partitioner)) {  
  40.       self.mapPartitions(iter => {  
  41.         val context = TaskContext.get()  
  42.         new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))  
  43.       }, preservesPartitioning = true)  
  44.     } else {  
  45.       new ShuffledRDD[K, V, C](self, partitioner)  
  46.         .setSerializer(serializer)  
  47.         .setAggregator(aggregator)  
  48.         .setMapSideCombine(mapSideCombine)  
  49.     }  
  50.   }  
通过查看combineByKeyWithClassTag的,发现reduceByKey默认在map端进行合并,那就是在shuffle前进行合并,如果合并了一些数据,那在shuffle时进行溢写则减少了磁盘IO,所以reduceByKey会快一些。