Python 连接 MySQL 及 SQL增删改查(主要使用sqlalchemy)

时间:2024-06-07 12:25:51

目录

一、环境

二、MySQL的连接和使用

2.1方式一:sql为主

2.1.1创建连接

2.1.2 表结构

2.1.3 新增数据

​编辑

2.1.4 查看数据

​编辑

2.1.5 修改数据

2.1.6 删除数据

2.2方式二:orm对象关系映射

2.2.1 mysql连接

2.2.2 创建表

2.2.3 新增数据

​编辑

2.2.4 查询数据

2.2.5 修改数据

​编辑

2.2.6 删除数据


一、环境

工作中需要用到python和mysql数据库,本次文档记录相关操作。

环境:windows10、python 3.11.7

mysql版本:5.7

二、MySQL的连接和使用

本人使用过的两种方式

2.1方式一:sql为主

2.1.1创建连接
import sqlalchemy
from sqlalchemy.orm import scoped_session, sessionmaker

USERNAME = "root"  # 用户名
PASSWORD = "123456"  # 密码
ADDR = "localhost"  # 连接地址 本地localhost 服务器就是服务器的地址XXX
PORT = "3306"  # mysql端口号
DATABASE = "test"  # 连接的数据库名


class MysqlSession:
    def __init__(self):
        self.create_connection()

    def create_connection(self):
        engine = sqlalchemy.create_engine(
            f"mysql+pymysql://{USERNAME}:{PASSWORD}@{ADDR}:{PORT}/{DATABASE}?charset=utf8",
            max_overflow=50,  # 超过连接池大小外最多创建的连接
            pool_size=50,  # 连接池大小
            pool_recycle=30  # 多久之后对线程池中的线程进行一次连接的回收(重置),
            , echo=False
        )
        self.connection = scoped_session(sessionmaker(bind=engine))

    def get_connection(self):
        return self.connection

    def ins(self, sql):  # 新增数据
        return self.connection.execute(sql).lastrowid

    def arr(self, sql):  # 查询多条数据
        return self.connection.execute(sql).fetchall()

    def obj(self, sql):  # 查询单个数据
        return self.connection.execute(sql).fetchone()

    def upd(self, sql):  # 修改单个数据
        return self.connection.execute(sql)

    def dlt(self, sql):  # 删除数据
        return self.connection.execute(sql)

    def commit(self):
        self.connection.commit()  # 提交
        self.connection.remove()  # 结束会话
2.1.2 表结构

2.1.3 新增数据
mysql_session = MysqlSession()
connection = mysql_session.get_connection()
table_name = 'test.user'  # 表名(这里是数据库名+表名)
# 新增数据,返回的该条数据的id
name_remark = [
    {'name': 'Alice', 'remark': '可能是个女生'},
    {'name': 'Bob', 'remark': "可能是个男生"},
    {'name': 'Tammi'}
]
new_ids = []
for p in name_remark:
    name = p.get('name')
    remark = p.get('remark')
    sql = f"""insert into {table_name} (name,remark) values ('{name}','{remark}')"""
    user_id = mysql_session.ins(sql)
    new_ids.append(user_id)
print(new_ids)
mysql_session.commit()
2.1.4 查看数据
# 查看所有数据
sql = f"""select * from {table_name}"""
users = mysql_session.arr(sql)
for u in users:
    print(u)
# 查看指定数据
sql = f"""select * from {table_name} where name='Alice'"""
user = mysql_session.obj(sql)
print(user)

2.1.5 修改数据
# 修改指定数据
sql = f"""update {table_name} set name='Alice_changed' where id=16"""
mysql_session.upd(sql)
mysql_session.commit()
# 查看刚才修改的数据
sql = f"""select * from {table_name} where id=16 """
user = mysql_session.obj(sql)
print(user)

2.1.6 删除数据
# 删除指定数据
sql = f"""delete from {table_name} where id=16"""
mysql_session.dlt(sql)
mysql_session.commit()
# 查看所有数据
users = show_test(mysql_session, table_name)
print(users)

2.2方式二:orm对象关系映射

因为现目前工作中没有用到这个(以前用django的时候有用到过orm),这里就简单记录一下测试情况。

2.2.1 mysql连接

mysql_session.py

import sqlalchemy
from sqlalchemy.orm import sessionmaker

USERNAME = "root"  # 用户名
PASSWORD = "123456"  # 密码
ADDR = "localhost"  # 连接地址 本地localhost 服务器就是服务器的地址XXX
PORT = "3306"  # mysql端口号
DATABASE = "test"  # 连接的数据库名


class MysqlSession:
    def __init__(self):
        self.create_connection()

    def create_connection(self):
        self.engine = sqlalchemy.create_engine(
            f"mysql+pymysql://{USERNAME}:{PASSWORD}@{ADDR}:{PORT}/{DATABASE}?charset=utf8",
            max_overflow=50,  # 超过连接池大小外最多创建的连接
            pool_size=50,  # 连接池大小
            pool_recycle=30,  # 多久之后对线程池中的线程进行一次连接的回收(重置),
            echo=False
        )
        Session = sessionmaker(bind=self.engine)
        self.session = Session()

    def get_session(self):
        return self.session

    def get_engine(self):
        return self.engine
2.2.2 创建表

models.py

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

from mysql_session import MysqlSession

Base = declarative_base()


class User(Base):
    __tablename__ = 'person'

    id = Column(Integer, primary_key=True, autoincrement=True)  # 主键 自增
    name = Column(String(255))
    age = Column(Integer)


engine = MysqlSession().get_engine()
Base.metadata.create_all(engine) #创建上面的表,运行一次即可

2.2.3 新增数据
from models import Person
from mysql_session import MysqlSession

session = MysqlSession().get_session()

# 新增几条数据
new_user = Person(name='张三', age=30)
session.add(new_user)
new_user = Person(name='李四', age=40)
session.add(new_user)
new_user = Person(name='王五', age=50)
session.add(new_user)
session.commit()
2.2.4 查询数据
# 查询所有数据
users = session.query(Person).all()
for user in users:
    print(user.name, user.age)
print("**************************************")
# 指定信息
user = session.query(Person).filter_by(name='李四').first()
print(user.name, user.age)

2.2.5 修改数据
# 修改数据
user = session.query(Person).filter_by(name='李四').first()
if user:
    user.age = 400  # 将年龄修改为400
    session.commit()
    print("updated success.")
    print(user.name, user.age)
else:
    print("not found.")

2.2.6 删除数据
# 删除所有年龄大于100的用户
users = session.query(Person).filter(Person.age > 100).all()
for u in users:
    print(u.name, u.age)
for user in users:
    session.delete(user)
session.commit()