目录
- 一致性测试
- 数据量比对
- 金额字段SUM比对
- 主键唯一
- 空值校验
- 枚举值
- 脚本框架
- 建表语句
- 跑后查数
一致性测试
在做集市迁移时,甲方比较看重数据的一致性测试,一般会要求做新表与旧表的数据量比对,以及部分金额字段的字段级比对。
下面给出的参考SQL都可以先在Excel中做好拼接语句,再整合到Python脚本框架中。一般整合之前可以先调试好SQL语句,也就是直接在SQL查询平台sqldbx,先跑一遍SQL,调试完毕再整合为Python脚本。
数据量比对
/* 表1与表2数据量比对 */
insert into 库名.结果表名
(
table_name -- 表名
,new_cnt -- 新表记录数
,org_cnt -- 旧表记录数
,res -- 新旧表差额
)
partition
(
data_dt = cast('2021-11-11' as varchar(10))
,type_name = cast('表1与表2数据量比对' as varchar(50))
)
'base_info' as table_name
,t1.cnt as new_cnt
,t2.cnt as org_cnt
,t1.cnt-t2.cnt as res
from
(select count(*) as cnt from 库名.表名1 where data_dt = '2021-11-11') t1
left join
(select count(*) as cnt from 库名.表名2 where data_dt = '2021-11-11') t2 on 1=1
金额字段SUM比对
/* 表1与表2重要金额字段sum值比对 */
insert into 库名.结果表名
(
table_name -- 表名
,column_name -- 字段名
,new_sum_cnt -- 新表字段求和
,org_sum_cnt -- 旧表字段求和
,res -- 新旧表字段差额
)
partition
(
data_dt = cast('2021-11-11' as varchar(10))
,type_name = cast('表1与表2数据量比对' as varchar(50))
)
'base_info' as table_name
,'amt' as column_name
,t1.cnt as new_cnt
,t2.cnt as org_cnt
,t1.cnt-t2.cnt as res
from
(select sum(amt) as cnt from 库名.表名1 where data_dt = '2021-11-11') t1
left join
(select sum(amt) as cnt from 库名.表名2 where data_dt = '2021-11-11') t2 on 1=1
主键唯一
/* 主键重复 */
insert into 库名.结果表名
(
table_name -- 表名
,pk_name -- 主键名
,pk_res -- 结果值
)
partition
(
data_dt = cast('2021-11-11' as varchar(10))
,type_name = cast('主键重复' as varchar(50))
)
select
'base_info' as table_name
,'主键1,主键2' as pk_name
count(1)-count(distinct 主键1,主键2) as pk_res
from 库名.表名 t1
where t1.data_dt = '2021-11-11'
空值校验
/* 字段空值检查 */
insert into 库名.结果表名
(
nb -- 序号
,table_name -- 表名
,column_name -- 字段名
,column_count -- 次数
)
partition (data_dt,type_name)
select
'kn001' as nb
,'字段空值检查' as type_name
,'base_info' as table_name
,'ident_id' as column_name
,cast(count(*) as bigint) as column_count
from 库名.表名 t1
where t1.data_dt = '2021-11-11'
and cast(t1.ident_id as string) = ''
group by null --这句可省略
having count(*) > 0
枚举值
/* 枚举值 */
insert into 库名.结果表名
(
table_name -- 表名
,column_name -- 字段名
,column_val -- 字段对应的值
,column_count -- 字段的值对应出现的次数
)
partition
(
data_dt = cast('2021-11-11' as varchar(10))
,type_name = cast('枚举值' as varchar(50))
)
select
cast('base_info' as varchar(50)) as table_name
,cast('ident_id' as varchar(50)) as column_name
,cast(ident_id as varchar(50)) as column_val
,cast(count(*) as bigint as column_count
from 库名.表名
where ident_id not in
(
select val --码值
from 库名.映射表名 --码值映射表
where table_id = 'base_info' --表名
and clmn_id = 'ident_id' --字段名
)
group by 1,2,3,5
可以根据需要,考虑是否做格式转换CAST('BASE_INFO' AS VARCHAR(50)) AS TABLE_NAME
,如果查询出来的结果与结果表的字段类型一致,就不需要格式转换的。
脚本框架
# 框架
# 注释
import sys
from job.base.JobBase import ExitCode
import job.base.ClientUtil as util
def checkArgs(length):
util.debug('参数检查')
util.checkArgsEx(length)
try:
checkArgs(1)
各种参数
SQL主代码
util.exit(ExitCode.EXIT_SUCCESS)
except Exception as e:
util.exit(ExitCode.EXIT_ERROR)
finally:
util.destory()
建表语句
drop table 库名.结果表名;
create table if not exists 库名.结果表名
(
table_name varchar(50) comment '表名'
,column_name varchar(50) comment '字段名'
,column_val varchar(50) comment '字段对应的值'
,column_count bigint comment '字段的值对应出现的次数'
)
partitioned by (
data_dt varchar(10) comment '数据日期'
type_name varchar(50) comment '类型'
)
comment '结果表名'
stored as parquet
这里有个小技巧,如果测试建表没有特殊要求,可选择灵活建表。可根据select查询出的字段的类型,以此建表,省去cast(字段,as 字段类型)
转换类型的麻烦
跑后查数
跑完脚本之后,多张表查数,可用union all
来看
select '表名1' as table_name, count(*) from 库名.表名1 where data_dt = '2021-11-11' union all
select '表名2' as table_name, count(*) from 库名.表名2 where data_dt = '2021-11-11' union all
select '表名3' as table_name, count(*) from 库名.表名3 where data_dt = '2021-11-11' union all
select '表名4' as table_name, count(*) from 库名.表名4 where data_dt = '2021-11-11' union all
select '表名5' as table_name, count(*) from 库名.表名5 where data_dt = '2021-11-11'
参考:数据测试全流程总结(小白易上手)