Spark调优

时间:2022-01-13 16:10:25

因为Spark是内存当中的计算框架,集群中的任何资源都会让它处于瓶颈,CPU、内存、网络带宽。通常,内存足够的情况之下,网络带宽是瓶颈,这时我们就需要进行一些调优,比如用一种序列化的方式来存储RDD来减少内存使用,这边文章就讲两种方式,数据序列化和内存调优,接下来我们会分几个主题来谈论这个调优问题。

1、数据序列化

(1) Spark默认是使用Java的 ObjectOutputStream框架,它支持所有的继承于java.io.Serializable序列化,如果想要进行调优的话,可以通过继承java.io.Externalizable。这种格式比较大,而且速度慢。

(2)Spark还支持这种方式Kryo serialization,它的速度快,而且压缩比高于 Java的序列化,但是它不支持所有的 Serializable格式,并且需要在程序里面注册。它需要在实例化 SparkContext之前进行注册, 下面是它的使用例子:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MyClass1])
kryo.register(classOf[MyClass2])
}
} // Make sure to set these properties *before* creating a SparkContext!
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator")
val sc = new SparkContext(...)

如果对象很大,需要设置这个参数 spark.kryoserializer.buffer.mb,默认是2。

想了解更多关于这个格式的,可以查看这个网址https://github.com/EsotericSoftware/kryo

2、内存调化

这里面需要考虑3点,对象使用的内存、访问这些对象的开销、垃圾回收器的管理开销。

通常,对象访问的速度都很快,但是需要2-5x的空间来存储,因为下面的原因:

1)每一个独立的Java对象,都有一个16字节的“ object header”和关于这个对象的信息,比如指针。

2)Java String类型有40字节的“object header”,然后因为 Unicode,每个字符要存储2个字节,这样10个字符要消耗掉大概60个字节。

3)普通的容器类,比如HashMap和LinkedList,它们采用的是链式的数据结构,它需要封装每个实体,不仅需要头信息,还要有个指针指向下一个实体。

4)原始容器类型通常存储它们为装箱类型,比如java.lang.Integer。

下面我们就来讨论如何确定这些对象的内存开销并且如何进行调优,比如改变数据结构或者序列化存储数据。下面我们讲谈论如何调优Spark的Cache大小以及Java的垃圾回收器。

(1)确定内存使用情况

首先我们要确定内存使用情况,确定数据集的内存使用情况,最好的方法就是创建一个RDD,然后缓存它,然后查看日志,日志会记录出来它的每个分片使用的大小,然后我们可以找个这些分片的大小计算出总大小,如下:

INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)
(2)数据结构调优

1) 优先使用数组和原生类型来替代容器类,或者使用fastutil找个包提供的容器类型,fastutil的官方链接是http://fastutil.di.unimi.it/。

2)避免大量的小对象的嵌套结构。

3)使用数字的ID来表示,而不是使用字符串的ID。

4)如果内存小于32GB,设置JVM参数 -XX:+UseCompressedOops为4个字节而不是8个字节;在Java7或者之后的,尝试使用 -XX:+UseCompressedStrings存储ASCII字符串8个比特一个字符 。这些参数可以添加到spark-env.sh,根据我的观察,应该是设置到 SPARK_JAVA_OPTS这个参数上。

(3)序列化RDD存储

强烈建议使用Kryo进行序列化,这也是降低内存使用最简单的方式。

(4)垃圾回收器调优

当我们只使用一次RDD的时候,不会存在这方面的问题。当java需要清除旧的对象给新的对象腾出空间的时候,它需要遍历所有对象,然后找出那些没有使用的。这里最中要的一点是记住,垃圾回收器的代价是和它里面的对象的数量相关的。查看GC是不是一个问题,第一件事就是使用序列化的缓存方式。

GC还可以出现的问题就是执行任务所需要的内存大小,下面我们讲讨论如何控制分配给RDD缓存的空间大小来减轻这个问题。

1)确定GC的影响

添加这些参数到 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps到SPARK_JAVA_OPTS这个参数,让它出书GC的信息,然后运行任务。

2)缓存大小调优

影响GC的一个重要配置参数是分配给缓存RDD的内存大小,Spark默认是使用 66%的可配置内存大小(通过 spark.executor.memory or SPARK_MEM来配置 )来存储RDD,也即是说,只有33%是给任务执行过程当中执行过程当中创建的对象的。

当你的程序慢下来,你发现GC很频繁或内存不够等现象,降低它的值会起到一些效果,我们可以通过这个参数 System.setProperty("spark.storage.memoryFraction", "0.5")来达到这个效果。

3)高级内存调优

java的堆内存是分为两个区间,Young和Old,Young是用来存储短生命周期的对象,Old是用来存储长生命周期的对象。 Young又可以进一步细分为 [Eden, Survivor1, Survivor2]。 一个简单的垃圾过程可以描述为:当 Eden满的时候,一个简单的GC会运行在Eden和依赖它的对象, Survivor1被复制到Survivor2。 Survivor区域进行了交换。如果一个对象足够老或者Survivor2满了,它就会被移到Old区。当Old区也满的时候,一个完整的GC就会触发。

Spark里面的GC调优目标是确保RDD存储在Old区间,并且Young区有足够的空间去存储那些短生命周期的对象。这样可以减少完全的GC去回收那些任务执行中的临时对象。 下面的的这些步骤可能是有用的:

1)检查 GC的统计信息,查看在任务执行完成之前是不是执行过多次的GC,这意味着内存不足以执行任务。

2)当Old区快满的时候,我们可以通过调整这个参数 spark.storage.memoryFraction来减少缓存使用的内存量,少缓存一点对象比拖慢作业执行更好一些。

3)当发生了很多次小的GC,而不是重要的GC时候,我们可以考虑多分配点内存给 Eden,假设一个任务需要使用E大小的内存,我们可以分配给Eden的内存大小为: -Xmn=4/3*E,这个大小同样适用于 survivor区间 。

4)当从HDFS上读取数据的时候,任务的所需内存可以估计为block的大小,一个反压缩的块是2-3倍的大小,我们考虑用3-4个任务来执行,这样我们可以考虑设置Eden的大小为4*3*64MB。

3、其它的考虑

(1)并行的水平

建议是1个CPU核心2-3个任务,可以通过程序的函数的时候传入 numPartitions 参数,或者通过系统变量 spark.default.parallelism来改变。

(2)Reduce任务的内存使用情况

有时候出现 OutOfMemoryError并不是因为RDD太大内存装不下,而是因为执行Reduce任务执行的 groupByKey的结果太大。Spark的 shuffle操作( sortByKey , groupByKey , reduceByKey , join , etc)它会为每一个任务建立一个hash表来执行grouping操作,简单的处理方式就是增加并行水平,这样每个任务的输入集变小。Spark能够支持每个任务200ms的速度,因为它在所有任务共享了JVMs,减小了发布任务的开销,所有可以安全的增加并行水平超过核心数。

(3)使用broadcast存储大的变量

使用Spark里面的broadcast的变量来存储大的变量可以大大减少每个序列化任务的大小和集群发布任务的开销。大对象的任务都可以考虑使用broadcast变量,Spark在master上会打印每个序列化任务的大小,当大小超过20KB的时候,可以考虑调优。

4、总结

这里简短的指出了我们调优的时候需要注意的一些重要的点,通常我们把序列化方式调整为 Kryo并且缓存方式改为序列化存储方式就可以解决大部分的问题了。