深入浅出Hive(二)

时间:2022-05-09 16:22:47

1. Hive工作原理详解

1.1 部件

  • 元存储(Metastore):存储“系统目录以及关于表、列、分区等的元数据”的组件,可以通过thrift接口查询得到,由于需要快速的提供到编译器中,所以使用关系型数据库管理系统(RDBMS)。
  • 驱动(Driver):控制HiveQL生命周期的组件,当HiveQL查询穿过Hive时。该驱动管理着会话句柄以及任何会话的统计。
  • 查询编译器(Query Compiler):将HiveQL编译成有向无环图(directed acyclic graph, DAG)形式的map/reduce任务。
  • 执行引擎(Execution Engine):依相依性顺序(dependency order)执行由编译器产生的任务。
  • Hive 服务器(HiveServer):提供“健壮的接口(thrift interface)、JDBC/ODBC 服务器以及提供一种整合Hive和其它应用的”组件。
  • 客户端组件:类似命令行接口CLI(Command Line Interface), web UI以及JDBC/ODBC驱动。包含了正反序列化(SerDe)以及对象观察器(ObjectInspector)接口的可扩展接口,类似于前述用户定义函数 UDF (User Defined Function)以及用户定义聚合函数UDAF(User Defined AggregateFunction)接口,允许用户定义自己的列函数。

1.2 工作流程简述

HiveQL通过CLI/web UI或者thrift 、odbc 或 jdbc接口的外部接口提交,经过complier编译器,运用Metastore中的云数据进行类型检测和语法分析,生成一个逻辑方案(logical plan),然后通过简单的优化处理,产生一个以有向无环图DAG数据结构形式展现的map-reduce任务。

深入浅出Hive(二)

1.3 查询编译器(Query Compiler)

用云存储中的元数据来生成执行计划,步骤如下:

  • 解析(parse):将HQL转化为抽象语法树AST;
  • 类型检查和语法分析(type checking and semantic analysis): 将抽象语法树转换此查询块(query block tree),并将查询块转换成逻辑查询计划(logic plan Generator);
  • 优化(optimization):重写查询计划(logical optimizer)-->将逻辑查询计划转成物理计划(physical plan generator)-->选择最佳的join策略(physical optimizer)
Hive完成列以下转换,作为优化阶段的一部分:
  • 列剪辑(column pruning):查询处理中唯一需要的列从行中投射出去。
  • 谓语下推(Predicate pushdown): 将只与一张表有关的过滤操作下推至TableScanOperator之后。
  • 分区剪辑(Partition pruning):过滤掉分区上不符合条件的字段。
  • Map 端的连接(Map side joins):当join的表很小时,在map段先复制它然后再进行join,格式如下:
     
    SELECT /*+ MAPJOIN(t2) */ t1.c1, t2.c1 FROM t1 JOIN t2 ON(t1.c2 = t2.c2);
     由hive.mapjoin.size.key以及hive.mapjoin.cache.numrows控制“任何时间保存在内存中的”表中行的数量,以及提供给系统联合键的大小。
  • 连接再排序(Join reordering):把较小的表保存在内存中,较大的表进行遍历操作,保证系统内存不溢出。

1.4 MapJoin进一步优化

  • 数据再分区以把控GROUP BY形成的非对称(skews):用两个MapReduce来做,第一个阶段将数据随机分发(或者按DISTINCT列分发在DISTINCT聚合的情况下)至reducers,并且计算聚合值;然后这些聚合结果按照GROUP BY 列分发给在第二个Reducer。
set hive.groupby.skewindata= true ;
SELECT t1.c1, sum(t1.c2)
FROM t1
GROUP BY t1.c1;
  • mappers中的基于哈希的局部聚合:相当于combiner,在map端内存中进行聚合,然后发送给reducers,参数hive.map.aggr.hash.percentmemory说明了mapper内存中可用于把控哈希表那部分的数量。如0.5能确保哈希表大小一旦超过用于mapper的最大内存的一半,存储在那儿的部分聚合就被发送到reducers了。hive.map.aggr.hash.min.reduction参数同样也用来控制用于mappers的内存数量。

