版权声明:本文为博主原创文章,未经博主允许不得转载。
Spark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平台提供对图计算和图挖掘简洁易用的而丰富多彩的接口,极大的方便了大家对分布式图处理的需求。Spark GraphX由于底层是基于Spark来处理的,所以天然就是一个分布式的图处理系统。图的分布式或者并行处理其实是把这张图拆分成很多的子图,然后我们分别对这些子图进行计算,计算的时候可以分别迭代进行分阶段的计算,即对图进行并行计算。
Spark GraphX基本操作:
- import org.apache.spark.SparkContext
- import org.apache.spark._
- import org.apache.spark.graphx._
- import org.apache.spark.graphx.Graph
- import org.apache.spark.graphx.Edge
- import org.apache.spark.graphx.VertexRDD
- import org.apache.spark.graphx.util.GraphGenerators
- import org.apache.spark.graphx.GraphLoader
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.rdd.RDD
- object SparkGraphx1 {
- def main(args: Array[String]) {
- val sc = new SparkContext("spark://centos.host1:7077", "Spark Graphx")
- //创建点RDD
- val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array(
- (3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
- (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
- //创建边RDD
- val relationships: RDD[Edge[String]] = sc.parallelize(Array(
- Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
- Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
- //定义一个默认用户,避免有不存在用户的关系
- val defaultUser = ("John Doe", "Missing")
- //构造Graph
- val graph = Graph(users, relationships, defaultUser)
- //点RDD、边RDD过滤
- val fcount1 = graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
- println("postdocs users count: " + fcount1)
- val fcount2 = graph.edges.filter(edge => edge.srcId > edge.dstId).count
- println("srcId > dstId edges count: " + fcount2)
- val fcount3 = graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
- println("srcId > dstId edges count: " + fcount3)
- //Triplets(三元组),包含源点、源点属性、目标点、目标点属性、边属性
- val triplets: RDD[String] = graph.triplets.map(triplet => triplet.srcId + "-" +
- triplet.srcAttr._1 + "-" + triplet.attr + "-" + triplet.dstId + "-" + triplet.dstAttr._1)
- triplets.collect().foreach(println(_))
- //度、入度、出度
- val degrees: VertexRDD[Int] = graph.degrees;
- degrees.collect().foreach(println)
- val inDegrees: VertexRDD[Int] = graph.inDegrees
- inDegrees.collect().foreach(println)
- val outDegrees: VertexRDD[Int] = graph.outDegrees
- outDegrees.collect().foreach(println)
- //构建子图
- val subGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
- subGraph.vertices.collect().foreach(println(_))
- subGraph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
- .collect().foreach(println(_))
- //Map操作,根据原图的一些特性得到新图,原图结构是不变的,下面两个逻辑是等价的,但是第一个不会被graphx系统优化
- val newVertices = graph.vertices.map { case (id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")) }
- val newGraph1 = Graph(newVertices, graph.edges)
- val newGraph2 = graph.mapVertices((id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")))
- //构造一个新图,顶点属性是出度
- val inputGraph: Graph[Int, String] =
- graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
- //根据顶点属性为出度的图构造一个新图,依据PageRank算法初始化边与点
- val outputGraph: Graph[Double, Double] =
- inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
- //图的反向操作,新的图形的所有边的方向相反,不修改顶点或边性属性、不改变的边的数目,它可以有效地实现不必要的数据移动或复制
- var rGraph = graph.reverse
- //Mask操作也是根据输入图构造一个新图,达到一个限制制约的效果
- val ccGraph = graph.connectedComponents()
- val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
- val validCCGraph = ccGraph.mask(validGraph)
- //Join操作,原图外连出度点构造一个新图 ,出度为顶点属性
- val degreeGraph2 = graph.outerJoinVertices(outDegrees) { (id, attr, outDegreeOpt) =>
- outDegreeOpt match {
- case Some(outDeg) => outDeg
- case None => 0 //没有出度标识为零
- }
- }
- //缓存。默认情况下,缓存在内存的图会在内存紧张的时候被强制清理,采用的是LRU算法
- graph.cache()
- graph.persist(StorageLevel.MEMORY_ONLY)
- graph.unpersistVertices(true)
- //GraphLoader构建Graph
- var path = "/user/hadoop/data/temp/graph/graph.txt"
- var minEdgePartitions = 1
- var canonicalOrientation = false // if sourceId < destId this value is true
- val graph1 = GraphLoader.edgeListFile(sc, path, canonicalOrientation, minEdgePartitions,
- StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY)
- val verticesCount = graph1.vertices.count
- println(s"verticesCount: $verticesCount")
- graph1.vertices.collect().foreach(println)
- val edgesCount = graph1.edges.count
- println(s"edgesCount: $edgesCount")
- graph1.edges.collect().foreach(println)
- //PageRank
- val pageRankGraph = graph1.pageRank(0.001)
- pageRankGraph.vertices.sortBy(_._2, false).saveAsTextFile("/user/hadoop/data/temp/graph/graph.pr")
- pageRankGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
- //Connected Components
- val connectedComponentsGraph = graph1.connectedComponents()
- connectedComponentsGraph.vertices.sortBy(_._2, false).saveAsTextFile("/user/hadoop/data/temp/graph/graph.cc")
- connectedComponentsGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
- //TriangleCount主要用途之一是用于社区发现 保持sourceId小于destId
- val graph2 = GraphLoader.edgeListFile(sc, path, true)
- val triangleCountGraph = graph2.triangleCount()
- triangleCountGraph.vertices.sortBy(_._2, false).saveAsTextFile("/user/hadoop/data/temp/graph/graph.tc")
- triangleCountGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
- sc.stop()
- }
- }
Spark学习笔记-GraphX-1的更多相关文章
-
Spark学习笔记--Graphx
浅谈Graphx: http://blog.csdn.net/shangwen_/article/details/38645601 Pregel: http://blog.csdn.net/shang ...
-
spark学习笔记总结-spark入门资料精化
Spark学习笔记 Spark简介 spark 可以很容易和yarn结合,直接调用HDFS.Hbase上面的数据,和hadoop结合.配置很容易. spark发展迅猛,框架比hadoop更加灵活实用. ...
-
Spark学习笔记0——简单了解和技术架构
目录 Spark学习笔记0--简单了解和技术架构 什么是Spark 技术架构和软件栈 Spark Core Spark SQL Spark Streaming MLlib GraphX 集群管理器 受 ...
-
Spark学习笔记之SparkRDD
Spark学习笔记之SparkRDD 一. 基本概念 RDD(resilient distributed datasets)弹性分布式数据集. 来自于两方面 ① 内存集合和外部存储系统 ② ...
-
Spark学习笔记2(spark所需环境配置
Spark学习笔记2 配置spark所需环境 1.首先先把本地的maven的压缩包解压到本地文件夹中,安装好本地的maven客户端程序,版本没有什么要求 不需要最新版的maven客户端. 解压完成之后 ...
-
Spark学习笔记3(IDEA编写scala代码并打包上传集群运行)
Spark学习笔记3 IDEA编写scala代码并打包上传集群运行 我们在IDEA上的maven项目已经搭建完成了,现在可以写一个简单的spark代码并且打成jar包 上传至集群,来检验一下我们的sp ...
-
Spark学习笔记3——RDD(下)
目录 Spark学习笔记3--RDD(下) 向Spark传递函数 通过匿名内部类 通过具名类传递 通过带参数的 Java 函数类传递 通过 lambda 表达式传递(仅限于 Java 8 及以上) 常 ...
-
Spark学习笔记2——RDD(上)
目录 Spark学习笔记2--RDD(上) RDD是什么? 例子 创建 RDD 并行化方式 读取外部数据集方式 RDD 操作 转化操作 行动操作 惰性求值 Spark学习笔记2--RDD(上) 笔记摘 ...
-
Spark学习笔记1——第一个Spark程序:单词数统计
Spark学习笔记1--第一个Spark程序:单词数统计 笔记摘抄自 [美] Holden Karau 等著的<Spark快速大数据分析> 添加依赖 通过 Maven 添加 Spark-c ...
随机推荐
-
解决JQuery中datatables设置隐藏显示列多次提交后台刷新数据的问题
此次项目开发过程中用到了Jquery的Datatables插件,无疑他是数据列表展示,解决MVC中同步过程中先走控制器后返回视图,查询数据过程中无法提示等待的弊端, 而且他所提供的各种方法也都有较强的 ...
-
SQL Server存储内幕系列
http://blog.itpub.net/355374/list/1/?cid=75087
-
C++ STL之set常用指令
set,关联容器,元素不允许有重复,数据被组织成一棵红黑树,以牺牲插入和删除元素的效率换来了查找元素的高效率(O(logN)). 1.初始化 set<int> st; 2.begin返回第 ...
-
zookeeper集群配置
zookeeper集群网上demo一大堆,补充一下一些不明白的地方 1 复制2份zookeeper,savle作为备份节点 2.配置zoo.cfg # The number of millisecon ...
-
【集美大学1411_助教博客】团队作业7——Alpha冲刺之事后诸葛亮
写在前面的话 alpha阶段都顺利完成了,大家这次作业完成得都很认真.我觉得通过这些问题,大家既可以回顾自己的alpha阶段,又可以给beta阶段做一些指引.但看了所有组的博客,没有一个组在这些问题之 ...
-
DOM&;JavaScript示例&;练习
以下示例均为html文件,保存至本地就可直接用浏览器打开以查看效果\(^o^)/~ 练习一:设置新闻字体 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTM ...
-
HBase事务
众所周知,ACID是指原子性(Atomicity),一致性(Consistency),隔离性(Isolation)和持久性(Durability). HBase对同一行数据的操作提供ACID保证.HB ...
-
Oracle的nvl函数和nvl2函数
一.基本语法 介绍一下oracle的nvl函数和nvl2函数. nvl函数 nvl函数基本语法为nvl(E1,E2),意思是E1为null就返回E2,不为null就返回E1. nvl2函数 nvl2函 ...
-
【工具相关】Web-Sublime Text2-通过Package Control安装插件
一,Sublime Text2--->Preferences--->Package Control-->输入install---> 下方就会提示“Package Control ...
-
Image Based Lighting In UE3
"IBL"全称为"Image-based Lighint",是一种伪装全局光照的方法.使用该方法可以获得较好的视觉效果并且可以达到实时渲染的目的. 实现的方法之 ...