Spark编程的基本的算子之:combineByKey,reduceByKey,groupByKey

时间:2021-10-26 19:02:13

Spark编程的基本的算子之:combineByKey,reduceByKey,groupByKey


  • 1) combineByKey。其他的算子比如说reduceByKey,groupByKey都是基于combineByKey实现的。

首先来看看API 定义:

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]

首先这个算子作用的是关于键值对(key, value)类型的数据。对有相同key的键值对进行操作。
在这个算子中,最后的返回值的value类型为C类型,总共接收三个参数.其他的算子为这个算子的重载,如分区数,添加partitioner。

1,第一个参数,createCombiner: V => C。。这个表示当combineByKey第一次遇到值为k的key时,调用createCombiner函数,将V转换为C。 (这一步类似于初始化操作)
2,第二个参数,mergeValue: (C, V) => C。。这个表示当combineByKey不是第一次遇到值为k的Key时,调用mergeValue函数,将v累加到c中。。(这个操作在每个分区内进行)
3,第三个参数,mergeCombiners: (C, C) => C。 这个表示将两个C合并为一个C类型。 (这个操作在不同分区间进行)
4,算子的返回值最后为RDD[(K,C)]类型。表示根据相同的k,将value值由原来的V类型最后转换为C类型。

看几个例子

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a) //利用拉练操作将两个rdd合并为一个值为pair类型的rdd。

val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
//在这个combineByKey中,可以看到首先每次遇到第一个值,就将其变为一个加入到一个List中去。
//第二个函数指的是在key相同的情况下,当每次遇到新的value值,就把这个值添加到这个list中去。
//最后是一个merge函数,表示将key相同的两个list进行合并。

d.collect
res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))  
val d1 = sc.parallelize(initialScores)
type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数) 。type的意思是以后再这个代码中所有的类型为(Int, Double)都可以被记为MVType。
d1.combineByKey(
score => (1, score),
//score => (1, score),我们把分数作为参数,并返回了附加的元组类型。 以"Fred"为列,当前其分数为88.0 =>(1,88.0) 1表示当前科目的计数器,此时只有一个科目
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
//注意这里的c1就是createCombiner初始化得到的(1,88.0)。在一个分区内,我们又碰到了"Fred"的一个新的分数91.0。当然我们要把之前的科目分数和当前的分数加起来即//c1._2 + newScore,然后把科目计算器加1即c1._1 + 1

(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
//注意"Fred"可能是个学霸,他选修的科目可能过多而分散在不同的分区中。所有的分区都进行mergeValue后,接下来就是对分区间进行合并了,分区间科目数和科目数相加分数和分数相加就得到了总分和总科目数
).map
{
case (name, (num, socre))
=> (name, socre / num)
}.collect

这个例子来源于: combineByKey


2) 接下来看看reduceByKey:

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

作用于键值对类型的数据,根据有相同键的数据,进行汇总。传入一个函数,这个函数作用于有两个相同的key的键值对,然后对value值进行函数操作

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x)) //生成一个键值对类型的数据,键为字符串长度,值为字符串。
b.reduceByKey(_ + _).collect //对于有相同的键的元祖进行累加,由于所有的数据的长度都是3,所以最后得到了如下的结果
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x)) //同样的,将数据变为元祖。
b.reduceByKey(_ + _).collect //长度为3的数据有dog,cat,长度为4的数据有lion。长度为5的有tiger和eagle。长度为7的有一个panther。
res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

reduceByKey的内部实现是通过combineByKey实现的。


3) groupByKey算子:

def groupByKey(): RDD[(K, Iterable[V])]  //讲一个rdd进行有键值,进行group操作,最后返回的value值是一个迭代器,其内容包含所有key值为K的元祖的value值。

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length) //keyBy算子的意思是以_.length这个值作为key。其中value的返回值为ArrayBuffer。
b.groupByKey.collect

res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle))) //