Spark——性能调优——执行模型与分区

时间:2021-05-18 18:25:33

一、序引
    考虑到性能问题,而言Spark基本原理、执行模型、描述数据被shuffle(洗牌),乃是前提条件。
    掌握数据序列化,缓存机制,以及内存管理、垃圾回收,亦十分必须。
二、Spark执行模型
    在大言Spark应用的性能改善之前,十分有必要先了解Spark在集群上分布式执行程序的基础知识。
        →当启动一个Spark应用时,driver进程会随着集群worker节点上的一系列executor进程一起启动。
        →driver负责运行用户的应用程序,当有action被触发时,driver负责管理所需执行的所有工作。
        →同时,executor进程以任务(task)的形式执行实际的工作以及保存结果。
        →问题:这些任务是如何分配给executor?
    对于Spark应用内部触发的每个action,DAG调度器都会创建一个执行计划来完成它。
        →执行计划:将尽可能多的窄依赖(narrow dependency)转换(transformation)装配到各步骤(stage)中。
        →RDD间的窄依赖是指父RDD的每一个分区最多能被一个子RDD的分区使用。
        →当有一些宽依赖需要做shuffle操作时,stage会受限制。
        →当多个子RDD的分区使用同一个父RDD的分区时,RDD间就会产生宽依赖。
        →事实上,stage是一组任务,对数据的不同子集执行相同转换(transformation)。任务调度器将基于可用资源及数据局c部性把这些任务分配给executor。
三、分区
    分区可极大地影响执行计划的创建方式 。因此,会间接影响性能。
    分区(partition),实质是RDD中的数据被切分后形成的片段。
        →当DAG调度器将job转换为stage时,每个分区将被处理成一个task,每个task需要一个CPU核来执行。
        →证明:Spark应用的并行度取决于RDD的分区数。
        →故而,在对Spark应用进行性能调优时,RDD的分区数将会是需要考虑的最重要的事情之一(甚至是首要地位)。
四、控制并行度
    RDD的分区数与创建方式高度相关。
        →从文件创建的RDD都有默认的分区数。如,文件存储在HDFS中,分区数将等于文件块数目(文件块与分区一一对应)。即表明,可通过在HDFS上写文件时的块大小,或者通过配置InputFormat创建的分片(split)的多少,来控制分区数。
        →通过并行化集合来创建RDD。默认分区数由spark.default.parallelism属性决定。此值由集群管理器决定:对于运行在local模式的Spark 1.5.2来说,其值为CPU核的数目;对于细粒度模式的Mesos来说,其值为8;在其它情况下,分区数取2与所有executor上的CPU核总数的最大值。
    最常见的创建RDD的方式,是对已有的RDD进行一些转换操作(transformation)。
        →通常,一个RDD的分区数与它所依赖的RDD的分区数相同。
        →例外,union转换不受此规则的制约。它所创建的RDD其分区数等于父RDD所有分区数的总和。
    另一种会引起数据shuffle的转换操作,即计算RDD的一个分区需要处理父RDD的多个分区的数据。如果不特别指定,默认分区数将是所依赖的RDD的最大分区数。既,此一类操作,皆为宽依赖。
    欲实现一个高效的Spark应用程序,必须设置最优的分区数。
        →假设job生成的任务数少于可c用的CPU数,可能会面临两个性能问题:1)、不能充分发挥整体计算能力;2)、如果分区数量少,单个分区内的数据量将会比分区数量更多时大很多。
        →如果一个分区的数据太大,无法加载到内存,数据将不得不溢写到磁盘,以避免出现out-of-memory异常。但写到磁盘的I/O操作,会带来不可估量的开销。
    所以,为了充分利用集群的计算资源,分区数至少应当等于集群分配给应用程序的CPU数。只不过,分区过大的问题依然没有得到解决。
    尽量避免“分区数过多”的极端。分区数过多,将会发生许多需要发送到worker节点执行的小任务,无疑又增加了任务调度的开销。
    But,启动任务带来的性能损失比数据溢写到磁盘的损失要小很多。
    If,如何确定出一个最佳的分区数,仍是难解之题,需要具体问题具体分析。建议,把每个RDD的分区数量设置为CPU数的2到4倍。
五、分区器
    了解了如何控制RDD的分区数量,接下来,需要了解数据在这些分区上是怎样分布的,将是关键。
    为了让分区中的数据分散到集群上,Spark使用分区器(partitioner)。目前有两种:HashPartitioner、RangePartitioner。
    选择分区器的默认方式是,对这两个参数中的一个进行设置来决定使用何种分区器:
        →1、当任何输入的RDD用到了某一分区器,输出的RDD也会用此分区器来分区。
        →2、否则,在Pair RDD(键值对)情形下,默认使用HashPartitioner。
        →HashPartitioner,基于键(Key)的哈希码(hash code)把值分布到各个分区上。通过计算键的hash code与分区数的模,得到分区索引值。计算中,需要考虑hash code的正负情况。
        →RangePartitioner根据范围对可排序项进行分区。对RDD内容进行取样能决定大致的范围区间。最终的分区数可能小于配置的数。
        →可自定义分区器,但需建立在对领域知识十分丰富的基础上。若只需要简单的,只需对分区器org.apache.spark.Partitioner进行扩展,并现实方法:
            →gcetPartition(key: Any): Int——为特定的键提供分区ID。
            →numPartitions:Int——指定分区器创建的分区数。
            →equals and hashcode——用来将你的分区器同其他分区器进行比较的方法。