1.5 其他优化

  • Left Semi Join实现in/exists子查询:
SELECT A.* FROM A LEFT SEMI JOIN B ON(A.KEY = B.KEY AND B.KEY > 100);

等同于

SELECT A.* FROM A WHERE A.KEY IN(SELECT B.KEY FORM B WHERE B.KEY > 100);
作用:map端用group by减少流入reduce端的数据量

  • Bucket Map Join:
set hive.optimize.bucketmapjoin = true;
和Map join一起工作;
所有join的表都做列分桶,同时大表桶的数量是小表桶的整数倍;
做bucket的列必须是join的列;

SELECT /*+MAPJOIN(a,c)*/ a.*, b.*, c.*
a join b on a.key = b.key
join c on a.key=c.key;
在现实的生产环境中,会有成百上千个buckets;

  • Skew join:
join时数据倾斜,造成Reduce端OOM
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = 阀值;

当JOIN得到的map超过阀值时,将内存中的a-k1/b-k1数据分别存入hdfs中,然后遍历完后再对hdfs上的两块数据做Map Join,和其它key一起组成最后结果

2.Hive体系结构

Hive体系结构全解

2.1 数据模型结构

  • Hive数据库:类似传统数据库的DataBase,在第三方数据库里实际是一张表。简单示例命令行 hive > create database test_database;
  • 内部表:
    Hive的内部表与数据库中的Table在概念上是类似。每一个Table在Hive中都有一个相应的目录存储数据。例如一个表pvs,它在HDFS中的路径为/wh/pvs,其中wh是在hive-site.xml中由${hive.metastore.warehouse.dir} 指定的数据仓库的目录,所有的Table数据(不包括External Table)都保存在这个目录中。删除表时,元数据与数据都会被删除。
    内部表简单示例:
    创建数据文件:test_inner_table.txt
    创建表:create table test_inner_table (key string)
    加载数据:LOAD DATA LOCAL INPATH ‘filepath’ INTO TABLE test_inner_table
    查看数据:select * from test_inner_table;  select count(*) from test_inner_table
    删除表:drop table test_inner_table

  • 外部表:
    外部表指向已经在HDFS中存在的数据,可以创建Partition。它和内部表在元数据的组织上是相同的,而实际数据的存储则有较大的差异。内部表的创建过程和数据加载过程这两个过程可以分别独立完成,也可以在同一个语句中完成,在加载数据的过程中,实际数据会被移动到数据仓库目录中;之后对数据对访问将会直接在数据仓库目录中完成。删除表时,表中的数据和元数据将会被同时删除。而外部表只有一个过程,加载数据和创建表同时完成(CREATE EXTERNAL TABLE ……LOCATION),实际数据是存储在LOCATION后面指定的 HDFS 路径中,并不会移动到数据仓库目录中。当删除一个External Table时,仅删除该链接。
    外部表简单示例:
    创建数据文件:test_external_table.txt
    创建表:create external table test_external_table (key string)
    加载数据:LOAD DATA INPATH ‘filepath’ INTO TABLE test_inner_table
    查看数据:select * from test_external_table;  •select count(*) from test_external_table
    删除表:drop table test_external_table

  • 分区
    Partition对应于数据库中的Partition列的密集索引,但是Hive中Partition的组织方式和数据库中的很不相同。在Hive中,表中的一个Partition对应于表下的一个目录,所有的Partition的数据都存储在对应的目录中。例如pvs表中包含ds和city两个Partition,则对应于ds = 20090801, ctry = US 的HDFS子目录为/wh/pvs/ds=20090801/ctry=US;对应于 ds = 20090801, ctry = CA 的HDFS子目录为/wh/pvs/ds=20090801/ctry=CA。
    分区表简单示例:
    创建数据文件:test_partition_table.txt
    创建表:create table test_partition_table (key string) partitioned by (dt string)
    加载数据:LOAD DATA INPATH ‘filepath’ INTO TABLE test_partition_table partition (dt=‘2006’)
    查看数据:select * from test_partition_table;  select count(*) from test_partition_table
    删除表:drop table test_partition_table


  • Buckets是将表的列通过Hash算法进一步分解成不同的文件存储。它对指定列计算hash,根据hash值切分数据,目的是为了并行,每一个Bucket对应一个文件。例如将user列分散至32个bucket,首先对user列的值计算hash,对应hash值为0的HDFS目录为/wh/pvs/ds=20090801/ctry=US/part-00000;hash值为20的HDFS目录为/wh/pvs/ds=20090801/ctry=US/part-00020。如果想应用很多的Map任务这样是不错的选择。
    桶的简单示例:
    创建数据文件:test_bucket_table.txt
    创建表:create table test_bucket_table (key string) clustered by (key) into 20 buckets
    加载数据:LOAD DATA INPATH ‘filepath’ INTO TABLE test_bucket_table
    查看数据:select * from test_bucket_table;  set hive.enforce.bucketing = true;

  • Hive的视图
    视图与传统数据库的视图类似。视图是只读的,它基于的基本表,如果改变,数据增加不会影响视图的呈现;如果删除,会出现问题。如果不指定视图的列,会根据select语句后的生成。
    示例:create view test_view as select * from test

