spark的算子Tranformation和Action的使用demo

时间:2022-12-19 20:46:51
  • 在spark中有两种算子:Tranformation和Action

Tranformation: 中文为转换,他会延迟加载,当你执行了Tranformation的算子,spark并不会立即进行计算,而是会记录计算的元数据,比如你执行如下操作:

sc.textFile("hdfs://cdhnode1:8030/wordcount")

spark并不会把数据读入到RDD中,而是记录在那个地方读取,当遇到Action算子时才会去真正的执行。

Action:中文为行动,顾名思义他和Tranformation不一样,Action算子会立即执行

  • 什么是RDD

RDD是Resilient Distributed Datasets的缩写及弹性分布式数据集,在spark的源码RDD类中,作者写了一段注释:

/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
* through implicit.
* * Internally, each RDD is characterized by five main properties:
* * - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
* * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
* on RDD internals.
*/
  • 其中中间说RDD有5个特点:
 * Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* //有一组的patition

* - A function for computing each split
* //由一个函数计算每一个分片

* - A list of dependencies on other RDDs
* //具有依赖性
*
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* //key-value型的RDD是根据哈希来分区的

* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
* //数据本地性计算
  • RDD的创建方式:

1.通过spark支持的数据源创建RDD,例如使用hdfs创建rdd

sc.textFile("hdfs://cdhnode1:8030/wordcount")

2.通过集合或者数组以并行化的方式创建

sc.parallelize(Array(1,2,3,4,5,6,7,8))
  • 下面是Tranformation和Action的demo例子

1.启动spark

spark的算子Tranformation和Action的使用demo

2.开启spark shell

/home/hadoop/app/spark-1.3.1-bin-hadoop2.6/bin/spark-shell --master spark://cdhnode1:7077 --executor-memory 512m --total-executor-cores 2

一 .Tranformation demo

map

//创建rdd
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:21

//map
scala> rdd1.map(_*10)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:24

//查看结果(collect属于Action算子)
scala> res0.collect
res1: Array[Int] = Array(10, 20, 30, 40, 50)

sortBy

//创建rdd
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:21

//sortBy
scala> rdd1.sortBy(x=>x,true)
res2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at sortBy at <console>:24

//查看结果(collect属于Action算子)
scala> res2.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)

flatMap

