文章目录
- SQLAlchemy是什么
- 为什么使用orm
- 定义
- 安装
- 组成部分
- SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
- SQLAlchemy的使用
- 原生sql
- 使用orm映射数据表
- 外键关系
- 一对多(ForeignKey)
- 多对多
- 使用orm操作记录
- 简单表操作
- 基于scoped_session实现线程安全
- CRUD
- 基础操作
- 进阶操作
- Flask集成sqlalchemy
- 构建模型类
- 常用的字段类型
- 常用的字段选项
- 增
- 查
- 改
- 先查询, 再更新
- 基于过滤条件的更新 (推荐方案)
- 删
- 先查询, 再删除
- 基于过滤条件的删除 (推荐方案)
SQLAlchemy是什么
为什么使用orm
优点
- 有语法提示, 省去自己拼写SQL,保证SQL语法的正确性
- orm提供方言功能(dialect, 可以转换为多种数据库的语法), 减少学习成本
- 防止sql注入攻击
- 搭配数据迁移, 更新数据库方便
- 面向对象, 可读性强, 开发效率高
缺点
- 需要语法转换, 效率比原生sql低
- 复杂的查询往往语法比较复杂 (可以使用原生sql替换)
定义
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
安装
pip3 install sqlalchemy
组成部分
- Engine,框架的引擎
- Connection Pooling ,数据库连接池
- Dialect,选择连接数据库的DB API种类
- Schema/Types,架构和类型
- SQL Exprression Language,SQL表达式语言
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多:/en/latest/dialects/
SQLAlchemy的使用
原生sql
from sqlalchemy import create_engine
from import Engine
from urllib import parse
import threading
user = "root"
password = "xxx@000"
pwd = parse.quote_plus(password) # 解决密码中含@符导致报错
host = "127.0.0.1:"
# 第一步: 创建engine
engine = create_engine(
f"mysql+pymysql://{user}:{pwd}@{host}3306/test1?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第二步:使用
def task():
conn = engine.raw_connection() # 从连接池中取一个连接
cursor = ()
sql = "select * from signer"
(sql)
print(())
if __name__ == '__main__':
for i in range(20):
t = (target=task)
()
使用orm映射数据表
import datetime
from import declarative_base
from model import engine # 用的简单使用里面的engine
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base() # 基类
# 表模型
class Users(Base):
__tablename__ = 'users' # 数据库表名称, 必须写
id = Column(Integer, primary_key=True) # id 主键
name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
email = Column(String(32), unique=True)
# 不能加括号,加了括号,以后永远是当前时间
ctime = Column(DateTime, default=)
extra = Column(Text, nullable=True)
__table_args__ = ( # 可选
UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一
Index('ix_id_name', 'name', 'email'), # 索引
)
def init_db():
"""
根据类创建继承base类的表
:return:
"""
.create_all(engine)
def drop_db():
"""
根据类删除继承base类的表
:return:
"""
.drop_all(engine)
if __name__ == '__main__':
# sqlalchemy 只支持 创建和删除表,不支持修改表(django orm支持)。sqlalchemy 需要借助第三方实现
init_db() # 创建表
# drop_db() # 删除表
外键关系
一对多(ForeignKey)
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名
hobby_id = Column(Integer, ForeignKey(""))
# 跟数据库无关,不会新增字段,只用于快速链表操作
# 类名,backref用于反向查询 参数 uselist=False , 设置就变成了一对一,其他和一对多一样
hobby=relationship('Hobby',backref='pers')
# hobby=relationship('Hobby',backref='pers', uselist=False) # 一对一关系
多对多
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey(''))
boy_id = Column(Integer, ForeignKey(''))
class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
servers = relationship('Girl', secondary='boy2girl', backref='boys')
'''
girl_id = Column(Integer, ForeignKey("", ondelete='SET NULL')) # 一般用SET NULL
外键约束
1. RESTRICT:若子表中有父表对应的关联数据,删除父表对应数据,会阻止删除。默认项
2. NO ACTION:在MySQL中,同RESTRICT。
3. CASCADE:级联删除。
4. SET NULL:父表对应数据被删除,子表对应数据项会设置为NULL。
'''
扩充:在 django 中,外键管理有个参数
db_contraint=False
用来在逻辑上关联表,但实体不建立约束。同样在SQLAlchemy
中也可以通过配值relationship
参数来实现同样的效果
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer) # 不用ForeignKey
boy_id = Column(Integer)
gitl = (
"Girl",
# uselist=False, # 一对一设置
backref=backref("to_course", uselist=False), # backref用于反向查询 uselist 作用同上
lazy="subquery", # 懒加载 用来指定sqlalchemy 什么时候加载数据
primaryjoin="==Boy2Girl.girl_id", # 指定对应关系
foreign_keys="Boy2Girl.girl_id" # 指定表的外键字段
)
'''
lazy 可选值
select:就是访问到属性的时候,就会全部加载该属性的数据 默认值
joined:对关联的两个表使用联接
subquery:与joined类似,但使用子子查询
dynamic:不加载记录,但提供加载记录的查询,也就是生成query对象
'''
使用orm操作记录
简单表操作
from import sessionmaker
from model import engine
from db_model import Users
# 定义一个 session, 以后操作数据都用 session 来执行
Session = sessionmaker(bind=engine)
session = Session()
# 创建User对象
usr = Users(name="yxh", email="152@", extra="xxx")
# 通过 user对象 添加到session中
(usr)
# 提交,才会刷新到数据库中,不提交不会执行
()
基于scoped_session实现线程安全
session 如果是一个全局对象。那么在多线程的情况下,并发使用同一个变量 session 是不安全的,解决方案如下:
-
将session定义在局部,每一个view函数都定义一个session。 代码冗余,不推荐
-
基于scoped_session 实现线程安全。原理同 request对象,g对象一致。也是基于local,给每一个线程创造一个session
from import sessionmaker, scoped_session
from model import engine
from db_model import Users
# 定义一个 session
Session = sessionmaker(bind=engine)
# session = Session()
session = scoped_session(Session) # 后续使用这个session就是线程安全的
# 创建User对象
usr = Users(name="yxh", email="152@", extra="xxx")
# 通过 user对象 添加到session中
(usr)
# 提交,才会刷新到数据库中,不提交不会执行
()
CRUD
创建(Create)、读取(Read)、更新(Update)和删除(Delete)
基础操作
from import sessionmaker, scoped_session
from import text
from model import engine
from db_model import Users
# 定义一个 session
Session = sessionmaker(bind=engine)
# session = Session()
session = scoped_session(Session) # 后续使用这个session就是线程安全的
# 1 增加操作
obj1 = Users(name="yxh003")
(obj1)
# 增加多个,不同对象
session.add_all([
Users(name="yxh009"),
Users(name="yxh008"),
])
()
# 2 删除操作---》查出来再删---》
(Users).filter( > 2).delete()
()
# 3 修改操作--》查出来改 传字典
(Users).filter( > 0).update({"name": "yxh"})
# 类似于django的F查询
# 字符串加
(Users).filter( > 0).update({: + "099"}, synchronize_session=False)
# 数字加
(Users).filter( > 0).update({"age": + 1}, synchronize_session="evaluate")
()
# 4 查询操作----》
r1 = (Users).all() # 查询所有
# 只取age列,把name重命名为xx
# select name as xx,age from user;
r2 = (('xx'), ).all()
# filter传的是表达式,filter_by传的是参数
r3 = (Users).filter( == "yxh").all()
r3 = (Users).filter( >= 1).all()
r4 = (Users).filter_by(name='yxh').all()
r5 = (Users).filter_by(name='yxh').first()
# :value 和:name 相当于占位符,用params传参数
r6 = (Users).filter(text("id<:value and name=:name")).params(value=224, name='yxh').order_by(
).all()
# 自定义查询sql
r7 = (Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='yxh').all()
# 执行原生sql
# 查询
cursor = ("select * from users")
result = ()
# 添加
cursor = ('insert into users(name) values(:value)', params={"value": 'yxh'})
()
print()
进阶操作
from import sessionmaker, scoped_session
from import text, func
from sqlalchemy import and_, or_
from model import engine
from db_model import Users, Person, Favor
# 定义一个 session
Session = sessionmaker(bind=engine)
# session = Session()
session = scoped_session(Session) # 后续使用这个session就是线程安全的
# 条件
# select * form user where name =lqz
ret = (Users).filter_by(name='lqz').all()
# 表达式,and条件连接
# select * from user where id >1 and name = lqz
ret = (Users).filter( > 1, == 'lqz').all()
# select * from user where id between 1,3 and name = lqz
ret = (Users).filter((1, 3), == 'lqz').all()
# # 注意下划线
# select * from user where id in (1,3,4)
ret = (Users).filter(.in_([1, 3, 4])).all()
# # ~非,除。。外
# select * from user where id not in (1,3,4)
ret = (Users).filter(~.in_([1, 3, 4])).all()
# # 二次筛选
ret = (Users).filter(.in_(().filter_by(name='lqz'))).all()
# # or_包裹的都是or条件,and_包裹的都是and条件
ret = (Users).filter(and_( > 3, == 'eric')).all()
ret = (Users).filter(or_( < 2, == 'eric')).all()
ret = (Users).filter(
or_(
< 2,
and_( == 'eric', > 3),
!= ""
)).all()
# 通配符,以e开头,不以e开头
ret = (Users).filter(('e%')).all()
ret = (Users).filter(~('e%')).all()
# # 限制,用于分页,区间
ret = (Users)[1:2]
# # 排序,根据name降序排列(从大到小)
ret = (Users).order_by(()).all()
# # 第一个条件重复后,再按第二个条件升序排
ret = (Users).order_by((), ()).all()
# 分组
# select * from user group by ;
ret = (Users).group_by().all()
# # 分组之后取最大id,id之和,最小id
# select max(id),sum(id),min(id) from user group by name ;
ret = (
(),
(),
()).group_by().all()
# haviing筛选
# select max(id),sum(id),min(id) from user group by name having min(id)>2;
ret = (
(),
(),
()).group_by().having(() > 2).all()
# select max(id),sum(id),min(id) from user where id >=1 group by name having min(id)>2;
ret = (
(),
(),
()).filter( >= 1).group_by().having(() > 2).all()
# 连表(默认用orm中forinkey关联)
# select * from user,favor where =
ret = (Users, Favor).filter( == ).all()
# join表,默认是inner join
# select * from Person inner join favor on =;
ret = (Person).join(Favor).all()
# isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
ret = (Person).join(Favor, isouter=True).all()
ret = (Favor).join(Person, isouter=True).all()
# 打印原生sql
aa = (Person).join(Favor, isouter=True)
# print(aa)
# 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# select * from person left join favor on =;
ret = (Person).join(Favor, == , isouter=True).all()
# 组合 UNION 操作符用于合并两个或多个 SELECT 语句的结果集 多用于分表后 上下连表
# union和union all union 去重, union all 不去重
q1 = ().filter( > 2)
q2 = ().filter( < 2)
ret = (q2).all()
q1 = ().filter( > 2)
q2 = ().filter( < 2)
ret = q1.union_all(q2).all()
Flask集成sqlalchemy
'''
flask_migrate 中使用了flask_sqlalchemy 下载时,会自动帮你下载flask_sqlalchemy
flask_migrate 3.0之前和之后使用方法有区别。这里以 做演示
'''
# flask_migrate使用步骤
from flask_sqlalchemy import SQLAlchemy
# Flask_SQLAlchemy给你包装了基类,和session,以后拿到db
db = SQLAlchemy() # 全局SQLAlchemy,就是线程安全的,内部就是上述那么实现的
app = Flask(__name__)
# SQLAlchemy 连接数据库配置是在 config 配置字典中获取的,所以需要我们将配置添加进去
.from_object('DevelopmentConfig')
'''
基本写这些就够了
"SQLALCHEMY_DATABASE_URI"
"SQLALCHEMY_POOL_SIZE"
"SQLALCHEMY_POOL_TIME"
"SQLALCHEMY_POOL_RECYCLE"
"SQLALCHEMY_TRACK_MODIFICATIONS"
"SQLALCHEMY_ENGINE_OPTIONS"
'''
# 将db注册到app中,加载配置文件,flask-session,用一个类包裹一下app
db.init_app(app)
# 将需要使用orm操作的表通过继承实现类
# 下面三句会创建出两个命令:runserver db 命令(flask_migrate)
manager=Manager(app)
Migrate(app, db)
manager.add_command('db', MigrateCommand) # 添加一个db命令
使用命令:
1. python db init # 初始化,刚开始干,生成一个migrate文件夹(迁移文件夹)
2. python db migrate # 生成迁移版本,保存到迁移文件夹,同django makemigartions
3. python db upgrade # 执行迁移,同django migrate
配置选项 | 示例值 | 说明 |
---|---|---|
SQLALCHEMY_DATABASE_URI | 'sqlite:///' |
数据库连接字符串,指定要连接的数据库类型、用户名、密码、主机和数据库名称等信息 |
SQLALCHEMY_TRACK_MODIFICATIONS | False |
是否跟踪对象修改。默认为True。建议在生产环境中将其设置为False,以提高性能 |
SQLALCHEMY_ECHO | True |
是否将生成的SQL语句输出到控制台上。主要用于调试和开发,默认为False |
SQLALCHEMY_POOL_SIZE | 10 |
数据库连接池的大小,默认为5 |
SQLALCHEMY_POOL_TIMEOUT | 20 |
等待数据库连接的超时时间(秒),默认为10 |
SQLALCHEMY_POOL_RECYCLE | 3600 |
连接在重新使用之前的最大时间(秒)。默认为-1,表示禁用连接回收 |
SQLALCHEMY_NATIVE_UNICODE | False |
控制是否使用数据库驱动程序的本机Unicode支持。默认为False |
SQLALCHEMY_COMMIT_ON_TEARDOWN | True |
是否在请求结束时自动提交事务。默认为False。在某些情况下,可以将其设置为True,以自动提交更改到数据库 |
SQLALCHEMY_BINDS | {'users': 'sqlite:///'} |
定义多个数据库连接的绑定。 |
SQLALCHEMY_ENGINE_OPTIONS | {'pool_size':10, 'connect_timeout':20, 'encoding':'utf8mb4'} |
具体参数和效果取决于所使用的数据库驱动程序和 SQLAlchemy 版本 |
构建模型类
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 相关配置
['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31'
['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
['SQLALCHEMY_ECHO'] = True
# 创建组件对象
db = SQLAlchemy(app)
# 构建模型类 类->表 类属性->字段 实例对象->记录
# 模型类必须继承 , 其中 db 指对应的组件对象
class User():
__tablename__ = 't_user' # 设置表名, 表名默认为类名小写
id = (, primary_key=True) # 设置主键, 默认自增
name = ('username', (20), unique=True) # 设置字段名 和 唯一约束
age = (, default=10, index=True) # 设置默认值约束 和 索引
if __name__ == '__main__':
# 删除所有继承自的表
db.drop_all()
# 创建所有继承自的表
db.create_all()
(debug=True)
常用的字段类型
字段类型 | 示例 | 说明 |
---|---|---|
Integer | age = () |
整数类型 |
String | name = ((100)) |
字符串类型 |
Text | content = () |
长文本类型 |
Float | height = () |
浮点数类型 |
Boolean | is_active = (, default=False) |
布尔类型 |
DateTime | created_at = () |
日期和时间类型 |
Date | birth_date = () |
日期类型 |
Time | meeting_time = () |
时间类型 |
JSON | data = () |
存储 JSON 数据的字段类型 |
PickleType | data = () |
存储 Python 对象的字段类型 |
Enum | status = (('active', 'inactive', name='status_enum')) |
枚举类型,用于限制字段取值范围 |
ForeignKey | user_id = (, ('')) |
外键类型,用于与其他模型类建立关联 |
relationship | user = ('User', backref='posts') |
定义模型之间的关系 |
backref | - | 在关系另一侧创建反向引用 |
primary_key=True | id = (, primary_key=True) |
将字段设置为主键 |
unique=True | email = ((100), unique=True) |
将字段设置为唯一值 |
nullable=False | name = ((100), nullable=False) |
设置字段为非空 |
default | is_active = (, default=False) |
设置字段的默认值 |
index=True | username = ((100), index=True) |
为字段创建索引 |
server_default | created_at = (, server_default=()) |
在数据库服务器上设置字段的默认值 |
常用的字段选项
字段选项 | 示例 | 说明 |
---|---|---|
primary_key=True | id = (, primary_key=True) |
将字段设置为主键 |
unique=True | email = ((100), unique=True) |
将字段设置为唯一值 |
nullable=False | name = ((100), nullable=False) |
设置字段为非空 |
default | is_active = (, default=False) |
设置字段的默认值 |
index=True | username = ((100), index=True) |
为字段创建索引 |
autoincrement=True | id = (, primary_key=True, autoincrement=True) |
设置字段自增 |
server_default | created_at = (, server_default=()) |
在数据库服务器上设置字段的默认值 |
onupdate | updated_at = (, onupdate=()) |
在字段更新时设置新的值 |
foreign_key | user_id = (, ('')) |
定义字段与其他表的外键关系 |
backref | posts = ('Post', backref='user') |
在关系的另一侧创建反向引用 |
lazy | posts = ('Post', lazy='dynamic') |
控制字段加载的时机 |
uselist | users = ('User', backref='role', uselist=False) |
控制关系字段是否作为列表返回 |
cascade | posts = ('Post', cascade='delete') |
控制级联操作的行为 |
passive_deletes | posts = ('Post', passive_deletes=True) |
控制删除时的级联行为 |
passive_updates | posts = ('Post', passive_updates=True) |
控制更新时的级联行为 |
ondelete | post_id = (, ('', ondelete='CASCADE')) |
在删除关联记录时的处理方式 |
primaryjoin | posts = ('Post', primaryjoin='and_( == Post.user_id, User.is_active == True)') |
指定关系字段之间的条件表达式 |
secondary | users = ('User', secondary='user_role', backref='roles') |
定义多对多关系中的中间表 |
secondaryjoin | users = ('User', secondary='user_role', secondaryjoin='and_( == user_role.user_id, user_role.is_active == True)') |
指定多对多关系中中间表与主表之间的条件表达式 |
增
# 1.创建模型对象
user1 = User(name='zs', age=20)
# = 'zs'
# = 20
# 2.将模型对象添加到会话中
(user1)
# 添加多条记录
# .add_all([user1, user2, user3])
# 3.提交会话 (会提交事务)
# sqlalchemy会自动创建隐式事务
# 事务失败会自动回滚
()
查
# 查询所有用户数据
() 返回列表, 元素为模型对象
# 查询有多少个用户
()
# 查询第1个用户
() 返回模型对象/None
# 查询id为4的用户[3种方式]
# 方式1: 根据id查询 返回模型对象/None
(4)
# 方式2: 等值过滤器 关键字实参设置字段值 返回BaseQuery对象
# BaseQuery对象可以续接其他过滤器/执行器 如 all/count/first等
.filter_by(id=4).all()
# 方式3: 复杂过滤器 参数为比较运算/函数引用等 返回BaseQuery对象
( == 4).first()
# 查询名字结尾字符为g的所有用户[开始 / 包含]
(("g")).all()
(("w")).all()
(("n")).all()
(("w%n%g")).all() # 模糊查询
# 查询名字和邮箱都以li开头的所有用户[2种方式]
(('li'), ('li')).all()
from sqlalchemy import and_
(and_(('li'), ('li'))).all()
# 查询age是25 或者 `email`以``结尾的所有用户
from sqlalchemy import or_
(or_(==25, (""))).all()
# 查询名字不等于wang的所有用户[2种方式]
from sqlalchemy import not_
(not_( == 'wang')).all()
( != 'wang').all()
# 查询id为[1, 3, 5, 7, 9]的用户
(.in_([1, 3, 5, 7, 9])).all()
# 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个
.order_by(, ()).limit(5).all()
# 查询年龄从小到大第2-5位的数据 2 3 4 5
.order_by().offset(1).limit(4).all()
# 分页查询, 每页3个, 查询第2页的数据 paginate(页码, 每页条数)
pn = (2, 3)
总页数 当前页码 当前页的数据 总条数
# 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合
from sqlalchemy import func
data = (, ().label("count")).group_by().all()
for item in data:
# print(item[0], item[1])
print(, ) # 建议通过label()方法给字段起别名, 以属性方式获取数据
# 只查询所有人的姓名和邮箱 优化查询 () # 相当于select *
from import load_only
data = (load_only(, )).all() # flask-sqlalchem的语法
for item in data:
print(, )
data = (, ).all() # sqlalchemy本体的语法
for item in data:
print(, )
改
先查询, 再更新
对应SQL中的 先select, 再commit()
-
缺点
- 查询和更新分两条语句, 效率低
- 如果并发更新, 可能出现更新丢失问题(Lost Update)
-
示例
# 1.执行查询语句, 获取目标模型对象 goods = ( == '方便面').first() # 2.对模型对象的属性进行赋值 (更新数据) = - 1 # 3.提交会话 ()
基于过滤条件的更新 (推荐方案)
对应SQL中的 update xx where xx = xx (也称为 update子查询 )
-
优点
- 一条语句, 被网络IO影响程度低, 执行效率更高
- 查询和更新在一条语句中完成, 单条SQL具有原子性, 不会出现更新丢失问题
- 会对满足过滤条件的所有记录进行更新, 可以实现批量更新处理
-
示例
( == '方便面').update({'count': - 1}) # 提交会话 ()
删
先查询, 再删除
# 方式1: 先查后删除
goods = ( == '方便面').first()
# 删除数据
(goods)
# 提交会话 增删改都要提交会话
()
基于过滤条件的删除 (推荐方案)
# 方式2: delete子查询
( == '方便面').delete()
# 提交会话
()