如何跨分区平衡数据?

时间:2021-04-09 13:46:53

Edit: The answer helps, but I described my solution in: memoryOverhead issue in Spark.

编辑:这个答案很有帮助,但是我在Spark中描述了我的解决方案:内存开销问题。


I have an RDD with 202092 partitions, which reads a dataset created by others. I can manually see that the data is not balanced across the partitions, for example some of them have 0 images and other have 4k, while the mean lies at 432. When processing the data, I got this error:

我有一个带有202092个分区的RDD,它读取其他人创建的数据集。我可以手动地看到数据在各个分区之间是不平衡的,例如有些分区有0个图像,另一些分区有4k,而平均值是432。在处理数据时,我得到了这个错误:

Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

while memoryOverhead is already boosted. I feel that some spikes are happening which make Yarn kill my container, because that spike overflows the specified borders.

内存开销已经增加。我觉得有些尖峰会让纱线杀死我的容器,因为尖峰会超过指定的边界。

So what should I do make sure that my data are (roughly) balanced across partitions?

那么,我应该如何确保我的数据(大致)在各个分区之间保持平衡呢?


My idea was that repartition() would work, it invokes shuffling:

我的想法是repartition()能起作用,它会引发洗牌:

dataset = dataset.repartition(202092)

but I just got the very same error, despite the programming-guide's instructions:

但是我得到了同样的错误,尽管编程指南的说明:

repartition(numPartitions)

重新分区(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

随机重组RDD中的数据以创建更多或更少的分区,并在它们之间进行平衡。这总是使网络上的所有数据混乱。


Check my toy example though:

看看我的玩具例子:

data = sc.parallelize([0,1,2], 3).mapPartitions(lambda x: range((x.next() + 1) * 1000))
d = data.glom().collect()
len(d[0])     # 1000
len(d[1])     # 2000
len(d[2])     # 3000
repartitioned_data = data.repartition(3)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 1854
len(re_d[1])  # 1754
len(re_d[2])  # 2392
repartitioned_data = data.repartition(6)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 422
len(re_d[1])  # 845
len(re_d[2])  # 1643
len(re_d[3])  # 1332
len(re_d[4])  # 1547
len(re_d[5])  # 211
repartitioned_data = data.repartition(12)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 132
len(re_d[1])  # 265
len(re_d[2])  # 530
len(re_d[3])  # 1060
len(re_d[4])  # 1025
len(re_d[5])  # 145
len(re_d[6])  # 290
len(re_d[7])  # 580
len(re_d[8])  # 1113
len(re_d[9])  # 272
len(re_d[10]) # 522
len(re_d[11]) # 66

1 个解决方案

#1


4  

The memory overhead limit exceeding issue I think is due to DirectMemory buffers used during fetch. I think it's fixed in 2.0.0. (We had the same issue, but stopped digging much deeper when we found that upgrading to 2.0.0 resolved it. Unfortunately I don't have Spark issue numbers to back me up.)

我认为超过问题的内存开销限制是由于在获取期间使用的DirectMemory缓冲区造成的。固定在2。0。(我们也遇到了同样的问题,但当我们发现升级到2.0.0解决了这个问题时,就不再深入研究了。不幸的是,我没有足够的问题数据来支持我。


The uneven partitions after repartition are surprising. Contrast with https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443. Spark even generates random keys in repartition, so it is not done with a hash that could be biased.

重新分区后的不均匀分区令人惊讶。与https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala L443。Spark甚至会在重新分区中生成随机键,因此不会使用可能有偏差的散列。

I tried your example and get the exact same results with Spark 1.6.2 and Spark 2.0.0. But not from Scala spark-shell:

我试用了您的示例,并得到了与Spark 1.6.2和Spark 2.0.0完全相同的结果。但不是来自Scala spark-shell:

scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }
data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24

scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res1: Seq[Int] = WrappedArray(1000, 2000, 3000)

scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res2: Seq[Int] = WrappedArray(1999, 2001, 2000)

scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)

scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)

Such beautiful partitions!

这样美丽的分区!


(Sorry this is not a full answer. I just wanted to share my findings so far.)

对不起,这不是一个完整的答案。我只是想分享我到目前为止的发现。

#1


4  

The memory overhead limit exceeding issue I think is due to DirectMemory buffers used during fetch. I think it's fixed in 2.0.0. (We had the same issue, but stopped digging much deeper when we found that upgrading to 2.0.0 resolved it. Unfortunately I don't have Spark issue numbers to back me up.)

我认为超过问题的内存开销限制是由于在获取期间使用的DirectMemory缓冲区造成的。固定在2。0。(我们也遇到了同样的问题,但当我们发现升级到2.0.0解决了这个问题时,就不再深入研究了。不幸的是,我没有足够的问题数据来支持我。


The uneven partitions after repartition are surprising. Contrast with https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443. Spark even generates random keys in repartition, so it is not done with a hash that could be biased.

重新分区后的不均匀分区令人惊讶。与https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala L443。Spark甚至会在重新分区中生成随机键,因此不会使用可能有偏差的散列。

I tried your example and get the exact same results with Spark 1.6.2 and Spark 2.0.0. But not from Scala spark-shell:

我试用了您的示例,并得到了与Spark 1.6.2和Spark 2.0.0完全相同的结果。但不是来自Scala spark-shell:

scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }
data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24

scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res1: Seq[Int] = WrappedArray(1000, 2000, 3000)

scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res2: Seq[Int] = WrappedArray(1999, 2001, 2000)

scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)

scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)

Such beautiful partitions!

这样美丽的分区!


(Sorry this is not a full answer. I just wanted to share my findings so far.)

对不起,这不是一个完整的答案。我只是想分享我到目前为止的发现。