2.2 操作符

Hive编译器将一个Hive QL转换操作符。操作符Operator是Hive的最小的处理单元,每个操作符代表HDFS的一个操作或者一道MapReduce作业。Operator都是hive定义的一个处理过程。

操作符如下:
TableScanOperator:扫描hive表数据
ReduceSinkOperator:创建将发送到Reducer端的<Key,Value>对
JoinOperator:Join两份数据
SelectOperator:选择输出列
FileSinkOperator:建立结果数据,输出至文件
FilterOperator:过滤输入数据
GroupByOperator:GroupBy语句
MapJoinOperator:/*+mapjoin(t) */
LimitOperator:Limit语句
UnionOperator:Union语句

Hive通过ExecMapper和ExecReducer执行MapReduce任务。在执行MapReduce时有两种模式,即本地模式和分布式模式 。

2.3 Join详解

 Join操作的一些注意事项:
    (1)Hive只支持等值连接(equality joins)、外连接(outer joins)和(left/right joins)。Hive不支持所有非等值的连接,因为非等值连接非常难转化到map/reduce任务。
    (2)Hive 支持多于2个表的连接。
    (3)join时,每次 map/reduce 任务的逻辑: reducer 会缓存 join 序列中除了最后一个表的所有表的记录, 再通过最后一个表将结果序列化到文件系统。这一实现有助于在reduce端减少内存的使用量。实践中,应该把最大的那个表写在最后(否则会因为缓存浪费大量内存)。
    (4)LEFT,RIGHT 和 FULL OUTER 关键字用于处理 join 中空记录的情况。
    (5)LEFT SEMI JOIN 是 IN/EXISTS 子查询的一种更高效的实现。Hive 当前没有实现 IN/EXISTS 子查询,所以你可以用 LEFT SEMI JOIN 重写你的子查询语句。LEFT SEMI JOIN的限制是, JOIN子句中右边的表只能在ON子句中设置过滤条件,在WHERE子句、SELECT子句或其他地方过滤都不行。
join的缓存和任务转换

  • hive转换多表join时,如果每个表在join字句中,使用的都是同一个列,只会转换为一个单独的map/reduce。
  • 在join的每个map/reduce阶段,序列中的最后一个表,当其他被缓存时,它会流到reducers。所以,reducers需要缓存join关键字的特定值组成的行,通过组织最大的表出现在序列的最后,有助于减少reducers的内存
  • 在join的每个map/reduce阶段,通过关键字,可以指定哪个表从流接收。
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)