在分布式程序中,网络通信的开销是很大的,因此控制数据分布以获得最少的网络传输开销可以极大地提升整体性能.Spark程序可以通过控制RDD分区方式来减少通信开销。Spark中所有的RDD都可以进行分区,系统会根据一个针对键的函数对元素进行分区。虽然Spark不能控制每个键具体划分到哪个节点上,但是可以确保相同的键出现在同一个分区上。RDD的分区原则是分区的个数尽量等于集群中的CPU核心(Core)数目。对于不同的Spark部署模式而言,都可以通过设置spark.default.prallien这个参数值来配置默认的分区数目。一般而言,各种模式下的默认分区数目如下。
(1) Local模式:默认为本地机器的CPU数目,若设置了loca[N],则默认为N。
(2) Standalone或者Yarn模式:在“集群中所有CPU核数总和”和“2”这两者中取较大值作为默认值。
(3) Mesos 模式:默认的分区数是8。
Spark框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)和范围分区(RangePartitioner)。其中,哈希分区是根据哈希值进行分区;范围分区是将一定范围的数据映射到一个分区中。这两种分区方式已经可以满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。 需要注意的是,RDD的分区函数是针对(Key, Value)类型的RDD,分区函数根据Key对RDD元素进行分区。因此,当需要对一些非(Key,Value)类型的RDD进行自定义分区时,需要先把RDD元素转换为(Key,Value)类型,再通过分区函数进行分区操作。
如果想要实现自定义分区,就需要定义一个类,使得这个自定义的类继承org. apache.spark. Partitioner类,并实现其中的3个方法,具体如下。
(1) def numPartitions:Int:用于返回创建的分区个数。
(2) def getPartition(Key:Any):用于对输人的Key做处理,并返回该Key的分区ID,分区ID的范围是0~ numPartitions 1。
(3) equals (other: Any):用于Spark判断自定义的Partitioner对象和其他的Partitioner 对象是否相同,从而判断两个RDD的分区方式是否相同。其中,equals()方法中的参数other表示其他的Partitioner 对象,该方法的返回值是一个Boolean类型,当返回值为true时表示自定义的Pritioer对象和其他Pritioer对象相同,则两个RDD的分区方式也是相同的;反之,自定义的Pritoner对象和其他Prtitioer对象不相同,则两个RDD的分区方式也不相同。
配套资料链接:https://pan.baidu.com/s/1VZBDzDrcQWsb69muIdiLmg 提取码:r5q4