1. Hive工作原理详解
1.1 部件
- 元存储(Metastore):存储“系统目录以及关于表、列、分区等的元数据”的组件,可以通过thrift接口查询得到,由于需要快速的提供到编译器中,所以使用关系型数据库管理系统(RDBMS)。
- 驱动(Driver):控制HiveQL生命周期的组件,当HiveQL查询穿过Hive时。该驱动管理着会话句柄以及任何会话的统计。
- 查询编译器(Query Compiler):将HiveQL编译成有向无环图(directed acyclic graph, DAG)形式的map/reduce任务。
- 执行引擎(Execution Engine):依相依性顺序(dependency order)执行由编译器产生的任务。
- Hive 服务器(HiveServer):提供“健壮的接口(thrift interface)、JDBC/ODBC 服务器以及提供一种整合Hive和其它应用的”组件。
- 客户端组件:类似命令行接口CLI(Command Line Interface), web UI以及JDBC/ODBC驱动。包含了正反序列化(SerDe)以及对象观察器(ObjectInspector)接口的可扩展接口,类似于前述用户定义函数 UDF (User Defined Function)以及用户定义聚合函数UDAF(User Defined AggregateFunction)接口,允许用户定义自己的列函数。
1.2 工作流程简述
HiveQL通过CLI/web UI或者thrift 、odbc 或 jdbc接口的外部接口提交,经过complier编译器,运用Metastore中的云数据进行类型检测和语法分析,生成一个逻辑方案(logical plan),然后通过简单的优化处理,产生一个以有向无环图DAG数据结构形式展现的map-reduce任务。
1.3 查询编译器(Query Compiler)
用云存储中的元数据来生成执行计划,步骤如下:
- 解析(parse):将HQL转化为抽象语法树AST;
- 类型检查和语法分析(type checking and semantic analysis): 将抽象语法树转换此查询块(query block tree),并将查询块转换成逻辑查询计划(logic plan Generator);
- 优化(optimization):重写查询计划(logical optimizer)-->将逻辑查询计划转成物理计划(physical plan generator)-->选择最佳的join策略(physical optimizer)
- 列剪辑(column pruning):查询处理中唯一需要的列从行中投射出去。
- 谓语下推(Predicate pushdown): 将只与一张表有关的过滤操作下推至TableScanOperator之后。
- 分区剪辑(Partition pruning):过滤掉分区上不符合条件的字段。
-
Map 端的连接(Map side joins):当join的表很小时,在map段先复制它然后再进行join,格式如下:
SELECT /*+ MAPJOIN(t2) */ t1.c1, t2.c1 FROM t1 JOIN t2 ON(t1.c2 = t2.c2);
由hive.mapjoin.size.key以及hive.mapjoin.cache.numrows控制“任何时间保存在内存中的”表中行的数量,以及提供给系统联合键的大小。
- 连接再排序(Join reordering):把较小的表保存在内存中,较大的表进行遍历操作,保证系统内存不溢出。
1.4 MapJoin进一步优化
- 数据再分区以把控GROUP BY形成的非对称(skews):用两个MapReduce来做,第一个阶段将数据随机分发(或者按DISTINCT列分发在DISTINCT聚合的情况下)至reducers,并且计算聚合值;然后这些聚合结果按照GROUP BY 列分发给在第二个Reducer。
set hive.groupby.skewindata= true ; SELECT t1.c1, sum(t1.c2) FROM t1 GROUP BY t1.c1;
-
mappers中的基于哈希的局部聚合:相当于combiner,在map端内存中进行聚合,然后发送给reducers,参数hive.map.aggr.hash.percentmemory说明了mapper内存中可用于把控哈希表那部分的数量。如0.5能确保哈希表大小一旦超过用于mapper的最大内存的一半,存储在那儿的部分聚合就被发送到reducers了。hive.map.aggr.hash.min.reduction参数同样也用来控制用于mappers的内存数量。
1.5 其他优化
- Left Semi Join实现in/exists子查询:
SELECT A.* FROM A LEFT SEMI JOIN B ON(A.KEY = B.KEY AND B.KEY > 100);
等同于
SELECT A.* FROM A WHERE A.KEY IN(SELECT B.KEY FORM B WHERE B.KEY > 100);作用:map端用group by减少流入reduce端的数据量
- Bucket Map Join:
set hive.optimize.bucketmapjoin = true;和Map join一起工作;
所有join的表都做列分桶,同时大表桶的数量是小表桶的整数倍;
做bucket的列必须是join的列;
SELECT /*+MAPJOIN(a,c)*/ a.*, b.*, c.* a join b on a.key = b.key join c on a.key=c.key;在现实的生产环境中,会有成百上千个buckets;
- Skew join:
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = 阀值;
当JOIN得到的map超过阀值时,将内存中的a-k1/b-k1数据分别存入hdfs中,然后遍历完后再对hdfs上的两块数据做Map Join,和其它key一起组成最后结果
2.Hive体系结构
2.1 数据模型结构
- Hive数据库:类似传统数据库的DataBase,在第三方数据库里实际是一张表。简单示例命令行 hive > create database test_database;
-
内部表:
Hive的内部表与数据库中的Table在概念上是类似。每一个Table在Hive中都有一个相应的目录存储数据。例如一个表pvs,它在HDFS中的路径为/wh/pvs,其中wh是在hive-site.xml中由${hive.metastore.warehouse.dir} 指定的数据仓库的目录,所有的Table数据(不包括External Table)都保存在这个目录中。删除表时,元数据与数据都会被删除。内部表简单示例:
创建数据文件:test_inner_table.txt创建表:create table test_inner_table (key string)
加载数据:LOAD DATA LOCAL INPATH ‘filepath’ INTO TABLE test_inner_table
查看数据:select * from test_inner_table; select count(*) from test_inner_table
删除表:drop table test_inner_table
-
外部表:
外部表指向已经在HDFS中存在的数据,可以创建Partition。它和内部表在元数据的组织上是相同的,而实际数据的存储则有较大的差异。内部表的创建过程和数据加载过程这两个过程可以分别独立完成,也可以在同一个语句中完成,在加载数据的过程中,实际数据会被移动到数据仓库目录中;之后对数据对访问将会直接在数据仓库目录中完成。删除表时,表中的数据和元数据将会被同时删除。而外部表只有一个过程,加载数据和创建表同时完成(CREATE EXTERNAL TABLE ……LOCATION),实际数据是存储在LOCATION后面指定的 HDFS 路径中,并不会移动到数据仓库目录中。当删除一个External Table时,仅删除该链接。
外部表简单示例:
创建数据文件:test_external_table.txt
创建表:create external table test_external_table (key string)
加载数据:LOAD DATA INPATH ‘filepath’ INTO TABLE test_inner_table
查看数据:select * from test_external_table; •select count(*) from test_external_table
删除表:drop table test_external_table
-
分区
Partition对应于数据库中的Partition列的密集索引,但是Hive中Partition的组织方式和数据库中的很不相同。在Hive中,表中的一个Partition对应于表下的一个目录,所有的Partition的数据都存储在对应的目录中。例如pvs表中包含ds和city两个Partition,则对应于ds = 20090801, ctry = US 的HDFS子目录为/wh/pvs/ds=20090801/ctry=US;对应于 ds = 20090801, ctry = CA 的HDFS子目录为/wh/pvs/ds=20090801/ctry=CA。分区表简单示例:
创建数据文件:test_partition_table.txt
创建表:create table test_partition_table (key string) partitioned by (dt string)
加载数据:LOAD DATA INPATH ‘filepath’ INTO TABLE test_partition_table partition (dt=‘2006’)
查看数据:select * from test_partition_table; select count(*) from test_partition_table
删除表:drop table test_partition_table
-
桶
Buckets是将表的列通过Hash算法进一步分解成不同的文件存储。它对指定列计算hash,根据hash值切分数据,目的是为了并行,每一个Bucket对应一个文件。例如将user列分散至32个bucket,首先对user列的值计算hash,对应hash值为0的HDFS目录为/wh/pvs/ds=20090801/ctry=US/part-00000;hash值为20的HDFS目录为/wh/pvs/ds=20090801/ctry=US/part-00020。如果想应用很多的Map任务这样是不错的选择。
桶的简单示例:
创建数据文件:test_bucket_table.txt
创建表:create table test_bucket_table (key string) clustered by (key) into 20 buckets
加载数据:LOAD DATA INPATH ‘filepath’ INTO TABLE test_bucket_table
查看数据:select * from test_bucket_table; set hive.enforce.bucketing = true;
-
Hive的视图
视图与传统数据库的视图类似。视图是只读的,它基于的基本表,如果改变,数据增加不会影响视图的呈现;如果删除,会出现问题。如果不指定视图的列,会根据select语句后的生成。
示例:create view test_view as select * from test
2.2 操作符
Hive编译器将一个Hive QL转换操作符。操作符Operator是Hive的最小的处理单元,每个操作符代表HDFS的一个操作或者一道MapReduce作业。Operator都是hive定义的一个处理过程。
操作符如下:
TableScanOperator:扫描hive表数据
ReduceSinkOperator:创建将发送到Reducer端的<Key,Value>对
JoinOperator:Join两份数据
SelectOperator:选择输出列
FileSinkOperator:建立结果数据,输出至文件
FilterOperator:过滤输入数据
GroupByOperator:GroupBy语句
MapJoinOperator:/*+mapjoin(t) */
LimitOperator:Limit语句
UnionOperator:Union语句
Hive通过ExecMapper和ExecReducer执行MapReduce任务。在执行MapReduce时有两种模式,即本地模式和分布式模式 。
2.3 Join详解
-
hive转换多表join时,如果每个表在join字句中,使用的都是同一个列,只会转换为一个单独的map/reduce。
- 在join的每个map/reduce阶段,序列中的最后一个表,当其他被缓存时,它会流到reducers。所以,reducers需要缓存join关键字的特定值组成的行,通过组织最大的表出现在序列的最后,有助于减少reducers的内存
- 在join的每个map/reduce阶段,通过关键字,可以指定哪个表从流接收。
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)