用 Python 元类的特性实现 ORM 框架

时间:2021-07-30 02:26:58

orm是什么

o是 object,也就 类对象 的意思,r是 relation,翻译成中文是 关系,也就是关系数据库中 数据表 的意思,m是 mapping,是映射的意思。在orm框架中,它帮我们把类和数据表进行了一个映射,可以让我们通过类和类对象就能操作它所对应的表格中的数据。orm框架还有一个功能,它可以根据我们设计的类自动帮我们生成数据库中的表,省去了我们自己建表的过程。

一个句话理解就是:创建一个实例对象,用创建它的类名当做数据表名,用创建它的类属性对应数据表的字段,当对这个实例对象操作时,能够对应 mysql 语句。

在 django 中就内嵌了一个 orm 框架,不需要直接面向数据库编程,而是定义模型类,通过模型类和对象完成数据表的增删改查操作。还有第三方库 sqlalchemy 都是 orm框架。

用 Python 元类的特性实现 ORM 框架

先看看我们大致要实现什么功能

?
1
2
3
4
5
6
7
8
9
10
11
12
13
class user(父类省略):
    uid = ('uid', "int unsigned")
    name = ('username', "varchar(30)")
    email = ('email', "varchar(30)")
    password = ('password', "varchar(30)")
    ...省略...
 
 
user = user(uid=123, name='hui', email='huidbk@163.com', password='123456')
user.save()
 
# 对应如下sql语句
# insert into user (uid,username,email,password) values (123,hui,huidbk@163.com,123456)

所谓的 orm 就是让开发者在操作数据库的时候,能够像操作对象时通过xxxx.属性=yyyy一样简单,这是开发orm的初衷。

实现orm中的insert功能

通过 python 中 元类 简单实现 orm 中的 insert 功能

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# !/usr/bin/python3
# -*- coding: utf-8 -*-
# @author: hui
# @desc: { 利用python元类简单实现orm框架的insert插入功能 }
# @date: 2021/05/17 17:02
 
 
class modelmetaclass(type):
    """数据表模型元类"""
 
    def __new__(mcs, cls_name, bases, attrs):
 
        print(f'cls_name -> {cls_name}')    # 类名
        print(f'bases -> {bases}')          # 继承类
        print(f'attrs -> {attrs}')          # 类中所有属性
        print()
 
        # 数据表对应关系字典
        mappings = dict()
 
        # 过滤出对应数据表的字段属性
        for k, v in attrs.items():
            # 判断是否是指定的stringfield或者integerfield的实例对象
            # 这里就简单判断字段是元组
            if isinstance(v, tuple):
                print('found mapping: %s ==> %s' % (k, v))
                mappings[k] = v
 
        # 删除这些已经在字典中存储的字段属性
        for k in mappings.keys():
            attrs.pop(k)
 
        # 将之前的uid/name/email/password以及对应的对象引用、类名字
        # 用其他类属性名称保存
        attrs['__mappings__'] = mappings  # 保存属性和列的映射关系
        attrs['__table__'] = cls_name     # 假设表名和类名一致
        return type.__new__(mcs, cls_name, bases, attrs)
 
 
class user(metaclass=modelmetaclass):
    """用户模型类"""
    # 类属性名    表字段    表字段类型
    uid =      ('uid', 'int unsigned')
    name =     ('username', 'varchar(30)')
    email =    ('email', 'varchar(30)')
    password = ('password', 'varchar(30)')
 
    def __init__(self, **kwargs):
        for name, value in kwargs.items():
            setattr(self, name, value)
 
    def save(self):
        fields = []
        args = []
        for k, v in self.__mappings__.items():
            fields.append(v[0])
            args.append(getattr(self, k, none))
 
        # 表名
        table_name = self.__table__
        # 数据表中的字段
        fields = ','.join(fields)
        # 待插入的数据
        args = ','.join([str(i) for i in args])
        
        # 生成sql语句
        sql = f"""insert into {table_name} ({fields}) values ({args})"""
        print(f'sql: {sql}')
 
 
def main():
    user = user(uid=123, name='hui', email='huidbk@163.com', password='123456')
    user.save()
 
 
