Python带动态参数功能的sqlite工具类

时间:2022-11-26 22:34:36

本文实例讲述了Python带动态参数功能的sqlite工具类。分享给大家供大家参考,具体如下:

最近在弄sqlite和python

在网上参考各教程后,结合以往java jdbc数据库工具类写出以下python连接sqlite的工具类

写得比较繁琐 主要是想保留一种类似java的Object…args动态参数写法 并兼容数组/list方式传递不定个数参数 并且返回值是List形式 dict字典 以便和JSON格式互相转换

在python中有一些区别 经过该工具类封装之后可以有以下用法:

?
1
2
3
db.executeQuery("s * f t w id=? and name=?", "id01", "name01");//动态参数形式
db.executeQuery("s * f t w id=? and name=?", ("id01", "name01"));//tuple元组式 等价上面 括号可省略
db.executeQuery("s * f t w id=? and name=?", ["id01", "name01"]);//list数组形式

完整Python代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/python
#-*- coding:utf-8 -*- 
import sqlite3
import os
#
# 连接数据库帮助类
# eg:
#  db = database()
#  count,listRes = db.executeQueryPage("select * from student where id=? and name like ? ", 2, 10, "id01", "%name%")
#  listRes = db.executeQuery("select * from student where id=? and name like ? ", "id01", "%name%")
#  db.execute("delete from student where id=? ", "id01")
#  count = db.getCount("select * from student ")
#  db.close()
#
class database :
  dbfile = "sqlite.db"
  memory = ":memory:"
  conn = None
  showsql = True
  def __init__(self):
    self.conn = self.getConn()
  #输出工具
  def out(self, outStr, *args):
    if(self.showsql):
      for var in args:
        if(var):
          outStr = outStr + ", " + str(var)
      print("db. " + outStr)
    return
  #获取连接
  def getConn(self):
    if(self.conn is None):
      conn = sqlite3.connect(self.dbfile)
      if(conn is None):
        conn = sqlite3.connect(self.memory)
      if(conn is None):
        print("dbfile : " + self.dbfile + " is not found && the memory connect error ! ")
      else:
        conn.row_factory = self.dict_factory #字典解决方案
        self.conn = conn
      self.out("db init conn ok ! ")
    else:
      conn = self.conn
    return conn
  #字典解决方案
  def dict_factory(self, cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
      d[col[0]] = row[idx]
    return d
  #关闭连接
  def close(self, conn=None):
    res = 2
    if(not conn is None):
      conn.close()
      res = res - 1
    if(not self.conn is None):
      self.conn.close()
      res = res - 1
    self.out("db close res : " + str(res))
    return res
  #加工参数tuple or list 获取合理参数list
  #把动态参数集合tuple转为list 并把单独的传递动态参数list从tuple中取出作为参数
  def turnArray(self, args):
    #args (1, 2, 3) 直接调用型 exe("select x x", 1, 2, 3)
    #return [1, 2, 3] <- list(args)
    #args ([1, 2, 3], ) list传入型 exe("select x x",[ 1, 2, 3]) len(args)=1 && type(args[0])=list
    #return [1, 2, 3]
    if(args and len(args) == 1 and (type(args[0]) is list) ):
      res = args[0]
    else:
      res = list(args)
    return res
  #分页查询 查询page页 每页num条 返回 分页前总条数 和 当前页的数据列表 count,listR = db.executeQueryPage("select x x",1,10,(args))
  def executeQueryPage(self, sql, page, num, *args):
    args = self.turnArray(args)
    count = self.getCount(sql, args)
    pageSql = "select * from ( " + sql + " ) limit 5 offset 0 "
    #args.append(num)
    #args.append(int(num) * (int(page) - 1) )
    self.out(pageSql, args)
    conn = self.getConn()
    cursor = conn.cursor()
    listRes = cursor.execute(sql, args).fetchall()
    return (count, listRes) 
  #查询列表array[map] eg: [{'id': u'id02', 'birth': u'birth01', 'name': u'name02'}, {'id': u'id03', 'birth': u'birth01', 'name': u'name03'}]
  def executeQuery(self, sql, *args):
    args = self.turnArray(args)
    self.out(sql, args)
    conn = self.getConn()
    cursor = conn.cursor()
    res = cursor.execute(sql, args).fetchall()
    return res 
  #执行sql或者查询列表 并提交
  def execute(self, sql, *args):
    args = self.turnArray(args)
    self.out(sql, args)
    conn = self.getConn()
    cursor = conn.cursor()
    #sql占位符 填充args 可以是tuple(1, 2)(动态参数数组) 也可以是list[1, 2] list(tuple) tuple(list)
    res = cursor.execute(sql, args).fetchall()
    conn.commit()
    #self.close(conn)
    return res 
  #查询列名列表array[str] eg: ['id', 'name', 'birth']
  def getColumnNames(self, sql, *args):
    args = self.turnArray(args)
    self.out(sql, args)
    conn = self.getConn()
    if(not conn is None):
      cursor = conn.cursor()
      cursor.execute(sql, args)
      res = [tuple[0] for tuple in cursor.description]
    return res 
  #查询结果为单str eg: 'xxxx'
  def getString(self, sql, *args):
    args = self.turnArray(args)
    self.out(sql, args)
    conn = self.getConn()
    cursor = conn.cursor()
    listRes = cursor.execute(sql, args).fetchall()
    columnNames = [tuple[0] for tuple in cursor.description]
    #print(columnNames)
    res = ""
    if(listRes and len(listRes) >= 1):
      res = listRes[0][columnNames[0]]
    return res  
  #查询记录数量 自动附加count(*) eg: 3
  def getCount(self, sql, *args):
    args = self.turnArray(args)
    sql = "select count(*) cc from ( " + sql + " ) "
    resString = self.getString(sql, args) 
    res = 0  
    if(resString):
      res = int(resString)
    return res
####################################测试
def main():
  db = database()
  db.execute(
    '''
    create table if not exists student(
      id   text primary key,
      name  text not null,
      birth  text
    )
    '''
  )
  for i in range(10):
    db.execute("insert into student values('id1" + str(i) + "', 'name1" + str(i) + "', 'birth1" + str(i) + "')")
  db.execute("insert into student values('id01', 'name01', 'birth01')")
  db.execute("insert into student values('id02', 'name02', 'birth01')")
  db.execute("insert into student values('id03', 'name03', 'birth01')")
  print(db.getColumnNames("select * from student"))
  print(db.getCount("select * from student " ))
  print(db.getString("select name from student where id = ? ", "id02" ))
  print(db.executeQuery("select * from student where 1=? and 2=? ", 1, 2 ))
  print(db.executeQueryPage("select * from student where id like ? ", 1, 5, "id0%"))
  db.execute("update student set name='nameupdate' where id = ? ", "id02")
  db.execute("delete from student where id = ? or 1=1 ", "id01")
  db.close()
if __name__ == '__main__':
  main()

希望本文所述对大家Python程序设计有所帮助。

原文链接:https://blog.csdn.net/u014303844/article/details/78963812