sqlAlchemy解读: http://www.zzvips.com/article/213710.html
sqlAlchemy解读:http://www.zzvips.com/article/213711.html
特点是操纵Python对象而不是SQL查询,也就是在代码层面考虑的是对象,而不是SQL,体现的是一种程序化思维,这样使得Python程序更加简洁易懂。
具体的实现方式是将数据库表转换为Python类,其中数据列作为属性,数据库操作作为方法。
- abstract # 辅助sqlAlchemy实现类的继承,自动继承属性,省去super()
- SQLAlchemy定义的ORM,在继承父级ORM时候,Foreign Key外键是不能继承的,它强制要求在子类中重新定义。
使用概述
在使用sqlalchemy访问数据库的时候,以类的形式表示表格,因此在使用之前,需要先定义类。
类的定义有三种:基类BASE、父类、子类
基类是sqlalchemy底层的;当需要一份数据切分为多个子表的时候,或多个表的字段一致时,可以使用一个父类定义字段的类型,多个子表继承父类的属性。
一、创建引擎和会话
通过创建引擎、绑定引擎来创建会话,实现数据库的访问。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from sqlalchemy import create_engine # 引擎
from sqlalchemy.orm import sessionmaker # 创建orm的会话池,orm和sql均可以管理对象关系型数据库,需要绑定引擎才可以使用会话,
# 创建连接
engine = create_engine( "mysql+pymysql://root:1234;@127.0.0.1/test" , # 需要安装mysql和pymysql的模块,用户名:密码@ip地址/某个数据库
#echo=True, # 打印操作对应的SQL语句
pool_size = 8 , # 连接个数
pool_recycle = 60 * 30 # 不使用时断开
)
# 创建session
DbSession = sessionmaker(bind = engine) # 会话工厂,与引擎绑定。
session = DbSession() # 实例化
session.close() # 关闭会话
|
二、定义类来表示虚拟表格
在使用sqlalchemy访问数据库的时候,以类的形式表示表格,因此在使用之前,需要先定义类。使用类的名称而不是tablename实现之后的增删改查。
1
2
3
4
|
# 导入定义类需要的模块
from sqlalchemy.ext.declarative import declarative_base # 调用sqlalchemy的基类
from sqlalchemy import Column, Index, distinct, update # 指定字段属性,索引、唯一、DML
from sqlalchemy.types import * # 所有字段类型
|
1. 直接建立一个可调用的表格
需要先继承基类,在定义__init__函数,设置输入参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
# 创建库表类型
Base = declarative_base() # 调用sqlalchemy的基类
class Users(Base):
'''继承基类'''
__tablename__ = "users" # 数据表的名字
__table_args__ = { 'extend_existing' : True } # 当数据库中已经有该表时,或内存中已声明该表,可以用此语句重新覆盖声明。
id = Column(Integer, primary_key = True )
name = Column(String( 64 ), unique = True )
#email = Column(String(64))
def __init__( self , name, email):
self .name = name
self .email = email # 声明需要调用的特征,可以只声明数据库中表格列的子集
Base.metadata.create_all(engine) # 表生效:将所有定义的类,使用引擎创建,此时可以在数据库中看到这些表。
|
2. 创建多个相同列属性的表格 先建立一个表格的父类,指定列的属性,再通过继承父类
不同的表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# 创建库表类型
Base = declarative_base() # 调用sqlalchemy的基类
class model_data(BASE):
'''创建数据库表类:模型所需的基本字段'''
__abstract__ = True # 辅助sqlAlchemy实现类的继承,自动继承属性,省去super()
__table_args__ = { 'extend_existing' : True } # 若表的声明在内存中已存在,则重新声明表的名称,不然会报错
ai_xdr_id = Column(BigInteger(), primary_key = True , unique = True , autoincrement = True )
ai_sdk_id = Column(BigInteger())
class TrainData(model_data): # 训练集表
'''继承model_data的属性,并将表的名字定义为:'xxx_train_data'存入数据库 '''
__tablename__ = 'xxx_train_data'
class DevData(model_data): # 开发集表
'''表的名字定义为:'xxx_dev_data' '''
__tablename__ = 'xxx_dev_data'
class TestData(model_data): # 测试集表
__tablename__ = 'xxx_test_data'
Base.metadata.create_all(engine) # 表生效:将所有定义的类,使用引擎创建,此时可以在数据库中看到这些表。
|
三、增删改查
因为是会话操作,当某个语句,例如增加数据时,不成功的时候需要回滚。
增加数据
1
2
3
4
5
6
7
8
9
10
11
|
# 增加数据
add_user = Users( "test3" , "test123@qq.com" )
session.add(add_user)
session.commit()
# add_users = Users(("test", "test123@qq.com"),('a','b')))
# session.add(add_users)
# session.commit()
# 当上述语句出现执行错误时,需要执行回滚语句,才能继续操作
session.rollback()
|
删除数据
1
2
3
4
5
6
7
|
delete_users = session.query(Users). filter (Users.name = = "test" ).first()
if delete_users:
session.delete(delete_users)
session.commit()
session.query(Users). filter (Users.name = = "test" ).delete()
session.commit()
|
更改数据
1
2
3
4
5
|
# 改
session.query(Users).filter_by( id = 1 ).update({ 'name' : "Jack" })
users = session.query(Users).filter_by(name = "Jack" ).first()
users.name = "test"
|
查找数据
1
2
3
4
|
users = session.query(Users).filter_by( id = 5 ). all ()
for item in users:
print (item.name)
print (item.email) # 若未在类中声明,则无法访问数据库中该表的属性。
|
四、进阶技能
1. 将DataFrame格式的数据导入数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
class DataAccessLayer: # 数据连接层、定义了连接和关闭。
'''数据连接层、定义了连接和关闭。'''
def __init__( self ):
self .ENGINE = None # 引擎
self .SESSION = None # 会话
self .conn_string = "mysql+pymysql://root:1234;@127.0.0.1/test" ## 需要安装mysql和pymysql的模块,用户名:密码@ip地址/某个数据库
def connect( self ):
'''连接时建立引擎和会话。'''
self .ENGINE = create_engine( self .conn_string, encoding = 'utf-8' ,isolation_level = "AUTOCOMMIT" , connect_args = { 'connect_timeout' : 7200 })
# self.ENGINE = create_engine(self.conn_string, encoding='utf-8',connect_args={'connect_timeout': 7200})
self .SESSION = sessionmaker(bind = self .ENGINE)()
def disconnect( self ):
'''断开时,关闭引擎。'''
self .ENGINE.close()
def df_save_db(df,tablename):
'''将数据集DataFrame保存到数据库'''
db_ac = DataAccessLayer()
db_ac.connect()
conn = db_ac.ENGINE.connect()
df.to_sql(name = tablename, con = conn, if_exists = 'append' , index = False )
conn.close()
print ( '%s updated.' % tablename)
df = pd.read_csv( 'traindata_jiangsu_donghai.csv' )
df_save_db(df, 'traindata_jiangsu_donghai' )
|
到此这篇关于python实现sqlalchemy的使用的文章就介绍到这了,更多相关python sqlalchemy使用内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/weixin_43899514/article/details/119652061