if __name__ == '__main__':
    main()

当 user 指定元类之后,uid、name、email、password 类属性将不在类中,而是在 __mappings__ 属性指定的字典中存储。 user 类的这些属性将转变为如下

?
1
2
3
4
5
6
7
__mappings__ = {
    "uid": ('uid', "int unsigned")
    "name": ('username', "varchar(30)")
    "email": ('email', "varchar(30)")
    "password": ('password', "varchar(30)")
}
__table__ = "user"

执行的效果如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cls_name -> user
bases -> ()
attrs -> {
    '__module__': '__main__', '__qualname__': 'user', '__doc__': '用户模型类',
    'uid': ('uid', 'int unsigned'),
    'name': ('username', 'varchar(30)'),
    'email': ('email', 'varchar(30)'),
    'password': ('password', 'varchar(30)'),
    '__init__': <function user.__init__ at 0x0000026d520c1048>,
    'save': <function user.save at 0x0000026d520c10d8>
}
 
found mapping: uid ==> ('uid', 'int unsigned')
found mapping: name ==> ('username', 'varchar(30)')
found mapping: email ==> ('email', 'varchar(30)')
found mapping: password ==> ('password', 'varchar(30)')
 
sql: insert into user (uid,username,email,password) values (123,hui,huidbk@163.com,123456)

完善对数据类型的检测

上面转成的 sql 语句如下:

?
1
insert into user (uid,username,email,password) values (12345,hui,huidbk@163.com,123456)

发现没有,在 sql 语句中字符串类型没有没有引号 ''

正确的 sql 语句应该是:

?
1
insert into user (uid,username,email,password) values (123, 'hui', 'huidbk@163.com', '123456')

因此修改 user 类完善数据类型的检测

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class modelmetaclass(type):
    # 此处和上文一样, 故省略....
    pass
    
class user(metaclass=modelmetaclass):
    """用户模型类"""
 
    uid = ('uid', "int unsigned")
    name = ('username', "varchar(30)")
    email = ('email', "varchar(30)")
    password = ('password', "varchar(30)")
 
    def __init__(self, **kwargs):
        for name, value in kwargs.items():
            setattr(self, name, value)
 
    # 在这里完善数据类型检测
    def save(self):
        fields = []
        args = []
        for k, v in self.__mappings__.items():
            fields.append(v[0])
            args.append(getattr(self, k, none))
 
        # 把参数数据类型对应数据表的字段类型
        args_temp = list()
        for temp in args:
            if isinstance(temp, int):
                args_temp.append(str(temp))
            elif isinstance(temp, str):
                args_temp.append(f"'{temp}'")
 
        # 表名
        table_name = self.__table__
        # 数据表中的字段
        fields = ','.join(fields)
        # 待插入的数据
        args = ','.join(args_temp)
 
        # 生成sql语句
        sql = f"""insert into {table_name} ({fields}) values ({args})"""
        print(f'sql: {sql}')
 
 
def main():
    user = user(uid=123, name='hui', email='huidbk@163.com', password='123456')
    user.save()
 
 
if __name__ == '__main__':
    main()

运行效果如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cls_name -> user
bases -> ()
attrs -> {
    '__module__': '__main__', '__qualname__': 'user', '__doc__': '用户模型类',
    'uid': ('uid', 'int unsigned'),
    'name': ('username', 'varchar(30)'),
    'email': ('email', 'varchar(30)'),
    'password': ('password', 'varchar(30)'),
    '__init__': <function user.__init__ at 0x0000026d520c1048>,
    'save': <function user.save at 0x0000026d520c10d8>
}
 
found mapping: uid ==> ('uid', 'int unsigned')
found mapping: name ==> ('username', 'varchar(30)')
found mapping: email ==> ('email', 'varchar(30)')
found mapping: password ==> ('password', 'varchar(30)')
    
sql: insert into user (uid,username,email,password) values(123,'hui','huidbk@163.com','123456')

