spark graphx 图计算官网实例练习:
http://spark.apache.org/docs/latest/graphx-programming-guide.html
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.GraphLoader
val graph = GraphLoader.edgeListFile(sc, "/data/graphx/followers.txt")
val temp = graph.mapVertices((id,attr) => attr.toInt * 2)
temp.vertices.take(10)
或者
val temp : Graph[Int,Int] = graph.mapVertices((_,attr) => attr * 2)
temp.vertices.take(10)
//140M的数据:
val graph = GraphLoader.edgeListFile(sc, "hdfs://server1:9000/data/graphx/followers.txt",numEdgePartitions=4)
graph.vertices.count
graph.edges.count
------properties operators---end====================================
structural operators-----start
把所有的边的方向对调一下!!!!reverse
subgraph-----对边或者顶点帅选
val subGraph = graph.subgraph(epred = e =>e.srcId > e.dstId)
val subGraph = graph.subgraph(epred = e =>e.srcId > e.dstId,vpred=(id,_) => id>500000)
mask---合并一个子集
groupEdges----多个边合并
structural operators-----end=============
computing degree-----start=============
val tmp = graph.degrees
val temp = graph.inDegrees//(作为目标节点的数量!)
temp.take(10)
val temp = graph.outDegrees//(作为原节点的数量!)
temp.take(10)
val tmp = graph.degrees
def max(a : (VertexId,Int),b: (VertexId,Int) ): (VertexId,Int) = if (a._2 > b._2) a else b
def max(a : {VertexId,Int},b: {VertexId,Int)}): (VertexId,Int) = if (a._2 > b._2) a else b
graph.degrees.reduce(max)
//业务含义:哪个完整的导航页数最多: 500个,说明可能是123的导航网站
computing degree-----end========================
collectint Neighbors-------start=============
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
collectint Neighbors-------end=============
join operators-------start=============
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null)
: Graph[VD2, ED]
val rawGraph = graph.mapVertices((id,attr) => 0)//顶点设置为0
val outDeg = rawGraph.outDegrees
val tmp = rawGraph.joinVertices[Int](outDeg){(_,_,optDeg) => optDeg}
val tmp = rawGraph.outerJoinVertices[int,Int](outDeg){_,_,optDeg => optDeg.getOrElse(0)}
join operators-------end=============
地图----挖掘工具!
map reduce triplets-------start=============
图上做mr操作
http://spark.apache.org/docs/latest/graphx-programming-guide.html
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst(1, triplet.srcAttr)
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) =>
value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
map reduce triplets-------end=============
pregel api-------start=============
为何graphx需要提供调用pregel api操作?
为了让大家更方便处理一些迭代操作!graphx需手动cache---控制比较难:点边分别cache;
会自动的处理cache!!!!
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
采用一步处理方式,适用于迭代多的场景
pregel api--------end=============
graphX设计--------start=============
edge cut 为了分布式
vertex cut 为了分布式 这个!!!!
partitionStrategy 类!4种!!!方式
graph 实用的是vertex cut 所以一个顶点可能在多个partition上,而一条边只会在一个partition!!!
graphX设计--------end=============
pagerank triangleCount--------start=============
pagerank不讲了,最简单的!
val ranks = graph.pageRank(0.01).vertices //0.01越小越精确,数据量大要设置大一点。。直接实现好了
ranks.take(10).mkString("\n")
很方便实用:社交网络的推荐。。。。0.01不算太大! 排序!!!!!
triangleCount : 关系紧密:三角形个数!!!!srcid < desid
val graph = GraphLoader.edgeListFile(sc, "hdfs://server1:9000/data/graphx/followers.txt",true)
val c = graph.triangleCount().vertices
c.take(10)
pagerank triangleCount--------end=============