Spark GraphX 属性图操作

时间:2023-02-03 10:12:11
package Spark_GraphX

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object 属性图 {
def main(args: Array[String]): Unit
= {
val conf
=new SparkConf().setAppName("SimpleGraphX").setMaster("local[4]")
val sc
=new SparkContext(conf)
//定义顶点
val users:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("soyo","student")),(7L,("soyo2","postdoc")),(5L,("xiaozhou","professor")),(2L,("xiaocui","professor"))))
//定义边
val relationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"parent")))
//定义默认的作者,以防与不存在的作者有边
val defaultUser=("Jone","Dance")
val graph
=Graph(users,relationships,defaultUser)
println(
"*****************")
println(
"找到图中属性是student的点")
graph.vertices.filter{
case (id,(name,occupation))=>occupation=="student"}.collect.foreach{case(id,(name,occupation))=>println(s"$name is $occupation")}
println(
"--------------------------")
println(
"找到途中边的属性是advisor的边")
graph.edges.filter(x
=>x.attr=="advisor").collect().foreach(x=>println(s"${x.srcId} to ${x.dstId} 属性为 ${x.attr}"))
println(
"--------------------------")
println(
"找到图中的最大出度,入度,度数")
println(
"最大的出度:"+graph.outDegrees.reduce(max))
println(
"最大的入度:"+graph.inDegrees.reduce(max))
println(
"最大的度数:"+graph.degrees.reduce(max))
//Scala 可直接调用Java程序
// System.out.print("hello word")
//属性操作
println("------------------------")
println(
"给图中每个顶点的职业属性上加上“spark字符串")
graph.mapVertices{
case (id,(name,occupation))=>(id,(name,occupation+"Spark"))}.vertices.collect.foreach(x=>println(s"${x._2._1} is ${x._2._2} : ${x._1} : ${x._2}"))
println(
"------------------------")
println(
"给途中每个元组的Edge的属性值设置为源顶点属性值+边的属性值+目标定点属性值:")
graph.mapTriplets(x
=>x.srcAttr._2+"+"+x.attr+"+"+x.dstAttr._2).edges.collect().foreach(println)
//可以证明:属性操作下,图的结构都不受影响.
graph.mapTriplets(x=>x.srcId+x.dstId).edges.collect().foreach(println)
//结构操作 :triplets(表示边)
/*
reverse操作返回一个所有边方向取反的新图.该反转操作并没有修改图中顶点,边的属性,更没有增加边的数量.
subgraph操作主要利用顶点和边进行判断,返回的新图中包含满足判断要求的顶点,边.该操作常用于一些情景,比如:限制感兴趣的图顶点和边,删除损坏连接.
*/
println(
"------结构操作---------")
graph.triplets.map(x
=>x.srcAttr._1+" is the "+x.attr+" of "+x.dstAttr._1).foreach(println)
println(
"-------删除职业是postdoc的节点,构建子图----------")
val validGraph
=graph.subgraph(vpred=(id,attr)=>attr._2!="postdoc")
validGraph.vertices.
foreach(println)
validGraph.triplets.map(x
=>x.srcAttr._1+" is the "+x.attr+" of "+x.dstAttr._1).foreach(println)
println(
"----------构建职业是professor的子图,并打印子图的顶点--------")
val subGraph
=graph.subgraph(vpred = (id,attr)=>attr._2=="professor")
subGraph.vertices.collect().
foreach(x=>println(s"${x._2._1} is ${x._2._2}"))

}
//VertexId:顶点,Int:度数
def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={
if(a._2>b._2)a else b
}

}

结果:

*****************
找到图中属性是student的点
soyo
is student
--------------------------
找到途中边的属性是advisor的边
5 to 3 属性为 advisor
--------------------------
找到图中的最大出度,入度,度数
最大的出度:(
5,2)
最大的入度:(
7,2)
最大的度数:(
5,3)
------------------------
给图中每个顶点的职业属性上加上“spark字符串
5 is (xiaozhou,professorSpark) : 5 : (5,(xiaozhou,professorSpark))
2 is (xiaocui,professorSpark) : 2 : (2,(xiaocui,professorSpark))
3 is (soyo,studentSpark) : 3 : (3,(soyo,studentSpark))
7 is (soyo2,postdocSpark) : 7 : (7,(soyo2,postdocSpark))
------------------------
给途中每个元组的Edge的属性值设置为源顶点属性值
+边的属性值+目标定点属性值:
Edge(
3,7,student+collab+postdoc)
Edge(
5,3,professor+advisor+student)
Edge(
2,5,professor+colleague+professor)
Edge(
5,7,professor+parent+postdoc)
Edge(
3,7,10)
Edge(
5,3,8)
Edge(
2,5,7)
Edge(
5,7,12)
------结构操作---------
xiaozhou
is the parent of soyo2
soyo
is the collab of soyo2
xiaozhou
is the advisor of soyo
xiaocui
is the colleague of xiaozhou
-------删除职业是postdoc的节点,构建子图----------
(
5,(xiaozhou,professor))
(
2,(xiaocui,professor))
(
3,(soyo,student))
xiaozhou
is the advisor of soyo
xiaocui
is the colleague of xiaozhou
----------构建职业是professor的子图,并打印子图的顶点--------
xiaozhou
is professor
xiaocui
is professor