spark优化设置

时间:2021-05-14 17:42:44
-》》》配置参数优化
  SparkConf sc = new SparkConf().setAppName("com.sp.test.GroupTop3").setMaster("local")
.set("spark.shuffle.consolidateFiles", "true")//优化1:开启shuffleGroup,避免shuffleMapTask创建过多的bucket文件
//优化2:设置并行度(rdd的同时partition的数量,每个partition都会被一个task执行,那么在不同节点的不同executor中同时执行的task为5)
//如果这个时候cpu core=6的话,那么资源就有一个core浪费了;
//如果cpu core为6的话,那么这个值可以设置成12 ~ 18(spark官方推荐task数量大约是core的3倍左右是比较合适的)这样可以充分的利用cpu资源,因为不知道task什么时间之行结束
.set("spark.default.parallelism", "5")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//优化3:使用kyro序列化机制,默认的jdk序列化占用内存空间大,并且速度慢
//.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]));//kyro需要设置序列化自定义类型
.set("spark.kryoserializer.buffer.mb", "10")//优化4:如果遇到非常大的java对象需要增加kryoserializer的缓存(默认为:2m,这里设置为10m)
.set("spark.storage.memoryFraction", "0.3")//优化5:jvm的内存控制,让RDD partition cache 所占用的内存数量仅仅站用20%,更多内存留给task执行时的需要
//shuffle级别的优化
.set("spark.shuffle.file.buffer", "128k")//优化6:将数据写入磁盘的缓冲区大小 (默认值:32k)
.set("spark.reducer.maxSizeInFlight", "96m")//优化7:resultTask从bucket缓冲区拉取数据的最大大小,值过小会导致多次网络通信(默认值:48m)
.set("spark.shuffle.io.maxRetries", "6")//优化9:拉取数据失败后的重试次数;默认3次
.set("spark.shuffle.io.retryWait", "10s")//优化10:拉去数据失败时的重试间隔;默认5秒
.set("spark.shuffle.memoryFraction", "0.5")//优化11:Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是0.2
;
JavaSparkContext javaSparkContext = new JavaSparkContext(sc);
eden区域分配的大小(-Xmx)是: 单独数据块的大小(如果为hdfs压缩的文件的话,那么解压后大概为压缩的3倍,这样需要在乘以3) * task的数量 * 3/4

JavaSparkContext javaSparkContext = new JavaSparkContext(sc);
-》》》数据结构优化
     在数据结构上也可以进行优化!如果你的spark应用程序对内存及其敏感,那么需要你使用更为轻量级的类型,在数据结构上进行优化,如1:Map类型使用特定格式的字符串代替,2:使用int类型代替UUID等。

-》》》持久化RDD
JavaPairRDD<String, Iterable<Integer>> rdd_group = rdd_tuple.groupByKey().cache();//直接持久化到内存
rdd_group.persist(StorageLevel.MEMORY_ONLY_SER());//虽然存在于内存,但是将其序列化,减小空间
rdd_group.persist(StorageLevel.MEMORY_AND_DISK_SER());//也可以序列化到内存和磁盘(会对数据进行分区,不适合放在内存的将要放入硬盘)