//创建rdd
scala> val rdd1 = sc.parallelize(Array("hello world", "hello word count", "hello lijie"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:21

//flatMap
scala> rdd1.flatMap(_.split(" ")).collect

//结果
res4: Array[String] = Array(hello, world, hello, word, count, hello, lijie)

//嵌套Array可以用两次flatMap

//创建rdd
scala> val rdd1 = sc.parallelize(Array(Array("hello world", "hello word count", "hello lijie")))
rdd1: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[13] at parallelize at <console>:21

//flatMap
scala> rdd1.flatMap(_.flatMap(_.split(" "))).collect

//结果
res5: Array[String] = Array(hello, world, hello, word, count, hello, lijie)

union

scala> val rdd1 = sc.parallelize(List(5,6,4,7))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:21

scala> val rdd1 = sc.parallelize(List(1,2,3,0))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(5,3,2,1))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:21

scala> val ddr3 = rdd1.union(rdd2)
ddr3: org.apache.spark.rdd.RDD[Int] = UnionRDD[18] at union at <console>:25

scala> ddr3.collect
res6: Array[Int] = Array(1, 2, 3, 0, 5, 3, 2, 1)

intersection

scala> val ddr3 = rdd1.intersection(rdd2)
ddr3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at intersection at <console>:25

scala> ddr3.collect
res7: Array[Int] = Array(2, 1, 3)

join

scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:21

scala> rdd1.join(rdd2).collect
res8: Array[(String, (Int, Int))] = Array((zhangsan,(28,100)), (lijie,(24,99)))

leftOuterJoin

scala> rdd1.leftOuterJoin(rdd2).collect
res9: Array[(String, (Int, Option[Int]))] = Array((zhangsan,(28,Some(100))), (lijie,(24,Some(99))), (lisi,(39,None)))

rightOuterJoin

scala> rdd1.rightOuterJoin(rdd2).collect
res10: Array[(String, (Option[Int], Int))] = Array((zhangsan,(Some(28),100)), (wangwu,(None,88)), (lijie,(Some(24),99)))

groupByKey

scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[36] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:21

scala> rdd1.union(rdd2).groupByKey.collect
res11: Array[(String, Iterable[Int])] = Array((zhangsan,CompactBuffer(28, 100)), (wangwu,CompactBuffer(88)), (lijie,CompactBuffer(24, 99)), (lisi,CompactBuffer(39)))

//使用它计算wordcount
scala> rdd1.union(rdd2).groupByKey.map(x=>(x._1,x._2.sum)).collect
res12: Array[(String, Int)] = Array((zhangsan,128), (wangwu,88), (lijie,123), (lisi,39))

reduceByKey

scala> rdd1.union(rdd2).reduceByKey(_+_).collect

res13: Array[(String, Int)] = Array((zhangsan,128), (wangwu,88), (lijie,123), (lisi,39))

cogroup

scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lijie", 999),("haha", 123)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[45] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:21

scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[48] at cogroup at <console>:25

scala> rdd3.collect
res14: ArrayBuffer((zhangsan,(CompactBuffer(28),CompactBuffer(28))), (lijie,(CompactBuffer(24, 999),CompactBuffer(24))), (haha,(CompactBuffer(123),CompactBuffer())), (lisi,(CompactBuffer(),CompactBuffer(39))))

//利用它计算worldcount
scala> rdd1.cogroup(rdd2).groupByKey.mapValues(_.map(x => {x._1.sum + x._2.sum})).collect()

//或者
scala> rdd1.cogroup(rdd2).map(x =>{(x._1,x._2._1.sum+x._2._2.sum)}).collect

res16: Array[(String, Int)] = ArrayBuffer((zhangsan,List(56)), (lijie,List(1047)), (haha,List(123)), (lisi,List(39)))

cartesian

scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:21

scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[53] at parallelize at <console>:21

scala> rdd1.cartesian(rdd2).collect
res17: Array[((String, Int), (String, Int))] = Array(((lijie,24),(lijie,99)), ((lijie,24),(wangwu,88)), ((lijie,24),(zhangsan,100)), ((zhangsan,28),(lijie,99)), ((lisi,39),(lijie,99)), ((zhangsan,28),(wangwu,88)), ((zhangsan,28),(zhangsan,100)), ((lisi,39),(wangwu,88)), ((lisi,39),(zhangsan,100)))

mapPartitionsWithIndex

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

//他需要传入一个函数,返回一个Iterator
scala> val func = (index: Int, iter: Iterator[(Int)]) => {
| iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
| }
func: (Int, Iterator[Int]) => Iterator[String] = <function2>

scala> rdd1.mapPartitionsWithIndex(func)
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:26

scala> res1.collect
res2: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

#计算每个分区处理的数值之和
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val func = (a: Int, b: Iterator[(Int)]) => {
b.toList.map(x => {
(a,x)
}).toIterator
}
println(rdd1.mapPartitionsWithIndex(func).reduceByKey(_+_).collect().toBuffer)

#结果
ArrayBuffer((0,6), (1,15), (2,24))

aggregateByKey

//1. 求每种动物的总数
scala> val rdd1 = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:21

scala> rdd1.aggregateByKey(0)(_ + _, _ + _).collect

res11: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))

//2. 求每个分区动物最大的,然后再将不同分区的加起来
scala> rdd1.aggregateByKey(0)(math.max(_, _), _ + _).collect

res12: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

//3.其他测试
scala> rdd1.aggregateByKey(100)(math.max(_, _), _ + _).collect

res13: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

combineByKey


//1.
scala> val rdd1 = sc.parallelize(List("hello world","hello lijie","lijie test"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:21

//x => x表示初始值,(a: Int, b: Int) => a + b表示局部相加,(m: Int, n: Int) => m + n表示局部加完之后再相加
scala> rdd1.flatMap(_.split(" ")).map((_,1)).combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n).collect

res15: Array[(String, Int)] = Array((hello,2), (world,1), (test,1), (lijie,2))


//2.其他 数字相同的 放一个集合中
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(1,2,4,2),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:21

scala> val rdd3 = rdd2.zip(rdd1)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[23] at zip at <console>:25


scala> rdd3.collect
res19: Array[(Int, String)] = Array((1,a), (2,b), (4,c), (2,d))

scala> rdd3.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ::: n)
res17: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[21] at combineByKey at <console>:28

scala> rdd3.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ::: n)
res20: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[24] at combineByKey at <console>:28

scala> res20.collect
res21: Array[(Int, List[String])] = Array((4,List(c)), (2,List(b, d)), (1,List(a)))

