目录
18.groupByKey不传参默认将第一个元素视为key,其他视为value
1.wordcount
1)读取本地文件
scala> sc.textFile("file:///opt/stufile/wordcount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
(hello,3)
(java,1)
(world,1)
(spark,1)
2)读取HDFS上的文件
scala> sc.textFile("hdfs://lxm147:9000/data/wordcount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
(hello,3)
(java,1)
(world,1)
(spark,1)
2.分区:parallelize
1)定义序列变量,分为3个分区
scala> val data = sc.parallelize(1 to 20,3)
scala> data.glom.collect.foreach((x)=>println(x.toList))
List(1, 2, 3, 4, 5, 6)
List(7, 8, 9, 10, 11, 12, 13)
List(14, 15, 16, 17, 18, 19, 20)
// glom是将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
// 即Int=>List
2)定义序列变量,分为5个分区
scala> val data = sc.parallelize(1 to 20,5)
scala> data.glom.collect.foreach((x)=>println(x.toList))
List(1, 2, 3, 4)
List(5, 6, 7, 8)
List(9, 10, 11, 12)
List(13, 14, 15, 16)
List(17, 18, 19, 20)
3.分区:partitions
4.分区:makeRDD
scala> val data2 = sc.makeRDD(1 to 10,2)
data2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at makeRDD at <console>:24
scala> data2.glom.collect.foreach((x)=>println(x.toList))
List(1, 2, 3, 4, 5)
List(6, 7, 8, 9, 10)
5.map的使用
1)所有数据变成二元组
scala> data2.map((_,"a")).collect.foreach(println)
(1,a)
(2,a)
(3,a)
(4,a)
(5,a)
(6,a)
(7,a)
(8,a)
(9,a)
(10,a)
scala> data2.map(("a",_)).collect.foreach(println)
(a,1)
(a,2)
(a,3)
(a,4)
(a,5)
(a,6)
(a,7)
(a,8)
(a,9)
(a,10)
6.mapPartitions
(x=>for(e <- x)中的x代表一个集合
scala> data.mapPartitions(x=>for(e <- x) yield (e,"a")).collect.foreach(println)
(1,a)
(2,a)
(3,a)
(4,a)
(5,a)
(6,a)
(7,a)
(8,a)
(9,a)
(10,a)
(11,a)
(12,a)
(13,a)
(14,a)
(15,a)
(16,a)
(17,a)
(18,a)
(19,a)
(20,a)
7.mapPartitionsWithIndex
查找每个元素分布在那个分区
scala> data.mapPartitionsWithIndex((index,iter)=>for(e <- iter) yield (index,e)).collect.foreach(println)
(0,1)
(0,2)
(0,3)
(0,4)
(1,5)
(1,6)
(1,7)
(1,8)
(2,9)
(2,10)
(2,11)
(2,12)
(3,13)
(3,14)
(3,15)
(3,16)
(4,17)
(4,18)
(4,19)
(4,20)
8.过滤器:filter
过滤出偶数
scala> data.filter(_%2==0).collect.foreach(println)
// 另一种写法
scala> data.filter((x)=>{if(x%2==0) true else false}).collect.foreach(println)
2
4
6
8
10
12
14
16
18
20
9.取出第一个元素:first
scala> data.first
res35: Int = 1
10.最大值:max
scala> data.max
res36: Int = 20
11.平均值:mean
scala> data.mean
res37: Double = 10.5
12.找出集合所有的key:keyBy
scala> data.mapPartitionsWithIndex((index,iter)=>for(e <- iter) yield (index,e)).keyBy(_._1).foreach(println)
(0,(0,1))
(0,(0,2))
(0,(0,3))
(0,(0,4))
(3,(3,13))
(3,(3,14))
(3,(3,15))
(3,(3,16))
(2,(2,9))
(2,(2,10))
(2,(2,11))
(2,(2,12))
(1,(1,5))
(1,(1,6))
(1,(1,7))
(1,(1,8))
(4,(4,17))
(4,(4,18))
(4,(4,19))
(4,(4,20))
13.通过相同的key进行聚合操作:reduceByKey
reduceByKey((x,y)=>)中的x和y都指的是要进行操作的集合或元组
x=>(4,(4,19))
y=>(4,(4,20))
确切地说,x是指一个集合,y是指随后进来的集合
def reduceByKey(func: ((Int, Int), (Int, Int)) => (Int, Int)): org.apache.spark.rdd.RDD[(Int, (Int, Int))]
def reduceByKey(func: ((Int, Int), (Int, Int)) => (Int, Int),numPartitions: Int): org.apache.spark.rdd.RDD[(Int, (Int, Int))]
def reduceByKey(partitioner: org.apache.spark.Partitioner,func: ((Int, Int), (Int, Int)) => (Int, Int)): org.apache.spark.rdd.RDD[(Int, (Int, Int))]
对分区内的元素进行求和
scala> data.mapPartitionsWithIndex((index,iter)=>for(e <- iter) yield (index,e)).keyBy(_._1).reduceByKey((x,y)=>(x._1,x._2+y._2)).collect.foreach(println)
(0,(0,10))
(1,(1,26))
(2,(2,42))
(3,(3,58))
(4,(4,74))
reduceByKey((x,y)=>(x._1,x._2+y._2))解析:
x和y都是指要操作的二元组(4,19)(4,20)等
x._1是二元组中的第一个值,也就是分区值,为key
x._2和y._2是二元组中的第二个值,为value
相加表示同一分区内的元素进行求和
scala> data3.collect
res137: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5)
将data3变成二元组
scala> pair1.map((_.swap)).collect
res135: Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5), (1,6), (1,7), (1,8), (1,9), (1,10), (1,1), (1,2), (1,3), (1,4), (1,5))
二元组的第一个值如果是偶数就变为a,奇数则变为b
scala> pair1.map(x=>{if(x._1%2==0) ("a",x._2) else ("b",x._2)}).collect
res142: Array[(String, Int)] = Array((b,1), (a,1), (b,1), (a,1), (b,1), (a,1), (b,1), (a,1), (b,1), (a,1), (b,1), (a,1), (b,1), (a,1), (b,1))
对value进行求和
scala> pair1.map(x=> (if(x._1%2==0) ("a",x._2) else ("b",x._2))).reduceByKey(_+_).collect
scala> pair1.map(x=> (if(x._1%2==0) ("a",x._2) else ("b",x._2))).reduceByKey((x,y)=>x+y).collect
res149: Array[(String, Int)] = Array((a,7), (b,8))
14.练习题:
1)求每个同学三门课的总成绩
val score=sc.parallelize(Array(("张三","数学",100),("李四","数学",97),("王五","数学",98),("张三","语文",95),("李四","语文",87),("王五","语文",69),("张三","英语",100),("李四","英语",95),("王五","英语",87)))
找出key值
scala> score.keyBy(_._1).collect.foreach(println)
(张三,(张三,数学,100))
(李四,(李四,数学,97))
(王五,(王五,数学,98))
(张三,(张三,语文,95))
(李四,(李四,语文,87))
(王五,(王五,语文,69))
(张三,(张三,英语,100))
(李四,(李四,英语,95))
(王五,(王五,英语,87))
相同的key的三元组中,第三个值进行相加
scala> score.keyBy(_._1).reduceByKey((x,y)=>(x._1,"总成绩:",(x._3+y._3))).collect.foreach(println)
(张三,(张三,总成绩:,295))
(李四,(李四,总成绩:,279))
(王五,(王五,总成绩:,254))
scala> score.keyBy(_._1).reduceByKey((x,y)=>(x._1,"总成绩:",(x._3+y._3))).values.collect.foreach(println)
(张三,总成绩:,295)
(李四,总成绩:,279)
(王五,总成绩:,254)
2)求每门课程的平均成绩
scala> score.foreach(println)
(张三,数学,100)
(李四,数学,97)
(李四,语文,87)
(王五,语文,69)
(王五,数学,98)
(张三,语文,95)
(张三,英语,100)
(李四,英语,95)
(王五,英语,87)
scala> score.map(x=>(x._2,x._3,1)).foreach(println)
(语文,87,1)
(语文,69,1)
(数学,98,1)
(语文,95,1)
(数学,100,1)
(数学,97,1)
(英语,100,1)
(英语,95,1)
(英语,87,1)
scala> score.map(x=>(x._2,x._3,1)).keyBy(x=>x._1).foreach(println)
(语文,(语文,87,1))
(语文,(语文,69,1))
(英语,(英语,100,1))
(英语,(英语,95,1))
(英语,(英语,87,1))
(数学,(数学,98,1))
(语文,(语文,95,1))
(数学,(数学,100,1))
(数学,(数学,97,1)) ^
scala> score.map(x=>(x._2,x._3,1)).keyBy(x=>x._1).reduceByKey((x,y)=>(x._1,x._2+y._2,x._3+y._3)).foreach(println)
(英语,(英语,282,3))
(数学,(数学,295,3))
(语文,(语文,251,3))
map提取中上述二元组中第二个值和第三个值,相除
scala> score.map(x=>(x._2,x._3,1)).keyBy(x=>x._1).reduceByKey((x,y)=>(x._1,x._2+y._2,x._3+y._3)).map(x=>(x._1,(x._2._2/x._2._3))).foreach(println)
(英语,94)
(数学,98)
(语文,83)
15.sample抽样
scala> val data = sc.makeRDD(1 to 10,3)
scala> data.sample
def sample(withReplacement: Boolean,fraction: Double,seed: Long): org.apache.spark.rdd.RDD[Int]
抽样后是否放回去 抽样的概率 种子(基准值),保证每次抽样是否一致
没有种子,每次抽样的数据可能不一致
1)抽样后放回去,概率设置大一些,给定种子,保证每次抽样的数据一致
scala> data.sample(true,1,102).collect
res111: Array[Int] = Array(2, 2, 4, 4, 9, 9, 10)
scala> data.sample(true,1,102).collect
res111: Array[Int] = Array(2, 2, 4, 4, 9, 9, 10)
2)抽样后放回去,概率设置大一些,不给定种子,每次抽样的数据不一定一致
scala> data.sample(true,1).collect
res113: Array[Int] = Array(1, 2, 2, 3, 3, 4, 8, 9)
scala> data.sample(true,1).collect
res114: Array[Int] = Array(1, 1, 1, 3, 4, 4, 5, 6, 6, 8, 10)
3)抽样后不放回去,概率设置大一些,给定种子,保证每次抽样的数据一致
scala> data.sample(false,0.5,100).collect
res122: Array[Int] = Array(1, 5, 6, 7)
scala> data.sample(false,0.5,100).collect
res123: Array[Int] = Array(1, 5, 6, 7)
4)抽样后放回去,概率设置大一些,不给定种子,每次抽样的数据不一定一致
scala> data.sample(false,0.5).collect
res119: Array[Int] = Array(7, 9)
scala> data.sample(false,0.5).collect
res120: Array[Int] = Array(1, 3, 4, 7, 9, 10)
16.sortBy
scala> data.intersection(data2).sortBy(x=>x).collect.foreach(println)
1
2
3
4
5
scala> data.intersection(data2).sortBy(x=>x,false).collect.foreach(println)
5
4
3
2
1
17.去重:distinct
scala> val data3 = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9,10,1,2,3,4,5),3)
scala> data3.distinct
def distinct(): org.apache.spark.rdd.RDD[Int] def distinct(numPartitions: Int)(implicit ord: Ordering[Int]): org.apache.spark.rdd.RDD[Int]
scala> data3.distinct().glom.collect
res131: Array[Array[Int]] = Array(Array(6, 3, 9), Array(4, 1, 7, 10), Array(8, 5, 2))
// 括号内可以设定去重后的分区个数,不设置与默认分区一致
scala> data3.distinct(1).glom.collect
res132: Array[Array[Int]] = Array(Array(4, 1, 6, 3, 7, 9, 8, 10, 5, 2))
scala> data3.distinct(2).glom.collect
res133: Array[Array[Int]] = Array(Array(4, 6, 8, 10, 2), Array(1, 3, 7, 9, 5))
18.groupByKey不传参默认将第一个元素视为key,其他视为value
scala> pair3.collect
res155: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1), (1,1), (2,1), (3,1), (4,1), (5,1), (1,1), (2,1), (3,1), (4,1), (5,1))
对上述元素按照元组第一个值进行分区
scala> pair3.groupByKey.collect
res156: Array[(Int, Iterable[Int])] = Array((10,CompactBuffer(1)), (5,CompactBuffer(1, 1, 1)), (1,CompactBuffer(1, 1, 1)), (6,CompactBuffer(1)), (7,CompactBuffer(1)), (2,CompactBuffer(1, 1, 1)), (3,CompactBuffer(1, 1, 1)), (8,CompactBuffer(1)), (4,CompactBuffer(1, 1, 1)), (9,CompactBuffer(1)))
19.groupBy
将元组第二个值视为key,结果分区只有1个
scala> pair3.groupBy(_._2).collect
res157: Array[(Int, Iterable[(Int, Int)])] = Array((1,CompactBuffer((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1), (1,1), (2,1), (3,1), (4,1), (5,1), (1,1), (2,1), (3,1), (4,1), (5,1))))
groupBy与groupByKey的区别:
都是对元素进行分组,
groupByKey直接将元素的value值取出来
groupBy是将所有元素取出来
// 将上述结果的value值进行合并
scala> pair3.reduceByKey((x,y)=>x+y).collect
scala> pair3.reduceByKey(_+_).collect
res158: Array[(Int, Int)] = Array((10,1), (5,3), (1,3), (6,1), (7,1), (2,3), (3,3), (8,1), (4,3), (9,1))
// 将上述的key和value分别相加
scala> pair3.reduce((x,y)=>(x._1+y._1,x._2+y._2))
res162: (Int, Int) = (85,20)
20.相同的key进行计数:countByKey
scala> pair3.countByKey
res166: scala.collection.Map[Int,Long] = Map(5 -> 3, 10 -> 1, 1 -> 3, 6 -> 1, 9 -> 1, 2 -> 3, 7 -> 1, 3 -> 3, 8 -> 1, 4 -> 3)
21.分区内外的操作不一样:aggregate
scala> val data = sc.makeRDD(1 to 10,3)
scala> data.glom.collect
res174: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
1) 给定初始值5,三个分区内的元素与初始值相加得出三个结果,三个结果再与初始值相加
scala> data.aggregate(5)((x,y)=>{println("seqOp",x,y);x+y},(x,y)=>{println("combOp",x,y);x+y})
(seqOp,5,4)
(seqOp,9,5)
(seqOp,14,6)
(seqOp,5,7)
(seqOp,12,8)
(seqOp,20,9)
(seqOp,29,10)
(seqOp,5,1)
(seqOp,6,2)
(seqOp,8,3)
(combOp,5,20)
(combOp,25,39)
(combOp,64,11)
res171: Int = 75
2)分区内求和,分区外找最大值
scala> data.aggregate(5)((x,y)=>{println("seqOp",x,y);x+y},(x,y)=>{println("combOp",x,y);if(x>y) x else y})
(seqOp,5,4)
(seqOp,9,5)
(seqOp,14,6)
(seqOp,5,7)
(seqOp,12,8)
(seqOp,20,9)
(seqOp,29,10)
(seqOp,5,1)
(seqOp,6,2)
(seqOp,8,3)
(combOp,5,39)
(combOp,39,20)
(combOp,39,11)
res173: Int = 39
22.分区内外做的事情一样:fold
scala> data.fold(0)((x,y)=>{println("aa",x,y);x+y})
(aa,0,1)
(aa,1,2)
(aa,3,3)
(aa,0,7)
(aa,7,8)
(aa,15,9)
(aa,24,10)
(aa,0,4)
(aa,4,5)
(aa,9,6)
(aa,0,6)
(aa,6,34)
(aa,40,15)
res176: Int = 55
aggregate与fold相比,aggregate更加灵活