抽取到基类中

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# !/usr/bin/python3
# -*- coding: utf-8 -*-
# @author: hui
# @desc: { 利用python元类实现orm框架的insert插入功能 }
# @date: 2021/05/17 17:02
 
 
class modelmetaclass(type):
    """数据表模型元类"""
 
    def __new__(mcs, cls_name, bases, attrs):
 
        print(f'cls_name -> {cls_name}'# 类名
        print(f'bases -> {bases}'# 继承类
        print(f'attrs -> {attrs}'# 类中所有属性
        print()
 
        # 数据表对应关系字典
        mappings = dict()
 
        # 过滤出对应数据表的字段属性
        for k, v in attrs.items():
            # 判断是否是对应数据表的字段属性, 因为attrs中包含所有的类属性
            # 这里就简单判断字段是元组
            if isinstance(v, tuple):
                print('found mapping: %s ==> %s' % (k, v))
                mappings[k] = v
 
        # 删除这些已经在字典中存储的字段属性
        for k in mappings.keys():
            attrs.pop(k)
 
        # 将之前的uid/name/email/password以及对应的对象引用、类名字
        # 用其他类属性名称保存
        attrs['__mappings__'] = mappings  # 保存属性和列的映射关系
        attrs['__table__'] = cls_name  # 假设表名和类名一致
        return type.__new__(mcs, cls_name, bases, attrs)
 
 
class model(object, metaclass=modelmetaclass):
    """数据表模型基类"""
 
    def __init__(self, **kwargs):
        for name, value in kwargs.items():
            setattr(self, name, value)
 
    def save(self):
        fields = []
        args = []
        for k, v in self.__mappings__.items():
            fields.append(v[0])
            args.append(getattr(self, k, none))
 
        # 把参数数据类型对应数据表的字段类型
        args_temp = list()
        for temp in args:
            if isinstance(temp, int):
                args_temp.append(str(temp))
            elif isinstance(temp, str):
                args_temp.append(f"'{temp}'")
 
        # 表名
        table_name = self.__table__
        # 数据表中的字段
        fields = ','.join(fields)
        # 待插入的数据
        args = ','.join(args_temp)
 
        # 生成sql语句
        sql = f"""insert into {table_name} ({fields}) values ({args})"""
        print(f'sql: {sql}')
 
        # 执行sql语句
        # ...
 
 
class user(model):
    """用户表模型类"""
 
    uid = ('uid', "int unsigned")
    name = ('username', "varchar(30)")
    email = ('email', "varchar(30)")
    password = ('password', "varchar(30)")
 
 
def main():
    user = user(uid=123, name='hui', email='huidbk@163.com', password='123456')
    user.save()
 
 
if __name__ == '__main__':
    main()

添加数据库驱动执行sql语句

这里我们使用 pymysql 数据库驱动,来执行 sql 语句

在 model 类中新增一个 get_connection 的静态方法用于获取数据库连接

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import pymysql
 
 
class model(object, metaclass=modelmetaclass):
    """数据表模型基类"""
 
    def __init__(self, **kwargs):
        for name, value in kwargs.items():
            setattr(self, name, value)
 
    @staticmethod
    def get_connection():
        """
        获取数据库连接与数据游标
        :return: conn, cursor
        """
        conn = pymysql.connect(
            database='testdb',
            host='localhost',
            port=3306,
            user='root',
            password='123456'
        )
        return conn, conn.cursor()
 
    def save(self):
        fields = []
        args = []
        for k, v in self.__mappings__.items():
            fields.append(v[0])
            args.append(getattr(self, k, none))
 
        # 把参数数据类型对应数据表的字段类型
        args_temp = list()
        for temp in args:
            if isinstance(temp, int):
                args_temp.append(str(temp))
            elif isinstance(temp, str):
                args_temp.append(f"'{temp}'")
 
        # 表名
        table_name = self.__table__
        # 数据表中的字段
        fields = ','.join(fields)
        # 待插入的数据
        args = ','.join(args_temp)
 
        # 生成sql语句
        sql = f"""insert into {table_name} ({fields}) values ({args})"""
        print(f'sql: {sql}')
 
        # 执行sql语句
        conn, cursor = self.get_connection()
        ret = cursor.execute(sql)
        print(ret)
        conn.commit()
        cursor.close()
        conn.close()
       

添加数据库驱动执行sql语句

这里我们使用 pymysql 数据库驱动,来执行 sql 语句

在 model 类中新增一个 get_connection 的静态方法用于获取数据库连接

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import pymysql
 
 
class model(object, metaclass=modelmetaclass):
    """数据表模型基类"""
 
    def __init__(self, **kwargs):
        for name, value in kwargs.items():
            setattr(self, name, value)
 
    @staticmethod
    def get_connection():
        """
        获取数据库连接与数据游标
        :return: conn, cursor
        """
        conn = pymysql.connect(
            database='testdb',
            host='localhost',
            port=3306,
            user='root',
            password='123456'
        )
        return conn, conn.cursor()
 
    def save(self):
        fields = []
        args = []
        for k, v in self.__mappings__.items():
            fields.append(v[0])
            args.append(getattr(self, k, none))
 
        # 把参数数据类型对应数据表的字段类型
        args_temp = list()
        for temp in args:
            if isinstance(temp, int):
                args_temp.append(str(temp))
            elif isinstance(temp, str):
                args_temp.append(f"'{temp}'")
 
        # 表名
        table_name = self.__table__
        # 数据表中的字段
        fields = ','.join(fields)
        # 待插入的数据
        args = ','.join(args_temp)
 
        # 生成sql语句
        sql = f"""insert into {table_name} ({fields}) values ({args})"""
        print(f'sql: {sql}')
 
        # 执行sql语句
        conn, cursor = self.get_connection()
        ret = cursor.execute(sql)
        print(ret)
        conn.commit()
        cursor.close()
        conn.close()
       

测试功能

准备数据库

先准备数据库 testdb 和 user 数据表

?
1
2
3
4
5
6
7
8
9
10
create database testdb charset=utf8;
 
use testdb;
 
create table user(
    uid int unsigned auto_increment primary key,
    username varchar(30) not null,
    email varchar(30),
    password varchar(30) not null
);

user 表结构如下

?
1
2
3
4
5
6
7
8
+----------+------------------+------+-----+---------+----------------+
| field    | type             | null | key | default | extra          |
+----------+------------------+------+-----+---------+----------------+
| uid      | int(10) unsigned | no   | pri | null    | auto_increment |
| username | varchar(30)      | no   |     | null    |                |
| email    | varchar(30)      | yes  |     | null    |                |
| password | varchar(30)      | no   |     | null    |                |
+----------+------------------+------+-----+---------+----------------+

创建模型类测试

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class user(model):
    """用户表模型类"""
 
    uid = ('uid', "int unsigned")
    name = ('username', "varchar(30)")
    email = ('email', "varchar(30)")
    password = ('password', "varchar(30)")
 
 
def main():
    user = user(uid=1, name='hui', email='huidbk@163.com', password='123456')
    user.save()
 
    for i in range(2, 10):
        user = user(
            uid=i,
            name=f'name{i}',
            email=f'huidbk@16{i}.com',
            password=f'12345{i}'
        )
        user.save()
    
 
if __name__ == '__main__':
    main()
   

查看数据库 user 表数据

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> select * from user;
+-----+----------+----------------+----------+
| uid | username | email          | password |
+-----+----------+----------------+----------+
|   1 | hui      | huidbk@163.com | 123456   |
|   2 | name2    | huidbk@162.com | 123452   |
|   3 | name3    | huidbk@163.com | 123453   |
|   4 | name4    | huidbk@164.com | 123454   |
|   5 | name5    | huidbk@165.com | 123455   |
|   6 | name6    | huidbk@166.com | 123456   |
|   7 | name7    | huidbk@167.com | 123457   |
|   8 | name8    | huidbk@168.com | 123458   |
|   9 | name9    | huidbk@169.com | 123459   |
+-----+----------+----------------+----------+
9 rows in set (0.00 sec)

源代码

源代码已上传到 gitee pythonknowledge: python知识宝库,欢迎大家来访。

以上就是用 python 元类的特性实现 orm 框架的详细内容,更多关于python 实现 orm 框架的资料请关注服务器之家其它相关文章!

原文链接:https://juejin.cn/post/6963443372266618917