第五十一讲 Spark优化之“钨丝计划”

时间:2021-06-25 06:14:25

第四十八讲  序列化和JVM性能调优

http://blog.sina.com.cn/s/blog_9ca9623b0102w8kp.html

第四十八讲  序列化和JVM性能调优

一:Spark性能调优之序列化

1,  之所以进行序列化,最重要的原因是内存空间有限(减少GC的压力,最大化的避免Full GC的产生,因为一旦产生Full GC则整个Task处于停止状态!!!)减少磁盘IO的压力,减少网络IO的压力。

2,  什么时候会必要的产生序列化和反序列化呢?发生磁盘IO和网络通信的时候会序列化和反序列化,更为重要的考虑序列化和发序列化的时候有另外两种情况:

A)      PersistCheckpoint的时候必须考虑序列化和反序列化,例如说cache到内存的时候,只能使用JVM分配的60%的内存空间,此时好的序列化机制就至关重要。

B)      编程的时候,使用算子的函数操作如果传入了外部数据,就必须序列化和反序列化。

Conf.set(spark.serializer

org.apache.spark.serializer.KryoSerializer);

conf.registryKryoClass(Array(classOf[Person]));

val person = new Person();

rdd.map(item =>person.add(item));//PersonDriver中实例化,而rddExecutor里,需要传输,所以需要序列化和反序列化。

3,  强烈建议使用Kryo序列化器进行序列化和反序列化,Spark默认情况下不是使用Kryo,而是Java自带的序列化器ObjectInputStreamObjectOutputStream。主要是考虑了方便性或者通用性。如果自定义了RDD中数据元素的类型则必须实现Serializable接口,当然也可以实现自己的序列化接口Externalizable来实现更加高效的Java序列化算法,采用默认的ObjectInputStreamObjectOutputStream会导致序列化后的数据占用大量的内存或者磁盘或者大量的消耗网络,并且在序列化和反序列化的时候比较消耗CPU

4,  强烈推荐大家采用Kryo序列化机制。Spark下使用Kryo序列化机制会比Java默认的序列化机制更加节省空间(节省近10倍的空间)以及更少的消耗CPU强烈建议在一切情况下尽可能使用Kryo序列化机制。

5,  使用Kryo的两种方式:

A)      spark-defaults.conf中配置。

B)      在程序的SparkConf中配置。

Conf.set(spark.serializer

org.apache.spark.serializer.KryoSerializer) 

使用Kryo可以更加快速,更低存储空间占用量以及更高性能的方式来进行序列化。

6,  SparkScala常用的类型自动的通过AllScalaRegister注册给了Kryo进行序列化管理。

7,  如果是自定义的类型,需要注册给序列化器,例如:

Conf.set(spark.serializer

org.apache.spark.serializer.KryoSerializer);

conf.registryKryoClass(Array(classOf[Person]));

8,  Kryo在序列化的时候缓存空间默认大小是2MB,可以更具具体的业务模型调整该大小,具体方式

Spark.kryoserializer.buffer 设置为10MB等。

9,使用Kryo强烈建议注册时写完整的包名和类名,否则的话每次序列化的时候都会保存一份整个包名和类名的完整信息,这就会不必要的消耗内存空间。

二:Spark JVM性能调优

1,  好消息是Spark的钨丝计划是用来解决JVM性能问题的,不好的消息是至少在Spark2.0以前钨丝计划功能不稳定且不完善且只能在特定的情况下发生作用,也就是说包括Spark1.6.0在内的Spark及其以前的版本我们大多数情况下没有使用钨丝计划的功能,所以此时就必须关注JVM性能调优。

2,  JVM性能调优的关键是调优GC!!!,为什么GC这么重要,主要是因为Spark热衷于RDD的持久化!!!GC本身的性能开销是和数据量成正比的。

3,  初步可以考虑是尽量多的使用arrayString,并且在序列化机制方面尽可能的采用Kryo,让每个partition都成为字节数组。

4,  监视GC的基本方式有两种:

A)配置

Spark.executor.extraJavaOptions =

-verbose:gc –XX:++PrintGCDetails –XX:+PrintGCDataTimeStramps

         B ) SparkUI

5,  Spark在默认情况下使用60%的空间来进行Cache缓存RDD的内容,也就是说Task在执行的时候只能使用剩下的40%的空间,如果空间不够用,就会触发(频繁的)GC

可以设置spark.memory.fraction参数来进行调整空间的使用。例如降低RDD的缓存空间比例,让Task使用更多的空间来创建对象和完成计算。

再一次,强烈建议进行RDDCache的时候使用Kryo序列化机制。从而给Task可以分配更大的空间来顺利完成计算(避免频繁的GC)。

