RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上,作用有二:增加并行度和减少通信开销(连接操作),例如下图:
RDD分区原则:
RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目
对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:
*本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N,local[*]则自动判断
*Apache Mesos:默认的分区数为8
*Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值
设置分区的个数的方法:
- 创建RDD时手动指定分区个数
- 使用reparititon方法重新设置分区个数
自定义分区方法:
Spark提供了自带的HashPartitioner(哈希分区)与RangePartitioner(区域分区),能够满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过提供一个自定义的Partitioner对象来控制RDD的分区方式,从而利用领域知识进一步减少通信开销。
//自定义分区类
class MyPartitioner(numParts:Int) extends Partitioner{
override def getPartition(key: Any): Int = {
key.toString().toInt % 10
}
override def numPartitions: Int = numParts}
object TestPartitioner {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Partitioner").setMaster("local[1]")
val sc = new SparkContext(conf)
//模拟5个分区的数据
val data = sc.parallelize(1 to 10, 5)
//根据数据的尾号变为10个分区,分别写到10个文件中
data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("E:\\tmp\\spark\\testpartitioner")
}
}
那么问题来了:改变分区数会影响集群的效率吗?