Hive学习笔记一

时间:2024-07-17 17:36:56

1. Load的使用

//在1.x版本中定义long数据类型会报错(用bigint代替)
create table t_load_stu(name string,age bigint)
row format
delimited fields terminated by ',';

1.1本地在本地导入数据,本地数据不会删除(相当于复制)

//overwrite覆盖表中所有的数据
load data local inpath 文件路径 [overwrite] into table 表名
eg:load data local inpath '/root/apps/hive-data/local_load_stu.dat'
into table t_load_stu;

1.2在hadoop中导入数据,hadoop上的数据被删除(相当于剪切)

load data inpath hadoop上文件目录 into table tab_name
eg:load data inpath '/hivedata/load_stu.dat' into table t_load_stu;
load data inpath '/hivedata/load_stu.dat' overwrite into table t_load_stu;

2.Insert的使用

 (1)创建源数据库
create external table org_stu(phone string,real_name string,age int,
gender int,country string,ip string,creat_time date,creator string)
row format
delimited fields terminated by ','
location '/hivedata';
//修改表中某个字段的类型
alter table org_stu change gender gender string;
//加入本地数据
load data local inpath '/root/apps/hive-data/org_stu.dat' into table org_stu;
(2)为查询出来的数据插入创建表
create table t_copy_stu1(phone string,real_name string,age int)
row format
delimited fields terminated by ',';
(3)创建分区表
create table t_copy_partition_stu(phone string,real_name string,ip string)
partitioned by(creat_time string)
row format
delimited fields terminated by ',';
(4)多重插入准备两张表
create table t_mult_stu_1(phone string,real_name string)
row format
delimited fields terminated by ',';
create table t_mult_stu_2(phone string,gender string,age int)
row format
delimited fields terminated by ',';

2.1将查询出来的结果插入到一个表中

 语法:insert [overwrite]/into table  表名
select 字段 from 表名 where 条件
eg:insert overwrite table t_copy_stu1 select phone,real_name,age from org_stu;

2.2将查询出来的结果作为插入到表中的某个分区中(自动分区模式)

设置自动分区模式:set hive.exec.dynamic.partition.mode=true;
语法:insert overwrite table 表名 partition (分区字段名)
select 字段 from 表名 where 条件
eg:insert overwrite table t_copy_partition_stu partition(creat_time)
select phone,real_name,ip,substring(creat_time,0,10) as creat_time from org_stu;

2.3多重插入

语法:from 表名
insert into table 表名 select 字段名称 where 查询条件
...
eg:from org_stu
insert into table t_mult_stu_1 select phone,real_name
insert into table t_mult_stu_2 select phone,gender,age;

3Select

 表:
create table t_stu_limit_20(sno int,name string,age int,sdp string)
row format
delimited fields terminated by ',';
注:1、set hive.exec.reducers.bytes.per.reducer=<number>,设置每个reducer最大处理的数据大小(单位:字节)来计算出需要多少个reducer去处理,默认大小是:256000000
2、set hive.exec.reducers.max=<number>,设置reducer的最大值,如果上面设置的算出来的reducer的个数超过max的值,以max的值为准.如果上面的计算出reudcer小于max,以小的为准。(默认值是:1009)
3、set mapreduce.job.reduces=<number>固定的设置reducer的个数

3.1 distribute by(字段)根据指定的字段将数据分到不同的reducer,且分发算法是hash散列算法

  注:跟reducer的个数有关,不具有排序
eg:select * from t_stu_limit_20 distribute by sno;

3.2 sort by(字段) 不是全局排序,其在数据进入reducer前完成排序。

  注:如果用sort by 进行排序并且设置mapred.reduce.tasks>1,则sort by 只能保证每个reducer的输出有序,不保证全局有序
eg:select * from t_stu_limit_20 distribute by sno sort by age;

3.3 order by(字段) 会对输入做全局排序

  注:只有一个reducer,缺点是当输入规模较大时需要较长的计算时间
(Number of reduce tasks determined at compile time: 1)
eg:select * from t_stu_limit_20 order by age;

3.4 cluster by(字段) 除了具有distribute by 功能外还可以对该字段进行排序

  注:如果分桶和sort是同一个字段,此时cluster by =distribute by +sort by
eg:insert overwrite local directory '/root/test' select * from t_stu_limit_20 cluster by sno;

分桶的作用:最大的作用是用来提高join操作的效率

补充:如何才能在yarn的管理控制台上查看已经完成的job信息?

需要在hadoop集群上启动jobhistory服务器,执行的命令是:mr-jobhistory-daemon.sh start
historyserver

4 join

注:

1)目前hive只支持等值得join,不支持非等值的连接,因为非等值join很难转化为map/reducer任务

2)可以join多于2个表,执行流程分析

 情况一:如果join多个表时,join key 是同一个,则join会被转化为单个map/reduce任务
