package Spark_GraphX
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object 属性图 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("SimpleGraphX").setMaster("local[4]")
val sc=new SparkContext(conf)
//定义顶点
val users:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("soyo","student")),(7L,("soyo2","postdoc")),(5L,("xiaozhou","professor")),(2L,("xiaocui","professor"))))
//定义边
val relationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"parent")))
//定义默认的作者,以防与不存在的作者有边
val defaultUser=("Jone","Dance")
val graph=Graph(users,relationships,defaultUser)
println("*****************")
println("找到图中属性是student的点")
graph.vertices.filter{case (id,(name,occupation))=>occupation=="student"}.collect.foreach{case(id,(name,occupation))=>println(s"$name is $occupation")}
println("--------------------------")
println("找到途中边的属性是advisor的边")
graph.edges.filter(x=>x.attr=="advisor").collect().foreach(x=>println(s"${x.srcId} to ${x.dstId} 属性为 ${x.attr}"))
println("--------------------------")
println("找到图中的最大出度,入度,度数")
println("最大的出度:"+graph.outDegrees.reduce(max))
println("最大的入度:"+graph.inDegrees.reduce(max))
println("最大的度数:"+graph.degrees.reduce(max))
//Scala 可直接调用Java程序
// System.out.print("hello word")
//属性操作
println("------------------------")
println("给图中每个顶点的职业属性上加上“spark字符串")
graph.mapVertices{case (id,(name,occupation))=>(id,(name,occupation+"Spark"))}.vertices.collect.foreach(x=>println(s"${x._2._1} is ${x._2._2} : ${x._1} : ${x._2}"))
println("------------------------")
println("给途中每个元组的Edge的属性值设置为源顶点属性值+边的属性值+目标定点属性值:")
graph.mapTriplets(x=>x.srcAttr._2+"+"+x.attr+"+"+x.dstAttr._2).edges.collect().foreach(println)
//可以证明:属性操作下,图的结构都不受影响.
graph.mapTriplets(x=>x.srcId+x.dstId).edges.collect().foreach(println)
//结构操作 :triplets(表示边)
/*
reverse操作返回一个所有边方向取反的新图.该反转操作并没有修改图中顶点,边的属性,更没有增加边的数量.
subgraph操作主要利用顶点和边进行判断,返回的新图中包含满足判断要求的顶点,边.该操作常用于一些情景,比如:限制感兴趣的图顶点和边,删除损坏连接.
*/
println("------结构操作---------")
graph.triplets.map(x=>x.srcAttr._1+" is the "+x.attr+" of "+x.dstAttr._1).foreach(println)
println("-------删除职业是postdoc的节点,构建子图----------")
val validGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="postdoc")
validGraph.vertices.foreach(println)
validGraph.triplets.map(x=>x.srcAttr._1+" is the "+x.attr+" of "+x.dstAttr._1).foreach(println)
println("----------构建职业是professor的子图,并打印子图的顶点--------")
val subGraph=graph.subgraph(vpred = (id,attr)=>attr._2=="professor")
subGraph.vertices.collect().foreach(x=>println(s"${x._2._1} is ${x._2._2}"))
}
//VertexId:顶点,Int:度数
def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={
if(a._2>b._2)a else b
}
}
结果:
*****************
找到图中属性是student的点
soyo is student
--------------------------
找到途中边的属性是advisor的边
5 to 3 属性为 advisor
--------------------------
找到图中的最大出度,入度,度数
最大的出度:(5,2)
最大的入度:(7,2)
最大的度数:(5,3)
------------------------
给图中每个顶点的职业属性上加上“spark字符串
5 is (xiaozhou,professorSpark) : 5 : (5,(xiaozhou,professorSpark))
2 is (xiaocui,professorSpark) : 2 : (2,(xiaocui,professorSpark))
3 is (soyo,studentSpark) : 3 : (3,(soyo,studentSpark))
7 is (soyo2,postdocSpark) : 7 : (7,(soyo2,postdocSpark))
------------------------
给途中每个元组的Edge的属性值设置为源顶点属性值+边的属性值+目标定点属性值:
Edge(3,7,student+collab+postdoc)
Edge(5,3,professor+advisor+student)
Edge(2,5,professor+colleague+professor)
Edge(5,7,professor+parent+postdoc)
Edge(3,7,10)
Edge(5,3,8)
Edge(2,5,7)
Edge(5,7,12)
------结构操作---------
xiaozhou is the parent of soyo2
soyo is the collab of soyo2
xiaozhou is the advisor of soyo
xiaocui is the colleague of xiaozhou
-------删除职业是postdoc的节点,构建子图----------
(5,(xiaozhou,professor))
(2,(xiaocui,professor))
(3,(soyo,student))
xiaozhou is the advisor of soyo
xiaocui is the colleague of xiaozhou
----------构建职业是professor的子图,并打印子图的顶点--------
xiaozhou is professor
xiaocui is professor