shuffle优化之减少shuffle数据量
1.谓词下推
hive.optimize.ppd ,默认为true。
所谓谓词下推就是过滤条件如果写在shuffle操作后面,就提前过滤掉,减少参与shuffle的数据量
如 select * from a join b on a.id=b.id where a.age>10 ,这里执行计划会优先执行 a.age>10 再执行 a join b ,是一种自动优化
但是如下sql就无法自动优化 select * from a join b on a.id=b.id where a.age+b.age>10 , 该where条件只能等到join之后才能执行
2.预聚合
hive.map.aggr
,默认为true。
所谓预聚合就是聚合之前,先在map端做部分聚合,减小数据量之后再做shuffle操作,如下图所示(做count计算)
但是有些函数是没有办法做预聚合的,比如count(distinct),你不能在本地先计算出去重后的数量,再和其他机器上去重的数量相加,因为这会偏大。
类似的还有平均数等。
3.自动使用map join
hive.auto.convert.join
,默认值true
当小表的行数或者文件大小小到一定程度的时候,就会自动启动map join,将小表广播出去。广播后的join操作不需要进行shuffle,在本地机器就能完成。
用该参数 hive.mapjoin.cache.numrows 控制触发map join的行数大小
用该参数 hive.mapjoin.smalltable.filesize 控制触发map join的文件大小
数据倾斜优化
4.group by数据倾斜自动均衡
hive.groupby.skewindata
,默认值false
原理还是预聚合,为了实现预聚合,它实行了两阶段聚合法,生成了2个MapReduce。
这个参数并不常用。
5.join数据倾斜自动均衡
hive.optimize.skewjoin,默认为false
原理是再生成一个MapReduce将倾斜的key单独处理。参考我们处理join的数据倾斜的时候用union的方法
可以用hive.skewjoin.key来判断该key是否属于倾斜,如果该key的行数超过这个参数,则认为该key倾斜,需要在第二个MapReduce中单独处理
还可以用hive.skewjoin.mapjoin.map.tasks控制第二个MapReduce里的map的数量。
调整task的数量
6.调整map的数量
可以通过切分的数据大小来控制map的数量
mapred.max.split.size 如果dfs.block.size的大小超过了该参数,每个map的文件大小就是该参数的大小
mapred.min.split.size 如果dfs.block.size的大小比该参数还小,每个map的大小就是该参数的大小
如果dfs.block.size在该参数两者之间,那每个map的文件大小就是dfs.block.size的大小,即每个block一个map
设定了每个map的大小,根据整个文件的大小,就可以得到map的数量。
你也可以指定参数mapred.map.tasks,来控制map的数量,但是这不一定起效果
原因在于,block的文件大小在mapred.max.split.size和mapred.min.split.size 之间的时候
该参数不起作用mapred.map.tasks,map的数量还是block的数量
当block的文件大小不在mapred.max.split.size和mapred.min.split.size 之间的时候
若block比min还小:map的大小以mapred.min.split.size来定(确定了大小也就确定了数量)
若block比max还大:若该参数的值得到一个map的大小(总文件大小除以该参数),比block大,则无效,比max小,也无效
只有当该map大小,在block和max之间的时候,才会有效,总结一下,就是该参数是mapred.max.split.size的修正。
7.调整reduce的数量
mapred.reduce.tasks可以直接指定reduce的时候的数量
也可以指定reduce能够处理的最大数据量hive.exec.reducers.bytes.per.reducer
或者是指定每个job的最大reducer数量hive.exec.reducers.max,
以小的那个为准:reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)
如果reduce的数量过多,就会产生多个小文件,如果reduce过少,就会增大每个reduce的负担
MR整体优化
8.合并小文件
hive.merge.mapfiles 如果该MapReduce里没有reduce,该参数为true时,程序写入硬盘时会自动合并小文件
hive.merge.mapredfiles 如果该MapReduce里有reduce,则使用该参数。
hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值
hive.merge.size.smallfiles.avgsize
可以指定所有输出文件大小的均值阈值,默认值都是1GB,如果平均大小不足的话,就会另外启动一个任务来进行合并。
9.启动压缩
压缩是以时间换空间的一种做法,压缩带来的坏处是数据的压缩和解压带来的消耗,带来的好处是可以大大减小数据传输和存储的大小。
分布式系统的网络资源比较可贵,尤其是遇到shuffle操作的时候,需要大量的网络传输,此时大大减小数据量的大小是非常有帮助的。
而且压缩和解压都是map操作,在自己机器上就可以完成。
有些压缩算法甚至在压缩后还支持某些计算(大部分的计算都要将数据解压后才能进行)。
hive.exec.compress.intermediate,默认为true
指定压缩方式:hive.intermediate.compression.codec
10.JVM重用
JVM利用率低是MapReduce比spark慢的原因之一。MR默认每执行一个task就启动一个jvm,而spark是启动一个jvm称之为executor,不断的接收task任务,避免了JVM不断启动销毁的资源开销。
所以MR也对此进行了改良,可以指定一个jvm重复处理几个task
通过指定参数mapred.job.reuse.jvm.num.tasks,可以指定一个MR中,一个jvm操作多少个task。
11.严格模式
hive.mapred.mode,可以设置值为strict或者nostrict
严格模式阻止了下面3种查询方式:分区表全表查询不指定分区,全局排序不指定limit,在非map join 的join中使用笛卡尔积
这3种操作都是有可能严重拖垮资源的行为。
12.本地模式
使用单台机器实现数据的计算,减小集群计算的消耗,适用于小数据量小计算量的任务
通过hive.exec.mode.local.auto=true开启。
可以通过hive.exec.mode.local.auto.inputbytes.max和hive.exec.mode.local.auto.tasks.max 设置数据文件大小和map数来决定是否触发本地模式
另外,reduce数量不能超过1个,否则也触发不了。
13.不同的job之间并发运行
将hive.exec.parallel设置为
true即可。通过hive.exec.parallel.thread.number
可以设定并行执行的线程数。
但是要保证job之间没有依赖关系,如果两个job是由同一个sql生成的, 或者MR之间指定了顺序,则无法并发进行。