6,  因为在老年代空间满的时候会发生Full GC操作,而老年代空间中基本都是活的比较久的对象(经历了数次GC,依旧存在)。还有一些不正常的情况,s1s2交换的时候,内存不够,直接进入老年代,然后容易进行Full GC,停下所有程序线程,对老年代进行整理,非常严重的影响性能。发生Full GC的时候,表明Eden区的空间不够大。需要给新生代更多的空间。

         可以考虑设置spark.memory.fraction参数来进行调整空间的使用来给年轻代更多的空间用于存放短时间的存活的对象。

         -Xmn调整Eden区域。对RDD中操作的对象和数据进行大小评估,如果在HDFS上解压后一般体积会变成原有体积的3倍左右的样子,根据数据的大小来设置Eden。如果有10Task,每个Task处理的HDFS上的数据是128MB,则需要设置-Xmn10*128*3*4/3的大小。(新生代大小。通常为 Xmx  1/3  1/4。新生代 = Eden + 2 Survivor 空间。实际可用空间为 = Eden + 1  Survivor,即 90% 

    -XXSupervisorRatio

    -XXNewRatio  调整新生代和老年代的比例。

http://www.th7.cn/Program/java/201409/276272.shtml

-Xms

初始堆大小。如:-Xms256m

-Xmx

最大堆大小。如:-Xmx512m

-Xmn

新生代大小。通常为 Xmx  1/3  1/4。新生代 = Eden + 2  Survivor 空间。实际可用空间为 = Eden + 1  Survivor,即 90% 

-Xss

JDK1.5+ 每个线程堆栈大小为 1M,一般来说如果栈不是很深的话, 1M 是绝对够用了的。

-XX:NewRatio

新生代与老年代的比例,如 –XX:NewRatio=2,则新生代占整个堆空间的1/3,老年代占2/3

-XX:SurvivorRatio

新生代中 Eden  Survivor 的比值。默认值为 8。即 Eden 占新生代空间的8/10,另外两个 Survivor 各占 1/10 

-XX:PermSize

永久代(方法区)的初始大小

-XX:MaxPermSize

永久代(方法区)的最大值

-XX:+PrintGCDetails

打印 GC 信息

-XX:+HeapDumpOnOutOfMemoryError

让虚拟机在发生内存溢出时 Dump 出当前的内存堆转储快照,以便分析用



第四十九讲  Spark性能优化 数据本地性和RDD自定义

http://blog.sina.com.cn/s/blog_9ca9623b0102w8pc.html

第四十九讲  Spark性能优化 数据本地性和RDD自定义

一:数据本地性

1,  数据本地性对分布式系统的性能而言是一件最为重要的事情(之一),程序运行本身包含代码和数据两部分,单机版本一般情况下很少考虑数据本地性的问题(因为数据在本地)。但是对于单机版本的程序,由于数据本地性有PROCESS_LOCALNODE_LOCAL之分,所以我们还是尽量的让数据处于PROCESS_LOCALSpark作为分布式系统更加注意数据本地性,在Spark中数据本地性分为PROCESS_LOCALNODE_LOCALNO_PREFRACK_LOCALANY(数据可能在任何地方,包括在其他网络环境中,例如说百度云中,数据和计算集群不在同样的集群中,此时就是ANY的一种表现);

2,  对于ANY的情况,默认情况状态下性能会非常低下,此时强烈建议使用Tachyon,例如在百度云上,为了确保计算速度,就在计算集群和存储集群之间加入了Tachyon,通过Tachyon来从远程抓取数据,而Spark基于Tachyon来进行计算,这就更好的满足了数据本地性。

3,  如果数据是PROCESS_LOCAL,但是此时并没有空闲的Core来运行我们的Task,此时Task就要等待,例如等待3000ms3000ms内如果能够运行待运行的Task则直接运行,如果超过了3000ms,此时数据本地性就要退而求其次采用NODE_LOCAL的方式。同样的道理,NODE_LOCAL也会有等待的超时时间,以此类推

4,  如何配置Locality呢?可以统一采用spark.locality.wait来设置(例如设置5000ms)。当然可以分别设置spark.locality.wait.processspark.locality.wait.nodespark.locality.wait.rack等;一般的具体设置是Locality优先级越高则可以设置越高的等待超时时间。

二:RDD自定义(以Spark on HBase为例)

1,  第一步是定义RDD.getPartitions的实现:

A)      createRelation具体确定HBase的链接方式和具体访问的表;

B)      然后通过HBaseAPI来获取RegionList

C)      可以过滤出有效的数据;

D)      最后返回RegionArray[Partition];也就是说一个Partition处理一个Region的数据,为更佳数据本地性打下基础。

2,  第二步是RDD.getPreferedLocations

