封装ORM.py与mysql_client.py代码

时间:2021-08-05 03:41:54

ORM.py

'''
ORM: 对象关系映射 ---> 映射到数据库MySQL中的数据表

类名 ---> 表名
对象 ---> 一条记录
对象.属性 ---> 字段

模拟Django的ORM,为了,将数据库的 增、删、改、查,全部封装成
一个个的方式,比如: save, delete, update, select。

优点:
    使用者无需 关心具体的SQL命令 如何编写。
    直接通过调用方法 来执行相对应的SQL命令。

缺点:
    1.更高级的封装导致“执行效率变低”。
    2.会逐渐遗忘SQL原生命令。

'''

from mysql_client import MySQLClient


# 1.创建字段的类型, 对应数据表中的一个个字段的创建规范
class Field:
    def __init__(self, name, column_type, primary_key, default):
        self.name = name
        self.column_type = column_type
        self.primary_key = primary_key
        self.default = default


# Integer
class IntegerField(Field):
    def __init__(self, name, column_type='int', primary_key=False, default=0):
        super().__init__(name, column_type, primary_key, default)


# String
class StringField(Field):
    def __init__(self, name, column_type='varchar(64)', primary_key=False, default=None):
        super().__init__(name, column_type, primary_key, default)


class OrmMetaClass(type):
    def __new__(cls, class_name, class_base, class_dict):
        # 过滤Models类
        if class_name == 'Models':
            # models类中,什么都不做,将类原路返回。
            return type.__new__(cls, class_name, class_base, class_dict)

        # 1.一张表必须要有表名
        # 假如table_name没有值,则将类名当做表名
        table_name = class_dict.get('table_name', class_name)  # get--> self.table_name

        # 2.主键名
        primary_key = None

        # 3.定义一个空字典, 专门用来存放字段对象
        mappings = {}

        # 遍历名称空间中所有的属性
        for key, value in class_dict.items():
            # print(key, value)
            # 除了有字段,还有其他字段以外的属性
            # 过滤字段对象以外的内容
            if isinstance(value, Field):
                mappings[key] = value

                # 判断字段对象primary_key是否为True
                if value.primary_key:

                    # 先判断初识的primary_key是否有值
                    # 判断主键是否已存在
                    if primary_key:
                        raise TypeError('只能有一个主键!')

                    # 若主键不存在,则给primary_key赋值
                    primary_key = value.name

        # 节省资源: 因为mappings与原类中名称空间中的属性重复,为了节省内存,剔除重复的属性。
        for key in mappings.keys():
            class_dict.pop(key)

        # 判断是否有主键
        if not primary_key:
            raise TypeError('必须有一个主键')

        # 给类的名称空间添加表名
        class_dict['table_name'] = table_name
        # 给类的名称空间添加主键名
        class_dict['primary_key'] = primary_key
        # 给类的名称空间添加一个mappings字典,字典中拥有所有字段属性
        class_dict['mappings'] = mappings
        return type.__new__(cls, class_name, class_base, class_dict)


