Spark Graphx图计算之二跳邻算法实战!
def sendMsgFunc(edge:EdgeTriplet[Int, Int]) = {
if(edge.srcAttr <= 0){
if(edge.dstAttr <= 0){
// 如果双方都小于0,则不发送信息
Iterator.empty
}else{
// srcAttr小于0,dstAttr大于零,则将dstAttr-1后发送
Iterator((edge.srcId, edge.dstAttr - 1))
}
}else{
if(edge.dstAttr <= 0){
// srcAttr大于0,dstAttr<0,则将srcAttr-1后发送
Iterator((edge.dstId, edge.srcAttr - 1))
}else{
// 双方都大于零,则将属性-1后发送
val toSrc = Iterator((edge.srcId, edge.dstAttr - 1))
val toDst = Iterator((edge.dstId, edge.srcAttr - 1))
toDst ++ toSrc
}
}
}
val friends = Pregel(
graph.mapVertices((vid, value)=> if(vid == 1) 2 else -1),
// 发送初始值
-1,
// 指定阶数
2,
// 双方向发送
EdgeDirection.Either
)(
// 将值设为大的一方
vprog = (vid, attr, msg) => math.max(attr, msg),
//
sendMsgFunc,
//
(a, b) => math.max(a, b)
).subgraph(vpred = (vid, v) => v >= 0)
println("\n\n~~~~~~~~~ Confirm Vertices of friends ")
friends.vertices.collect.foreach(println(_))
// (4,1)
// (8,0)
// (2,1)
// (1,2)
// (3,0)
// (5,0)
sc.stop