Spark性能调优--调度与分区优化

时间:2021-05-18 18:25:27

1.小分区合并问题

在用户使用Spark的过程中,常常会使用filter算子进行数据过滤。而频繁的过滤或者过滤掉的数据量过大就会产生问题,造成大量小分区的产生(每个分区数据量小)。由于Spark是每个数据分区都会分配一个任务执行,如果任务过多,则每个任务处理的数据量很小,会造成线程切换开销大,很多任务等待执行,并行度不高的问题,是很不经济的。

例如:

val rdd2 = rdd1.filter(line=>lines.contains("error"
)).filter(line=>line.contains(info).collect();

解决方式:可以采用RDD中重分区的函数进行数据紧缩,减少分区数,将小分区合并变为大分区。通过coalesce函数来减少分区,具体如下。

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit
ord: Ordering[T] = null): RDD[T]

这个函数会返回一个含有numPartitions数量个分区的新RDD,即将整个RDD重分区。以下几个情景请大家注意,当分区由10000重分区到100时,由于前后两个阶段的分区是窄依赖的,所以不会产生Shuffle的操作。但是如果分区数量急剧减少,如极端状况从10000重分区为一个分区时,就会造成一个问题:数据会分布到一个节点上进行计算,完全无法开掘集群并行计算的能力。为了规避这个问题,可以设置shuffle=true。请看源码:

new CoalescedRDD(newShuffledRDD[Int, T, T, (Int,T)]
(mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values

由于Shuffle可以分隔Stage,这就保证了上一阶段Stage中的上游任务仍是10000个分区在并行计算。如果不加Shuffle,则两个上下游的任务合并为一个Stage计算,这个Stage便会在1个分区状况下进行并行计算。同时还会遇到另一个需求,即当前的每个分区数据量过大,需要将分区数量增加,以利用并行计算能力,这就需要把Shuffle设置为true,然后执行coalesce函数,将分区数增大,在这个过程中,默认使用Hash分区器将数据进行重分区。

def repartition(numPartitions: Int)(implicit ord: Ordering[T]
= null): RDD[T]
rePartition方法会返回一个含有numPartitions个分区的新RDD。
repartition的源码如下。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null):
RDD[T] = {
coalesce(numPartitions, shuffle = true)
}

reparition本质上就是调用的coalesce方法。因此如果用户不想进行Shuffle,就需用coalese配置重分区,为了方便起见,可以直接用repartition进行重分区。

2.倾斜问题

倾斜(skew)问题是分布式大数据计算中的重要问题,很多优化研究工作都围绕该问题展开。倾斜有数据倾斜和任务倾斜两种情况,数据倾斜导致的结果即为任务倾斜,在个别分区上,任务执行时间过长。当少量任务处理的数据量和其他任务差异过大时,任务进度长时间维持在99%(或100%),此时,任务监控页面中有少量(1个或几个reduce子任务未完成。单一reduce的记录数与平均记录数差异过大,最长时长远大于平均时长,常可能达到3倍甚至更多。

(1)数据倾斜

产生数据倾斜的原因大致有以下几种。

1)key的数据分布不均匀(一般是分区key取得不好或者分区函数设计得不好)。

2)业务数据本身就会产生数据倾斜(像TPC-DS为了模拟真实环境负载特意用有倾斜的数据进行测试)。

3)结构化数据表设计问题。

4)某些SQL语句会产生数据倾斜。

(2)任务倾斜

产生任务倾斜的原因较为隐蔽,一般就是那台机器的正在执行的Executor执行时间过长,因为服务器架构,或JVM,也可能是来自线程池的问题,等等。解决方式:可以通过考虑在其他并行处理方式中间加入聚集运算,以减少倾斜数据量。数据倾斜一般可以通过在业务上将极度不均匀的数据剔除解决。这里其实还有SkewJoin的一种处理方式,将数据分两个阶段处理,倾斜的key数据作为数据源处理,剩下的key的数据再做同样的处理。二者分开做同样的处理。

(3)任务执行速度倾斜

产生原因可能是数据倾斜,也可能是执行任务的机器在架构,OS、JVM各节点配置不同或其他原因。解决方式:设置spark.speculation=true把那些执行时间过长的节点去掉,重新调度分配任务,这个方式和Hadoop MapReduce的speculation是相通的。同时可以配置多长时间来推测执行,spark.speculation.interval用来设置执行间隔进行配置。在源码中默认是配置的100,示例如下。

val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)

(4)解决方案

1)增大任务数,减少每个分区数据量:增大任务数,也就是扩大分区量,同时减少单个分区的数据量。

2)对特殊key处理:空值映射为特定Key,然后分发到不同节点,对空值不做处理。

3)广播。

①小数据量表直接广播。

②数据量较大的表可以考虑切分为多个小表,多阶段进行Map Side Join。

4)聚集操作可以Map端聚集部分结果,然后Reduce端合并,减少Reduce端压力。

5)拆分RDD:将倾斜数据与原数据分离,分两个Job进行计算。

3.并行度

在分布式计算的环境下,如果不能正确配置并行度,就不能够充分利用集群的并行计算能力,浪费计算资源。Spark会根据文件的大小,默认配置Map阶段任务数量,也就是分区数量(也可以通过SparkContext.textFile等方法进行配置)。而Reduce的阶段任务数量配置可以有两种方式,下面分别进行介绍。

第一种方式:写函数的过程中通过函数的第二个参数进行配置。

def reduceByKey(func: (V, V) => V, numPartitions: Int):
RDD[(K, V)] = {
reduceByKey(new HashPartitioner(numPartitions), func)
}

第二种方式:通过配置spark.default.parallelism来进行配置。它们的本质原理一致,均是控制Shuffle过程的默认任务数量。下面介绍通过配置spark.default.parallelism来配置默认任务数量(如groupByKey、reduceByKey等操作符需要用到此参数配置任务数),这里的数量选择也是权衡的过程,需要在具体生产环境中调整,Spark官方推荐选择每个CPU Core分配2~3个任务,即cpu corenum*2(或3)数量的并行度。如果并行度太高,任务数太多,就会产生大量的任务启动和切换开销。如果并行度太低,任务数太小,就会无法发挥集群的并行计算能力,任务执行过慢,同时可能会造成内存combine数据过多占用内存,而出现内存溢出(out of memory)的异常。

下面通过源码介绍这个参数是怎样发挥作用的。可以通过分区器的代码看到,分区器函数式决定分区数量和分区方式,因为Spark的任务数量由分区个数决定,一个分区对应一个任务。

object Partitioner {
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner= {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism"))
{
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}
}

从RDD的代码中可以看到,默认的分区函数设置了groupBy的分区数量。

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]):
RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))
reduceByKey中也是通过默认分区器设置分区数量。
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}

4.DAG调度执行优化

1)同一个Stage中尽量容纳更多的算子,以减少Shuffle的发生。由于Stage中的算子是按照流水线方式执行的,所以更多的Transformation放在一起执行能够减少Shuffle的开销和任务启动和切换的开销。

2)复用已经cache过的数据。可以使用cache和persist函数将数据缓存在内存,其实用户可以按单机的方式理解,存储仍然是多级存储,数据存储在访问快的存储设备中,提高快速存储命中率会提升整个应用程序的性能。