基于底层数据来开发不难,无非是将用户输入变量作为筛选条件,将参数映射到 sql 语句,并生成一个 sql 语句然后再去数据库执行
最后再利用 qt 开发一个 gui 界面,用户界面的点击和筛选条件,信号触发对应按钮与绑定的传参槽函数执行
具体思路:
一、数据库连接类
此处利用 pandas 读写操作 oracle 数据库
二、主函数模块
1)输入参数模块,外部输入条件参数,建立数据库关键字段映射
--注:读取外部 txt 文件,将筛选字段可能需要进行键值对转换
2)sql 语句集合模块,将待执行的业务 sql 语句统一存放到这里
3)数据处理函数工厂
4)使用多线程提取数据
一、数据库连接类
cx_oracle 是一个 python 扩展模块,相当于 python 的 oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 api 来实现 oracle 数据库的查询和更新
pandas 是基于 numpy 开发,为了解决数据分析任务的模块,pandas 引入了大量库和一些标准的数据模型,提供了高效地操作大型数据集所需的方法类和函数
pandas 调用数据库主要有 read_sql_table,read_sql_query,read_sql 三种方式
本文主要介绍一下 pandas 中 read_sql_query 方法的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
1 :pd.read_sql_query()
读取自定义数据,返还dataframe格式,通过sql查询脚本包括增删改查。
pd.read_sql_query(sql, con, index_col = none,coerce_float = true, params = none, parse_dates = none,chunksize = none)
sql:要执行的sql脚本,文本类型
con:数据库连接
index_col:选择返回结果集索引的列,文本 / 文本列表
coerce_float:非常有用,将数字形式的字符串直接以 float 型读入
parse_dates:将某一列日期型字符串转换为datetime型数据,与pd.to_datetime函数功能类似。
params:向sql脚本中传入的参数,官方类型有列表,元组和字典。用于传递参数的语法是数据库驱动程序相关的。
chunksize:如果提供了一个整数值,那么就会返回一个generator,每次输出的行数就是提供的值的大小
read_sql_query()中可以接受sql语句,delete,insert into、update操作没有返回值(但是会在数据库中执行),程序会抛出sourcecodecloseerror,并终止程序。select会返回结果。如果想继续运行,可以 try 捕捉此异常。
2 :pd.read_sql_table()
读取数据库中的表,返还dataframe格式(通过表名)
import pandas as pd
pd.read_sql_table(table_name, con, schema = none,index_col = none, coerce_float = true, parse_dates = none, columns = none,chunksize = none)
3 :pd.read_sql()
读数据库通过sql脚本或者表名
import pandas as pd
pd.read_sql(sql, con, index_col = none,coerce_float = true, params = none, parse_dates = none, columns = none, chunksize = none)
|
以下创建连接 oracel 数据库的连接类 oracle_db
主要提供 2 种操作数据的函数方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import cx_oracle
# pandas读写操作oracle数据库
import pandas as pd
# 避免编码问题带来的乱码
import os
os.environ[ 'nls_lang' ] = 'simplified chinese_china.utf8'
class oracle_db( object ):
def __init__( self ):
try :
# 连接oracle
# 方法1:sqlalchemy 提供的create_engine()
# from sqlalchemy import create_engine
# engine = create_engine('oracle+cx_oracle://username:password@ip:1521/orcl')
# #方法2:cx_oracle.connect()
self .engine = cx_oracle.connect( 'username' , 'password' , 'ip:1521/database' )
except cx_oracle.error as e:
print ( "error %d:%s" % (e.args[ 0 ], e.args[ 1 ]))
exit()
# 查询部分信息
def search_one( self , sql,sparm):
try :
# #查询获取数据用sql语句
# 代传参数:sparm--查询指定字段参数
df = pd.read_sql_query(sql, self .engine,params = sparm)
self .engine.close()
except exception as e:
return "error " + e.args[ 0 ]
return df
# 查询全部信息
def search_all( self , sql):
try :
# #查询获取数据用sql语句
df = pd.read_sql_query(sql, self .engine)
self .engine.close()
except exception as e:
return "error " + e.args[ 0 ]
return df
|
二、数据提取主函数模块
cx_oracle 是一个 python 扩展模块,相当于 python 的 oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 api 来实现 oracle 数据库的查询和更新。
1)外部输入参数模块
txt 文本中,就包含一列数据,第一行列名,读取的时候忽略第一行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#建立id——编号字典
def buildid():
sqlid = """select * from b_build_info"""
db = oracle_db() # 实例化一个对象
b_build_info = db.search_all(sqlid)
id_buildcode = b_build_info.set_index( "buildcode" )[ "id" ].to_dict()
return id_buildcode
#通过文本传入待导出数据清单
def read_task_list():
build_code = buildid()
tasklist = []
is_first_line = true
with open ( "./b_lst.txt" ) as lst:
for line in lst:
if is_first_line:
is_first_line = false
continue
tasklist.append(build_code.get(line.strip( '\n' ))) #键值对转换
return tasklist
|
2)业务 sql 语句集合
注意in后面{0}不要加引号,这里传入为元组,params 参数传入sparm
= {'start_time':'2021-04-01','end_time':'2021-05-01'},此处参数可根据需要改变
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
def sql_d(lst):
# 逐月数据
sql_d_energy_item_month = """select * from d_energy_item_month
where recorddate >= to_date(:start_time, 'yyyy-mm-dd')
and recorddate < to_date(:end_time, 'yyyy-mm-dd')
and buildid in {0}
order by recorddate asc""" . format (lst)
# 逐月数据
sql_d_energy_month = """select d.*,t.name from d_energy_month d join t_device_info t on d.branchid = t.id
where d.recorddate >= to_date(:start_time, 'yyyy-mm-dd')
and d.recorddate < to_date(:end_time, 'yyyy-mm-dd')
and d.buildid = '{0}'
order by d.recorddate asc""" . format (lst)
# 查询当日数据
sql_energy_item_hour_cheak = """select * from d_energy_item_hour
where trunc(sysdate)=trunc(recorddate)
order by recorddate asc""" . format (lst)
sql_collection = [sql_d_energy_item_month, sql_d_energy_item_day, sql_d_energy_item_hour, sql_d_energy_month,
sql_d_energy_day, sql_d_energy_hour, sql_energy_hour_cheak]
#此处省略部分sql语句
return sql_collection
|
3)业务数据处理
业务数据处理流程,原始数据后处理,这里不作介绍:
1
2
3
4
5
6
7
8
9
10
|
def db_extranction(lst,sparm,sql_type):
"""sql_type--输入需要操作的sql业务序号"""
sql_ = sql_d(lst)[sql_type] #输出sql语句
db = oracle_db() # 实例化一个对象
res = db.search_one(sql_,sparm)
# 数据处理加工
res = data_item_factory(res) #此处省略
# res = db.search_all(sql_d_energy_item_month)
print (res)
return res
|
多线程提取数据部分,这里 tasklist 列表多线程提取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import threading
# pandas读写操作oracle数据库
from tools.data_update_oracle import oracle_db
import pandas as pd
from concurrent import futures
if __name__ = = '__main__' :
#外部传入
tasklist = read_task_list()
print (tasklist)
# 输入时间查找范围参数,可手动修改
sparm = { 'start_time' : '2021-04-01' , 'end_time' : '2021-05-01' }
lst = tuple ( list (tasklist))
#业务类型序号,可手动修改
sql_type = 0
#全部提取
db_extranction(lst,sparm,sql_type)
#多线程按字段分批提取
方法一:使用threading模块的thread类的构造器创建线程
#threads=[threading.thread(target=db_extranction,args=(lst,sparm,sql_type)) for lst in tasklist]
# [threads[i].start() for i in range(len(threads))]
方法二:使用python的concurrent库,这是官方基于 threading 封装,先安装该库
# with futures.threadpoolexecutor(len(tasklist)) as executor:
# executor.map([db_extranction(lst,sparm,sql_type) for lst in tasklist],tasklist)
|
到此整个数据库取数工具开发流程介绍完毕,就差最后一步分享给小伙伴使用了,做成 gui 应用此处不做详细介绍,构建独立的 python 环境,快速发布你的应用
以上就是python实现一个自助取数查询工具的详细内容,更多关于python 自助取数查询的资料请关注服务器之家其它相关文章!
原文链接:https://mp.weixin.qq.com/s/uUHECcIqbMuImGsGg7IsXQ