A)根据Split包含的Region信息来确定Region具体在什么节点上,这样Task在调度的时候就可以优先被调度到Region所在的机器上,最大化的提高数据本地性。

3,  第三步是RDD.compute

A)根据split中的Region等信息调用HBaseAPI来进行操作(主要是查询)




第五十讲  Spark性能优化 Shuffle性能优化

http://blog.sina.com.cn/s/blog_9ca9623b0102w8qi.html

第五十讲  Spark性能优化 Shuffle性能优化

一:Shuffle性能调优

1,  问题:Shuffle output file lost? 真正的最重要的原因是GC导致的!!!下一个Stage向上一个Stage要数据,需要消耗CPU 导致GCFull GC的时候,线程不再工作,向上一个Stage的线程请求数据,就请求不到,请求不到的时候就会重试。

第五十一讲  Spark优化之“钨丝计划”

2,  如果GC尤其是Full GC产生通常会导致线程停止工作,这个时候下一个StageTask在默认情况下就会尝试重试来获取数据,一般重试3次,每次重试的时间为5s,也就是说默认情况下,如果15s内还是无法抓取数据的话,就会 Shuffle output file lost等情况,进而会导致Task重试,甚至会导致Stage重试,最严重的是会导致Application失败,在这个时候首先就要采用高效的内存数据结构和序列化机制,JVM的调优来减少Full GC的产生。

3,  Shuffle的时候,Reducer端获取数据会有一个指定大小的缓存空间,如果内存足够大的情况下,可以适当的增大缓存空间,否则会spill到磁盘上影响效率。

此时可以增大spark.reducer.maxSizeInFlight参数,可以调到128MB

4,  ShuffleMapTask端通常也会增大Map任务的写磁盘的缓存,默认情况下是32K

spark.shuffle.file.buffer 32k

5,  调整获取Shuffle中读取数据的重试次数,默认是3次,通常建议增大重试次数。

6,  调整获取Shuffle读取数据重试的时间间隔,默认为5s,强烈建议提高该时间。

spark.shuffle.io.retryWait 5s

7,  reducer端做Aggregation的时候,默认是20%的内存用来做Aggregation,如果超出了这个大小,就会溢出到磁盘上,建议调大百分比来提高性能。



第五十一讲  Spark优化之“钨丝计划”

http://blog.sina.com.cn/s/blog_9ca9623b0102w90u.html


第五十一讲  Spark优化之“钨丝计划”

1,“钨丝计划”产生背景

2,“钨丝计划”内幕详解

3,“钨丝计划”下的Shuffle

 

一:“钨丝计划”产生的本质原因

1,  Spark作为一个一体化多元化的数据处理通用平台,性能一直是其根本性的追求之一,Spark基于内存迭代(部分基于磁盘迭代)的模型极大的满足了人们对分布式系统处理性能的渴望。但是由于Spark是采用Scala+Java语言编写的,所以运行在了JVM平台,淡然JVM是一个绝对伟大的平台,因为JVM让整个离散的主机融为了一体(网络即OS),但是JVM的死穴GC反过来限制了Spark(也就是说平台限制了Spark)。所以Tungsten聚焦于CPUMemory的使用,以期望达到对分布式硬件潜能的终极压榨!

2,  Memory的使用,Tungsten使用了Off-Heap,也就是在JVM之外的内存空间(这就好像C语言对内存的分配,使用和销毁)。C语言和C++可以分配堆外内存。此时Spark实现了自己的独立的内存管理,就避免了JVMGC引发的性能问题。其实还包含避免序列化和发序列化。OS的角度就不需要考虑序列化和发序列化。

3,  对于Memory管理方面一个至关重要的内容CacheTungsten提出了Cache-awarecomputation,也就是说使用对缓存友好的算法和数据结构来完成数据的存储和复用。

4,  对于CPU而言,Tungsten提出了Code Generation,其首先在Spark SQL使用,通过Tungsten要把该功能普及到Spark的所有功能中;

总结:Tungsten的内存管理机制独立于JVM,所以Spark操作数据的时候具体操作的是BinaryData,而不是JVM Object!!!JavaScala同样能操作BinaryData,而且还免去了序列化和发序列化的过程。

 

二:“钨丝计划”内幕讲解

1,  内存管理方面:Off-Heap,在JVM外面分配内存,不受JVM管理,所以就没有GC的问题。Spark使用了sun.misc.Unsafe来进行Off-heap级别的内存分配,指针使用及内存释放。Spark为了统一管理Off-HeapOn-Heap而提出了Page。如果是在Off-Heap,指针的形式,如果是On-Heap的形式,不仅有指针的形式,还有JVMJava Object。首先找到哪个Page,再找偏移量,找到具体的物理地址。