2021年7月份加入了当前项目组,以一个原汁原味的Java开发工程师的身份进来的,来了没多久,项目组唯一一名大数据开发工程师要离职了,一时间一大堆的数据需求急需人来接手,此刻又招不来新的数据开发。没辙,我和同组的另一位Java开发同事算是临危受命,接下了大数据方面的工作,开启了Java工程师从0到1搞大数据的漫长旅途,开始的磕磕碰碰叫苦不堪到如今的还算得心应手,已经整整16个月了,16个月期间双向支持着数据分析和后端开发的工作,两者时而穿插时而并行处理,大数据工作占得比重之多,有时让我怀疑我还是不是一名纯粹的Java开发工作者,当我看见假期值班表中我的角色填写一项变成“B端后端/数据”时,我就知道我已经不纯粹了。
1.Sql -- 大数据分析的灵魂
搞大数据究竟每天在做些什么?坦白讲,情况和我想象的不太一样,因为做大数据开发时最最主要的工作居然写Sql,曾经我还以为它是有一套刁钻困难冷门的牛逼技术,将海量数据玩弄于股掌之中。现在看来,我是每天和各种各样的大数据表打交道,在大数据平台用sql提取出业务方想要的信息,有时会出各式各样的数据报表,有时是为C端项目服务,提供底层海量数据计算的支持,有时是为各种数据看板服务,提供他们想要的销量排行了、人群覆盖情况了诸类。工作前两年纯粹写java时也是对sql有所研究的,毕竟数据持久层的交互离不开sql,然后搞了大数据才明白,之前写的sql都是小儿科,现在一条sql写上百行那都是常有的事,而且最开始解读大sql时总是慢半拍,好久才能搞明白前辈留下的交接文档表达的是什么,现在不一样了,看见那些sql都亲切很多,很多需求提出来总能迅速想到sql解决的方案,下面呢,我就开始分享一些我在写大sql时经常会使用的一些语法,这些语法可能针对于只做Java的人并不会经常性的熟练使用。
1.1with.. as..
with temp1 as (
select * from ... where ..
),
temp2 as (
select * from ... where...
),
...
tempn as (
select * from ... inner join ... where
)
select a.*,b.*,c.* from temp1 a inner join temp2 b on a.id = b.id left join ..tempn c on a.iid = c.iid where ...
模板中的temp1,temp2,tempn都可以看做这个sql执行过程中的临时表,存在周期仅限于执行这条sql期间,sql执行完毕临时表也销毁,并且和其他的sql是相互隔离的,下面的sql都可以使用之前的产生的临时表(temp2就可以使用temp1的结果),使用with时最后一定跟的select语句,当然,跟的是insert into table ...... select * from也是可以的。
使用with..as..语法大大提高了长Sql的解读性。
之前一直以为这个HiveSql特有的语法,后来才发现在mysql中也可以使用,只不过是mysql8.0以后的版本可以使用,之前的版本是没有这个语法的。
1.2开窗函数:row_number() over(partition by file order by file2 desc/asc)
select row_number() over(partition by userid order by pay_time desc) as rn,userid,name,order_cd,goods_name,pay_time
from db_dw.table _order
having rn = 1
这个sql的作用就是找出每个用户的最新付款的那笔订单的订单信息。
实现思路就是利用开窗函数按照用户id分组,再按照付款时间倒叙排序,给每组的数据加上一个rn的编号,每组的第一条rn 都等于 1 ,第二条rn = 2,以此类推,再通过having函数将结果中rn = 1的数据全取出来,这样就能通过单条sql完成取每一个用户最新一条订单的数据需求。
1.3开窗函数lag(field, num, defaultvalue) over(partition by ..order by ..) 与 lead() over()
select lag(pay_time,1,NULL) over(partition by userid order by pay_time asc) as last_pay_time,userid,name,order_cd,goods_name,pay_time
from db_dw.table _order
select lead(pay_time,1,NULL) over(partition by userid order by pay_time asc) as next_pay_time,userid,name,order_cd,goods_name,pay_time
from db_dw.table _order
- lag(field, num, defaultvalue),其中fied是要查询的字段,num是向前取几行,defaultvalue是取不到值时的默认值。向上面案例中那样,假设按照userid分组后又按照pay_time排序了,第一个查出来的用户刚好也有多条不同pay_time的数据,那么查询结果应该是第一行数据为last_pay_time为NULL,pay_time为该用户的最小的时间,第二行数据的last_pay_time等于第一行数据的pay_time值,而pay_time为第二小的时间。
- lead(field, num, defaultvalue),其中fied是要查询的字段,num是向后取几行,defaultvalue是取不到值时的默认值。向上面案例中那样,假设按照userid分组后又按照pay_time排序了,第一个查出来的用户刚好也有多条不同pay_time的数据,那么查询结果应该是第一行数据为next_pay_time为第二行的pay_time,pay_time为该用户的最小的时间,第二行数据的next_pay_time等于第三行数据的pay_time值,而pay_time为第二小的时间,而该用户的最后一行的next_pay_time则为NULL。
1.4case when <条件1> then <结果1> when <条件2> then <结果2> else <剩余数据的结果> end as 字段名
-- 将用户年龄按照18岁及以下,18岁至65岁,65岁以上分类
select case when age<=18 then '未成年' when age>18 and age <=65 then '青中年' else '老年' end as age_group,name,age,sex
from user
case when 语法其实就是java语言的if...else if ...else if...else,当满足条件时就进入该分支,不满足的话就一直进入下面的分支,最后所有条件都不满足则进入else分支,通常在Sql中我们使用case when then进行一些归纳分类,譬如我们的电商涉及到的商品种类众多,可能需要按照某些规则进行分类,就免不了使用该语法。
1.5union,union all
select name from A
union
select name from B
select name from A
union all
select name from B
- 想去重使用union,不去重完全放一起使用union all
- 假设A表中某列有重复数据,然后A表和B表进行union,A表中的那列数据自动的去重,不仅仅是把B表中的那列和A表重复的数据去重。像案例中的union后的结果一样,所得的name不会有一条重复数据,相当于整体的distinct了一下。
- union 和 union all查询数据结果只以第一句sql的字段名称为准,后续的sql只按照顺序匹配,不会识别字段名称
1.6partition分区使用
-- 创建hive分区表
create table db_demo.tb_demo (
filed1 string comment '字段1',
filed2 int comment '字段2'
)PARTITIONED BY(l_date string) ;
-- 删除表分区
alter table db_demo.tb_demo drop if exists partition(l_date = '${v_date}')
--将数据写入表分区
insert into table db_demo.tb_demo partition(l_date = '${v_date}')
select * from db_demo.tb_demo_v0 where ......
--覆盖指定分区表数据
insert overwrite table db_demo.tb_demo partition(l_date = '${v_date}')
select * from db_demo.tb_demo_v0 where ......
- 分区表指的是在创建表时指定的partition的分区空间。
- 一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下。
- 分区字段会作为表的最后一个字段出现。
1.7JSON处理
-- 取出JSON串中指定key的value值
-- 语法
get_json_object('{key1:value1,key2:value2}','$.key')
--比如取出JSON串中的name信息
select get_json_object('{"age":1089,"name":"tom"}','$.name')
1.8日期函数
-- to_date:日期时间转日期
select to_date(create_time) from demo_db.demo_table;
-- current_date :当前日期
select current_date
-- date_sub : 返回日期前n天的日期
select date_sub(pay_time,9) from demo_db.demo_table
-- date_add : 返回日期后n天的日期,即使放入时间参数,得到的也是日期,上一个同理,只比较日期位。
select date_add(pay_time,9) from demo_db.demo_table
-- unix_timestamp:获取当前unix时间戳
select unix_timestamp('2022-10-10 10:22:11')
-- datediff:返回开始日期减去结束日期的天数,只比较日期位
select datediff('2022-10-10 23:22:11','2022-10-09 00:22:11')
-- 获取当前月
select substr(current_date,1,7);
--获取上个月最后一天
select DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP()),DAY(FROM_UNIXTIME(UNIX_TIMESTAMP())))
1.9炸裂函数
Hive版本:
select
id,
type_id_new
from table_one
lateral view explode(split(type_id,",")) table_one_temp as type_id_new
;
Mysql版本:
SELECT
a.id, substring_index(substring_index(a.type_id,',',b.help_topic_id + 1 ), ',' ,- 1 ) AS type_id
FROM
(select id, type_id from table_one) a
JOIN mysql.help_topic b ON b.help_topic_id <
(length(a.type_id) - length( replace(a.type_id, ',', '') ) + 1)
简而言之,炸裂函数从命名上就可以看出,这是一个由1到多的过程,由一个裂变成多个。具体场景大概是某条数据的某个字段里面存放的是被相同符号分割的字符串,我们暂时用逗号分割来讲述,拿我们案例来讲,假设一条数据的id = 1,type_id = 1,2,3 ,通过以上的炸裂函数处理之后,该条查询结果将变成3条,分别为id=1、type_id_new =1,id=1、type_id_new =2,id=1、type_id_new =3,也就是被炸裂的字段数据分割,剩余字段全部保持不变。
1.10 COALESCE ( expression,value1,value2……,valuen)
select coalesce(demo_id1,demo_id2,demo_id3) from demo_db.demo_table ;
select coalesce(case when demo_name like '%杰伦%' then '杰粉' when demo_name like '%许嵩% ' then '嵩鼠' else '小泷包' end,
demo_name2);
coalesce函数其实就是找到第一个不为NULL的表达式,将其结果返回,假设全部为NULL,最后只能返回NULL,从我以上案例可以看出来,每个参数不仅仅可以写字段,也可以嵌入其他的表达式,像第二行嵌入了一串case when then,那也仅仅是一个参数而已。
注意点,coalesce函数只是判断是否为NULL,它不会判断空串,假设第一个不为NULL的参数为空串‘’,那么它也会将这个空串当做有值查出来的。
1.11 group by field1,field2 having
select year,sex,goods_name,sum(goods_number) as num
from demo_db.demo_table
group by year,sex,goods_name
having num>1000
group by 用法确实比较常见,写在这里也是因为平时做数据统计基本每次都会用得到,想着写上吧显得没什么技术含量,不写又对不起这个好用的聚合语法,以上的demo呢便是统计每一年男女各对每种商品的购买量是多少,并把销量在1000以上的数据找出来,这个写法便是先对年份分组,再对性别分组,然后又对商品名称分组,分好组后便使用sum函数对商品销量进行求和。
1.12 随机抽样:distribute by rand() sort by rand()
select * from ods_user_bucket_log distribute by rand() sort by rand() limit 10;
rand函数前的distribute和sort关键字可以保证数据在mapper和reducer阶段是随机分布的,像用例中写的那样就是随机抽样取10条数据。
1.13 A left join B on a.field1= b.filed1 where B.field1 is null
select a.*
from demo_db.demo_tableA a
left join demo_db.demo_tableB b
on a.demo_id = b.demo_id where b.demo_id is null
left join 这个写法一让我拿出来介绍属实让人笑掉大牙,左连接么,谁不会,左表全查白。当然,我列出来这个当然不是通俗的告诉大家一下我会左连接哎,我好棒棒啊,其实就是因为平时工作时总是需要把某表A和另一表B中做比较,将A中存在的属于B的部分给排除掉,此刻,我上述写的万能公式便能用到了,别笑,语法简单,但是遇到这种情况时,不经常写Sql的人可能都想不太到。
再补充一个知识点,在使用连接时,主表越小,那么查询效率越高,因此如果遇到一些inner join场景,主表次表换一下位置对查询结果没影响的话,可以记着将数据量字段量小的表放在主表位置上。
1.14 创建函数调用Jave-Jar中的方法
--语法
create temporary function 方法名 as 'java类的全限定名' using jar 'jar包在hdfs上的位置';
--案例
create temporary function decryption as 'com.zae.aes.Decrypt' using jar 'hdfs://namenodeha/user/zae/secret/decryption_demo.jar';
1.15 摆出一条大SQL看看
这个是最近写的一条中等规模大小的SQL吧,只有50行左右而已,其他太长的也不好截屏,里面就用到了一些前面讲述的SQL语法,当然,这个SQL的业务场景需求我就不再赘述了,因为原本的SQL已经被我大批量的用新随便定义的表名和字段给替换了,目前已经面目全非了,毕竟不能暴露公司的一些业务的东西吧,之所以粘出来还是想实际的介绍下我前面那14条是怎样的结合着嵌入到一个SQL中的,随便看看就好,不需要深究其意思。
2.Presto/Spark/Mapreduce 计算引擎对比
平时一直使用大数据平台进行一些数据的处理,在执行查询语句时,是可以选择使用Presto,Spark,MapReduce不同的计算引擎进行工作,坦白讲,初次接触时也没搞明白它们的区别是什么,只知道有些SQL放在Presto引擎上执行准报错,但是放在Spark上就不会报错,它们的语法还是有差异的,presto没有spark内嵌的函数多。据数据前辈给交接介绍时,大致就说,如果是单表查询,查询一些单表的数据量,聚合分组诸类,就使用Presto,相对来讲是比较快的;但是要同时使用到多个大数据表查询,那就使用Spark和MR是比较快些的;另外,Spark和MR相比,Spark运算速度应该有一些优势,但是遇到了特别特别大的计算量级时,资源再不够用,那么可能就会发生一些job abort,time out等比较让人牙疼的报错,毕竟这些报错不是由于SQL本身的编写出现的问题,而是和资源不够用相关,而且往往出现这个问题都是发生在SQL运行很久之后,记着我刚接手大数据没一个月时,曾经写了一条SQL执行了三个小时,最后给我了报了个time out,气得我没把键盘摔了!反过来看,使用MR的话好像很少会发生以上陈述问题,它可能会慢些,但它最后一定会不辱使命帮你执行完毕。
2.1 Presto
Presto我讲不了特别深,毕竟我平时对于它的使用也仅仅是选择了这个引擎,然后执行了我写下的单表执行的SQL,不过有一点可以确定,他对你填写的类型的要求是苛刻的,比如假设你定义了一个字段叫做user_id,给定的它的类型为string(不是误写,在hive中就是定义为string,你可以理解为mysql中的varchar类型),于是你写了一条SQL:select * from demo_db.demo_user where user_id = 1001,那么它对于presto引擎执行那将会报错的,因为他检查语法时会发现你输入的1001是个整型数值,和string类型不匹配,但是对于Spark引擎执行时就不会出这个问题,我觉得Spark底层应该是对1001做了转化,将这个整型数值1001转化为了字符串‘1001’,故可以去做正常的查询。下面我将整理几条关于Presto的介绍吧放在这里,也是我从各类网站中了解来的,可能对实际开发用处不大,但起码我们知道自己用了个什么计算引擎吧。
- Presto是一个facebook开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节
- Presto是一款内存计算型的引擎,所以对于内存管理必须做到精细,才能保证query有序、顺利的执行,部分发生饿死、死锁等情况。也正是因为它是基于内存计算的,它的速度也是很快的。
- Presto采用典型的master-slave模型,master主要负责对从节点的一些管理以及query的解析和调度,而slave则是负责一些计算和读写。
2.2 Spark
Spark是一个并行计算框架,适用于大规模的数据处理。Spark也是一个基于内存的计算引擎,专门解决大数据的分布式问题,它是hadoop的一个补充,因此可以在Haddop文件系统中并行运行。
平时使用最多的还是SparkSql,用于一些查询,或者是搭建一些定时Job,使用SparkSql组件去完成对数据的抽取、转换、加载,但是查阅了相关介绍,Spark的应用远远不止这些,它还有一些流计算、机器学习、图计算的场景应用,下面有一段关于Spark应用场景介绍,是从<https://help.aliyun.com/document_detail/441938.html>
中看来了,我就不用我笨拙的语言来编排了,放在下面供大家探讨一下:
-
离线ETL
离线ETL主要应用于数据仓库,对大规模的数据进行抽取(Extract)、转换(Transform)和加载(Load),其特点是数据量大,耗时较长,通常设置为定时任务执行。
-
在线数据分析(OLAP)
在线数据分析主要应用于BI(Business Intelligence)。分析人员交互式地提交查询作业,Spark可以快速地返回结果。除了Spark,常见的OLAP引擎包括Presto和Impala等。Spark 3.0的主要特性在EMR中的Spark 2.4版本已支持,更多特性详情请参见Spark SQL Guide。
-
流计算
流计算主要应用于实时大屏、实时风控、实时推荐和实时报警监控等。流计算主要包括Spark Streaming和Flink引擎,Spark Streaming提供DStream和Structured Streaming两种接口,Structured Streaming和Dataframe用法类似,门槛较低。Flink适合低延迟场景,而Spark Streaming更适合高吞吐的场景,详情请参见Structured Streaming Programming Guide。
-
机器学习
Spark的MLlib提供了较丰富的机器学习库,包括分类、回归、协同过滤、聚合,同时提供了模型选择、自动调参和交叉验证等工具来提高生产力。MLlib主要支持非深度学习的算法模块,详情请参见Machine Learning Library (MLlib) Guide。
-
图计算
Spark的GraphX支持图计算的库,支持丰富的图计算的算子,包括属性算子、结构算子、Join算子和邻居聚合等。详情请参见GraphX Programming Guide。
2.3 MapReduce
Hadoop MapReduce:一个分布式的离线并行计算框架,它是Hadoop全家桶的一部分,它的思想是分而治之,也就是说将一个大的复杂的问题切割成一个个小的问题加以解决,最后再汇总,这从MapReduce的字面就可以看出来。MapReduce处理任务过程是分为两个阶段的:
- Map阶段:Map阶段的主要作用是“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。Map阶段的这些任务可以并行计算,彼此间没有依赖关系。
- Reduce阶段:Reduce阶段的主要作用是“合”,即对map阶段的结果进行全局汇总。
下面从一张图例来看下MapReduce计算的处理过程:
3.由数据同步想到的
所谓大数据,必然和形形色色的数据表打交道,但是要清楚一点,对于一个规模还算可以的企业来讲,那下面的项目组肯定是一片一片的,他们之间的数据没有办法做到百分之百的共享,有时你想要的去做一些数据的分析,可能就需要其他项目组甚至第三方企业的支持,从别的渠道去拿到数据进行使用,因此,数据同步接入变成了大数据开发中必不可少的工作。
其实数据同步接入的方式有很多,如果有数据库的权限,可以直接使用大数据平台自带的同步组件,编写一定的规则,配置好接入的频率,将数据接入过来;如果是第三方外部企业的数据,为了安全起见,我们通常也会选择接口的方式进行数据的接入,再同步至大数据平台;当然,使用消息中间件也是很不错的方式,比如Kafka,但是这东西总归有些严格意义上的限制,很多企业为了安全是不会对外暴露自身的Kafka服务地址的;还有一些数据量过大的情况,可以考虑sftp服务器的方式,直接将数据上传到指定的服务器的文件夹里,不过这个总归有些依赖于手工支持的弊端在里面,不过据说好像也可以编写脚本完成自动化的上传和拉取,对于我这个搞Java的来讲,这方面的解决策略还是不太懂的;其他方法也可以使用DolphinScheduler(DS)里面的一些小组件,去执行一些脚本来完成同步,当然脚本的编写就类似于hadoop distcp -update hdfs://主机名/源数据路径 hdfs://主机名/目标数据路径
,这是将数据表从hdfs的一个文件目录下复制到指定位置,同样,flinkX也支持类似的功能。
总之,方案不少,具体场景具体分析,数据同步的问题也有很多,比如上游数据源断了,导致目标日期的数据没有过来;使用的同步的服务器宕机了,那时候就需要详细的排查了,尽快将数据同步修复。
4.任重道远,仍需砥砺前行
我清楚,要是彻头彻尾的搞明白大数据,除了会写写复杂SQL是远远不够的,我记着有些归类中将ES和Kafka也作为大数据开发的范畴,当然,这两块的知识点我也是有所涉猎的,只不过是Java后端代码中使用的,也许这两块还有其他用法可以用于大数据,比如结合着Scala语言使用。Scala语言是函数式编程,因为在Java方向已经沉浸多年,所以看了几天scala语言的语法也没有那么抗拒,都大致了解了下,但是了解语法和实际使用这门语言进行工作上的开发又是另外一回事,由于各种原因没有深入的去研究下去略表遗憾。 总的来讲,目前我还是比较喜欢java的,但是因为最近这一年里也做了不少大数据相关工作,所以总觉得不为它写一篇博客总归对不起这一年的收获,所以还是找个地方记录下来吧,将来有一天如果我在java方向钻研透了,想再探索大数据的广袤无垠时,我想,我会认认真真系系统统的去学一遍,像scala,spark,flink,hadoop他们深层次技术,我一定要每一个都好好品尝下。