repartition(num)和coalesce(num,true)一样

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> val rdd2 = rdd1.repartition(5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:23

flatMapValues

scala> val rdd1 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[17] at parallelize at <console>:21

scala> val rdd2 = rdd1.flatMapValues(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[18] at flatMapValues at <console>:23

scala> rdd2.collect

res6: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

foldByKey

scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at parallelize at <console>:21

scala> val rdd2 = rdd1.map(x => (x.length, x)).foldByKey("")(_+_)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[21] at foldByKey at <console>:23

scala> rdd2.collect
res7: Array[(Int, String)] = Array((4,lisihehe), (5,lijiehello))

keyBy

scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at parallelize at <console>:21

//让字符串的长度作为key
scala> rdd1.keyBy(_.length)
res10: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at keyBy at <console>:24

scala> res10.collect
res11: Array[(Int, String)] = Array((5,lijie), (5,hello), (4,lisi), (4,hehe))

keys

scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:21

scala> rdd1.map(x => (x.length,x))
res13: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[27] at map at <console>:24

scala> res13.keys.collect
res14: Array[Int] = Array(5, 5, 4, 4)

values


scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:21

scala> rdd1.map(x => (x.length,x))
res13: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[27] at map at <console>:24

scala> res13.values.collect
res15: Array[String] = Array(lijie, hello, lisi, hehe)

二 .Tranformation demo

collect

sc.textFile("hdfs://cdhnode2:8030/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect

res26: Array[(String, Int)] = Array(("",18), (the,8), (and,6), (of,5), (The,4), (cryptographic,3), (encryption,3), (for,3), (this,3), (Software,2), (on,2), (which,2), (software,2), (at:,2), (includes,2), (import,,2), (use,,2), (or,2), (software.,2), (software,,2), (more,2), (to,2), (distribution,2), (using,2), (re-export,2), (information,2), (possession,,2), (our,2), (please,2), (Export,2), (under,1), (country,1), (is,1), (Technology,1), (Jetty,1), (currently,1), (check,1), (permitted.,1), (have,1), (Security,1), (U.S.,1), (with,1), (BIS,1), (This,1), (mortbay.org.,1), ((ECCN),1), (security,1), (Department,1), (export,1), (reside,1), (any,1), (algorithms.,1), (from,1), (details,1), (has,1), (SSL,1), (Industry,1), (Administration,1), (provides,1), (http://hadoop.apache.org/core/,1), (cou...

collectAsMap

scala> val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:21

scala> rdd.collectAsMap

res0: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
  • saveAsTextFile
sc.textFile("hdfs://cdhnode2:8030/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://cdhnode2:8030/wordcount/myout")

reduce

这里写代码片

count

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[97] at parallelize at <console>:21

scala> rdd1.reduce(_+_)
res28: Int = 15

top

scala> rdd1.top(3)
res29: Array[Int] = Array(5, 4, 3)

take

scala> rdd1.take(3)
res30: Array[Int] = Array(1, 2, 3)

first

scala> rdd1.first
res31: Int = 1

takeOrdered

scala> rdd1.takeOrdered(3)
res33: Array[Int] = Array(1, 2, 3)

aggregate

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21

//1. 使用它求和
//第一个参数是初始值, 二:是2个函数[每个函数都是2个参数(第一个参数:先对个个分区进行合并, 第二个:对个个分区合并后的结果再进行合并), 输出一个参数
scala> rdd1.aggregate(0)(_+_, _+_)

//结果
res3: Int = 45

//2. 使用它求上面分区最大值的和
scala> rdd1.aggregate(0)(math.max(_, _), _ + _)

//结果
res4: Int = 13

//3. 其他01
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21

scala> rdd2.aggregate("")(_+_,_+_)

res5: String = abcdef

scala> rdd2.aggregate("*")(_ + _, _ + _)

res7: String = **abc*def

//4. 其他02
scala> val rdd3 = sc.parallelize(List("a","bb","ccc","dddd"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:21

scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

res8: String = 24 或者 42

//5. 其他03
scala> val rdd4 = sc.parallelize(List("aa","bb","ccc",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:21

scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

res9: String = 01 或者 10

//6. 其他04
scala> val rdd5 = sc.parallelize(List("aa","bb","","ddd"),2)
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:21

scala> rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

res10: String = 11

countByKey

scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:21

scala> rdd1.countByKey

res1: scala.collection.Map[String,Long] = Map(b -> 2, a -> 1, c -> 2)

countByValue

scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:21

scala> rdd1.countByValue

res2: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1)

foreachPartition

scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:21

//对每个元素进行操作,没有返回值
scala> rdd1.foreachPartition(x => println(x.reduce(_ + _)))

  • groupByKey和groupBy的区别
#如下是groupByKey的部分结果
ArrayBuffer(
(20160321102223,CompactBuffer(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)),
(20160321102510,CompactBuffer(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1))

#如下是groupBy(_._1)的部分结果
ArrayBuffer((20160321102223,CompactBuffer((20160321102223,1), (20160321102223,1),
(20160321102223,1), (20160321102223,1), (20160321102223,1), (20160321102223,1),
(20160321102223,1), (20160321102223,1), (20160321102223,1), (20160321102223,1),
(20160321102223,1), (20160321102223,1), (20160321102223,1)

#比较还是比较明显
groupByKey之后会是一个元组,元组第一个是key,第二个是所有前面value的集合
groupBy其中分组字段是key,执行后得到一个元组,第一个同样是key,第二个是key和value的元组集合

其中groupByKey执行之后用mapValues相当于直接操作前面的相同key的所有value
groupByKey.mapValues(_.sum)

其中groupBy执行后用mapValues相当于操作前面相同key的所有(key,value)集合
groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))