Sqlalchemy join连表查询

时间:2025-02-24 15:44:05

摘要1:/weixin_33804582/article/details/92471702
摘要2:/juandx/p/
摘要3:/wuheng-123/p/

model存在外键做join连接

首先创建数据库,在这里一个user对应多个address,因此需要在address上增加user_id这个外键(一对多)。


from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import ForeignKey
from  import backref
from  import sessionmaker
from  import relationship, backref
from  import declarative_base
 
Base = declarative_base()
 
 
class User(Base):
    __tablename__ = 'users'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32))

    addresses = relationship("Address", order_by="", backref="user")
 
class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    email_address = Column(String(32), nullable=False)
    user_id = Column(Integer, ForeignKey(''))
 
    #user = relationship("User", backref=backref('addresses', order_by=id))
 
engine  = create_engine('mysql://root:root@localhost:3306/test', echo=True)
#.create_all(engine)

1.如果不使用join的话,可以直接联表查询

>>> (, Address.email_address).filter(==Address.user_id).filter(Address.email_address=='test@').all()
2015-08-19 14:02:02,877 INFO  SELECT  AS users_name, addresses.email_address AS addresses_email_address 
FROM users, addresses 
WHERE  = addresses.user_id AND addresses.email_address = %s
2015-08-19 14:02:02,878 INFO  ('test@',)
[('jack', 'test@')]

2.使用sqlalchemy中提供了()函数

>>> (User).join(Address).filter(Address.email_address=='test@').first()
2015-08-19 14:06:56,624 INFO  SELECT  AS users_id,  AS users_name 
FROM users INNER JOIN addresses ON  = addresses.user_id 
WHERE addresses.email_address = %s 
 LIMIT %s
2015-08-19 14:06:56,624 INFO  ('test@', 1)
< object at 0x7f9a74139a10>

model不存在外键做join连接

1.上面的用法的前提是存在外键的情况下,如果没有外键,如何做join连接

from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import ForeignKey
from  import backref
from  import sessionmaker
from  import relationship, backref
from  import declarative_base
 
Base = declarative_base()
 
class User(Base):
    __tablename__ = 'users'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32)) 
 
class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    email_address = Column(String(32), nullable=False)
    user_id = Column(Integer, ForeignKey(''))
 
    #user = relationship("User", backref=backref('addresses', order_by=id))
 
engine  = create_engine('mysql://root:root@localhost:3306/test', echo=True)
#.create_all(engine)
# 上面model对应示例
(Address, ==Address.user_id)


# 其他sql示例
result = (,()).join(Article,==).\
    group_by().order_by(().desc()).all()
print(result)#[('ketang', 2), ('zhiliao', 1)]

'''
SELECT  AS user_username, count() AS count_1 
FROM user INNER JOIN article ON  =  GROUP BY  ORDER BY count()
'''

子查询示例

# 原生sql,子表查询
mysql> SELECT users.*, adr_count.address_count FROM users LEFT OUTER JOIN
    ->     (SELECT user_id, count(*) AS address_count
    ->         FROM addresses GROUP BY user_id) AS adr_count
    ->     ON =adr_count.user_id;
+----+------+---------------+
| id | name | address_count |
+----+------+---------------+
|  1 | jack |             2 |
+----+------+---------------+
1 row in set (0.00 sec)

使用sqlalchemy子查询


# 生成子句,等同于(select user_id ... group_by user_id)
>>> sbq = (Address.user_id, ('*').label('address_count')).group_by(Address.user_id).subquery()
 
# 联接子句,注意子句中需要使用c来调用字段内容
>>> (, .address_count).outerjoin(sbq, ==.user_id).all()
2015-08-19 14:42:53,425 INFO  SELECT  AS users_name, anon_1.address_count AS anon_1_address_count
FROM users LEFT OUTER JOIN (SELECT addresses.user_id AS user_id, count(%s) AS address_count
FROM addresses GROUP BY addresses.user_id) AS anon_1 ON  = anon_1.user_id
2015-08-19 14:42:53,425 INFO  ('*',)
[('jack', 2L)]
>>>