大型数据集缓存中的“GC开销限制超出”到火花存储器中(通过sparklyr和RStudio)

时间:2020-12-28 04:49:35

I am very new to the Big Data technologies I am attempting to work with, but have so far managed to set up sparklyr in RStudio to connect to a standalone Spark cluster. Data is stored in Cassandra, and I can successfully bring large datsets into Spark memory (cache) to run further analysis on it.

我对我正在尝试使用的大数据技术非常陌生,但到目前为止,我已设法在RStudio中设置sparklyr以连接到独立的Spark集群。数据存储在Cassandra中,我可以成功地将大型数据集引入Spark内存(缓存)以对其进行进一步分析。

However, recently I have been having a lot of trouble bringing in one particularly large dataset into Spark memory, even though the cluster should have more than enough resources (60 cores, 200GB RAM) to handle a dataset of its size.

然而,最近我在将一个特别大的数据集引入Spark内存时遇到了很多麻烦,即使集群应该有足够的资源(60个内核,200GB RAM)来处理其大小的数据集。

I thought that by limiting the data being cached to just a few select columns of interest I could overcome the issue (using the answer code from my previous query here), but it does not. What happens is the jar process on my local machine ramps up to take over up all the local RAM and CPU resources and the whole process freezes, and on the cluster executers keep getting dropped and re-added. Weirdly, this happens even when I select only 1 row for cacheing (which should make this dataset much smaller than other datasets which I have had no problem cacheing into Spark memory).

我认为通过将缓存的数据限制为只有几个感兴趣的列,我可以克服这个问题(使用我之前的查询中的答案代码),但事实并非如此。发生的事情是我本地计算机上的jar进程会占用所有本地RAM和CPU资源并且整个进程冻结,并且群集执行程序不断被删除并重新添加。奇怪的是,即使我只选择1行进行缓存(这应该使这个数据集比我没有问题缓存到Spark内存中的其他数据集小得多)也会发生这种情况。

I've had a look through the logs, and these seem to be the only informative errors/warnings early on in the process:

我已经查看了日志,这些似乎是过程中早期唯一的信息性错误/警告:

17/03/06 11:40:27 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 33813 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.
17/03/06 11:40:27 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 8167), so marking it as still running
...
17/03/06 11:46:59 WARN TaskSetManager: Lost task 3927.3 in stage 0.0 (TID 54882, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 3863), so marking it as still running
17/03/06 11:46:59 WARN TaskSetManager: Lost task 4300.3 in stage 0.0 (TID 54667, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 14069), so marking it as still running

And then after 20min or so the whole job crashes with:

大约20分钟后,整个工作崩溃了:

java.lang.OutOfMemoryError: GC overhead limit exceeded

I've changed my connect config to increase the heartbeat interval ( spark.executor.heartbeatInterval: '180s' ), and have seen how to increase memoryOverhead by changing settings on a yarn cluster ( using spark.yarn.executor.memoryOverhead ), but not on a standalone cluster.

我已经更改了我的连接配置以增加心跳间隔(spark.executor.heartbeatInterval:'180s'),并且已经看到如何通过更改纱线群集上的设置(使用spark.yarn.executor.memoryOverhead)来增加memoryOverhead,但是不在独立群集上。

In my config file, I have experimented by adding each of the following settings one at a time (none of which have worked):

在我的配置文件中,我通过一次添加以下每个设置进行了实验(没有一个工作):

spark.memory.fraction: 0.3
spark.executor.extraJavaOptions: '-Xmx24g'
spark.driver.memory: "64G"
spark.driver.extraJavaOptions: '-XX:MaxHeapSize=1024m'
spark.driver.extraJavaOptions: '-XX:+UseG1GC'

UPDATE: and my full current yml config file is as follows:

更新:我的完整当前yml配置文件如下:

default:
# local settings
  sparklyr.sanitize.column.names: TRUE
  sparklyr.cores.local: 3
  sparklyr.shell.driver-memory: "8G"

# remote core/memory settings
  spark.executor.memory: "32G"
  spark.executor.cores: 5
  spark.executor.heartbeatInterval: '180s'
  spark.ext.h2o.nthreads: 10
  spark.cores.max: 30
  spark.memory.storageFraction: 0.6
  spark.memory.fraction: 0.3
  spark.network.timeout: 300
  spark.driver.extraJavaOptions: '-XX:+UseG1GC'

# other configs for spark
  spark.serializer: org.apache.spark.serializer.KryoSerializer
  spark.executor.extraClassPath: /var/lib/cassandra/jar/guava-18.0.jar

# cassandra settings
  spark.cassandra.connection.host: <cassandra_ip>
  spark.cassandra.auth.username: <cassandra_login>
  spark.cassandra.auth.password: <cassandra_pass>
  spark.cassandra.connection.keep_alive_ms: 60000

# spark packages to load
  sparklyr.defaultPackages: 
  - "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M1"
  - "com.databricks:spark-csv_2.11:1.3.0"
  - "com.datastax.cassandra:cassandra-driver-core:3.0.2"
  - "com.amazonaws:aws-java-sdk-pom:1.10.34"

So my question are:

所以我的问题是:

  1. Does anyone have any ideas about what to do in this instance?
    Are
  2. 有没有人对在这个例子中做什么有任何想法?是

  3. Are there config settings I can change to help with this issue?
  4. 是否有配置设置我可以更改以帮助解决此问题?

  5. Alternatively, is there a way to import the cassandra data in batches with RStudio/sparklyr as the driver?
  6. 或者,有没有办法以RStudio / sparklyr作为驱动程序批量导入cassandra数据?

  7. Or alternatively again, is there a way to munge/filter/edit data as it is brought into cache so that the resulting table is smaller (similar to using SQL querying, but with more complex dplyr syntax)?
  8. 或者另外,有没有办法在数据进入缓存时进行munge /过滤/编辑数据,以便生成的表更小(类似于使用SQL查询,但使用更复杂的dplyr语法)?

1 个解决方案

#1


1  

OK, I've finally managed to make this work!

好的,我终于成功完成了这项工作!

I'd initially tried the suggestion of @user6910411 to decrease the cassandra input split size, but this failed in the same way. After playing around with LOTS of other things, today I tried changing that setting in the opposite direction:

我最初尝试使用@ user6910411的建议来减少cassandra输入分割大小,但这种方法失败了。在玩了很多其他的东西之后,今天我尝试改变相反方向的设置:

spark.cassandra.input.split.size_in_mb: 254 

By INCREASING the split size, there were fewer spark tasks, and thus less overhead and fewer calls to the GC. It worked!

通过增加分割大小,可以减少火花任务,从而减少开销,减少对GC的调用。有效!

#1


1  

OK, I've finally managed to make this work!

好的,我终于成功完成了这项工作!

I'd initially tried the suggestion of @user6910411 to decrease the cassandra input split size, but this failed in the same way. After playing around with LOTS of other things, today I tried changing that setting in the opposite direction:

我最初尝试使用@ user6910411的建议来减少cassandra输入分割大小,但这种方法失败了。在玩了很多其他的东西之后,今天我尝试改变相反方向的设置:

spark.cassandra.input.split.size_in_mb: 254 

By INCREASING the split size, there were fewer spark tasks, and thus less overhead and fewer calls to the GC. It worked!

通过增加分割大小,可以减少火花任务,从而减少开销,减少对GC的调用。有效!