ORM框架之SQLAchemy

时间:2023-07-20 08:12:14

SQLAchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,即:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

1.安装

pip3 install SQLAlchemy

SQLAchemy本身无法操作数据库,必须依赖pymysql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作:

MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> 例如:
mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8 pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

2.ORM功能使用

2.1创建表

#导入模块
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,ForeignKey,UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker,relationship
from sqlalchemy import create_engine Base=declarative_base()#继承Base类
#创建表单
class UserType(Base):
__tablename__='usertype'##新建表名称
id=Column(Integer,primary_key=True,autoincrement=True)##设置表列名,类型,主键,自增
title=Column(String(32),nullable=True,index=True)##设置表列名,类型,是否可以为空,索引 class User(Base):
__tablename__='users'##新建表名称
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(32), nullable=True, index=True)
email = Column(String(16), unique=True)##设置表列名,类型,唯一索引
user_type_id = Column(Integer, ForeignKey('usertype.id'))#设置表列名,类型,外键
# __table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'),
# Index('ix_n_ex','name', 'email',),
# )
  
user_type=relationship('UserType',backref='xxoo')##设置relationship,将两个类关联,在查询User表格时,也会查询到
# UserType表格,触发查询UserType表格是在循环遍历结果时调用UserType表格的属性
# 设置backref为反向查询
def create_db():
Base.metadata.create_all(engine)
def drop_db():
Base.metadata.create_all(engine) engine=create_engine('mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8',max_overflow=5)
#使用第三方模块连接数据库,
#设置最大连接数
#engine的定义包含了三部分的内容,一是具体数据库类型的实现,二是连接池,三是策略(即engine自己的实现)
#所谓的数据库类型即是MYSQL,Postgresql,SQLite这些不同的数据库,
#一般创建engine是使用create_engine的方法
Session=sessionmaker(bind=engine)#生成一个会话
session=Session()
create_db()#创建表格 user_list=session.query(User,UserType).join(UserType,isouter=True)##isouter 连表(left join) print(user_list) for row in user_list: print(row[0].id,row[0].name,row[0].email,row[0].user_type_id,row[1].title) type_list=session.query(UserType)##获取用户类型 for row in type_list:##遍历列表 if len(row.xxoo)>0:##调用backref,触发获取用户信息的执行
print(row.id,row.title,row.xxoo[0].name)##取到其中的数据 ##其中row.xxoo是一个用户对象 [<__main__.User object at 0x03C67CF0>] ##print(row.xxoo[0].id,row.xxoo[0].name,row.xxoo[0].email) ##获取到用户表中的数据信息,这就是relationship

2.2操作表

##创建一个
obj1=UserType(title='')#实例化对象
session.add(obj1) ##创建多个
objs=[
UserType(title='egon'),
UserType(title='eric'),
UserType(title='aldx')]
session.add_all(objs) 

session.query(UserType.id,UserType.title).filter(UserType.id>2).delete() 

# 以下两种方法都是在原来基础上操作值
session.query(UserType.id,UserType.title).filter(UserType.id>0).update({UserType.title:UserType.title+'x'},synchronize_session='False')
##字符串操作
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
##数字类型操作

print(session.query(UserType))
user_type_list=session.query(UserType).all()##user_type_list是一个列表 user_type_list=session.query(UserType).filter(UserType.id>2)##filter是过滤
#该语句相当于where条件语句 for row in user_type_list:
print(row.id)

其他

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'eric', Users.id > 3),
Users.extra != ""
)).all() # 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制
ret = session.query(Users)[1:2] # 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组
from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all() ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()