spark RDD操作算子详解(汇总)

时间:2021-10-04 20:49:47
一、aggregateByKey [Pair]

像聚合函数一样工作,但聚合应用于具有相同键的值。 也不像聚合函数,初始值不应用于第二个reduce。


列表变式

(1)def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
ps:
使用给定的组合函数和中性“零值”汇总每个键的值。此函数可返回不同的结果类型U,而不是此RDD中的值的类型,
因此,我们需要一个用于将V合并成U的操作和用于合并两个U的一个操作,如在scala.TraversableOnce中。 前一个操作用于合并a中的值
分区,后者用于合并分区之间的值。 避免记忆分配,这两个函数都允许修改并返回其第一个参数而不是创建一个新的U.

(2)def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

ps:

使用给定的组合函数和中性“零值”汇总每个键的值。此函数可返回不同的结果类型U,而不是此RDD中的值的类型,
因此,我们需要一个用于将V合并成U的操作和用于合并两个U的一个操作,如在scala.TraversableOnce中。 前一个操作用于合并a中的值
分区,后者用于合并分区之间的值。 避免记忆分配,这两个函数都允许修改并返回其第一个参数而不是创建一个新的U.


(3)def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
ps:

使用给定的组合函数和中性“零值”汇总每个键的值。此函数可返回不同的结果类型U,而不是此RDD中的值的类型,

因此,我们需要一个用于将V合并成U的操作和用于合并两个U的一个操作,如在scala.TraversableOnce中。 前一个操作用于合并a中的值
分区,后者用于合并分区之间的值。 避免记忆分配,这两个函数都允许修改并返回其第一个参数而不是创建一个新的U.

参数说明:

              U: ClassTag==>表示这个最终的RDD的返回值类型.

              zeroValue: U==>表示在每个分区中第一次拿到key值时,用于创建一个返回类型的函数,这个函数最终会被包装成先生成一个返回类型,

                                    然后通过调用seqOp函数,把第一个key对应的value添加到这个类型U的变量中,下面代码的红色部分.

              seqOp: (U,V) => U ==> 这个用于把迭代分区中key对应的值添加到zeroValue创建的U类型实例中.

              combOp: (U,U) => U ==> 这个用于合并每个分区中聚合过来的两个U类型的值.


举例:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

// lets have a look at what is in the partitions
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(myfunc).collect

res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])

pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))



可参考:http://blog.csdn.net/u014393917/article/details/50602456


二、reduceByKey

        此函数提供了Spark中众所周知的reduce功能。 请注意,您提供的任何功能都应该是可交换的,以便产生可重复的结果


列表变式:

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

举例:

        val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
        val b = a.map(x => (x.length, x))
        b.reduceByKey(_ + _).collect
       res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

       val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
       val b = a.map(x => (x.length, x))
       b.reduceByKey(_ + _).collect
       res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))


注:

简单理解reduceByKey((x,y)=>x+y),即不要看元祖中的key,永远只考虑value即可.(x,y)代表的是value的操作!!


可参考:http://blog.csdn.net/qq_23660243/article/details/51435257


未完待续