Spark入门(五):键值对RDD

时间:2023-01-29 18:17:24

1.创建PairRDD

普通RDD转Pair RDD


val rdd = sc.parallelize(Array("java","scala"))
rdd.foreach(println)
//java
//scala
val pairRdd = rdd.map(w => (w,"编程语言"))
pairRdd.foreach(print)
//(java,编程语言)
//(scala,编程语言)

2.Pair RDD转化操作

2.1 单个Pair RDD转化操作

函数名 目的
reduceByKey(fuc) 合并具有相同键的值
groupByKey() 对具有相同键的值进行分组
combineByKey() 使用不同的返回类型合并具有相同键的值
mapValues(func) 对 pair RDD 中的每个值应用一个函数而不改变键
flatMapValues(func) 对 pair RDD 中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。
keys() 返回一个仅包含键的 RDD
values() 返回一个仅包含值的 RDD
sortByKey() 返回一个根据键排序的 RDD

2.2 编码实例

# 创建测试RDD
val pairRdd = sc.parallelize(List((1,2),(1,3),(2,4),(3,5)))
# 1.reduceByKey()
# 合并相同键的值
val reduceByKeyRdd = pairRdd.reduceByKey((x,y) => x+y)
reduceByKeyRdd.foreach(println)
# (1,5)
# (3,5)
# (2,4)

# 2.groupByKey()
val groupByKeyRdd = pairRdd.groupByKey()
groupByKeyRdd.foreach(println)
# (1,CompactBuffer(2, 3))
# (3,CompactBuffer(5))
# (2,CompactBuffer(4))

# 3.combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)
# createCombiner:如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。
# 如果这是一个在处理当前分区中之前已经存在键,此时combineByKey()使用mergeValue()将该键的累加
# 器对应的当前值与这个新值进行合并。
# mergeValue: 合并重复的键(这个操作在每个分区内进行)
# mergeCombiners: 合并各个分区数据
# 计算相同键元素的平均值
val combineByKeyRdd = pairRdd.combineByKey((v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(part1: (Int, Int), part2: (Int, Int)) => (part1._1 + part2._1, part1._2 + part2._2))
.map{ case (key, value) => (key, value._1 / value._2.toFloat) }
combineByKeyRdd.foreach(println)
# (1,2.5)
# (3,5.0)
# (2,4.0)


# 4.mapValues(func)
# 对每个键的值进行+1操作
val mapValuesRdd = pairRdd.mapValues(x => x+1)
mapValuesRdd.foreach(println)
# (1,3)
# (1,4)
# (2,5)
# (3,6)

# 5.flatMapValues(func)
# 对每个value值进行迭代到6的操作
val flatMapValuesRdd = pairRdd.flatMapValues(x => (x to 6))
flatMapValuesRdd.foreach(println)
# (1,2)
# (1,3)
# (1,4)
# (1,5)
# (1,6)
# (1,3)
# (1,4)
# (1,5)
# (1,6)
# (2,4)
# (2,5)
# (2,6)
# (3,5)
# (3,6)

# 6.keys()
pairRdd.keys()
# 1123

# 7.values()
pairRdd.values()
# 2345

# 8.sortByKey()

2.2 两个Pair RDD转化操作

函数名 目的
subtractByKey(other) 删掉 RDD 中键与 other RDD 中的键相同的元素
join(other) 对两个 RDD 进行内连接
rightOuterJoin(other) 对两个 RDD 进行连接操作,确保第一个 RDD 的键必须存在(右外连接)
leftOuterJoin(other) 对两个 RDD 进行连接操作,确保第二个 RDD 的键必须存在(左外连接)
cogroup(other) 将两个 RDD 中拥有相同键的数据分组到一起

3.Pair RDD行动操作

函数 描述
countByKey() 对每个键对应的元素分别计数
collectAsMap() 将结果以映射表的形式返回,以便查询
lookup(key) 返回给定键对应的所有值
# countByKey()
print(pairRdd.countByKey())
# Map(2 -> 1, 1 -> 2, 3 -> 1)

# collectAsMap()
print(pairRdd.collectAsMap())
# Map(2 -> 4, 1 -> 3, 3 -> 5)

# lookup()
print(pairRdd.lookup(1))
WrappedArray(2, 3)