常用Transformation
注:某些函数只有PairRDD只有,而普通的RDD则没有,比如gropuByKey、reduceByKey、sortByKey、join、cogroup等函数要根据Key进行分组或直接操作
RDD基本转换: |
|||
RDD[U] T:原RDD中元素类型 U:新RDD中元素类型 |
函数将T元素转换为新的U元素 |
rdd.map(x |
{1, 2, 3, 3} =>{2, |
RDD[U] TraversableOnce:集合与迭代器的父类 |
函数将T元素转换为含有新类型U元素的集合,并将这些集合展平(两层转换成一层)后的元素形成新的RDD |
rdd.flatMap(x |
{1, 2, 3, 3} =>{1, |
RDD[T] |
函数对每个元素进行过滤,通过的元素形成新的RDD |
rdd.filter(x |
{1, 2, 3, 3} =>{2, |
RDD[T] |
去重 |
rdd.distinct() |
{1, 2, 3, 3} =>{1, |
RDD[U] |
与map一样,只是转换时是以分区为单位,将一个分区所有元素包装成Iterator一次性传入函数进行处理,而不像map函数那样每个元素都会调用一个函数,即这里有几个分区则才调用几次函数 假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次 |
val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr, 2) rdd.mapPartitions((it: |
=>{2, 4, 6, 8, 10} |
RDD[U] |
与mapPartitions类似,不同的时函数多了个分区索引的参数 |
||
RDD[T] |
两个RDD 并集,包括重复的元素 |
rdd.union(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{1, |
RDD[T] |
两个RDD 交集 |
rdd.intersection(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{3} |
RDD[T] |
两个RDD相减 |
rdd.subtract(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{1, |
RDD[(T, |
两个RDD相减笛卡儿积 |
rdd.cartesian(otherRdd) |
{ 1, 2 } { 3, 4} =>{(1,3),(1,4),(2,3),(2,4)} |
RDD[T] |
根据转换后的值进行排序,传入的是一个(T) => K 转换函数 |
rdd.sortBy(_._2, 这里根据value进行降序排序 |
{("leo", 65), ("tom", 50), ("marry", 100), =>{("marry", |
RDD[Array[T]] |
将RDD的每个分区中的类型为T的元素转换换数组Array[T] |
val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr, 2) val arrRDD = rdd.glom()arrRDD.foreach { =>[ 1 2 ], [ 3 4 5 ] |
|
键-值RDD转换: |
|||
RDD[(K, K:key类型 V:value类型 |
将value转换为新的U元素,Key不变 |
rdd.mapValues(_ |
{"class1", 80), ("class2", 70)} =>{"class1", |
RDD[(K, |
对[K,V]型数据中的V值flatmap操作 |
rdd.flatMapValues(_.toCharArray()) |
{ (1, "ab"), (2, "bc")} =>{(1, |
RDD[(K, |
根据key进行分组,同一组的元素组成Iterable<V>,并以(key, Iterable<V>)元组类型为元素作为新的RDD返回 |
rdd.groupByKey() |
{("class1", 80), ("class2", 75), =>{("class1",[80,90]),("class2",[75,60])} |
RDD[(K, T:原RDD元素类型 K:新RDD中元素Key的类型 |
根据函数将元素T映射成相应K后,以此K进行分组 |
rdd.groupBy({ |
{ 1, 2, "二" =>{(1,[1]),(2,[2, |
RDD[(K, |
先根据key进行分组,再对同一组中的的value进行reduce操作:第一次调用函数时传入的是两个Key所对应的value,从第二次往后,传入的两个参数中的第一个为上次函数计算的结果,第二个参数为其它Key的value |
rdd. |
{("class1", 80), ("class2", 75), =>{("class1", |
RDD[(K, |
根据key的大小进行排序(注:并不是先以Key进行分组,再对组类进行排序,而是直接根据Key的值进行排序) |
rdd.sortByKey(false) |
{(65, "leo"), (50, "tom"),(100, =>{(100, |
|
|||
RDD[(K, zeroValue:每个分区相同Key累计时的初始值,以及不同分区相同Key合并时的初始值 e.g., Nil for list concatenation, 0 |
对每个value先进行func操作,且funcfoldByKey函数是通过调用comineByKey函数实现的。 zeroVale:对V进行初始化,实际上是通过CombineByKey的createCombiner实现的 V => func: Value将通过func函数按Key值进行合并(实际上是通过CombineByKey的mergeValue,mergeCombiners函数实现的,只不过在这里,这两个函数是相同的) |
val people = List(("Mobin", 1), ("Lucy", 2), ("Amy", 3), ("Amy", 4), ("Lucy", 5)) val rdd = sc.parallelize(people,2) val foldByKeyRDD = rdd.foldByKey(10)((v1, v2) foldByKeyRDD.foreach(println) |
//处理第一个分区数据 10 + 1 = 11 // ("Mobin", 10 + 2 = 12 // ("Lucy", ===================== //处理第二个分区数据 10 + 3 = 13 // ("Amy", 3) 13 + 4 10 + 5 = 15 // ("Lucy", ===================== //将不同分区相同Key的Value合并起来 12 + (Amy,17) (Mobin,11) (Lucy,27) |
RDD[(K, |
左外连接,包含左RDD的所有数据,如果右边没有与之匹配的用None表示 |
val arr = List(("A", 1), ("A", 2), ("B", 1)) val arr1 = List(("A", "A1"), ("A", "A2")) val rdd = sc.parallelize(arr, 2) val rdd1=sc.parallelize(arr1, 2) val leftOutJoinRDD = rdd.leftOuterJoin(rdd1) leftOutJoinRDD.foreach(println) |
=> (B,(1,None)) (A,(1,Some(A1))) (A,(1,Some(A2))) (A,(2,Some(A1))) (A,(2,Some(A2))) |
RDD[(K, |
右外连接,包含右RDD的所有数据,如果左边没有与之匹配的用None表示 |
val arr = List(("A", 1), ("A", 2)) val arr1 = List(("A", "A1"), ("A", "A2"), ("B", 1)) val rdd = sc.parallelize(arr, 2) val rdd1 = sc.parallelize(arr1, 2) val leftOutJoinRDD = rdd.rightOuterJoin(rdd1) leftOutJoinRDD.foreach(println) |
(B,(None,1)) (A,(Some(1),A1)) (A,(Some(1),A2)) (A,(Some(2),A1)) (A,(Some(2),A2)) |
RDD[(K, W:另一RDD元素的value的类型 |
对两个包含<key,value>对的RDD根据key进行join操作,返回类型<key,Tuple2(key,value)> |
rdd.join(otherRdd) |
{(1, "leo"),(2, "jack"),(3, "tom")} {(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)} =>{(1,("leo",100)),(1,("leo",70)),(2, |
RDD[(K, |
同join,也是根据key进行join,只不过相同key的value分别存放到Iterable<value>中 |
rdd.cogroup(otherRdd) |
{(1, "leo"),(2, "jack"),(3, "tom")} {(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)} =>{(1,(["leo"],[100,70])),(2, |
|
常用Action
T reduce(f: (T, T) => T) |
对所有元素进行reduce操作 |
rdd.reduce(_ |
{1, 2, 2, 3, 3, 3} =>14 |
Array[T] |
将RDD中所有元素返回到一个数组里 注意:This method should only |
rdd.collect() |
{1, 2, 3, 3} =>[1, |
Map[K, |
作用于K-V类型的RDD上,作用与collect不同的是collectAsMap函数不包含重复的key,对于重复的key,后面的元素覆盖前面的元素 |
rdd.collectAsMap() |
{ ("leo", 65), ("tom", 50), ("tom", =>{ |
Long count() |
统计RDD 中的元素个数 |
rdd.count() |
{1, 2, 3, 3} =>4 |
Map[T, |
各元素在 RDD 中出现的次数 注意:This method should only To handle |
rdd.countByValue() |
{1, 2, 3, 3} =>Map(1 |
Map[K, |
先根据Key进行分组,再对每组里的value分别进行计数统计 注意:This method should only To handle |
{ ("leo", 65), ("tom", 50), ("tom", 100), =>Map(leo |
|
T first() |
取第一个元素,实质上是调用take(1)实现的 |
rdd.first() |
{3, 2, =>3 |
Array[T] |
从 RDD 中返回前 num 个元素 注意:This method should only |
rdd.take(2) |
{3, 2, 1, 4} =>[3, |
Array[T] 如果没有传递 ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供。 top实现是将Ordering反序后再调用 takeOrdered的:takeOrdered(num)(ord.reverse) |
默认从 RDD 中返回最最大的 num个元素 注意:This method should only |
rdd.top(2) |
{3, 2, 1, 4} =>[4, |
Array[T] 如果没有传递 ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供 |
与top相反,默认取的是前面最小的num个元素 注意:This method should only |
rdd.takeOrdered(2)(myOrdering) |
{3, 2, 1, 4} =>[1, |
T fold(zeroValue: T)(op: (T, T) => T) zeroValue:为每个分区累计的初始值,以及不同分区累计的初始值 e.g., Nil for list concatenation, 0 |
和 reduce() 一 提供初始值。注意:每个分区应用op函数时,都会以zeroValue为初始值进行计算,然后将每个分区的结果合并时,还是会以zeroValue为初始值进行合并计算 |
val arr = Array(1, 2, 3, 4, 5); val rdd = sc.parallelize(arr, 2) //分成两分区[1, println(rdd.fold(10)((v1, v2) |
//处理第一个分区数据 10 + 1 = 11 11 + 2 ===================== //处理第一个分区数据 10 + 3 = 13 13 + 4 17 + 5 ===================== //将各分区汇总起来 10 + 13 = 23 // 汇总时还会使用初始值来作起始 23 + 45 |
U aggregate (zeroValue: U)(seqOp: (U, T) => U, 初始值类型与原始数据类型可以不同,但初始值类型决定了返回值类型 |
与fold一样,计算时需要提供初始值,不同的是,分区的计算函数(seqOp)与分区合并计算函数(combOp)是不同的,但fold分区计算函数与分区合并计算函数是同一函数 |
rdd.fold(5)(_ |
val val println(rdd.aggregate(5)( (v1, (v1, ) 过程与结果与上面的fold函数一样 |
Unit saveAsTextFile(path: String) |
将RDD元素保存到文件中,对每个元素调用toString方法 |
||
Unit foreach(f: T => Unit) |
遍历RDD中的每个元素 |
rdd.foreach(println(_)) |
无 |
comineByKey
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型值转换C类型值(V => C),
mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner道理的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C
mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值
partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner
mapSideCombine:是否在map端进行Combine操作,默认为true
例:统计男性和女生的个数,并以(性别,(名字,名字....),个数)的形式输出
object CombineByKey {
def main(args:
Array[String]) {
val conf = new
SparkConf().setMaster("local").setAppName("combinByKey")
val sc = new SparkContext(conf)
val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy"))
val rdd = sc.parallelize(people)
val combinByKeyRDD = rdd.combineByKey(
(x: String) => (List(x), 1),
(peo: (List[String], Int), x: String) => (x :: peo._1, peo._2 + 1),
(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
combinByKeyRDD.foreach(println)
sc.stop()
}
}
输出:
(male,(List(Lufei, Kpop,
Mobin),3))
(female,(List(Amy,
Lucy),2))
计算过程:
Partition1:
K="male" -->
("male","Mobin") -->
createCombiner("Mobin") => peo1 = (
List("Mobin") , 1 )
K="male" -->
("male","Kpop") -->
mergeValue(peo1,"Kpop") => peo2 = (
"Kpop" :: peo1_1 , 1 + 1 ) //Key相同调用mergeValue函数对值进行合并
K="female" -->
("female","Lucy") -->
createCombiner("Lucy") => peo3 = (
List("Lucy") , 1 )
Partition2:
K="male" -->
("male","Lufei") -->
createCombiner("Lufei") => peo4 = ( List("Lufei")
, 1 )
K="female" -->
("female","Amy") -->
createCombiner("Amy") => peo5 = (
List("Amy") , 1 )
Merger Partition:
K="male" --> mergeCombiners(peo2,peo4) =>
(List(Lufei,Kpop,Mobin))
K="female" --> mergeCombiners(peo3,peo5)
=> (List(Amy,Lucy))