eg:select a.val,b.val,c.val from a join b on(a.key=b.key1) join c
on (c.key=b.key1)
情况二:如果join key非同一个,则join会被转化为多个map/reduce的任务
eg:select a.val,b.val,c.val from a join b on(a.key=b.key1) join c
on(c.key=b.key2)
分析:join被转化为2个map/reduce任务。因为b.key1用于第一次join条件,而b.key2用于第二次join.

3)join时,每次map/reduce的任务逻辑

  说明:reducer会缓存join序列中除了最后一个表所有表的记录,再通过最后一个表将结果序列化到文件系统中。
优点:这一实现有助于在reduce端减少内存的使用量。
注:在实践中应该把最大的那个表放在最后,否则会因为缓存浪费大量的缓存。
eg:1)select a.val,b.val,c.val from a join b on(a.key=b.key1) join c
on (c.key=b.key1)
说明:所有的表都使用同一个join key(使用一次map/reduce任务计算)。reduce端会缓存a表和b表的记录,然后每次取得一个c表记录就计算一次join结果。
2)select a.val,b.val,c.val from a join b on(a.key=b.key1) join c
on(c.key=b.key2)
说明:join key不同,这里用了2次map/reduce任务。第一次缓存a表用b表序列化,第二次缓存第一次map/reduce任务的结果,然后用c表序列化。

4)LEFT、RIGHT、FULL 、OUTER关键字用于处理join中空记录的情况

  说明:和数据库处理的差不多

5)join发生在where字句之前

  实际场景问题:select a.val,b.val from a left outer join(a.key=b.key) where a.ds='2016-12-30' and b.ds='2016-12-30'
问题描述:如果b找不到对应的a表的记录,b表所有列都会列出null,包括ds列。也就是说join会过滤b表中不能找到匹配a表join key的所有记录,导致LEFT OUTER与where子句无关。
解决方案:在left out时使用条件
select a.val,b.val from a left outer join on(a.key=b.key and b.ds='2016-12-29' and a.ds='2016-12-29')
说明:这一查询的结果是预先在join阶段过滤的,所以不会存在上述的问题。这一逻辑可以用于right 和full类型的join中。

6)join是不能交换位置的

 说明:无论是left还是right都是通过左连接的。
eg:select a.val1,a.val2,b.val,c.val from a join b on(a.key=b.key) left outer join c on(a.key=b.key)
分析:先join a表到b表的记录,丢弃掉所有join key中不匹配的记录,然后用这中间结果和c表做join.也就是说,就是当一个key在a表和c表都存在,但在b表不存在的时候,整个记录在第一次join,即a join b的时候被丢弃掉了(包括a.val1,a.val2和a.key),然后我们在跟c表join的时候,如果c.key与a.key或b.key相等,就会得到这样的结果:null,null,null,c.val

5hive参数配置

5.1hive的命令行

  语法结构:
hive [-hiveconf x=y]*[<-i filename>]*[<-f filename>|-e query-string][-s]
说明:
1、-i从文件中初始化hql;
2、-e从命令行执行指定的hql
3、-f执行hql脚本
4、-v输出执行的hql语句到控制台
eg:命令:hive -e 'use db2;select * from t_load_stu';
结果:Time taken: 1.02 seconds
OK
zs 12
ls 15
wu 16

5.2参数配置方式

1)配置文件(全局有效)
说明:用户自定义配置文件:hive-site.xml
默认配置文件:hive-default.xml
注: 1>用户自定义配置文会覆盖默认配置;
2>hive会读取hadoop的配置,因为hive作为hadoop的客户端启动的,hive配置会覆盖hadoop的配置
2)命令行参数(对hive启动实例有效)
说明:-hiveconf param=value
eg:-hiveconf hive.root.logger=info,console
3)参数声明(对hive的连接session有效)
说明:可以在hql中使用set关键字来设定参数
eg:1>set hive.exec.reducers.bytes.per.reducer=>每个reduce task的平均负载数据量,hive会估算我们的总数据量,然后用总数据量除以上述参数值,就能得到需要运行的reduce task的数量
2>set hive.exec.reducers.max=>设置reduce task的上限
3>set mapreduce.job.reduces=>指定固定的reduce task的数量
注:这个参数在必要时<业务逻辑决定只能用一个reduce task>会忽略的(例如order by)

6hive函数

创建测试需要的表,方便函数的测试
1、create table dual(id string);
2、load一个文件(一行,一个空格)到dual表中

6.1自定义hive的函数(基于java开发语言)

 步骤:第一步:先开发一个java类,继承UDF,并重载evaluate方法;
