1 引言
在了解GraphX之前,需要先了解关于通用的分布式图计算框架的两个常见问题:图存储模式和图计算模式。
1.1 图存储模式
巨型图的存储总体上有边分割和点分割两种存储方式。2013年,GraphLab2.0将其存储方式由边分割变为点分割,在性能上取得重大提升,目前基本上被业界广泛接受并使用。
边分割(Edge-Cut):每个顶点都存储一次,但有的边会被打断分到两台机器上。这样做的好处是节省存储空间;坏处是对图进行基于边的计算时,对于一条两个顶点被分到不同机器上的边来说,要跨机器通信传输数据,内网通信流量大。
点分割(Vertex-Cut):每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上,增加了存储开销,同时会引发数据同步问题。好处是可以大幅减少内网通信量。
虽然两种方法互有利弊,但现在是点分割占上风,各种分布式图计算框架都将自己底层的存储形式变成了点分割。主要原因有以下两个。
磁盘价格下降,存储空间不再是问题,而内网的通信资源没有突破性进展,集群计算时内网带宽是宝贵的,时间比磁盘更珍贵。这点就类似于常见的空间换时间的策略。
在当前的应用场景中,绝大多数网络都是“无尺度网络”,遵循幂律分布,不同点的邻居数量相差非常悬殊。而边分割会使那些多邻居的点所相连的边大多数被分到不同的机器上,这样的数据分布会使得内网带宽更加捉襟见肘,于是边分割存储方式被渐渐抛弃了。
1.2 图计算模式
目前的图计算框架基本上都遵循BSP(Bulk Synchronous Parallell)计算模式。Bulk Synchronous Parallell,即整体同步并行,它将计算分成一系列的超步(superstep)的迭代(iteration)。从纵向上看,它是一个串行模型,而从横向上看,它是一个并行的模型,每两个superstep之间设置一个栅栏(barrier),即整体同步点,确定所有并行的计算都完成后再启动下一轮superstep。
每一个超步(superstep)包含三部分内容:
1.计算compute,每一个processor利用上一个superstep传过来的消息和本地的数据进行本地计算;
2.消息传递,每一个processor计算完毕后,将消息传递个与之关联的其它processors;
3.整体同步点,用于整体同步,确定所有的计算和消息传递都进行完毕后,进入下一个superstep。
1.2.1 Pregel模型——像顶点一样思考
Pregel借鉴MapReduce的思想,采用消息在点之间传递数据的方式,提出了“像顶点一样思考”(Think Like A Vertex)的图计算模式,采用消息在点之间传递数据的方式,让用户无需考虑并行分布式计算的细节,只需要实现一个顶点更新函数,让框架在遍历顶点时进行调用即可。
常见的代码模板如下:
上图简要地描述了Pregel的计算模型:
1.master将图进行分区,然后将一个或多个partition分给worker;
2.worker为每一个partition启动一个线程,该线程轮询partition中的顶点,为每一个active状态的顶点调用compute方法;
3.compute完成后,按照edge的信息将计算结果通过消息传递方式传给其它顶点;
4.完成同步后,重复执行2,3操作,直到没有active状态顶点或者迭代次数到达指定数目。
这个模型虽然简洁,但很容易发现它的缺陷。对于邻居数很多的顶点,它需要处理的消息非常庞大,而且在这个模式下,它们是无法被并发处理的。所以对于符合幂律分布的自然图,这种计算模型下很容易发生假死或者崩溃。
作为第一个通用的大规模图处理系统,pregel已经为分布式图处理迈进了不小的一步,这点不容置疑,但是pregel在一些地方也不尽如人意:
1.在图的划分上,采用的是简单的hash方式,这样固然能够满足负载均衡,但是hash方式并不能根据图的连通特性进行划分,导致超步之间的消息传递开销可能会是影响性能的最大隐患。
2.简单的checkpoint机制只能向后式地将状态恢复到当前S超步的几个超步之前,要到达S还需要重复计算,这其实也浪费了很多时间,因此如何设计checkpoint,使得只需重复计算故障worker的partition的计算节省计算甚至可以通过checkpoint直接到达故障发生前一超步S,也是一个很需要研究的地方。
3.BSP模型本身有其局限性,整体同步并行对于计算快的worker长期等待的问题无法解决。
4.由于pregel目前的计算状态都是常驻内存的,对于规模继续增大的图处理可能会导致内存不足,如何解决尚待研究。
1.2.2 GAS模型——邻居更新模型
相比Pregel模型的消息通信范式,GraphLab的GAS模型更偏向共享内存风格。它允许用户的自定义函数访问当前顶点的整个邻域,可抽象成Gather、Apply和Scatter三个阶段,简称为GAS。相对应,用户需要实现三个独立的函数gather、apply和scatter。常见的代码模板如下所示:
由于gather/scatter函数是以单条边为操作粒度,所以对于一个顶点的众多邻边,可以分别由相应的worker独立调用gather/scatter函数。这一设计主要是为了适应点分割的图存储模式,从而避免Pregel模型会遇到的问题。
1.Gather阶段
工作顶点的边(可能是所有边,也有可能是入边或者出边)从领接顶点和自身收集数据,记为gather_data_i,各个边的数据graphlab会求和,记为sum_data。这一阶段对工作顶点、边都是只读的。
2.Apply阶段
Mirror将gather计算的结果sum_data发送给master顶点,master进行汇总为total。Master利用total和上一步的顶点数据,按照业务需求进行进一步的计算,然后更新master的顶点数据,并同步mirror。Apply阶段中,工作顶点可修改,边不可修改。
3.Scatter阶段
工作顶点更新完成之后,更新边上的数据,并通知对其有依赖的邻结顶点更新状态。这scatter过程中,工作顶点只读,边上数据可写。
在执行模型中,graphlab通过控制三个阶段的读写权限来达到互斥的目的。在gather阶段只读,apply对顶点只写,scatter对边只写。并行计算的同步通过master和mirror来实现,mirror相当于每个顶点对外的一个接口人,将复杂的数据通信抽象成顶点的行为。
下面这个例子说明GraphLab的执行模型:
2 GraphX实现分析
如同Spark本身,每个子模块都有一个核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图。它扩展了Spark RDD的抽象,有Table和Graph两种视图,而只需要一份物理存储。两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。
GraphX的底层设计有以下几个关键点。
对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成。这样对一个图的计算,最终在逻辑上,等价于一系列RDD的转换过程。因此,Graph最终具备了RDD的3个关键特性:Immutable、Distributed和Fault-Tolerant。其中最关键的是Immutable(不变性)。逻辑上,所有图的转换和操作都产生了一个新图;物理上,GraphX会有一定程度的不变顶点和边的复用优化,对用户透明。
两种视图底层共用的物理数据,由RDD[Vertex-Partition]和RDD[EdgePartition]这两个RDD组成。点和边实际都不是以表Collection[tuple]的形式存储的,而是由VertexPartition/EdgePartition在内部存储一个带索引结构的分片数据块,以加速不同视图下的遍历速度。不变的索引结构在RDD转换过程中是共用的,降低了计算和存储开销。
图的分布式存储采用点分割模式,而且使用partitionBy方法,由用户指定不同的划分策略(PartitionStrategy)。划分策略会将边分配到各个EdgePartition,顶点Master分配到各个VertexPartition,EdgePartition也会缓存本地边关联点的Ghost副本。划分策略的不同会影响到所需要缓存的Ghost副本数量,以及每个EdgePartition分配的边的均衡程度,需要根据图的结构特征选取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut这四种策略。
2.1 GraphX的存储模式
Graphx借鉴PowerGraph,使用的是Vertex-Cut(点分割)方式存储图,用三个RDD存储图数据信息:
VertexTable(id, data):id为Vertex id,data为Edge data
EdgeTable(pid, src, dst, data):pid为Partion id,src为原定点id,dst为目的顶点id
RoutingTable(id, pid):id为Vertex id,pid为Partion id
点分割存储实现如下图所示:
2.2 GraphX的图运算符
如同Spark一样,GraphX的Graph类提供了丰富的图运算符,大致结构如下图所示。可以在官方GraphX Programming Guide中找到每个函数的详细说明,本文仅讲述几个需要注意的方法。
2.2.1 图的cache
每个图是由3个RDD组成,所以会占用更多的内存。相应图的cache、unpersist和checkpoint,更需要注意使用技巧。出于最大限度复用边的理念,GraphX的默认接口只提供了unpersistVertices方法。如果要释放边,调用g.edges.unpersist()方法才行,这给用户带来了一定的不便,但为GraphX的优化提供了便利和空间。参考GraphX的Pregel代码,对一个大图,目前最佳的实践是:
大体之意是根据GraphX中Graph的不变性,对g做操作并赋回给g之后,g已不是原来的g了,而且会在下一轮迭代使用,所以必须cache。另外,必须先用prevG保留住对原来图的引用,并在新图产生后,快速将旧图彻底释放掉。否则,十几轮迭代后,会有内存泄漏问题,很快耗光作业缓存空间。
2.2.2 mrTriplets——邻边聚合
mrTriplets(mapReduceTriplets)是GraphX中最核心的一个接口。Pregel也基于它而来,所以对它的优化能很大程度上影响整个GraphX的性能。mrTriplets运算符的简化定义是:
它的计算过程为:map,应用于每一个Triplet上,生成一个或者多个消息,消息以Triplet关联的两个顶点中的任意一个或两个为目标顶点;reduce,应用于每一个Vertex上,将发送给每一个顶点的消息合并起来。
mrTriplets最后返回的是一个VertexRDD[A],包含每一个顶点聚合之后的消息(类型为A),没有接收到消息的顶点不会包含在返回的VertexRDD中。
在最近的版本中,GraphX针对它进行了一些优化,对于Pregel以及所有上层算法工具包的性能都有重大影响。主要包括以下几点。
· Caching for Iterative mrTriplets & Incremental Updates for Iterative mrTriplets:在很多图分析算法中,不同点的收敛速度变化很大。在迭代后期,只有很少的点会有更新。因此,对于没有更新的点,下一次mrTriplets计算时EdgeRDD无需更新相应点值的本地缓存,大幅降低了通信开销。
· Indexing Active Edges:没有更新的顶点在下一轮迭代时不需要向邻居重新发送消息。因此,mrTriplets遍历边时,如果一条边的邻居点值在上一轮迭代时没有更新,则直接跳过,避免了大量无用的计算和通信。
· Join Elimination:Triplet是由一条边和其两个邻居点组成的三元组,操作Triplet的map函数常常只需访问其两个邻居点值中的一个。例如,在PageRank计算中,一个点值的更新只与其源顶点的值有关,而与其所指向的目的顶点的值无关。那么在mrTriplets计算中,就不需要VertexRDD和EdgeRDD的3-way join,而只需要2-way join。
所有这些优化使GraphX的性能逐渐逼近GraphLab。虽然还有一定差距,但一体化的流水线服务和丰富的编程接口,可以弥补性能的微小差距。
2.2.3 Triplets运算符
Triplets运算符,实际是对VertexTable和EdgeTable进行3-way Join操作,得到一组键值对: ((i; j); (Pv(i); PE(i; j); PV(j)))
关系型查询语句表达如下:
SELECT s.Id, t.Id, s.P, e.P, t.P
FROM edges AS e
JOIN vertices AS s, vertices AS t
ON e.srcId = s.Id AND e.dstId = d.Id