手撸orm框架

时间:2022-11-03 07:32:49

一 前言

1 我在实例化一个user对象的时候,可以user=User(name='lqz',password='123')

2 也可以 user=User()

    user['name']='lqz'
    user['password']='123'
3 也可以 user=User()

    user.name='lqz'
    user.password='password'
前两种,可以通过继承字典dict来实现,第三种,用getattr和setattr

__getattr__ 拦截点号运算。当对未定义的属性名称和实例进行点号运算时,就会用属性名作为字符串调用这个方法。如果继承树可以找到该属性,则不调用此方法
    __setattr__会拦截所有属性的的赋值语句。如果定义了这个方法,self.arrt = value 就会变成self,__setattr__("attr", value).这个需要注意。当在__setattr__方法内对属性进行赋值是,不可使用self.attr = value,因为他会再次调用self,__setattr__("attr", value),则会形成无穷递归循环,最后导致堆栈溢出异常。应该通过对属性字典做索引运算来赋值任何实例属性,也就是使用self.__dict__['name'] = value

二 定义Model基类

class Model(dict):
def __init__(self, **kw):
super(Model, self).__init__(**kw) def __getattr__(self, key):# .访问属性触发
try:
return self[key]
except KeyError:
raise AttributeError('没有属性:%s' % key) def __setattr__(self, key, value):
self[key] = value

三 定义Field

数据库中每一列数据,都有:列名,列的数据类型,是否是主键,默认值

class Field:
def __init__(self, name, column_type, primary_key, default_value):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default_value = default_value class StringField(Field):
def __init__(self, name, column_type='varchar(100)', primary_key=False, default_value=None):
super().__init__(name, column_type, primary_key, default_value) class IntegerField(Field):
def __init__(self, name, primary_key=False, default_value=0):
super().__init__(name, 'int', primary_key, default_value)

四 定义元类

元类介绍

数据库中的每个表,都有表名,每一列的列名,以及主键是哪一列

既然我要用数据库中的表,对应这一个程序中的类,那么我这个类也应该有这些类属性

但是不同的类这些类属性又不尽相同,所以我应该怎么做?在元类里拦截类的创建过程,然后把这些东西取出来,放到类里面

所以用到了元类

class ModelMetaclass(type):
def __new__(cls, name, bases,attrs):
if name=='Model':
return type.__new__(cls,name,bases,attrs)
table_name=attrs.get('table_name',None)
if not table:
table_name=name
primary_key=None
mappings=dict()
for k,v in attrs.items():
if isinstance(v,Field):#v 是不是Field的对象
mappings[k]=v
if v.primary_key:
#找到主键
if primary_key:
raise TypeError('主键重复:%s'%k)
primary_key=k for k in mappings.keys():
attrs.pop(k)
if not primary_key:
raise TypeError('没有主键')
attrs['table_name']=table_name
attrs['primary_key']=primary_key
attrs['mappings']=mappingsreturn type.__new__(cls,name,bases,attrs)

五 继续Model基类

Model类是所有要对应数据库表类的基类,所以,Model的元类应该是咱上面写的那个,而每个数据库表对应类的对象,都应该有查询,插入,保存,方法

所以:

class Model(dict, metaclass=ModelMetaclass):
def __init__(self, **kw):
super(Model, self).__init__(**kw) def __getattr__(self, key): # .访问属性触发
try:
return self[key]
except KeyError:
raise AttributeError('没有属性:%s' % key) def __setattr__(self, key, value):
self[key] = value @classmethod
def select_all(cls, **kwargs):
ms = mysql_singleton.Mysql().singleton()
if kwargs: # 当有参数传入的时候
key = list(kwargs.keys())[0]
value = kwargs[key]
sql = "select * from %s where %s=?" % (cls.table_name, key)
sql = sql.replace('?', '%s')
re = ms.select(sql, value)
else: # 当无参传入的时候查询所有
sql = "select * from %s" % cls.table_name
re = ms.select(sql)
return [cls(**r) for r in re] @classmethod
def select_one(cls, **kwargs):
# 此处只支持单一条件查询
key = list(kwargs.keys())[0]
value = kwargs[key]
ms = mysql_singleton.Mysql().singleton()
sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace('?', '%s')
re = ms.select(sql, value)
if re:
return cls(**re[0])
else:
return None def save(self):
ms = mysql_singleton.Mysql().singleton()
fields = []
params = []
args = []
for k, v in self.mapping.items():
fields.append(v.name)
params.append('?')
args.append(getattr(self, k, v.default))
sql = "insert into %s (%s) values (%s)" % (self.table_name, ','.join(fields), ','.join(params))
sql = sql.replace('?', '%s')
ms.execute(sql, args) def update(self):
ms = mysql_singleton.Mysql().singleton()
fields = []
args = []
pr = None
for k, v in self.mapping.items():
if v.primary_key:
pr = getattr(self, k, v.default)
else:
fields.append(v.name + '=?')
args.append(getattr(self, k, v.default))
sql = "update %s set %s where %s = %s" % (
self.table_name, ', '.join(fields), self.primary_key, pr) sql = sql.replace('?', '%s')
print(sql)
ms.execute(sql, args)

六 基于pymsql的数据库操作类(单例)

from conf import setting
import pymysql class Mysql:
__instance = None
def __init__(self):
self.conn = pymysql.connect(host=setting.host,
user=setting.user,
password=setting.password,
database=setting.database,
charset=setting.charset,
autocommit=setting.autocommit)
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def close_db(self):
self.conn.close() def select(self, sql, args=None):
self.cursor.execute(sql, args)
rs = self.cursor.fetchall()
return rs def execute(self, sql, args):
try:
self.cursor.execute(sql, args)
affected = self.cursor.rowcount
# self.conn.commit()
except BaseException as e:
print(e)
return affected @classmethod
def singleton(cls):
if not cls.__instance:
cls.__instance = cls()
return cls.__instance

七 数据库连接池版的数据库操作类

在此之前,要先学习数据库链接池:链接

db_pool.py

import pymysql
from conf import setting
from DBUtils.PooledDB import PooledDB POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=6, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。 host=setting.host,
port=setting.port,
user=setting.user,
password=setting.password,
database=setting.database,
charset=setting.charset,
autocommit=setting.autocommit
)

mysql_pool.py

import pymysql
from ormpool import db_pool
from threading import current_thread class MysqlPool:
def __init__(self):
self.conn = db_pool.POOL.connection()
# print(db_pool.POOL)
# print(current_thread().getName(), '拿到连接', self.conn)
# print(current_thread().getName(), '池子里目前有', db_pool.POOL._idle_cache, '\r\n')
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def close_db(self):
self.cursor.close()
self.conn.close() def select(self, sql, args=None):
self.cursor.execute(sql, args)
rs = self.cursor.fetchall()
return rs def execute(self, sql, args): try:
self.cursor.execute(sql, args)
affected = self.cursor.rowcount
# self.conn.commit()
except BaseException as e:
print(e)
finally:
self.close_db()
return affected

setting.py

host = '127.0.0.1'
port = 3306
user = 'root'
password = '123456'
database = 'youku2'
charset = 'utf8'
autocommit = True