4.1Python与数据库的交互
在没有DB-API之前,各数据库之间的应用接口非常混乱,实现各不相同,如果项目需要更换数据库,基本上需要把所有和数据库相关的代码都进行改动,十分不方便
4.2DB-API
DB-API的出现就是为了解决这个问题,python 所有的数据库接口在一定程度上都要遵守python DB-API规范,它定义了一系列的操作数据库的方式,为各种数据库提供了一致的访问接口,项目中更换使用数据库变得更加方便。
4.3MySQL
开始
创建connection
获取cursor
?
执行查询执行命令获取数据处理数据
?
关闭cursor
关闭connection
结束
###################
conn = pymysql.connect(
?
‘user‘: ‘账号‘,
‘password‘: ‘密码‘,
‘db‘: ‘数据库名‘,
‘charset‘: ‘utf8‘, #不是utf-8
)
#####################
db_config = {
‘user‘: ‘账号‘,
‘password‘: ‘密码‘,
‘db‘: ‘数据库名‘,
‘charset‘: ‘utf8‘,
}
conn = pymysql.connect(**db_config)
1.使用步骤
-
1.导入模块:
import pymysql
-
2.建立连接:
pymysql.connect(**dbconfig) 连接是不能操作数据库的,需要用连接生成游标来操作
-
3.创建游标:
connection.cursor()
-
4.执行SQL语句:
cursor.execute(sql) SQL语句都是通过这个方法执行
-
5.获取结果:
cur.fetchall()
-
6.注意:
在pymysql中执行的SQL语句不需要加 ;
execute执行完后不是直接得到结果,需要你主动去获取
和文件一样,别忘了关闭游标与连接
事务的回滚和提交 rollbck 与commit
2.获取
print( cur .fetchone( )) #一行
print( cur .fetchall( )) #全部 返回二维元组
print( cur .fetchmany(3)) # 几条
3.with 语法 (默认将数据保存到库中)
with conn.cursor() as cur:
cur .execute( ‘select * from student‘)
print( cur .fetchone( )) #一行
?
conn.close()
?
########
?
with pymysql.connect(**mysql_ config) as cur:
cur .execute( ‘select * from student‘)
print( cur .fetchone( )) #一行
cur 此时为游标对象
4. 插入
with pymysql.connect(**mysql_ config) as cur:
cur .execute(
‘insert into student values (6,"患者",16,22)‘
)
5. 事务 (pymysql 默认使用事务模式)
begin ; 开始事务
rollback ; 回滚 (撤回)
commit ; 保存
开始之后 只有commit 才能保存的数据库中
rollback 是在提交之前可以回滚 commit 之前 写的都是在临时环境下
?
conn = pymysql. connect(**mysql_ config)
cur = conn.cursor( )
cur .execute(
‘insert into student values (6,"患者",16,22)‘
)
conn.commit()
cur.close( )
conn.close()
优化: 插入多条数据:
?
conn = pymysql. connect(**mysql_ config)
with conn.cursor() as cur:
cur .execute(
‘insert into student values (6,"患者",16,22)‘
)
conn.commit()
?
?
with conn.cursor() as cur:
cur .execute(
‘insert into student values (7,"7者",17,27)‘
)
conn.commit()
conn.close()
6.fetchone fetchall
数据过多时会一条一条显示 不会爆内存 如果直接 print(cur .fetchall()) 内存爆了
conn = pymysql. connect(**mysql_ config)
?
with conn.cursor() as cur:
cur .execute( ‘select * from student‘)
?
row = cur .fetchone()
while row:
print (‘Row‘:row)
row = cur .fetchone()
#################
?
db_config = {
‘user‘: ‘账号‘,
‘password‘: ‘密码‘,
‘db‘: ‘数据库名‘,
‘charset‘: ‘utf8‘,
}
?
conn = pymysql. connect(**db_config)
cur = conn.cursor( )
cur .execute( ‘select * from student‘)
?
print( cur .fetchone( )) #一行
print( cur .fetchall( )) #全部 返回二维元组
print( cur .fetchmany(3)) # 几条
?
cur.close( )
conn.close()
?
?
#with conn.cursor() as cur:
#cur .execute( ‘select * from student‘)
#print( cur .fetchone( )) #一行
?
#conn.close()
?
?
#with pymysql.connect(**mysql_ config) as cur:
#cur .execute( ‘select * from student‘)
#print( cur .fetchone( )) #一行
#cur 此时为游标对象
##########################
?
?
conn = pymysql. connect(**db_config) #连接pymysql
cur = conn.cursor( ) #建立游标,利用游标来执行sq_语句
try:
#执行sq1语句,不会返回结果,返回共影响的行数
executes = cur .execute( ‘select * from student‘)
#获取结果
values = cur. fetchall()
for value in values : #循环打印结果
print (value)
#提交到数据片,自正把数据插入或者更新到数据白
conn. commit ()
exvept Exceplion as e:
print (e)
#发生了异常,同滚
conn. rollback ()
finally:
#在最后使用完关闭游标和连接#关闭游林
cur. close()
conn. close()
?
4.4. Redis
在python中操作redis的命令和命令行的用户几乎一模一样
-
1.安装python包redis:
pip install redis
-
2.连接redis:
redis.Redis() 为了兼容旧版本(不推荐使用)
-
3.连接redis:
Redis.StrictRedis()
-
4.在程序操作的数据为bytes类型,加入decode_responses=True,写入的数据为str类型
import redis
?
conn = redis. StrictRedis( db=5)
conn.set( ‘ name‘, ‘短视的‘) #以二进制格式插入
res=conn.get( ‘name‘)
print(res) # 二进制
?
print(res.decode(‘utf-8‘))
?
################
?
conn = redis. StrictRedis( db=5,decode_responses=True)
res=conn.get( ‘name‘)
print(res)
?
#连接远程数据库
conn = redis. StrictRedis(host =‘‘,port=‘‘,db=5)
?
import redis
class RedisList(object):
def __init__(self,db=0,decode_responses=False):
self.conn=redis.StrictRedis(db=db,decode_responses=decode_responses)
def push(self,key,*value,dire=‘r‘):
if dire==‘r‘:
self.conn.lpush(key,*value)
else:
self.conn.rpush(key,*value)
def get(self,key,start_index,end_index=None):
if end_index:
return self.conn.lrange(key,start_index,end_index)
else:
return self.conn.lindex(key,start_index)
def set(self,key,index,value):
self.conn.lset(key,index,value)
def pop(self,key,value,dire=‘r‘):
if dire==‘r‘:
self.conn.lpop(key,value)
else:
self.conn.rpop(key,value)
if __name__==‘__main__‘:
rlist=RedisList(db=1,decode_responses=True)
print(rlist.get(‘name‘,0,3))
?
?
?
?
?