class Models(dict, metaclass=OrmMetaClass):  # OrmMetaClass(Models, Models_name, base, class_dict)
    def __getattr__(self, item):
        # print(item, '调用没有的属性时会触发...')
        # 将字典的值,返回
        return self.get(item)

    def __setattr__(self, key, value):
        # print(key, value)
        self[key] = value

    # 查询方法
    #  User.orm_select(id=1) ----> user_obj ---》 修改对象的属性值 user_obj.user_name = '李小花' ---》 user_obj.orm_update()
    @classmethod
    def orm_select(cls, **kwargs):
        # kwargs --> {'id': 1}
        # 1.调用MySQLClient拿到mysql对象
        mysql = MySQLClient()

        if not kwargs:
            # 查询所有 sql: select * from User;
            sql = 'select * from %s' % cls.table_name

            res = mysql.my_select(sql)

        else:
            # dict.keys() ---> 返回的是一个对象,需要转成list类型
            key = list(kwargs.keys())[0]
            value = kwargs.get(key)

            # 条件查询 sql: select * from User where id=1;
            sql = 'select * from %s where %s=?' % (cls.table_name, key)
            sql = sql.replace('?', '%s')

            # 需要拿到mysql的游标,提交sql语句
            # res = mysql.cursor.execute(sql, value)
            res = mysql.my_select(sql, value)

        # d = {'user_id': 1, 'user_name': 'tank', 'pwd': '123'}
        # return [cls(**{'user_id': 1, 'user_name': 'tank', 'pwd': '123'}) for d in res]

        # MySQL ---> [dict, ]  --> [{}, {}]  ---> [obj, obj] obj.属性取值
        return [cls(**d) for d in res]

    # 插入方法  User(传关键字参数) ---》 user_obj.orm_insert(user_obj)
    def orm_insert(self):
        mysql = MySQLClient()
        # sql: insert into table(f1, f2, f3) values(?, ?, ?);

        # 存储字段名
        keys = []

        # 存字段对应的值
        values = []

        # 存放?号的,有几个字段,就存几个?号
        args = []

        for k, v in self.mappings.items():
            # 过滤掉主键,因为主键是自增的
            if not v.primary_key:
                # keys.append(k)
                # 存表中除了主键以外的字段名
                keys.append(v.name)

                # 存表中除了主键以外的字段值,若值没有,则使用默认值
                values.append(
                    getattr(self, v.name, v.default)
                )

                # 存放?号的,有几个字段,就存几个?号
                args.append('?')

        # sql: insert into table_name(v1, v2, v3) values(?, ?, ?)
        sql = 'insert into %s(%s) values(%s)' % (
            self.table_name,
            ','.join(keys),
            ','.join(args)
        )

        # sql: insert into table_name(v1, v2, v3) values(%s, %s, %s)
        sql = sql.replace('?', '%s')

        mysql.my_execute(sql, values)
        # mysql.cursor.execute(sql, values)

    # 更新方法  update  User(传关键字参数) ---》 user_obj.orm_insert(user_obj)
    def orm_update(self):
        mysql = MySQLClient()
        # 字段名
        keys = []
        # 字段值
        values = []
        # 主键: id=pk
        primary_key = None

        for k, v in self.mappings.items():
            if v.primary_key:
                # 只要思想不滑坡,方法总比问题多。
                primary_key = v.name   '= %s' % getattr(self, v.name)

            else:
                keys.append(v.name   '=?')

                values.append(
                    getattr(self, v.name)  # 小贱贱 ---> 大贱贱
                )

        # sql: update table set k1=v1, k2=v2 where id=pk; #
        # sql: update table set k1=?, k2=? where id=pk; #
        # 注意: tank的更新方法,更新条件固定使用主键。
        sql = 'update %s set %s where %s' % (
            self.table_name,
            ','.join(keys),
            primary_key
        )

        # sql: update table set k1=%s, k2=%s where id=pk; #
        sql = sql.replace('?', '%s')

        mysql.my_execute(sql, values)


# 用户表类
class User(Models):  # ---> 表名
    # table_name = 'user_info'
    # 强调: 最好与字段类型的name属性同名
    # user_id自增
    user_id = IntegerField(name='user_id', primary_key=True)
    user_name = StringField(name='user_name')
    pwd = StringField(name='pwd')


# 用户表类
class Movie(Models):  # ---> 表名
    # table_name = 'user_info'
    # 强调: 最好与字段类型的name属性同名
    user_id = IntegerField(name='user_id', primary_key=True)
    user_name = StringField(name='user_name')
    pwd = StringField(name='pwd')


if __name__ == '__main__':
    # ORM:类名 --> 表名     对象 ---> 一条记录     对象.属性 ---> 字段
    # user = User(user_name='tank', pwd='123')

    # 查  orm_select
    # 查询所有 sql: select * from User;
    # User.select()
    # Movie.orm_select()
    # 条件查询 sql: select * from User where id=1;
    # User.orm_select(id=1)
    # res = User.orm_select()[0]
    # print(res)
    # print(res.user_name)

    # 增  orm_insert
    # User表 ---> 添加字段对应的数据
    # user_obj = User(user_name='小贱贱', pwd='123')
    # user_obj.orm_insert()
    # user_obj = User.orm_select(user_name='小贱贱')[0]
    # print(user_obj.user_name)

    # 改
    # user_obj = User.orm_select(user_name='小贱贱')[0]
    # print(user_obj.user_name)
    # user_obj.user_name = '大贱贱'
    # user_obj.orm_update()
    user_obj = User.orm_select(user_name='大贱贱')[0]
    print(user_obj)

mysql_client.py

import pymysql


class MySQLClient:

    __instance = None

    # 单例模式1
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = object.__new__(cls)
        return cls.__instance

    # 单例模式2
    # @classmethod
    # def singleton(cls):
    #     if not cls.__instance:
    #         cls.__instance = cls()
    #     return cls.__instance

    # 1.创建连接并获取游标
    def __init__(self):
        # 连接客户端
        self.client = pymysql.connect(
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123qwe',
            database='orm_demo',
            charset='utf8',
            autocommit=True
        )

        self.cursor = self.client.cursor(
            pymysql.cursors.DictCursor
        )

    # 2.提交查询sql命令
    def my_select(self, sql, value=None):
        # 提交查询的sql命令
        self.cursor.execute(sql, value)

        # 获取查询以后的结果
        res = self.cursor.fetchall()
        return res

    # 3.封装 “提交sql命令,插入或者更新操作”的方法
    def my_execute(self, sql, values):
        try:
            self.cursor.execute(sql, values)

        except Exception as e:
            print(e)

    # 4.关闭数据库
    def close(self):
        self.cursor.close()
        self.client.close()