水善利万物而不争,处众人之所恶,故几于道????
文章目录
- 1. map()
- 2. flatMap()
- 3. filter()
- 4. mapPartitions()
- 5. mapPartitionsWithIndex()
- 6. groupBy()
- 7. distinct()
- 8. coalesce()
- 9. repartition()
- 10. sortBy()
- 11. intersection()
- 12.union()
- 13. subtract()
- 14. zip()
- 15. partitionBy()
- 16. groupByKey()
- 17. reduceByKey()
- 18. aggregateByKey()()
- 19. sortByKey()
- 20. mapValues()
- 21. join()
- 22. cogroup()
1. map()
用于对数据进行映射转换,返回一个新的RDD
操作的是RDD中的每个元素
例:创建一个List集合的RDD,将其中的每个数字映射为二元元组,(“偶数”, 4)、(“奇数”, 1)这样的形式
@Test
def map(): Unit ={
// 创建一个RDD,2个分区
val rdd1 = sc.parallelize(List(1, 4, 6, 7),2)
// 使用map()转换结构
val rdd2 = rdd1.map(x => {
if (x % 2 == 0)
("偶数", x)
else ("奇数", x)
})
// 遍历RDD并打印
rdd2.foreach(println)
}
运行结果:
2. flatMap()
转化+压平操作,先对RDD中的每个元素进行转换,然后将转换的结果(数组、列表等)进行压平,返回一个新的RDD
操作的单位是RDD中的每个元素
例:将RDD中的元素进行切割,然后进行扁平化处理:
@Test
def flatMap(): Unit ={
val rdd1 = sc.parallelize(List("spark,scala", "python,java", "hadoop,java"),3)
val rdd2 = rdd1.flatMap(x=>{
// 切割后的每个元素都是一个数组,然后将多个数组进行扁平化处理
x.split(",")
})
// 将扁平化后的结果数组,收集到Driver端,转成List然后打印(不转换为List也能打印,只不过打印的是地址值,没有重写toString方法)
println(rdd2.collect().toList)
}
结果:
3. filter()
过滤数据,参数是一个返回值是Boolean类型的函数,将返回为true的元素保留
操作的单位是每一个元素
例:过滤出RDD中奇数元素
@Test
def filter(): Unit ={
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 7, 8))
val rdd2: RDD[Int] = rdd1.filter(x => {
// 过滤奇数
x % 2!=0
})
println(rdd2.collect().toList)
}
结果:
4. mapPartitions()
以分区为单位,执行map,它拿到的是一个分区内所有数据的迭代器对象(iterator类型的迭代器),你要遍历这个迭代器对象才能拿到这个分区里面的每个数据
这个的应用场景一般是读取数据库操作的时候用,可以减少数据库连接的创建、销毁次数,提高效率
普通的map是一个元素一个元素的操作,如果操作每个元素的时候都创建、销毁一次数据库连接,效率太差了,可以在每个分区创建一个连接,分区内的数据操作都用这一个连接,然后处理数据,并以批的形式执行操作这样效率比较高。
为啥不能将数据库的连接抽取出来在map函数外面创建,map里面使用连接?因为函数体外的代码是在Driver执行的(Driver负责执行main方法),函数体内处理数据的逻辑是在Executor中task去执行的task和Driver不在一台机器上,如果task想用Driver上的对象,就要Driver把这个对象通过网络传给task,网络传递肯定要序列化,执行sql的PrepareStatement对象是根本没有继承序列化接口,无法序列化,所以就会报错
例:RDD里的是学生id,要求从数据库中查出学生姓名和家庭住址并打印 - (3,test3,陕西)
map写法:
@Test
def map(): Unit = {
// 学生id
val rdd1 = sc.parallelize(List(1, 3, 5, 77, 6, 34))
// 对每个元素进行操作
val rdd2 = rdd1.map(id=>{
var connection: Connection = null
var statement:PreparedStatement = null
var name:String = null
var address:String = null
try{
// 在map()函数体里面创建连接对象,因为map()是针对每个元素进行操作的,
// 所以处理每个元素的时候都会进行连接的创建、销毁,效率低的要命
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")
statement = connection.prepareStatement("select id,name,address from student where id=?")
statement.setInt(1,id)
val resultSet = statement.executeQuery()
while (resultSet.next()) {
name = resultSet.getString("name")
address = resultSet.getString("address")
}
}catch {
case e:Exception => e.printStackTrace()
}finally {
if (connection != null) {
connection.close()
}
if (statement != null) {
statement.close()
}
}
(id,name,address)
})
println(rdd2.collect().toList)
}
mapPartitions写法:
@Test
def mapPartitions(): Unit = {
val rdd1 = sc.parallelize(List(1, 3, 5, 77, 6, 34))
// 每个分区进行操作
val rdd2 = rdd1.mapPartitions(idIterator => {
// 每个分区创建一个对象
var connection: Connection = null
var statement: PreparedStatement = null
// 将查到的数据缓存起来
val listBuffer: ListBuffer[(Int, String, String)] = ListBuffer[(Int, String, String)]()
try {
// 这个连接对象在每个分区只会创建一次
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
statement = connection.prepareStatement("select id,name,address from student where id=?")
println(connection)
var name: String = null
var address: String = null
// idIterator.map(id=>{ 不能用map,因为用statement的时候连接已经被关闭了
// map()是转换算子,是延迟执行的,执行到这的时候还没有执行,而main方法已经把连接关闭了
idIterator.foreach(id => {
statement.setInt(1, id)
val resultSet = statement.executeQuery()
while (resultSet.next()) {
name = resultSet.getString("name")
address = resultSet.getString("address")
}
listBuffer += ((id, name, address))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
connection.close()
}
if (statement != null) {
statement.close()
}
}
listBuffer.toIterator
})
println(rdd2.collect().toList)
}
结果:
5. mapPartitionsWithIndex()
和mapPartitions()类似,只不过它可以取到分区号
例:rdd1设置为2个分区,打印分区号和每个分区内的数据
@Test
def mapPartitionsWithIndex(): Unit = {
val rdd1 = sc.parallelize(List(1, 3, 99, 56, 76, 7), 2)
val rdd2 = rdd1.mapPartitionsWithIndex((index, iterator) => {
println(s"分区号: ${index} === 区内数据:${iterator.toList}")
iterator
})
// 这个为啥输出空,是因为iterator迭代器只能调用一次,用过后里面就没有数据了 前面println的时候已经toList()用过了,所以后面返回的iterator本来就是个空的....
println(rdd2.collect().toList)
}
结果:
这个为啥输出空,是因为iterator迭代器只能调用一次,用过后里面就没有数据了 前面println的时候已经toList()用过了,所以后面返回的iterator本来就是个空的…
6. groupBy()
通过传入函数的参数进行分组,分组后的value还是完整的整个元素
例:按照三元元组中第三个元素进行分组
@Test
def groupBy(): Unit = {
val rdd1 = sc.parallelize(List(("zhangsan", "man", "beijing"), ("lisi", "woman", "xian"), ("zhaoliu", "man", "xian")))
val rdd2 = rdd1.groupBy(_._3)
println(rdd2.collect().toList)
}
结果:
7. distinct()
对RDD中的元素进行去重,返回一个去重后的RDD,新的RDD默认的分区数与原RDD分区数相同,也可以指定新RDD的分区数
去重也可以用groupBy实现,直接按照元素分组,然后把元组中的第一个元素取出来就是去重后的结果
@Test
def distinct(): Unit = {
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 3, 2, 1, 1))
val rdd2 = rdd1.distinct()
/*
val rdd3 = rdd1.groupBy(w => w)
val rdd4 = rdd3.map(_._1)
*/
println(rdd2.collect().toList)
}
结果:
8. coalesce()
合并分区,不会走shuffle,分区数比原来的小才能生效,如果想要将分区变多,要开启第二个参数,它会走shuffle将分区变多
@Test
def coalesce(): Unit = {
// 将算子分区数设置为6个分区
val rdd1 = sc.parallelize(List(3, 454, 566, 7, 5657, 6734545, 4, 5), 6)
// 将分区合并为4个
// val rdd2 = rdd1.coalesce(4)
val rdd2 = rdd1.coalesce(8,true)
println(rdd2.collect().toList)
Thread.sleep(900000000)
}
结果:通过web页面查看,可以看到分区变为了8个
9. repartition()
重分区,它可以增大或者减少分区数,它底层调用的就是coalesce() 只不过把是否shuffle恒设置为了true
@Test
def repartition(): Unit = {
val rdd1 = sc.parallelize(List(2, 3, 4, 5, 6, 3, 3, 4, 5, 4), 4)
val rdd2 = rdd1.repartition(6)
println(s"rdd1分区数:${rdd1.getNumPartitions}\nrdd2分区数:${rdd2.getNumPartitions}")
}
结果:
10. sortBy()
排序,默认升序排序,他会走shuffle,它使用的分区器是RangePartitioner
@Test
def sortBy(): Unit = {
val rdd1 = sc.parallelize(List(1, 4, 5, 6, 7, 4, 2, 2, 5, 7, 8), 6)
// 第二个参数默认是true,也就是升序排序,降序的话就false
val rdd2 = rdd1.sortBy(x => x, false)
println(rdd2.collect().toList)
}
结果:
11. intersection()
交集,取两个RDD的相同元素,会有两次shuffle,因为要想取出交集,就要把相同的元素聚在一起才能知道有没有相同的元素,那就rdd1落盘,rdd2落盘,然后rdd3再把两个数据拉过来,所以有两次shuffle
@Test
def intersection(): Unit = {
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), 6)
val rdd2 = sc.parallelize(List(4, 5, 6, 7, 8), 4)
val rdd3 = rdd1.intersection(rdd2)
println(rdd3.collect().toList)
Thread.sleep(10000000)
}
结果:
12.union()
并集,并集没有shuffle,他只是单纯的将两个RDD的数据放到一起,不关心有没有相同的数据。新RDD的分区数是原来两个RDD分区数之和
@Test
def union(): Unit = {
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), 5)
val rdd2 = sc.parallelize(List(4, 5, 6, 7, 8), 4)
val rdd3 = rdd1.union(rdd2)
println(rdd3.collect().toList)
println(rdd3.getNumPartitions) // 并集的分区数是两个RDD集合的分区数之和
Thread.sleep(10000000)
}
结果:
13. subtract()
差集合,它会产生shuffle,取方法调用者的分区数,因为衍生RDD的分区数取决于依赖的第一个RDD的分区数
@Test
def subtract(): Unit = {
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), 3)
val rdd2 = sc.parallelize(List(4, 5, 6, 7, 8), 4)
val rdd3 = rdd1.subtract(rdd2)
println(rdd3.collect().toList)
println(rdd3.getNumPartitions) //取方法调用者的分区数,衍生RDD的分区数取决于依赖的第一个RDD的分区数
Thread.sleep(10000000)
}
结果:
14. zip()
拉链,spark中的拉链要求两个RDD的分区数和数据条数都必须一样才能拉起来
@Test
def zip(): Unit = {
val rdd1 = sc.parallelize(List("hello", "spark", "xian", "beijign"), 5)
val rdd2 = sc.parallelize(List(1, 2, 3, 4), 5)
val rdd3 = rdd1.zip(rdd2)
println(rdd3.collect().toList)
}
结果:
15. partitionBy()
它的参数是一个分区器,按照给定的分区器重新分区。注意他这个要求操作的RDD必须是k-v键值对才能使用
@Test
def partitionBy(): Unit = {
val rdd1 = sc.parallelize(List(1, 3, 4, 5, 6, 7, 9), 4)
// 4个分区,集合长度为7 0: (0*7)/4 - (1*7)/4 => 0-1 => 1
// 1: (1*7)/4 - (2*7)/4 => 1-3 => 3 4
// 2: 3 - (3*7)/4 => 3-5 => 5 6
// 3: 5 - (4*7)/4 => 5-6 => 7 9
//
val rdd2 = rdd1.map(x => {
(x, null)
})
val rdd3 = rdd2.partitionBy(new HashPartitioner(5))
// 1%5 = 1 1在1号分区
// 3%5 = 3
//
// 0: 5
// 1: 1 6
// 2: 7
// 3: 3
// 4: 4 9
//
rdd3.mapPartitionsWithIndex((index, it) => {
println(s"${index} == ${it.toList}")
it
}).collect()
}
结果:
16. groupByKey()
根据key分组,返回的新的RDD是KV键值对,value是所有key相同的value值。会走shuffle
@Test
def groupByKey(): Unit = {
val rdd1 = sc.parallelize(List(("spark", 200), ("spark", 10), ("hadoop", 700), ("scala", 50), ("flink", 90000)), 3)
val rdd2 = rdd1.groupByKey(2)
// println(rdd2.getNumPartitions)
rdd2.mapPartitionsWithIndex((index, it) => {
println(s"groupByKey: ${index} == ${it.toList}")
it
}).collect()
// groupBy实现groupByKey的功能
val rdd3 = rdd1.groupBy(x => x._1)
val rdd4 = rdd3.map