spark2.1:rdd.combineByKeyWithClassTag的用法示例

时间:2021-07-29 20:27:02

测试spark版本:

Spark context Web UI available at http://192.168.1.1:32735
Spark context available as 'sc' (master = local[*], app id = local-1380172893828).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_72)
Type in expressions to have them evaluated.
Type :help for more information.

备注:spark1.5中没有提供rdd.combineByKeyWithClassTag算子,但提供的有rdd.combineByKey算子(spark2.1中依然保留)。

使用示例:

scala> case class FModel(cgridid: Int, angle: Double, drsrp: Double, distance: Double)
defined class FModel scala> val sample_rdd=sc.makeRDD(
| Array(
| (1,FModel(1,2.0,2.1,2.2)),
| (1,FModel(2,2.2,2.11,23.2)),
| (2,FModel(1,2.0,2.1,2.2)),
| (1,FModel(3,2.0,42.1,22.2)),
| (2,FModel(2,2.2,2.11,23.2)),
| (3,FModel(3,2.0,42.1,22.2))
| )
| )
sample_rdd: org.apache.spark.rdd.RDD[(Int, FModel)] = ParallelCollectionRDD[0] at makeRDD at <console>:26 scala> val combinByKeyRDD = sample_rdd.combineByKeyWithClassTag(
| (x: FModel) => (List(x), 1),
| (peo: (List[FModel], Int), x: FModel) => (x :: peo._1, peo._2 + 1),
| (sex1: (List[FModel], Int), sex2: (List[FModel], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
combinByKeyRDD: org.apache.spark.rdd.RDD[(Int, (List[FModel], Int))] = ShuffledRDD[1] at combineByKeyWithClassTag at <console>:28 scala> combinByKeyRDD.foreach(println)
[Stage 0:> (0 + 0) / 12](3,(List(FModel(3,2.0,42.1,22.2)),1))
(2,(List(FModel(1,2.0,2.1,2.2), FModel(2,2.2,2.11,23.2)),2))
(1,(List(FModel(1,2.0,2.1,2.2), FModel(2,2.2,2.11,23.2), FModel(3,2.0,42.1,22.2)),3)) scala>