编写好Python操作数据库的脚本后,运行报错如下:
报错1:“AttributeError: 'NoneType' object has no attribute 'encoding'”
解决办法:设置charset时要用utf8,不能用utf-8
if link_type == 0:
# 创建数据,返回字典
self.conn = pymysql.connect(host=host_db, user=user_db, password=password_db, db=name_db, port=port_db,
charset='utf8', cursorclass=pymysql.cursors.DictCursor)
else:
# 创建数据库,返回元祖
self.conn = pymysql.connect(host=host_db, user=user_db, password=password_db, db=name_db, port=port_db,
charset='utf8')
报错2:“AttributeError: 'OperationDbInterface' object has no attribute 'cur'”
错误原因:编写代码时,换行时没有对齐,导致调用cur时报错
贴上Python操作mysql数据库的源码:
# coding:utf8 '''
定义对mysql数据库操作的封装
1、包括基本的单条语句操作,如删除、修改、插入
2、独立地查询单条、多条数据
3、独立地添加多条数据
''' import logging, os, pymysql
from public import config class OperationDbInterface(object):
# 定义初始化数据库连接
def __init__(self, host_db='127.0.0.1', user_db='root', password_db='',
name_db='interface_test', port_db=3306, link_type=0):
'''
:param host_db: 数据库服务主机IP
:param user_db: 数据库连接用户名
:param password_db: 数据库密码
:param name_db: 数据库名称
:param port_db: 数据库端口号,整型数据
:param link_type: 连接类型,用于设置输出数据是元祖还是字典,默认是字典,link_type=0
:return:游标
'''
try:
if link_type == 0:
# 创建数据,返回字典
self.conn = pymysql.connect(host=host_db, user=user_db, password=password_db, db=name_db, port=port_db,
charset='utf8', cursorclass=pymysql.cursors.DictCursor)
else:
# 创建数据库,返回元祖
self.conn = pymysql.connect(host=host_db, user=user_db, password=password_db, db=name_db, port=port_db,
charset='utf8')
self.cur = self.conn.cursor()
print("输出:%s" % self.cur)
except pymysql.Error as e:
print("创建数据库连接失败|Mysql Error %d: %s" % (e.args[0], e.args[1]))
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
logger.exception(e) # 定义单条数据操作,包含删除和更新操作
def op_sql(self, condition):
'''
:param condition: sql语句,该通用方法可用来替代updateone,deleteone
:return: 字典形式
''' try:
self.cur.execute(condition) # 执行sql语句
self.conn.commit() # 提交游标数据
result = {'code': '', 'message': '执行通用操作成功', 'data': []}
except pymysql.Error as e:
self.conn.rollback() # 执行回滚操作
result = {'code': '', 'message': '执行通用操作异常', 'data': []}
print("数据库错误|op_sql %d:%s" % (e.args[0], e.args[1]))
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
logger.exception(e)
return result # 查询表中单条数据
def select_one(self, condition):
'''
:param condition: 查询单条sql语句
:return: 字典形式的单条查询语句
''' try:
rows_affect = self.cur.execute(condition)
if rows_affect > 0:
results = self.cur.fetchone() # 获取一条数据
result = {'code': '', 'message': '执行单条查询操作成功', 'data': results}
else:
result = {'code': '', 'message': '执行单条查询操作成功', 'data': []}
except pymysql.Error as e:
self.conn.rollback() # 执行回滚操作
result = {'code': '', 'message': '执行单条查询异常', 'data': []}
print("数据库错误|select_one %d: %s" % (e.args[0], e.args[1]))
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
logger.exception(e)
return result # 查询表中多条数据
def select_all(self, condition):
'''
:param condition: sql语句
:return: 字典形式的批量查询结果
''' try:
rows_affect = self.cur.execute(condition)
if rows_affect > 0:
# 将鼠标光标放回到初始位置
self.cur.scroll(0, mode='absolute')
# 返回游标中所有结果
results = self.cur.fetchall()
result = {'code': '', 'message': '执行批量查询操作成功', 'data': results}
else:
result = {'code': '', 'message': '执行批量查询操作成功', 'data': []}
except pymysql.Error as e:
# 执行回滚操作
self.conn.rollback()
result = {'code': '', 'message': '执行批量查询异常', 'data': []}
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
logger.exception(e)
return result # 定义表中插入数据操作的方法
def insert_data(self, condition, params):
'''
:param condition: 插入语句
:param params: 插入数据,列表形式[(,,,),(,,,)]
:return: 字典形式批量插入数据结果
'''
try:
results = self.cur.executemany(condition, params) # 返回插入数据的条数
self.conn.commit()
result = {'code': '', 'message': '执行批量插入数据成功', 'data': results}
except pymysql.Error as e:
self.conn.rollback() # 执行回滚操作
result = {'code': '', 'message': '执行批量插入数据异常', 'data': []}
print("数据库错误|insert_data %d: %s" % (e.args[0], e.args[1]))
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(massage)s')
logger = logging.getLogger(__name__)
logger.exception(e)
return result # 关闭数据库
def __del__(self):
if self.cur is not None:
self.cur.close() # 关闭游标
if self.conn is not None:
self.conn.close() # 释放数据库资源 if __name__ == "__main__":
test = OperationDbInterface() # 实例化类
result_select_all = test.select_all("select * from config_test") # 查询所有数据
result_select_one = test.select_one("select * from config_test where id=1") # 查询单条数据
result_oP_sql = test.op_sql("UPDATE config_test SET value_config='修改后的valuetest' WHERE id=1") # 通用操作,修改数据库
result__insert_sql = test.insert_data(
"insert into config_test (key_config, value_config, description, status) values (%s, %s, %s, %s)",
[('mytest1', 'mytestvalue1', '我插入的测试数据1', 1), ('mytest2', 'mytestvalue2', '我插入的测试数据2', 0)])
print(result_select_all['data'], result_select_all['message'])
print(result_select_one['data'], result_select_one['message'])
print(result_oP_sql['data'], result_oP_sql['message'])
print(result__insert_sql['data'], result__insert_sql['message'])