如何将包含(vertexId、edgeId)的映射转换为GraphX RDDs

时间:2020-12-06 23:11:19

After parsing the graph from a file, I get a Map where the key represents vertices (id) and the value represents the edge (id). In order to create the edges (Vx->Vy) we need to join the Map entries using the values (the edge id). The goal is to create a GraphX graph from this representation.

从文件解析图形之后,我得到一个映射,其中键表示顶点(id),值表示边(id)。为了创建边缘(Vx->Vy),我们需要使用值(边缘id)加入映射条目。目标是从这个表示创建一个GraphX图形。

Here is what I have so far:

以下是我目前所拥有的:

tempLHM.foreach(x=>println(x))

(A.L0,A)
(B.L0,B)
(C.L0,C)
(D.L0,D)
(E.L0,E)
(a.L0M1,A)
(b.L0M1,B)
(c.L0M1,n4)
(a.L0M2,n4)
(b.L0M2,D)
(c.L0M2,n5)
(a.L0M3,n5)
(b.L0M3,C)
(c.L0M3,E)

Is there a direct way to map this hashmap to vertex and edge RDD?

有直接的方法将这个hashmap映射到顶点和边RDD吗?

tempLHM is a mutable LinkedHashMap[String,String]. In the above hashmap, in elements (A.L0,A) and (a.L0M1,A), A.L0 and a.L0M1 are keys(vertices) that are joined by the common value A (edge)

tempLHM是一个可变的LinkedHashMap[String,String]。在上面的hashmap中,在元素(A.L0,A)和(a.L0M1,A)中,A。L0和。L0M1是由公共值A(边)连接的键(顶点)

Here is what I want to derive

这是我想推导的。

val vertex:RDD(vertexId, VertexName)  i.e ((A.L0).Long, A.L0), ((a.L0M1).Long, a.L0M1) etc

val edge:RDD((vertexId1, vertexId2), EdgeName) i.e ((A.L0).Long, (a.L0M1).Long), A)

1 个解决方案

#1


3  

Assume you have this structure for your data.

假设您的数据具有这种结构。

val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")

Two edges here ("v1","v2") and ("v3","v4")

这里的两条边(“v1”、“v2”)和(“v3”、“v4”)

Assuming you have a simple graph (not a hyper-graph that can have multiple nodes connected by an edge). Therefore, the assumption for this solution is that an edge only connects two nodes and that edges appear just once.

假设您有一个简单的图(不是一个可以通过一条边连接多个节点的超图)。因此,这个解决方案的假设是,一条边只连接两个节点,而那条边只出现一次。

import collection.mutable.{ HashMap, MultiMap, Set }
import java.security.MessageDigest
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph

// a hacky way to go from string to Long since GraphX need Longs to
// represent vertex IDs. You might want to do something different 
// here to make sure that your IDs are unique.
def str2Long(s: String) = s.##.toLong

val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")

// We use a multi-map to create an inverse map (Edge->Set(Vertices))
val mm = new HashMap[String, Set[String]] with MultiMap[String, String]
d.foreach{ x => mm.addBinding(x._2,x._1) }

val edges = mm.map{ case(k,v) => Edge[String](str2Long(v.head),str2Long(v.last), k) }.toList
val vertices = d.keys.map(x => (str2Long(x), x)).toList

val edgeRdd = sc.parallelize(edges)
val vertexRdd = sc.parallelize(vertices)

val g = Graph(vertexRdd, edgeRdd)

If you print the edges and the vertices you get:

如果你打印边缘和顶点你会得到:

g.vertices.foreach(println)
g.edges.foreach(println)


(3709,v3)
(3707,v1)
(3708,v2)
(3710,v4)
Edge(3709,3710,e2)
Edge(3707,3708,e1)

Note: The solution here will only work for data that fit in the memory of a single node. From your question I see that you load the data in a local Map, so the following solution would work for you. If you want to run this on a huge dataset with multiple nodes, the above solution will not work.

注意:这里的解决方案只适用于适合单个节点内存的数据。从您的问题中,我看到您将数据加载到本地映射中,因此以下解决方案将适用于您。如果您想在具有多个节点的大型数据集上运行此解决方案,那么上面的解决方案将不起作用。


Updated Solution

This solution is more scalable that the one above. It makes sure that you always stay in the RDD domain without the need to collect the graph at the driver (for example, above we loaded all the raw data in a scala Map, which we are going to avoid here). It also covers the case where we have a common edge ID between different nodes (in a hyper-graph like way).

这个解决方案更具有可扩展性。它确保您始终保持在RDD域中,而不需要在驱动程序中收集图形(例如,上面我们在scala映射中加载了所有原始数据,我们将在这里避免这种情况)。它还涵盖了在不同节点之间有一个公共边缘ID的情况(以超图的方式)。

Let's assume that the text file has this format:

假设文本文件有这样的格式:

v1,e1 
v2,e1 
v3,e2
v4,e2

In the code below, we first read the raw data and then we transform them to the proper vertex and edge RDDs.

在下面的代码中,我们首先读取原始数据,然后将它们转换为正确的顶点和边缘RDDs。

import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph

def str2Long(s: String) = s.##.toLong

