Hive数据倾斜总结

时间:2023-11-09 20:28:56

倾斜的原因:

  使map的输出数据更均匀的分布到reduce中去,是我们的最终目标。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。

解决思路:

  Hive的执行是分阶段的,map处理数据量的差异取决于上一个stage的reduce输出,所以如何将数据均匀的分配到各个reduce中,就是解决数据倾斜的根本所在

具体办法:

内存优化和I/O优化:

  驱动表:使用大表做驱动表,以防止内存溢出;Join最右边的表是驱动表;Mapjoin无视join顺序,用大表做驱动表;StreamTable。

 1. Mapjoin是一种避免避免数据倾斜的手段

  允许在map阶段进行join操作,MapJoin把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多

在《hive:join遇到问题》有具体操作

  在对多个表join连接操作时,将小表放在join的左边,大表放在Jion的右边,

  在执行这样的join连接时小表中的数据会被缓存到内存当中,这样可以有效减少发生内存溢出错误的几率

  没有一个表足够小到能够放进内存:用到bucket map join。其方法是两个join表在join key上都做hash bucket,并且把你打算复制的那个(相对)小表的bucket数设置为大表的倍数。这样数据就会按照join key做hash bucket。小表依然复制到所有节点,map join的时候,小表的每一组bucket加载成hashtable,与对应的一个大表bucket做局部join,这样每次只需要加载部分hashtable就可以了。

 2. 设置参数 参考这里

  hive.map.aggr = true

  hive.groupby.skewindata=true 还有其他参数 

3.SQL语言调节

  比如: group by维度过小时:采用sum() group by的方式来替换count(distinct)完成计算

4.StreamTable

  将在reducer中进行join操作时的小table放入内存,而大table通过stream方式读取

5.索引

  Hive从0.80开始才有,提供了一个Bitmap位图索引,索引可以加快GROUP BY查询语句的执行速度,用的较少。

set mapreduce.map.memory.mb=1025;
set mapreduce.reduce.memory.mb=1025;

其他优化:

1、 列裁剪(Column pruning):只有需要用到的列才进行输出

2、 谓词下推(Predicate pushdown):尽早进行数据过滤(见图表 7中,下面为先处理的逻
辑),减少后续处理的数据量

3、 分区裁剪(Partition pruning):只读取满足分区条件的文件 
4、 map-join:对于join中一些小文件,可以在map阶段进行join操作,见3.2.2节map-join部分 
5、 join-reordering:将在reducer中进行join操作时的小table放入内存,而大table通过
stream方式读取 
6、 Group-by优化: 进行局部聚合进行优化(包括hash-based和sort-based),对于skew
的key(key的row num和size在reduce时非常不均)可以进行两次map-reduce的方式优化

Hive的配置参数比较保守,所以效率会比较差一点,修改配置会让查询效率有比较大的提升,记录几个对查询效率影响比较重要的参数。

元数据:

嵌套SQL并行执行优化:

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=16;

四、排序优化

Order by 实现全局排序,一个reduce实现,效率低

Sort by 实现部分有序,单个reduce输出的结果是有序的,效率高,通常和DISTRIBUTE BY关键字一起使用(DISTRIBUTE BY关键字 可以指定map 到 reduce端的分发key)

CLUSTER BY col1 等价于DISTRIBUTE BY col1 SORT BY col1.

五、合并小文件

文件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的结果文件来尽量消除这样的影响

hive.merge.mapfiles = true是否和并 Map 输出文件,默认为 True

hive.merge.mapredfiles = false是否合并 Reduce 输出文件,默认为 False

hive.merge.size.per.task = 256*1000*1000合并文件的大小。

这里的参数没有写到上面的表格里是因为这是可以根据任务不同临时设置的,而不一定非要是全局设置。有时候全局设置了反而对大文件的操作有性能影响。

六、使用分区,RCFile,lzo,ORCFile等

Hive中的每个分区都对应hdfs上的一个目录,分区列也不是表中的一个实际的字段,而是一个或者多个伪列,在表的数据文件中实际上并不保存分区列的信息与数据。Partition关键字中排在前面的为主分区(只有一个),后面的为副分区

静态分区:静态分区在加载数据和使用时都需要在sql语句中指定

例:(stat_date='20120625',province='hunan')

动态分区:使用动态分区需要设置hive.exec.dynamic.partition参数值为true,默认值为false,在默认情况下,hive会假设主分区时静态分区,副分区使用动态分区;如果想都使用动态分区,需要设置set hive.exec.dynamic.partition.mode=nostrick,默认为strick

例:(stat_date='20120625',province)