算子调优
目录
-
算子调优
* map与mapPartitions
* filter过后使用coalesce
* foreachPartition优化写数据库
* repartition解决Spark SQL并行度过低
* reduceByKey的Map端本地聚合
map与mapPartitions
- 问题
如果是普通的map,比如一个partition中有1万条数据;那么你的function要执行和计算1万次。使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。
缺点也很显而易见,就是一个partition中数据量太大会导致OOM。
- 调优
所以说在数据量不是很大的时候可以用这个。
filter过后使用coalesce
- 问题
1、每个partition数据量变少了,但是在后面进行处理的时候,还是要跟partition数量一样数量的task,来进行处理;有点浪费task计算资源。
2、每个partition的数据量不一样,会导致后面的每个task处理每个partition的时候,每个task要处理的数据量就不同,这个时候很容易发生数据倾斜
- 调优
coalesce算子主要就是用于在filter操作之后,针对每个partition的数据量各不相同的情况,来压缩partition的数量。减少partition的数量,而且让每个partition的数据量都尽量均匀紧凑。
foreachPartition优化写数据库
- 问题
foreach会对每条数据都创建一个数据库连接,很消耗性能
- 调优
使用foreachPartition,只会对一个partition中的数据创建一个连接
repartition解决Spark SQL并行度过低
- 问题
在调节并行度中介绍了并行度应该怎么设置,如果没有使用Spark SQL,那么整个spark application默认所有stage的并行度都是你设置的那个参数(除非使用coalesce算子缩减过partition数量)。
如果用了Spark SQL,那个stage的并行度,你没法自己指定。Spark SQL自己会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的并行度。通过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的stage中生效。
- 调优
可以将你用Spark SQL查询出来的RDD,使用repartition算子,去重新进行分区,此时可以分区成多个partition,比如从20个partition,分区成100个。
reduceByKey的Map端本地聚合
reduceByKey,相较于普通的shuffle操作(比如groupByKey),它的一个特点,会进行map端的本地聚合。下一个stage,拉取数据的量,也就变少了。减少网络的数据传输的性能消耗。
要实现类似于wordcount程序一样的,对每个key对应的值,进行某种数据公式或者算法的计算(累加、类乘)就可以使用reduceByKey。
参考资料
《北风网Spark项目实战》
github: https://github.com/yangtong123/StudySpark