Python 3.x MySQL 功能封装

时间:2022-03-12 02:54:18

摘要

最近要用Python写一个Linux中继,需要用到MySQL,然后就封装了一下。

感兴趣的小伙伴可以试试连接池,业务量大的时候还是很有用的:

http://www.tuicool.com/articles/U3ymUb7

手册

配置与安装

Python 3.x 参考教程
http://www.runoob.com/python3/python3-mysql.html

# 安装好了之后,在db.py文件中config字典中配置自己的数据库
__config = {
    'host':"123.32.32.18",
    'port':3306,
    'username':"root",
    'password':"qq32",
    'database':"WechatCard"'charset' :"utf8"
}

使用

使用SQL语句直接操作

from db import MySQL
database = MySQL()

id = input('>>')
sql = "SELECT stu_uid FROM student WHERE id="+id
result = database.query(sql)

使用query_dic进行操作

result = database.query_dic({
    'select': 'stu_uid', 'from': 'student', 'where': { 'id':id, 'iii':3 } })

where也可以直接这么写,以实现复杂的条件判断

result = database.query_dic({
    'select': 'stu_uid', 'from': 'student', 'where': "id>2 and id<5" })

删除操作

result = database.query_dic({
   'delete': 'student',
   'where': "iii>5"
})

插入操作

database.query_dic({
    'insert': 'student',
    'domain_array':[
        'stu_uid', 'iii'
    ],
    'value_array':[
        'asdf',33232
    ]
})

源代码

(1) db.py

import pymysql
import types

class MySQL:
    __db = None

    # 在这里配置自己的SQL服务器
    __config = {
        'host':"123.32.32.18",
        'port':3306,
        'username':"root",
        'password':"qq32",
        'database':"WechatCard"'charset' :"utf8"
    }

    def __init__(self):
        self.__connect()

    def __del__(self):
        if(self.__db is not None):
            self.__db.close()

    def __connect(self):
        if (self.__db == None):
            self.__db = pymysql.connect(
                host   =self.__config['host'],
                port   =self.__config['port'],
                user   =self.__config['username'],
                passwd =self.__config['password'],
                db     =self.__config['database'],
                charset=self.__config['charset']
            )
        return self.__db

    def query(self,_sql):
        cursor = self.__connect().cursor()
        try:
            cursor.execute(_sql)
            data = cursor.fetchall()
            # 提交到数据库执行
            self.__connect().commit()
        except:
            # 如果发生错误则回滚
            self.__connect().rollback()
            return False
        return data

    def query_dic(self,_sql_dic):
        if('select' in _sql_dic.keys()):
            sql = "SELECT "+_sql_dic['select']+" FROM "+_sql_dic['from']+self.where(_sql_dic['where'])
            print(sql)
            return self.query(sql)
        elif('insert' in _sql_dic.keys()):
            sql = "INSERT INTO "+_sql_dic['insert']+self.quote(_sql_dic['domain_array'],type_filter=False)+" VALUES "+self.quote(_sql_dic['value_array'])
            print(sql)
            return self.query(sql)
        if ('delete' in _sql_dic.keys()):
            sql = "DELETE FROM " + _sql_dic['delete'] + self.where(_sql_dic['where'])
            print(sql)
            return self.query(sql)


    def where(self, _sql):
        if(isinstance(_sql,dict)==False):
            return " WHERE "+ str(_sql)
        if(isinstance(_sql,dict)):
            _sql_dic = _sql
            s = " WHERE "
            index = 0
            for domain in _sql_dic:
                if(index==0):
                    s += domain+"="+ str(_sql_dic[domain]) +" "
                    index+=1
                else:
                    s += "AND "+domain + "=" + str(_sql_dic[domain]) + " "
            return s

    # 为数组加上外括号,并拼接字符串
    def quote(self, _data_array, type_filter=True):
        s = "("
        index = 0
        if(type_filter):
            for domain in _data_array:
                if(index==0):
                    if (isinstance(domain, int)):
                        s +=  str(domain)
                    elif (isinstance(domain, str)):
                        s += "'" + domain + "'"
                    index+=1
                else:
                    if(isinstance(domain, int)):
                        s += ", " + str(domain)
                    elif(isinstance(domain, str)):
                        s += ", " + "'" + domain + "'"
        else:
            for domain in _data_array:
                if(index==0):
                    s +=  str(domain)
                    index+=1
                else:
                    s += ", " + domain
        return s+")"