研究学习主题
sqlAlchemy架构图
测试练习代码编写
连接数据库
看代码
db_config = {
'host': 'xxx.xxx.xxx.xx',
'user': 'root',
'passwd': 'xxxxxxxx',
'db': 'test',
'charset': 'utf8'
}
engine = create_engine('mysql://%s:%s@%s/%s?charset=%s' % (db_config['user'],
db_config['passwd'],
db_config['host'],
db_config['db'],
db_config['charset']), echo=True)
创建查询
分页
# start from 0 and get 10
searchResult = self.db.query(User).join(UserInfo, User.uid == UserInfo.uid).\
filter(UserInfo.income > 5000).offset(0).limit(10)
filter中实现isnull(id,0)=0
这个的办法就是用 or_
函数, 同时实现了result转json
SysNote = self.db.query(Note.note_id.label('msg_id'), Note.content).filter(Note.deleted == 0).\
outerjoin(NoteBox, Note.note_id == NoteBox.note_id).\
filter(or_(NoteBox.nb_id == None, NoteBox.deleted == 0)).\
filter(or_(NoteBox.nb_id == None, NoteBox.uid == self.uid))
msgs = []
# print(dir(SysNote), ...)
for msg in SysNote:
row = {}
# msg is tuple
for field in msg._fields:
row[field] = eval('msg.'+field) #dynamicly get the field value
msgs.append(row)
SysNote = json.dumps(msgs, check_circular=False)
return JsonResponse(self, 50000, msg='Success', data=msgs)
排除Noe值
#方法一
session.query(employee).filter_by(employ.brand_id.isnot(None))
#方法二
from sqlalchemy import not_
session.query(employee).filter_by(not_(employ.brand_id==None))
列别名
users = session.query(User.name.label("user_name")) # 结果集的列取别名
for user in users:
print("label test:", user.user_name) # 这里使用别名
join使用
sqlalchemy各个模块主要功能了解
1. 在sqlalchemy.schema包里有数据库关系的描述,列举几个最常用的:
字段:Column
索引:Index
表:Table
1. 数据类型在sqlalchemy.types包,列举几个最常用的:
二进制:BIGINT
布尔:BOOLEAN
字符:CHAR
可变字符:VARCHAR
日期:DATETIME
1. 操作方法在sqlalchemy.sql包里,列举几个最常用的:
execute,update,insert,select,delete,join等
执行原生SQL,格式化参数
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql://user:passwd@ip:port/db', echo=True)
Session = sessionmaker(bind=engine)
session = Session()
session.execute('show databases')
如何选取指定的列,使用别名
联表统计,聚合函数的使用,sum,rownumber, avg
如何使用group by, having
创建表结构
创建名为users的表,有四个字段:id,name,fullname,password.
String在mysql里其实就是varchar
metadata = MetaData()
users_table = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('fullname', String(50)),
Column('password', String(100))
)
metadata.create_all(engine)
从数据表获取映射对象
看到这里我们没有使用Model对象
from sqlalchemy.orm import mapper
metadata = MetaData(engine)
users_table = Table('users', metadata, autoload=True)
print users_table.columns
# 利用映射对象执行数据插入和显示
insert = users_table.insert()
insert.execute(name='leon', fullname='leon liang', password='leon123')
mapper(User, users_table)
ed_user=User('crackpot','Crackpot','password')
print 'username:', ed_user.name
print 'fullname:', ed_user.fullname
print 'password:', ed_user.password
print 'id:', str(ed_user.id)
ORM操作
单表操作
#coding:utf8
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
print(sqlalchemy.__version__)
engine = create_engine('sqlite:///dbyuan1.db', echo=True)
Base = declarative_base()#生成一个SQLORM基类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
password = Column(String)
def __repr__(self):
return "<User(name='%s', fullname='%s', password='%s')>" % (
self.name, self.fullname, self.password)
Base.metadata.create_all(engine) #创建所有表结构
ed_user = User(name='xiaoyu', fullname='Xiaoyu Liu', password='123')
print(ed_user)
#这两行触发sessionmaker类下的__call__方法,return得到 Session实例,赋给变量session,所以session可以调用Session类下的add,add_all等方法
MySession = sessionmaker(bind=engine)
session = MySession()
session.add(ed_user)
# our_user = session.query(User).filter_by(name='ed').first()
# SELECT * FROM users WHERE name="ed" LIMIT 1;
# session.add_all([
# User(name='alex', fullname='Alex Li', password='456'),
# User(name='alex', fullname='Alex old', password='789'),
# User(name='peiqi', fullname='Peiqi Wu', password='sxsxsx')])
session.commit()
#print(">>>",session.query(User).filter_by(name='ed').first())
#print(session.query(User).all())
# for row in session.query(User).order_by(User.id):
# print(row)
# for row in session.query(User).filter(User.name.in_(['alex', 'wendy', 'jack'])):#这里的名字是完全匹配
# print(row)
# for row in session.query(User).filter(~User.name.in_(['ed', 'wendy', 'jack'])):
# print(row)
#print(session.query(User).filter(User.name == 'ed').count())
#from sqlalchemy import and_, or_
# for row in session.query(User).filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')):
# print(row)
# for row in session.query(User).filter(or_(User.name == 'ed', User.name == 'wendy')):
# print(row)
一对多关联操作
http://www.cnblogs.com/yuanchenqi/articles/5638282.html
#coding:utf8
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
print(sqlalchemy.__version__)
engine = create_engine('sqlite:///dbyuan1.db', echo=True)
Base = declarative_base()#生成一个SQLORM基类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
password = Column(String)
def __repr__(self):
return "<User(name='%s', fullname='%s', password='%s')>" % (
self.name, self.fullname, self.password)
Base.metadata.create_all(engine) #创建所有表结构
ed_user = User(name='xiaoyu', fullname='Xiaoyu Liu', password='123')
print(ed_user)
#这两行触发sessionmaker类下的__call__方法,return得到 Session实例,赋给变量session,所以session可以调用Session类下的add,add_all等方法
MySession = sessionmaker(bind=engine)
session = MySession()
session.add(ed_user)
# our_user = session.query(User).filter_by(name='ed').first()
# SELECT * FROM users WHERE name="ed" LIMIT 1;
# session.add_all([
# User(name='alex', fullname='Alex Li', password='456'),
# User(name='alex', fullname='Alex old', password='789'),
# User(name='peiqi', fullname='Peiqi Wu', password='sxsxsx')])
session.commit()
#print(">>>",session.query(User).filter_by(name='ed').first())
#print(session.query(User).all())
# for row in session.query(User).order_by(User.id):
# print(row)
# for row in session.query(User).filter(User.name.in_(['alex', 'wendy', 'jack'])):#这里的名字是完全匹配
# print(row)
# for row in session.query(User).filter(~User.name.in_(['ed', 'wendy', 'jack'])):
# print(row)
#print(session.query(User).filter(User.name == 'ed').count())
#from sqlalchemy import and_, or_
# for row in session.query(User).filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')):
# print(row)
# for row in session.query(User).filter(or_(User.name == 'ed', User.name == 'wendy')):
# print(row)
#coding:utf8
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
print(sqlalchemy.__version__)
engine = create_engine('sqlite:///dbyuan1.db', echo=True)
Base = declarative_base()#生成一个SQLORM基类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
password = Column(String)
def __repr__(self):
return "<User(name='%s', fullname='%s', password='%s')>" % (
self.name, self.fullname, self.password)
Base.metadata.create_all(engine) #创建所有表结构
ed_user = User(name='xiaoyu', fullname='Xiaoyu Liu', password='123')
print(ed_user)
#这两行触发sessionmaker类下的__call__方法,return得到 Session实例,赋给变量session,所以session可以调用Session类下的add,add_all等方法
MySession = sessionmaker(bind=engine)
session = MySession()
session.add(ed_user)
# our_user = session.query(User).filter_by(name='ed').first()
# SELECT * FROM users WHERE name="ed" LIMIT 1;
# 批量插入
# session.add_all([
# User(name='alex', fullname='Alex Li', password='456'),
# User(name='alex', fullname='Alex old', password='789'),
# User(name='peiqi', fullname='Peiqi Wu', password='sxsxsx')])
session.commit()
#print(">>>",session.query(User).filter_by(name='ed').first())
#print(session.query(User).all())
# for row in session.query(User).order_by(User.id):
# print(row)
# for row in session.query(User).filter(User.name.in_(['alex', 'wendy', 'jack'])):#这里的名字是完全匹配
# print(row)
# for row in session.query(User).filter(~User.name.in_(['ed', 'wendy', 'jack'])):
# print(row)
#print(session.query(User).filter(User.name == 'ed').count())
#from sqlalchemy import and_, or_
# for row in session.query(User).filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')):
# print(row)
# for row in session.query(User).filter(or_(User.name == 'ed', User.name == 'wendy')):
# print(row)
ORM查询
http://www.cnblogs.com/aylin/p/5770888.html
#limit索引取出第一二行数据
session.query(Person).all()[1:3]
#order by,按照id从大到小排列
session.query(Person).ordre_by(Person.id)
#equal/like/in
query = session.query(Person)
query.filter(Person.id==1).all()
query.filter(Person.id!=1).all()
query.filter(Person.name.like('%ay%')).all()
query.filter(Person.id.in_([1,2,3])).all()
query.filter(~Person.id.in_([1,2,3])).all()
query.filter(Person.name==None).all()
#and or
from sqlalchemy import and_
from sqlalchemy import or_
query.filter(and_(Person.id==1, Person.name=='张岩林')).all()
query.filter(Person.id==1, Person.name=='张岩林').all()
query.filter(Person.id==1).filter(Person.name=='张岩林').all()
query.filter(or_(Person.id==1, Person.id==2)).all()
# count计算个数
session.query(Person).count()
# 修改update
session.query(Person).filter(id > 2).update({'name' : '张岩林'})
# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
# 分组
from sqlalchemy.sql import func
ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
# 连表
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
ret = session.query(Person).join(Favor).all()
ret = session.query(Person).join(Favor, isouter=True).all()
# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
relationship和join使用
转载自:http://www.cnblogs.com/coder2012/p/4746941.html
- relationship
#!/usr/bin/env python
# encoding: utf-8
from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import ForeignKey
from sqlalchemy.orm import backref
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32))
addresses = relationship("Address", order_by="Address.id", backref="user")
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String(32), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
#user = relationship("User", backref=backref('addresses', order_by=id))
engine = create_engine('mysql://root:root@localhost:3306/test', echo=True)
#Base.metadata.create_all(engine)
jack = User(name='jack')
jack.addresses = [Address(email_address='test@test.com'), Address(email_address='test1@test1.com')]
session.add(jack)
session.commit()
join
不使用join可以直接联表查询
session.query(User.name, Address.email_address).\
filter(User.id == Address.user_id).\
filter(Address.email_address == 'test@test.com').all()
# SELECT users.name AS users_name, addresses.email_address AS addresses_email_address
# FROM users, addresses
# WHERE users.id = addresses.user_id AND addresses.email_address = %s
# [('jack', 'test@test.com')]
在sqlalchemy中提供了Queqy.join()函数
# 有外键
session.query(User).join(Address).filter(Address.email_address=='test@test.com').first()
session.query(User).join(Address).filter(Address.email_address=='test@test.com').first().name
session.query(User).join(Address).filter(Address.email_address=='test@test.com').first().addresses
# 无外键
query.join(Address, User.id==Address.user_id) # explicit condition
query.join(User.addresses) # specify relationship from left to right
query.join(Address, User.addresses) # same, with explicit target
query.join('addresses')
表的别名
from sqlalchemy.orm import aliased
adalias1 = aliased(Address)
子查询
例如我们需要如下的查询
SELECT users.*, adr_count.address_count
FROM users
LEFT JOIN
(SELECT user_id, count(*) AS address_count
FROM addresses
GROUP BY user_id
) AS adr_count ON users.id=adr_count.user_id;
# 生成子句,等同于(select user_id ... group_by user_id)
sbq = session.query(Address.user_id, func.count('*').label('address_count')).\
group_by(Address.user_id).subquery()
# 联接子句,注意子句中需要使用c来调用字段内容
session.query(User.name, sbq.c.address_count).\
outerjoin(sbq, User.id == sbq.c.user_id).all()
包含contains
query.filter(User.addresses.contains(someaddress))
sqlalchemy(一)基本操作 TODO
http://www.cnblogs.com/coder2012/p/4741081.html
#获取所有数据
session.query(Person).all()
#获取某一列数据,类似于django的get,如果返回数据为多个则报错
session.query(Person).filter(Person.name=='jack').one()
#获取返回数据的第一行
session.query(Person).first()
#过滤数据
session.query(Person.name).filter(Person.id>1).all()
#limit
session.query(Person).all()[1:3]
#order by
session.query(Person).ordre_by(-Person.id)
#equal/like/in
query = session.query(Person)
query.filter(Person.id==1).all()
query.filter(Person.id!=1).all()
query.filter(Person.name.like('%ac%')).all()
query.filter(Person.id.in_([1,2,3])).all()
query.filter(~Person.id.in_([1,2,3])).all()
query.filter(Person.name==None).all()
#and or
from sqlalchemy import and_
query.filter(and_(Person.id==1, Person.name=='jack')).all()
query.filter(Person.id==1, Person.name=='jack').all()
query.filter(Person.id==1).filter(Person.name=='jack').all()
from sqlalchemy import or_
query.filter(or_(Person.id==1, Person.id==2)).all()
# 使用text
from sqlalchemy import text
query.filter(text("id>1")).all()
query.filter(Person.id>1).all() #同上
query.filter(text("id>:id")).params(id=1).all() #使用:,params来传参
query.from_statement(
text("select * from person where name=:name")).\
params(name='jack').all()
#计数 count
query.filter(Person.id>1).count()
session.query(func.count('*')).select_from(Person).scalar()
session.query(func.count(Person.id)).scalar()
Query
resultProxy=db.execute("select * from users")
resultProxy.close(), resultProxy 用完之后, 需要close
resultProxy.scalar(), 可以返回一个标量查询的值
ResultProxy 类是对Cursor类的封装(在文件sqlalchemy\engine\base.py),
ResultProxy 类有个属性cursor即对应着原来的cursor.
ResultProxy 类有很多方法对应着Cursor类的方法, 另外有扩展了一些属性/方法.
resultProxy.fetchall()
resultProxy.fetchmany()
resultProxy.fetchone()
resultProxy.first()
resultProxy.scalar()
resultProxy.returns_rows #True if this ResultProxy returns rows.
resultProxy.rowcount #return rows affected by an UPDATE or DELETE statement. It is not intended to provide the number of rows present from a SELECT.
**遍历ResultProxy时, 得到的每一个行都是RowProxy对象, 获取字段的方法非常灵活, 下标和字段名甚至属性都行. rowproxy[0] == rowproxy['id'] == rowproxy.id, 看得出 RowProxy 已经具备基本 POJO 类特性. **
http://blog.csdn.net/mmx/article/details/48064109
delete and remove
session.delete(jack)
session.query(User).filter_by(name='jack').count()
session.query(User).filter_by(name='jack').remove()
session.commit()