val rawData: RDD[String] = sc.textFile("...")

val toBeJoined: RDD[(String, String)] 
  = rawData.map(_.split(",")).map{ case Array(x,y) => (y,x) }

Note here that our resulting graph will be bidirectional: If we have edge (v1,v2) we also have edge (v2,v1).

注意,我们得到的图像是双向的如果我们有边(v1,v2)我们也有边(v2,v1)

val biDirectionalEdges: RDD[(String, (String, String))] 
  = toBeJoined.join(toBeJoined).filter{ case(e,(v1,v2)) => v1 != v2 }

val edgeRdd = 
  biDirectionalEdges.map{ case(e,v) => Edge[String](str2Long(v._1),str2Long(v._2), e) }
val vertexRdd = 
  toBeJoined.map(_._1).distinct.map(x => (str2Long(x), x))

val g = Graph(vertexRdd, edgeRdd)

// Verify that this is the right graph
g.vertices.take(10).foreach(println)
g.edges.take(10).foreach(println)

#1


3  

Assume you have this structure for your data.

假设您的数据具有这种结构。

val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")

Two edges here ("v1","v2") and ("v3","v4")

这里的两条边(“v1”、“v2”)和(“v3”、“v4”)

Assuming you have a simple graph (not a hyper-graph that can have multiple nodes connected by an edge). Therefore, the assumption for this solution is that an edge only connects two nodes and that edges appear just once.

假设您有一个简单的图(不是一个可以通过一条边连接多个节点的超图)。因此,这个解决方案的假设是,一条边只连接两个节点,而那条边只出现一次。

import collection.mutable.{ HashMap, MultiMap, Set }
import java.security.MessageDigest
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph

// a hacky way to go from string to Long since GraphX need Longs to
// represent vertex IDs. You might want to do something different 
// here to make sure that your IDs are unique.
def str2Long(s: String) = s.##.toLong

val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")

// We use a multi-map to create an inverse map (Edge->Set(Vertices))
val mm = new HashMap[String, Set[String]] with MultiMap[String, String]
d.foreach{ x => mm.addBinding(x._2,x._1) }

val edges = mm.map{ case(k,v) => Edge[String](str2Long(v.head),str2Long(v.last), k) }.toList
val vertices = d.keys.map(x => (str2Long(x), x)).toList

val edgeRdd = sc.parallelize(edges)
val vertexRdd = sc.parallelize(vertices)

val g = Graph(vertexRdd, edgeRdd)

If you print the edges and the vertices you get:

如果你打印边缘和顶点你会得到:

g.vertices.foreach(println)
g.edges.foreach(println)


(3709,v3)
(3707,v1)
(3708,v2)
(3710,v4)
Edge(3709,3710,e2)
Edge(3707,3708,e1)

Note: The solution here will only work for data that fit in the memory of a single node. From your question I see that you load the data in a local Map, so the following solution would work for you. If you want to run this on a huge dataset with multiple nodes, the above solution will not work.

注意:这里的解决方案只适用于适合单个节点内存的数据。从您的问题中,我看到您将数据加载到本地映射中,因此以下解决方案将适用于您。如果您想在具有多个节点的大型数据集上运行此解决方案,那么上面的解决方案将不起作用。


Updated Solution

This solution is more scalable that the one above. It makes sure that you always stay in the RDD domain without the need to collect the graph at the driver (for example, above we loaded all the raw data in a scala Map, which we are going to avoid here). It also covers the case where we have a common edge ID between different nodes (in a hyper-graph like way).

这个解决方案更具有可扩展性。它确保您始终保持在RDD域中,而不需要在驱动程序中收集图形(例如,上面我们在scala映射中加载了所有原始数据,我们将在这里避免这种情况)。它还涵盖了在不同节点之间有一个公共边缘ID的情况(以超图的方式)。

Let's assume that the text file has this format:

假设文本文件有这样的格式:

v1,e1 
v2,e1 
v3,e2
v4,e2

In the code below, we first read the raw data and then we transform them to the proper vertex and edge RDDs.

在下面的代码中,我们首先读取原始数据,然后将它们转换为正确的顶点和边缘RDDs。

import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph

def str2Long(s: String) = s.##.toLong

val rawData: RDD[String] = sc.textFile("...")

val toBeJoined: RDD[(String, String)] 
  = rawData.map(_.split(",")).map{ case Array(x,y) => (y,x) }

Note here that our resulting graph will be bidirectional: If we have edge (v1,v2) we also have edge (v2,v1).

注意,我们得到的图像是双向的如果我们有边(v1,v2)我们也有边(v2,v1)

val biDirectionalEdges: RDD[(String, (String, String))] 
  = toBeJoined.join(toBeJoined).filter{ case(e,(v1,v2)) => v1 != v2 }

val edgeRdd = 
  biDirectionalEdges.map{ case(e,v) => Edge[String](str2Long(v._1),str2Long(v._2), e) }
val vertexRdd = 
  toBeJoined.map(_._1).distinct.map(x => (str2Long(x), x))

val g = Graph(vertexRdd, edgeRdd)

// Verify that this is the right graph
g.vertices.take(10).foreach(println)
g.edges.take(10).foreach(println)