hive sql的参数调优

时间:2021-06-05 23:53:01

shuffle优化之减少shuffle数据量

1.谓词下推 

hive.optimize.ppd ,默认为true。

所谓谓词下推就是过滤条件如果写在shuffle操作后面,就提前过滤掉,减少参与shuffle的数据量

如 select * from a join b on a.id=b.id where a.age>10  ,这里执行计划会优先执行 a.age>10 再执行 a join b ,是一种自动优化

但是如下sql就无法自动优化 select * from a join b on a.id=b.id where a.age+b.age>10  , 该where条件只能等到join之后才能执行

2.预聚合

hive.map.aggr,默认为true。

所谓预聚合就是聚合之前,先在map端做部分聚合,减小数据量之后再做shuffle操作,如下图所示(做count计算)

hive sql的参数调优

但是有些函数是没有办法做预聚合的,比如count(distinct),你不能在本地先计算出去重后的数量,再和其他机器上去重的数量相加,因为这会偏大。

类似的还有平均数等。

3.自动使用map join

hive.auto.convert.join,默认值true

当小表的行数或者文件大小小到一定程度的时候,就会自动启动map join,将小表广播出去。广播后的join操作不需要进行shuffle,在本地机器就能完成。

用该参数 hive.mapjoin.cache.numrows 控制触发map join的行数大小

用该参数 hive.mapjoin.smalltable.filesize 控制触发map join的文件大小

数据倾斜优化

4.group by数据倾斜自动均衡

hive.groupby.skewindata,默认值false

原理还是预聚合,为了实现预聚合,它实行了两阶段聚合法,生成了2个MapReduce。

这个参数并不常用。

5.join数据倾斜自动均衡

hive.optimize.skewjoin,默认为false

原理是再生成一个MapReduce将倾斜的key单独处理。参考我们处理join的数据倾斜的时候用union的方法

可以用hive.skewjoin.key来判断该key是否属于倾斜,如果该key的行数超过这个参数,则认为该key倾斜,需要在第二个MapReduce中单独处理

还可以用hive.skewjoin.mapjoin.map.tasks控制第二个MapReduce里的map的数量。

调整task的数量

6.调整map的数量

可以通过切分的数据大小来控制map的数量

mapred.max.split.size    如果dfs.block.size的大小超过了该参数,每个map的文件大小就是该参数的大小

mapred.min.split.size     如果dfs.block.size的大小比该参数还小,每个map的大小就是该参数的大小

如果dfs.block.size在该参数两者之间,那每个map的文件大小就是dfs.block.size的大小,即每个block一个map

设定了每个map的大小,根据整个文件的大小,就可以得到map的数量。

你也可以指定参数mapred.map.tasks,来控制map的数量,但是这不一定起效果

原因在于,block的文件大小在mapred.max.split.size和mapred.min.split.size 之间的时候

该参数不起作用mapred.map.tasks,map的数量还是block的数量

当block的文件大小不在mapred.max.split.size和mapred.min.split.size 之间的时候

若block比min还小:map的大小以mapred.min.split.size来定(确定了大小也就确定了数量)

若block比max还大:若该参数的值得到一个map的大小(总文件大小除以该参数),比block大,则无效,比max小,也无效

只有当该map大小,在block和max之间的时候,才会有效,总结一下,就是该参数是mapred.max.split.size的修正。

7.调整reduce的数量

mapred.reduce.tasks可以直接指定reduce的时候的数量

也可以指定reduce能够处理的最大数据量hive.exec.reducers.bytes.per.reducer

或者是指定每个job的最大reducer数量hive.exec.reducers.max,

以小的那个为准:reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)

如果reduce的数量过多,就会产生多个小文件,如果reduce过少,就会增大每个reduce的负担

MR整体优化

8.合并小文件

hive.merge.mapfiles 如果该MapReduce里没有reduce,该参数为true时,程序写入硬盘时会自动合并小文件

hive.merge.mapredfiles 如果该MapReduce里有reduce,则使用该参数。

hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值

hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认值都是1GB,如果平均大小不足的话,就会另外启动一个任务来进行合并。

9.启动压缩

压缩是以时间换空间的一种做法,压缩带来的坏处是数据的压缩和解压带来的消耗,带来的好处是可以大大减小数据传输和存储的大小。

分布式系统的网络资源比较可贵,尤其是遇到shuffle操作的时候,需要大量的网络传输,此时大大减小数据量的大小是非常有帮助的。

而且压缩和解压都是map操作,在自己机器上就可以完成。

有些压缩算法甚至在压缩后还支持某些计算(大部分的计算都要将数据解压后才能进行)。

hive.exec.compress.intermediate,默认为true

指定压缩方式:hive.intermediate.compression.codec

10.JVM重用

JVM利用率低是MapReduce比spark慢的原因之一。MR默认每执行一个task就启动一个jvm,而spark是启动一个jvm称之为executor,不断的接收task任务,避免了JVM不断启动销毁的资源开销。

所以MR也对此进行了改良,可以指定一个jvm重复处理几个task

通过指定参数mapred.job.reuse.jvm.num.tasks,可以指定一个MR中,一个jvm操作多少个task。

11.严格模式

hive.mapred.mode,可以设置值为strict或者nostrict

严格模式阻止了下面3种查询方式:分区表全表查询不指定分区,全局排序不指定limit,在非map join 的join中使用笛卡尔积

这3种操作都是有可能严重拖垮资源的行为。

12.本地模式

使用单台机器实现数据的计算,减小集群计算的消耗,适用于小数据量小计算量的任务

通过hive.exec.mode.local.auto=true开启。

可以通过hive.exec.mode.local.auto.inputbytes.max和hive.exec.mode.local.auto.tasks.max 设置数据文件大小和map数来决定是否触发本地模式

另外,reduce数量不能超过1个,否则也触发不了。

13.不同的job之间并发运行

将hive.exec.parallel设置为true即可。通过hive.exec.parallel.thread.number可以设定并行执行的线程数。

但是要保证job之间没有依赖关系,如果两个job是由同一个sql生成的, 或者MR之间指定了顺序,则无法并发进行。