GraphX算法模型:PageRank
一:算法介绍
PageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。
一个页面的“得票数”由所有链向它的页面的重要性来决定,到一个页面的超链接相当于对该页投一票。一个页面的PageRank是由所有链向它的页面(“链入页面”)的重要性经过递归算法得到的。一个有较多链入的页面会有较高的得分,相反如果一个页面没有任何链入页面,那么它没有得分。
二:源码分析
I:PregelPageRank
文件位置:spark-1.0.1\graphx\src\main\scala\org\apache\spark\graphx\lib\PageRank.scala
1.1:代码简介
该PageRank模型提供了两种调用方式:
第一种:(静态)在调用时提供一个参数number,用于指定迭代次数,即无论结果如何,该算法在迭代number次后停止计算,返回图结果。
第二种:(动态)在调用时提供一个参数tol,用于指定前后两次迭代的结果差值应小于tol,以达到最终收敛的效果时才停止计算,返回图结果。
这是GraphX提供的用Pregel的模型改进后产生的图算法,通常我们在进行使用PageRank的代码编写时并不涉及去改动这份源码,而是直接调用:
1.2:源码解析
首先解释下列代码中run()的几个参数:
graph:进行PageRank计算的图模型
numIter:固定的PageRank计算的迭代次数
resetProb:随机重置的概率,通常都是0.15
Graph:返回值,以图的形式包括最终的顶点值(pagerank值)和边值(权重值),进而得到最终的排名结果
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
{
//下列这段代码用于初始化PageRank图模型,具体内容是赋予每个顶点属性为值1.0,赋予每条边属性为值“1/该边的出发顶点的出度数”。
val pagerankGraph: Graph[Double, Double] = graph
//将每个顶点进行连接(度的传递)得到顶点属性值为出度数
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) =>deg.getOrElse(0) }
//通过顶点的出度数为每条边设置权重值;这里是Triplet型的迭代器不停地执行一个map函数来遍历得到每条边的权重值,值为1.0/顶点出度数
.mapTriplets( e => 1.0 / e.srcAttr )
//设置每个顶点的初始属性值为1.0
.mapVertices( (id,attr) => 1.0 )
.cache() //将完成初始化的图缓存操作
//以下将定义三个所需函数来完成GraphX对PageRank的算法实现
//用作 Pregel的message //第一个函数用于返回一个考虑“随机事件”发生后的计算结果
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double=
resetProb + (1.0 - resetProb) * msgSum
//第二个函数用于得到一个迭代器,里面包含了两个信息:该边的目的ID、该边的源属性值和权重的乘积(该边传递的实际PR值)
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr* edge.attr))
//第三个函数用于将顶点属性值和传递的值进行累加
def messageCombiner(a: Double, b: Double): Double = a + b //在该PageRank模型中每个顶点接受到的初始传递信息都是0.0
val initialMessage = 0.0 // 执行 pregel 模型算法(固定的迭代次数)
Pregel(pagerankGraph, initialMessage, numIter, activeDirection= EdgeDirection.Out)(
vertexProgram,sendMessage, messageCombiner)
}
至此第一种(静态)PageRank模型计算结束
以下是第二种(动态)PageRank模型计算,相同代码就不再累赘解释
初始化参数和上面不同的是少了numIter(迭代次数),多了tol(比较两次迭代的结果差)
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], tol: Double, resetProb:Double = 0.15): Graph[Double,Double] =
{
// 下段代码同样用于初始化图形
val pagerankGraph: Graph[(Double, Double), Double] = graph
//同上,将每个顶点进行连接(度的传递)得到顶点属性值为出度数
.outerJoinVertices(graph.outDegrees) {
(vid, vdata, deg)=> deg.getOrElse(0)
}
//边属性值(权重)的初始化,值为1.0/顶点出度数
.mapTriplets( e => 1.0 / e.srcAttr )
// 顶点属性值的初始化,但是属性值带两个参数即(初始PR值,两次迭代结果的差值)
.mapVertices( (id,attr) => (0.0, 0.0) )
.cache()
同样需要定义以下三个函数来完成GraphX对PageRank的实现
同样用作Pregel的message
// 第一个函数多了一个返回值delta(newPR-oldPR)
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
}
// 第二个函数同样用于得到一个迭代器,但是多了一个条件判定:如果源顶点的delta值小于tol就清空迭代器即返回空迭代。
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
if (edge.srcAttr._2 > tol) {
Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
} else {
Iterator.empty
}
} def messageCombiner(a: Double, b: Double): Double = a + b // 每个顶点接受到的初始传递信息值不是0,而是resetProb / (1.0 - resetProb)
val initialMessage = resetProb / (1.0 - resetProb) // 动态执行 Pregel 模型(直至结果最终收敛)
Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
vertexProgram, sendMessage, messageCombiner)
.mapVertices((vid, attr) => attr._1)
}
至此第二种(动态)PageRank模型计算结束
1.3:涉及代码
spark-1.0.1\graphx\src\main\scala\org\apache\spark\graphx:
GraphOps.scala
Pregel.scala
应该说明一下:
Pregel相当于图计算的引擎,用于图计算的大框架(对顶点的消息计算、消息发送、消息合并),它是图迭代的执行者。lib中的所有算法模型最后都会调用Pregel。
GraohOps 则相当于一个可以快速调用方法的清单,里面给出了很多类或方法的入口;在此例中的PageRank()、Pregel()方法都是从这启动的。
II:PageRank简例
2.1:代码简介
这段代码是官方GraphX guide 提供的,由于使用的就是spark包中的自带数据,所以用于测试非常简单。另外,这个例子确实非常简短,因为自带数据量非常非常小(但这并不意味我们不可以修改原始数据)!
下面是全部的源码,除开注释可看出代码非常的少,那是因为GraphX为了让开发者方便直接提供了多个算法模型(如上述的PageRank.scala文件),只需代入数据直接调用就行。
2.2:源码解析
// 从特定的边列表文件中读取数据生成图框架
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// 用上面的图框架来调用pageRank(动态)算法
//特别注意:静态调用的方法名是staticPageRank(Int)
// vertices将返回顶点属性
val ranks = graph.pageRank(0.0001).vertices
// 将上面得到的ranks(顶点属性)和用户进行关系连接
// 首先也是读取一个包含了用户信息的文件,然后调用了一个map函数,即将文件里的每行数据按 ”,” 切开并返回存储了处理后数据的RDD
val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
// 这里具体实现了将ranks和用户列表一一对应起来
// 从map函数的内容可以看出是按id来进行连接,但返回的结果只含用户名和它的相应rank值
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// 收集上面RDD里的数据并打印出来
println(ranksByUsername.collect().mkString("\n"))
2.3:输入数据
在spark-1.0.1\graphx\data 目录下有两份数据文件:
followers.txt
Users.txt
这两份数据的数据量非常小,但是作为测试可以更好的分析其算法原理。
2.4:输出结果
㈠动态调用
结果1:
下面变更下参数(tol值)
结果2:
可以看出和上面的结果相差较大,事实上,参数值越小得到的结果越有说服力。
⑵静态调用
结果1:
增加迭代次数:
结果2:
结果依然不够准确,再次增加迭代次数:
结果3:
结果依然不够准确,再次增加迭代次数:
结果4:
可以看出对于该数据,用静态算法很难得到准确的结果
三:问题及改进
I:格式问题
①不同的方法对数据格式有不同的要求,例如edgeListFile 读取边列表文件时,要求数据格式必须为:
vId vId
vId vId
... ...
②数据的提取,对于一行数据,哪些才是我们真正使用和想表现的,需要自定义方法来达成。同时要注意包含相应的标识数据(如VertexID)
③有个问题要特别注意:即两种调用方式的方法名,他们除了参数类型不同,名字也是不同的:
动态调用:pageRank(double) //开头不大写
静态调用:staticPageRank(int) //开头不大写,但P处要大写
II:模型(代码)问题
①PageRank本身是存在多种计算漏洞的,如“黑洞效应”:当一个顶点只有入度而没有出度时将不断的吞噬掉该有向图其他顶点的PR值,最终使得所有顶点的PR值都变成0。
不过上述的计算模型用阻尼系数resetProb解决了这个问题。
②代入参数不同而造成的结果不同,例如静态和动态调用哪种更适合,又或者迭代次数的选择、前后两次迭代的差值限定又该选择多少,这些都是没有固定标准的。
另外,从测试结果可以看出目前静态调用方式(即设置固定迭代次数)的结果是存在问题的,但这个问题究竟是我的使用方式不对,还是数据本身的原因或者是其他因素还有待确认。
III:应用场景
Google的网页排名并非如此单纯的PageRank算法,它考虑的综合因素至少有10点以上。
但该算法仍然可以为排名计算(网页排名、用户排名等)提供其中一个可靠的依据。在目前,我们应该不会去改动模型代码,而是合理的安排需要处理的数据。该算法处理的场景原型很容易看出:有相互联系的事物网中,评选出最受“欢迎”的事物。什么叫受欢迎?——被其他事物选择、依赖、信任、消费等等。
有任何错误,欢迎指出