1.创建PairRDD
普通RDD转Pair RDD
val rdd = sc.parallelize(Array("java","scala"))
rdd.foreach(println)
val pairRdd = rdd.map(w => (w,"编程语言"))
pairRdd.foreach(print)
2.Pair RDD转化操作
2.1 单个Pair RDD转化操作
函数名 |
目的 |
reduceByKey(fuc) |
合并具有相同键的值 |
groupByKey() |
对具有相同键的值进行分组 |
combineByKey() |
使用不同的返回类型合并具有相同键的值 |
mapValues(func) |
对 pair RDD 中的每个值应用一个函数而不改变键 |
flatMapValues(func) |
对 pair RDD 中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。 |
keys() |
返回一个仅包含键的 RDD |
values() |
返回一个仅包含值的 RDD |
sortByKey() |
返回一个根据键排序的 RDD |
2.2 编码实例
# 创建测试RDD
val pairRdd = sc.parallelize(List((1,2),(1,3),(2,4),(3,5)))
# 1.reduceByKey()
# 合并相同键的值
val reduceByKeyRdd = pairRdd.reduceByKey((x,y) => x+y)
reduceByKeyRdd.foreach(println)
# (1,5)
# (3,5)
# (2,4)
# 2.groupByKey()
val groupByKeyRdd = pairRdd.groupByKey()
groupByKeyRdd.foreach(println)
# (1,CompactBuffer(2, 3))
# (3,CompactBuffer(5))
# (2,CompactBuffer(4))
# 3.combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)
# createCombiner:如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。
# 如果这是一个在处理当前分区中之前已经存在键,此时combineByKey()使用mergeValue()将该键的累加
# 器对应的当前值与这个新值进行合并。
# mergeValue: 合并重复的键(这个操作在每个分区内进行)
# mergeCombiners: 合并各个分区数据
# 计算相同键元素的平均值
val combineByKeyRdd = pairRdd.combineByKey((v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(part1: (Int, Int), part2: (Int, Int)) => (part1._1 + part2._1, part1._2 + part2._2))
.map{ case (key, value) => (key, value._1 / value._2.toFloat) }
combineByKeyRdd.foreach(println)
# (1,2.5)
# (3,5.0)
# (2,4.0)
# 4.mapValues(func)
# 对每个键的值进行+1操作
val mapValuesRdd = pairRdd.mapValues(x => x+1)
mapValuesRdd.foreach(println)
# (1,3)
# (1,4)
# (2,5)
# (3,6)
# 5.flatMapValues(func)
# 对每个value值进行迭代到6的操作
val flatMapValuesRdd = pairRdd.flatMapValues(x => (x to 6))
flatMapValuesRdd.foreach(println)
# (1,2)
# (1,3)
# (1,4)
# (1,5)
# (1,6)
# (1,3)
# (1,4)
# (1,5)
# (1,6)
# (2,4)
# (2,5)
# (2,6)
# (3,5)
# (3,6)
# 6.keys()
pairRdd.keys()
# 1123
# 7.values()
pairRdd.values()
# 2345
# 8.sortByKey()
2.2 两个Pair RDD转化操作
函数名 |
目的 |
subtractByKey(other) |
删掉 RDD 中键与 other RDD 中的键相同的元素 |
join(other) |
对两个 RDD 进行内连接 |
rightOuterJoin(other) |
对两个 RDD 进行连接操作,确保第一个 RDD 的键必须存在(右外连接) |
leftOuterJoin(other) |
对两个 RDD 进行连接操作,确保第二个 RDD 的键必须存在(左外连接) |
cogroup(other) |
将两个 RDD 中拥有相同键的数据分组到一起 |
3.Pair RDD行动操作
函数 |
描述 |
countByKey() |
对每个键对应的元素分别计数 |
collectAsMap() |
将结果以映射表的形式返回,以便查询 |
lookup(key) |
返回给定键对应的所有值 |
# countByKey()
print(pairRdd.countByKey())
# Map(2 -> 1, 1 -> 2, 3 -> 1)
# collectAsMap()
print(pairRdd.collectAsMap())
# Map(2 -> 4, 1 -> 3, 3 -> 5)
# lookup()
print(pairRdd.lookup(1))
WrappedArray(2, 3)