如何通过spark RDD中的一个键连接两个hashmap

时间:2021-08-28 20:48:53

I have two RDD each in the format of

我有两个RDD格式的。

{string, HashMap[long,object]}

{字符串,HashMap(长、对象)}

I want to perform a join operation on them such that the hashmap of the same key get merge in scala.

我想在它们上面执行一个连接操作,这样就可以在scala中合并同一个键的hashmap。

RDD1-> {string1,HashMap[{long a,object},{long b,object}]
RDD2-> {string1,HashMap[{long c,object}]

After joining the two RDD, it should be like

在加入两个RDD之后,应该是这样的

RDD->{string1,HashMap[{long a,object},{long b,object},{long c,object}]

Any help will be appreciated, also I am kind of new to scala and spark.

任何的帮助都将被欣赏,我也是一种新的scala和spark。

2 个解决方案

#1


1  

You can do by joining the two RDDs and applying a merge function to the tuples of maps:

您可以通过加入两个RDDs并对映射的元组应用一个合并函数来完成:

def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] Return an RDD containing all pairs of elements with matching keys in this and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.

def join[W](other: RDD[(K, W)], numsplit: Int): RDD[(K, (V, W))]返回一个RDD,该RDD包含所有对元素,并在此和其他元素中使用匹配的键。每一对元素都将被返回为a (k, (v1, v2))元组,其中(k, v1)在这里,(k, v2)在其他地方。跨集群执行散列连接。

def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.

def mapValues(U)(f:(V)⇒U):抽样((K,U))通过键-值对中的每个值通过map函数抽样不改变的关键;这也保留了原始RDD的分区。

assume, there is a function merge like discussed in Best way to merge two maps and sum the values of same key?

假设,有一个函数合并,像讨论的最好的方法,合并两个映射和和的值相同的键?

def [K] merge(a:K,b:K):K = ???

could be like

可能会喜欢

def merge(a:Map[K,V],b:Map[K,V]) = a ++ b

given that, the RDDs can be joined first

考虑到这一点,RDDs可以先加入

val joined = RDD1.join(RDD2) 

and then mapped

然后映射

val mapped = joined.mapValues( v => merge(v._1,v._2))

The result is an RDD with (Key, the merged Map)..

结果是一个带有(Key,合并的映射)的RDD。

#2


2  

Update: a simpler way is just to take the union and then reduce by key:

更新:一个更简单的方法就是取联合,然后按键减少:

(rdd1 union rdd2).reduceByKey(_++_)

Older solution, just for reference. This can also be done by cogroup, which collects values for keys in one or both RDDs (whereas join will omit values that only have a key in one of the original RDDs). See the ScalaDoc.

旧的解决方案,仅供参考。这也可以由cogroup来完成,cogroup为一个或两个RDDs中的键收集值(而join将省略只有一个原始RDDs中的键的值)。看到scala文档中中改编过来。

We then concatenate the lists of values using ++ to form a single list of values, and finally reduce the values (Maps) to a single Map.

然后使用++将值列表连接起来,形成一个值列表,最后将值(映射)减少为一个映射。

The last two steps can be combined into a single mapValues operation:

最后两个步骤可以合并为一个mapValues操作:

Using this data...

使用这个数据…

val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))

...in the spark shell:

…在火花壳:

val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}

x foreach println

> (a,Map(1 -> one, 2 -> two, 3 -> three))

#1


1  

You can do by joining the two RDDs and applying a merge function to the tuples of maps:

您可以通过加入两个RDDs并对映射的元组应用一个合并函数来完成:

def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] Return an RDD containing all pairs of elements with matching keys in this and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.

def join[W](other: RDD[(K, W)], numsplit: Int): RDD[(K, (V, W))]返回一个RDD,该RDD包含所有对元素,并在此和其他元素中使用匹配的键。每一对元素都将被返回为a (k, (v1, v2))元组,其中(k, v1)在这里,(k, v2)在其他地方。跨集群执行散列连接。

def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.

def mapValues(U)(f:(V)⇒U):抽样((K,U))通过键-值对中的每个值通过map函数抽样不改变的关键;这也保留了原始RDD的分区。

assume, there is a function merge like discussed in Best way to merge two maps and sum the values of same key?

假设,有一个函数合并,像讨论的最好的方法,合并两个映射和和的值相同的键?

def [K] merge(a:K,b:K):K = ???

could be like

可能会喜欢

def merge(a:Map[K,V],b:Map[K,V]) = a ++ b

given that, the RDDs can be joined first

考虑到这一点,RDDs可以先加入

val joined = RDD1.join(RDD2) 

and then mapped

然后映射

val mapped = joined.mapValues( v => merge(v._1,v._2))

The result is an RDD with (Key, the merged Map)..

结果是一个带有(Key,合并的映射)的RDD。

#2


2  

Update: a simpler way is just to take the union and then reduce by key:

更新:一个更简单的方法就是取联合,然后按键减少:

(rdd1 union rdd2).reduceByKey(_++_)

Older solution, just for reference. This can also be done by cogroup, which collects values for keys in one or both RDDs (whereas join will omit values that only have a key in one of the original RDDs). See the ScalaDoc.

旧的解决方案,仅供参考。这也可以由cogroup来完成,cogroup为一个或两个RDDs中的键收集值(而join将省略只有一个原始RDDs中的键的值)。看到scala文档中中改编过来。

We then concatenate the lists of values using ++ to form a single list of values, and finally reduce the values (Maps) to a single Map.

然后使用++将值列表连接起来,形成一个值列表,最后将值(映射)减少为一个映射。

The last two steps can be combined into a single mapValues operation:

最后两个步骤可以合并为一个mapValues操作:

Using this data...

使用这个数据…

val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))

...in the spark shell:

…在火花壳:

val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}

x foreach println

> (a,Map(1 -> one, 2 -> two, 3 -> three))