第二步:打成jar包上传到服务器;
第三步:将jar包添加到hive的classpath(命令:add jar jar包的全路径);
第四步:创建临时函数与开发好的java class关联(命令:create temporary function 方法名称 as '定义方法类名的全路径');
eg:
1)创建一个java类,集成UDF
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
public class MyHiveFunctionUdf extends UDF {
// 重载方法
public String evaluate(String inStr) {
if (StringUtils.isEmpty(inStr)) {
return inStr;
}
return inStr.toLowerCase();
} public int evaluate(Integer... sumParams) {
if (sumParams == null || sumParams.length == 0)
return 0;
int totalValue = 0;
for (Integer sumParam : sumParams) {
if (sumParam != null) {
totalValue += sumParam;
}
}
return totalValue;
} public String evaluate(String... inStrs) {
StringBuffer sb = new StringBuffer();
if (inStrs == null || inStrs.length == 0) {
return "";
}
for (String inStr : inStrs) {
sb.append(inStr);
}
return sb.toString();
}
}
2)打成jar添加到hive的classpath中去
命令:hive> add jar /root/hive-udf.jar;
结果:Added [/root/hive-udf.jar] to class path(加载本地的classpath)
Added resources: [/root/hive-udf.jar](加载到distribute cache中分发到各个map/reduce中)
3)创建临时函数myfuns与com.hive.udf.MyHiveFunctionUdf类映射
命令:create temporary function myfuns as 'com.hive.udf.MyHiveFunctionUdf';
结果:OK
Time taken: 1.194 seconds
4)在hql中使用函数
命令:select myfuns('1','2','3') from dual;
结果:123
命令:select myfuns(1,2) from dual;
结果:3

6.2Transform实现hive自定义的函数

说明:hive的transform关键字提供了在sql中调用自写脚本的功能
eg:
1、创建容纳json数据的表
create table t_json(json_line string)
row format
delimited fields terminated by '\001';
2、使用get_json_object对json表中的json字符串进行解析并保存到新的表中
create table t_rating
as select get_json_object(json_line,'$.movie') as movieid,
get_json_object(json_line,'$.rate') as rate,
get_json_object(json_line,'$.timeStamp') as timestring,
get_json_object(json_line,'$.uid') as userid
from t_json
3、通过transform方式创建通过时间戳获取weekday
python的脚本:
#!/bin/python
import sys
import datetime
for line in sys.stdin:
line = line.strip()
movieid, rating, unixtime,userid = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([movieid, rating, str(weekday),userid])
创建一张记录表:
CREATE TABLE u_data_new (
movieid INT,
rating INT,
weekday INT,
userid INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
命令:add file /root/apps/hive-data/weekday-mapper.py(向每个map/reduce发送该脚本)
执行:
INSERT OVERWRITE TABLE u_data_new
SELECT
TRANSFORM (movieid , rate, timestring,userid)
USING 'python weekday-mapper.py'---使用脚本函数
AS (movieid, rating, weekday,userid)
FROM t_rating;
校验是否成功:
select distinct(weekday) from u_data_new limit 10;

7hvie特殊分割符处理

hive读取数据的机制:1、首选用InputFormat<默认是:org.apache.hadoop.mapred.TextInputFormat>的具体实现类读入文件数据,返回一条一条的记录(可以是行或自定义逻辑中的行)
2、然后用SerDe<默认是:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe>的一个具体实现类,对上面返回一条一条的记录进行字段切割

7.1通过自定义InputFormat解决特殊分隔符的问题

eg:
1、原始数据格式:zs||24
ls||27
2、重写InputFormat
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import com.google.common.base.Charsets;
public class TextInputFormatWrapper extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
LineRecordReader lineRecordReader = new LineRecordReader(job, (FileSplit) genericSplit, recordDelimiterBytes);
return new LineRecordReaderWrapper(lineRecordReader);
}
public static class LineRecordReaderWrapper implements RecordReader<LongWritable, Text> {
private LineRecordReader lineRecordReader;
public LineRecordReaderWrapper(LineRecordReader lineRecordReader) {
super();
this.lineRecordReader = lineRecordReader;
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
@Override
public LongWritable createKey() {
return lineRecordReader.createKey();
}
@Override
public Text createValue() {
return lineRecordReader.createValue();
}
@Override
public long getPos() throws IOException {
return lineRecordReader.getPos();
}
@Override
public float getProgress() throws IOException {
return lineRecordReader.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
boolean hasNext = lineRecordReader.next(key, value);
if (hasNext && value != null) {
String replaceResult = value.toString().replaceAll("\\|\\|", "\\|");
value.set(replaceResult);
}
return hasNext;
}
}
}
3、创建表使用该InputFormat
create table t_bi(name string,age int)
row format delimited fields terminated by '|'
stored as inputformat 'com.hadoop.extend.TextInputFormatWrapper'
outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
4、加载数据
load data local inpath '/root/apps/hive-data/bi.dat' into table t_bi;
5、测试结果是
select * from t_bi;
zs 24
ls 27

7.2使用RegexSerDe通过正则表达式来抽取字段

eg:
1、 创建表,使用serde为org.apache.hadoop.hive.serde2.RegexSerDe
create table t_bi_reg(name string,age int)
row format
serde 'org.apache.hadoop.hive.serde2.RegexSerDe'
with serdeproperties(
'input.regex'='(.*)\\|\\|(.*)',
'output.format.string'='%1$s %2$s'
)
stored as textfile;
2、加载数据
load data local inpath '/root/apps/hive-data/bi.dat' into table t_bi_reg;
3、测试结果是:
select * from t_bi_reg;
zs 24
ls 27
注:在数据量大的情况下上述方式比该方式效率高