I'm currently trying to bulk load data into HBase from Spark and am primarily working with the following examples:
http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/ http://zeyuanxy.github.io/hbase_bulk_loading/
However my aggregation of the data at the start is a fair bit more complex.
The source files are about 40GB of AVRO Records with a fair number of (possibly null) fields (>200). My whole thing runs through but during saveAsNewApihadoopFile containers start getting killed by for exceeding memory limits. I have tried higher numbers of partitions (up to 4000) but still i get containers failing the same happens when I give the executors more memory (4 GB each). Also i get very high GC times which then in turn makes the whole thing awefuly slow.
源文件大约是40GB的AVRO Records,其中包含相当数量(可能为空)的字段(> 200)。我的整个事情贯穿但在saveAsNewApihadoopFile容器开始因超出内存限制而被杀死。我已经尝试了更多的分区(最多4000个),但是当我给执行程序更多的内存(每个4 GB)时,我仍然得到容器失败。此外,我得到非常高的GC时间,然后反过来使整个事情变得非常缓慢。
Here are some questions:
Does anyone know how I could further profile the job to find out exactly why the executors need so much memory? Or what I could do to mitigate it?
Do I need to execute an action first before calling saveAsNewApihadoopFile to narrow down the problem and maybe avoid needless redistribution of data(part of my workflow is a repartitionAndSortWithinPartition)?
1 个解决方案
First of all you could try to tune spark.yarn.executor.memoryOverhead
and "memory fraction"-related settings.
首先,您可以尝试调整spark.yarn.executor.memoryOverhead和“memory fraction”相关设置。
Regarding profiling, there are few options that depend on how close you can get to actual nodes and their jvms and logs:
- If it's possible, try to enable JMX on executors JVMs and connect to any of them with tool like VisualVM to be able to see actual stats.
- In case of limited access you can do or request memory dumps from executor JVM.
- And last resort - enable memory profiling via
and adjust it with next options (check if they are suitable for GC of your choice):
最后一种方法 - 通过spark.executor.extraJavaOptions启用内存分析,并使用下一个选项进行调整(检查它们是否适合您选择的GC):
-XX:+UnlockDiagnosticVMOptions -XX:+PrintGCDetails -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+G1SummarizeConcMark
so you will be able to have diagnostic output in your executor logs.
-XX:+ UnlockDiagnosticVMOptions -XX:+ PrintGCDetails -XX:+ PrintFlagsFinal -XX:+ PrintReferenceGC -XX:+ PrintGCTimeStamps -XX:+ PrintAdaptiveSizePolicy -XX:+ G1SummarizeConcMark,这样您就可以在执行程序日志中获得诊断输出。
First of all you could try to tune spark.yarn.executor.memoryOverhead
and "memory fraction"-related settings.
首先,您可以尝试调整spark.yarn.executor.memoryOverhead和“memory fraction”相关设置。
Regarding profiling, there are few options that depend on how close you can get to actual nodes and their jvms and logs:
- If it's possible, try to enable JMX on executors JVMs and connect to any of them with tool like VisualVM to be able to see actual stats.
- In case of limited access you can do or request memory dumps from executor JVM.
- And last resort - enable memory profiling via
and adjust it with next options (check if they are suitable for GC of your choice):
最后一种方法 - 通过spark.executor.extraJavaOptions启用内存分析,并使用下一个选项进行调整(检查它们是否适合您选择的GC):
-XX:+UnlockDiagnosticVMOptions -XX:+PrintGCDetails -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+G1SummarizeConcMark
so you will be able to have diagnostic output in your executor logs.
-XX:+ UnlockDiagnosticVMOptions -XX:+ PrintGCDetails -XX:+ PrintFlagsFinal -XX:+ PrintReferenceGC -XX:+ PrintGCTimeStamps -XX:+ PrintAdaptiveSizePolicy -XX:+ G1SummarizeConcMark,这样您就可以在执行程序日志中获得诊断输出。