I have two RDD each in the format of
我有两个RDD格式的。
{string, HashMap[long,object]}
{字符串,HashMap(长、对象)}
I want to perform a join operation on them such that the hashmap of the same key get merge in scala.
我想在它们上面执行一个连接操作,这样就可以在scala中合并同一个键的hashmap。
RDD1-> {string1,HashMap[{long a,object},{long b,object}]
RDD2-> {string1,HashMap[{long c,object}]
After joining the two RDD, it should be like
在加入两个RDD之后,应该是这样的
RDD->{string1,HashMap[{long a,object},{long b,object},{long c,object}]
Any help will be appreciated, also I am kind of new to scala and spark.
任何的帮助都将被欣赏,我也是一种新的scala和spark。
2 个解决方案
#1
1
You can do by joining the two RDDs and applying a merge function to the tuples of maps:
您可以通过加入两个RDDs并对映射的元组应用一个合并函数来完成:
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] Return an RDD containing all pairs of elements with matching keys in this and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.
def join[W](other: RDD[(K, W)], numsplit: Int): RDD[(K, (V, W))]返回一个RDD,该RDD包含所有对元素,并在此和其他元素中使用匹配的键。每一对元素都将被返回为a (k, (v1, v2))元组,其中(k, v1)在这里,(k, v2)在其他地方。跨集群执行散列连接。
def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
def mapValues(U)(f:(V)⇒U):抽样((K,U))通过键-值对中的每个值通过map函数抽样不改变的关键;这也保留了原始RDD的分区。
assume, there is a function merge like discussed in Best way to merge two maps and sum the values of same key?
假设,有一个函数合并,像讨论的最好的方法,合并两个映射和和的值相同的键?
def [K] merge(a:K,b:K):K = ???
could be like
可能会喜欢
def merge(a:Map[K,V],b:Map[K,V]) = a ++ b
given that, the RDDs can be joined first
考虑到这一点,RDDs可以先加入
val joined = RDD1.join(RDD2)
and then mapped
然后映射
val mapped = joined.mapValues( v => merge(v._1,v._2))
The result is an RDD with (Key, the merged Map)..
结果是一个带有(Key,合并的映射)的RDD。
#2
2
Update: a simpler way is just to take the union and then reduce by key:
更新:一个更简单的方法就是取联合,然后按键减少:
(rdd1 union rdd2).reduceByKey(_++_)
Older solution, just for reference. This can also be done by cogroup
, which collects values for keys in one or both RDDs (whereas join
will omit values that only have a key in one of the original RDDs). See the ScalaDoc.
旧的解决方案,仅供参考。这也可以由cogroup来完成,cogroup为一个或两个RDDs中的键收集值(而join将省略只有一个原始RDDs中的键的值)。看到scala文档中中改编过来。
We then concatenate the lists of values using ++
to form a single list of values, and finally reduce
the values (Maps) to a single Map.
然后使用++将值列表连接起来,形成一个值列表,最后将值(映射)减少为一个映射。
The last two steps can be combined into a single mapValues
operation:
最后两个步骤可以合并为一个mapValues操作:
Using this data...
使用这个数据…
val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))
...in the spark shell:
…在火花壳:
val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}
x foreach println
> (a,Map(1 -> one, 2 -> two, 3 -> three))
#1
1
You can do by joining the two RDDs and applying a merge function to the tuples of maps:
您可以通过加入两个RDDs并对映射的元组应用一个合并函数来完成:
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] Return an RDD containing all pairs of elements with matching keys in this and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.
def join[W](other: RDD[(K, W)], numsplit: Int): RDD[(K, (V, W))]返回一个RDD,该RDD包含所有对元素,并在此和其他元素中使用匹配的键。每一对元素都将被返回为a (k, (v1, v2))元组,其中(k, v1)在这里,(k, v2)在其他地方。跨集群执行散列连接。
def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
def mapValues(U)(f:(V)⇒U):抽样((K,U))通过键-值对中的每个值通过map函数抽样不改变的关键;这也保留了原始RDD的分区。
assume, there is a function merge like discussed in Best way to merge two maps and sum the values of same key?
假设,有一个函数合并,像讨论的最好的方法,合并两个映射和和的值相同的键?
def [K] merge(a:K,b:K):K = ???
could be like
可能会喜欢
def merge(a:Map[K,V],b:Map[K,V]) = a ++ b
given that, the RDDs can be joined first
考虑到这一点,RDDs可以先加入
val joined = RDD1.join(RDD2)
and then mapped
然后映射
val mapped = joined.mapValues( v => merge(v._1,v._2))
The result is an RDD with (Key, the merged Map)..
结果是一个带有(Key,合并的映射)的RDD。
#2
2
Update: a simpler way is just to take the union and then reduce by key:
更新:一个更简单的方法就是取联合,然后按键减少:
(rdd1 union rdd2).reduceByKey(_++_)
Older solution, just for reference. This can also be done by cogroup
, which collects values for keys in one or both RDDs (whereas join
will omit values that only have a key in one of the original RDDs). See the ScalaDoc.
旧的解决方案,仅供参考。这也可以由cogroup来完成,cogroup为一个或两个RDDs中的键收集值(而join将省略只有一个原始RDDs中的键的值)。看到scala文档中中改编过来。
We then concatenate the lists of values using ++
to form a single list of values, and finally reduce
the values (Maps) to a single Map.
然后使用++将值列表连接起来,形成一个值列表,最后将值(映射)减少为一个映射。
The last two steps can be combined into a single mapValues
operation:
最后两个步骤可以合并为一个mapValues操作:
Using this data...
使用这个数据…
val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))
...in the spark shell:
…在火花壳:
val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}
x foreach println
> (a,Map(1 -> one, 2 -> two, 3 -> three))