Spark Graphx图计算之aggregateUsingIndex实操!
aggregateUsingIndex:根据相同的verticesId,进行reduceFun操作。比如(1,1.0)(1,2.0),操作后成为(1,3.0)
//屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//设置运行环境
val conf = new SparkConf().setAppName("SNSAnalysisGraphX").setMaster("local[4]")
val sc = new SparkContext(conf)
//aggregateUsingIndex操作
println("*************************************************************")
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 2)))
setA.collect().foreach(println(_))
println("*************************************************************")
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
rddB.collect().foreach(println(_))
println("*************************************************************")
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) //根据ID,合并计算相同ID的值
setB.collect().foreach(println(_))
println("*************************************************************")
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
setC.collect().foreach(println(_))
sc.stop