本节目录
常用函数一:redis操作
常用函数二:mongodb操作
常用函数三:数据库连接池操作
常用函数四:pandas连接数据库
常用函数五:异步连接数据库
常用函数一:redis操作
# -*- coding: utf-8 -*- """
Datetime: 2020/07/06
Author: Zhang Yafei
Description:
"""
import redis def get_redis_conn():
conn = redis.Redis(host='127.0.0.1', port=6379)
return conn def get_redis_conn_pool():
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=1000)
# max_connection最多创建1000个连接
conn = redis.Redis(connection_pool=pool)
return conn def redis_string_practice():
conn = get_redis_conn_pool()
# 添加
conn.set('str_k', 'hello') # 为指定key设置value
# {'str_k':'hello'}
conn.mset({'str_k': 'hello', 'str_k1': 'world'}) # 设置多个key/value
# {'str_k':'hello', 'str_k1':'world'}
conn.msetnx({'str_k': 'msetnx_hello'}) # 若当前key未设定, 则基于mapping设置key/value,结果返回True或False
# {'str_k':'hello'}
conn.setex('str_k2', 'str_v2', 2) # 秒 conn.decr('num', amount=1)
conn.incr('num', amount=1)
conn.incrbyfloat('num', amount='1.5') # 删除
conn.delete('str_k1') # 修改
conn.append('str_k', ' world') # 为指定key添加value
# {'str_k':'hello world'}
conn.setrange('str_k', 5, 'world') # 在key对应的的value指定位置上设置值
# b'helloworld' # 查询
print(conn.get('str_k'))
print(conn.get('num'))
print(conn.getrange('str_k', 0, 100))
print(conn.keys())
print(conn.strlen('str_k')) # 长度
print(conn.exists('str_k'))
conn.expire('str_k1', 5)
print(conn.get('str_k1')) # 添加并查询
print(conn.getset('str_k2', 'str_v2'))
# b'str_v2' def redis_dict_practice():
"""
redis dict
redis = {
k4:{
'username': 'zhangyafei',
'age': 23,
}
}
"""
conn = get_redis_conn_pool()
# 1. 创建字典
conn.hset('k4','username','zhangyafei')
conn.hset('k4','age',23)
conn.hsetnx('k4','username','root') # 若key不存在则将value赋值给key, 如果赋值成功则返回1,否则返回0
conn.hsetnx('k4', 'hobby', 'basketball')
conn.hmset('k4',{'username':'zhangyafei','age':23}) # 2. 获取字典的值
# 获取一个值
val = conn.hget('k4', 'username') # b'zhangyafei'
# print(val)
# 获取多个值
vals = conn.mget('k4', ['username','age'])
vals = conn.mget('k4', 'username','age') # {b'username': b'zhangyafei', b'age': b'23'}
# 获取所有值
vals = conn.hgetall('k4') # {b'username': b'zhangyafei', b'age': b'23'}
print(vals)
# 获取长度
lens = conn.hlen('k4') # 2
str_lens = conn.hstrlen('k4', 'username') # 10
keys = conn.hkeys('k4') # [b'username', b'age']
values = conn.hvals('k4') # [b'zhangyafei', b'23']
judge = conn.hexists('k4', 'username') # True
# conn.hdel('k4', 'age', 'username')
# print(conn.hkeys('k4')) # [] # 计算器
# print(conn.hget('k4', 'age'))
# conn.hincrby('k4','age',amount=2)
# conn.hincrbyfloat('k4','age',amount=-1.5)
# print(conn.hget('k4', 'age')) # 问题:如果redis的k4对应的字典中有1000w条数据,请打印所有数据
# 不可取:redis取到数据之后,服务器内存无法承受,爆栈
# result = conn.hgetall('k4')
# print(result) for item in conn.hscan_iter('k4'):
print(item) def redis_list_practice():
"""
redis list
redis = {
k1: [1,2,3,]
}
"""
conn = get_redis_conn_pool()
# 左插入
conn.lpush('k1', 11) conn.lpush('k1', 22)
# 右插入
conn.rpush('k1', 33) # 左获取
val = conn.lpop('k1')
val = conn.blpop('k1', timeout=10) # 夯住
# 右获取
val = conn.rpop('k1')
val = conn.brpop('k1', timeout=10) # 夯住 conn.lpush('k1',*[12,3,1,21,21,1,212,11,1,1,1,2,2,34,5,5,5]) def list_iter(key, count=3):
index = 0
while True:
data_list = conn.lrange(key, index, index + count - 1)
if not data_list:
return
index += count for item in data_list:
yield item result = conn.lrange('k1', 0, 100)
print(result) # [b'22', b'11', b'33'] for item in list_iter('k1', 3):
print(item) def redis_pipeline_practice():
"""
pipeline:管道,也即事务。一次放多个值,一次执行所有管道中的操作,要么全部成功,要么全部失
"""
conn = get_redis_conn_pool()
pipe = conn.pipeline(transaction=True)
pipe.multi() pipe.set('k2', '123')
pipe.hset('k3', 'n1', 666)
pipe.lpush('k4', 'laonanhai') pipe.execute() def redis_set_practice():
"""
{
'set_k':{v1,v2,v3},
}
"""
conn = get_redis_conn_pool()
# 添加
conn.sadd('set_k', 3, 4, 5, 6)
conn.sadd('set_k1', 3, 4, 5, 6) # 删除
print(conn.spop('set_k'))
conn.srem('set_k', 2) # 修改
conn.smove('set_k', 'set_k1', 1) # 查询
print(conn.smembers('set_k'))
print(conn.smembers('set_k1'))
print(conn.srandmember('set_k', 3))
print(conn.scard('set_k'))
print(conn.sismember('set_k', 2)) print(conn.sdiff('set_k', 'set_k1')) # 集合之差
conn.sdiffstore('set_k_k1', 'set_k', 'set_k1')
print(conn.smembers('set_k_k1')) print(conn.sinter('set_k', 'set_k1')) # 集合交集
conn.sinterstore('set_k_k1_inter', 'set_k', 'set_k1')
print(conn.smembers('set_k_k1_inter')) print(conn.sunion('set_k', 'set_k1')) # 集合并集
conn.sunionstore('set_k_k1_union', 'set_k', 'set_k1')
print(conn.smembers('set_k_k1_union')) def redis_zset_practice():
"""
{
'set_k':{
{v1: score1},
{v2: score2},
{v3: score3},
},
}
"""
conn = get_redis_conn_pool()
# # 添加
# conn.zadd('zset_k', 'math', 99, 'english', 80, 'chinese', 85, 'sport', 100, 'music', 60)
#
# # 删除
# conn.zrem('zset_k', 'music')
# conn.zremrangebyrank('zset_k', 0, 0) # 按等级大小删除, 删除等级在第min-max个值
# conn.zremrangebyscore('zset_k', 0, 90) # 按分数范围删除, Min < x < max之间的删除 # 查询
print(conn.zrange('zset_k', 0, 100))
print(conn.zrevrange('zset_k', 0, 100))
# score从小到大排序, 默认小值先出, 广度优先
results = conn.zrangebyscore('zset_k', 0, 100)
print(results)
print(conn.zcard('zset_k'))
print(conn.zcount('zset_k', 0, 90))
print(conn.zrank('zset_k', 'chinese'))
print(conn.zscore('zset_k', 'chinese'))
print(conn.zrange('zset_k', 0, 100)) if __name__ == '__main__':
redis_string_practice()
常用函数二:mongodb操作
import json
import pymongo
import pandas as pd class MongoPipeline(object):
"""
mongodb:
save(self, data, collection): 将数据保存到数据库
read(self, data): 读取数据库中指定表格
insert(self, table, dict_data): 插入数据
delete(self, table, condition): 删除指定数据
update(self, table, condition, new_dict_data): 更新指定数据
dbFind(self, table, condition=None): 按条件查找
findAll(self, table): 查找全部
close(self): 关闭连接
""" def __init__(self, mongo_db, mongo_uri='localhost'):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db] def close(self):
"""
关闭连接
:return:
"""
self.client.close() def save(self, data, collection):
"""
将数据保存到数据库表
:param data:
:param collection:
:return: None
"""
self.collection = self.db[collection]
try:
if self.collection.insert(json.loads(data.T.to_json()).values()):
print('mongodb insert {} sucess.'.format(collection))
return
except Exception as e:
print('insert error:', e)
import traceback
traceback.print_exc(e) def read(self, table):
"""
读取数据库中的数据
:param table:
:return: dataframe
"""
try:
# 连接数据库
table = self.db[table]
# 读取数据
data = pd.DataFrame(list(table.find()))
return data
except Exception as e:
import traceback
traceback.print_exc(e) def insert(self, table, dict_data):
"""
插入
:param table:
:param dict_data:
:return: None
"""
try:
self.db[table].insert(dict_data)
print("插入成功")
except Exception as e:
print(e) def update(self,table, condition, new_dict_data):
"""
更新
:param table:
:param dict_data:
:param new_dict_data:
:return: None
"""
try:
self.db[table].update(condition, new_dict_data)
print("更新成功")
except Exception as e:
print(e) def delete(self,table, condition):
"""
删除
:param table:
:param dict_data:
:return: None
"""
try:
self.db[table].remove(condition)
print("删除成功")
except Exception as e:
print(e) def dbFind(self, table, condition=None):
"""
按条件查找
:param table:
:param dict_data:
:return: generator dict
"""
data = self.db[table].find(condition)
for item in data:
yield item def findAll(self, table):
"""
查找全部
:param table:
:return: generator dict
"""
for item in self.db[table].find():
yield item if __name__ == '__main__':
mongo = MongoPipeline('flask')
# data = mongo.read('label')
# print(data.head())
condition = {"药品ID": 509881}
data = mongo.dbFind('label', condition)
print(data)
for i in data:
print(i)
# mongo.findAll()
常用操作三:数据连接池操作
# -*- coding: utf-8 -*- """
Datetime: 2020/07/06
Author: Zhang Yafei
Description: DButils连接池
"""
from DBUtils.PooledDB import PooledDB class DBPoolHelper(object):
def __init__(self, dbname, user=None, password=None, db_type='postgressql', host='localhost', port=5432):
"""
# sqlite3
# 连接数据库文件名,sqlite不支持加密,不使用用户名和密码
import sqlite3
config = {"datanase": "path/to/your/dbname.db"}
pool = PooledDB(sqlite3, maxcached=50, maxconnections=1000, maxusage=1000, **config)
# mysql
import pymysql
pool = PooledDB(pymysql,5,host='localhost', user='root',passwd='pwd',db='myDB',port=3306) #5为连接池里的最少连接数
# postgressql
import psycopg2
POOL = PooledDB(creator=psycopg2, host="127.0.0.1", port="5342", user, password, database)
# sqlserver
import pymssql
pool = PooledDB(creator=pymssql, host=host, port=port, user=user, password=password, database=database, charset="utf8")
:param type:
"""
if db_type == 'postgressql':
import psycopg2
pool = PooledDB(creator=psycopg2, host=host, port=port, user=user, password=password, database=dbname)
elif db_type == 'mysql':
import pymysql
pool = PooledDB(pymysql, 5, host='localhost', user='root', passwd='pwd', db='myDB',
port=3306) # 5为连接池里的最少连接数
elif db_type == 'sqlite':
import sqlite3
config = {"database": dbname}
pool = PooledDB(sqlite3, maxcached=50, maxconnections=1000, maxusage=1000, **config)
else:
raise Exception('请输入正确的数据库类型, db_type="postgresql" or db_type="mysql" or db_type="sqlite"')
self.__conn = pool.connection()
self.__cursor = self.__conn.cursor() def __connect_close(self):
"""关闭连接"""
self.__cursor.close()
self.__conn.close() def commit(self):
self.__conn.commit() def execute_without_commit(self, sql, params=tuple()):
self.__cursor.execute(sql, params) def execute(self, sql, params=tuple()):
self.__cursor.execute(sql, params)
self.__conn.commit() def execute_many(self, sql, params=tuple()):
self.__cursor.executemany(sql, params)
self.__conn.commit() def fetchone(self, sql, params=tuple()):
self.__cursor.execute(sql, params)
data = self.__cursor.fetchone()
return data def fetchall(self, sql, params=tuple()):
self.__cursor.execute(sql, params)
data = self.__cursor.fetchall()
return data def __del__(self):
print("connect close ----------------")
self.__connect_close()
常用操作四:pandas连接数据库
# -*- coding: utf-8 -*- """
Datetime: 2020/07/06
Author: Zhang Yafei
Description: pandas连接数据库
"""
from sqlalchemy import create_engine
from pandas import read_sql def pandas_db_helper():
"""
'postgresql://postgres:0000@127.0.0.1:5432/xiaomuchong'
"mysql+pymysql://root:0000@127.0.0.1:3306/srld?charset=utf8mb4"
"sqlite: ///sqlite3.db"
"""
engine = create_engine("sqlite:///sqlite3.db")
conn = engine.connect()
return conn if __name__ == '__main__':
conn = pandas_db_helper()
data = read_sql("select * from articles", con=conn)
print(data.info())
常用函数五:异步连接数据库
- redis
# -*- coding: utf-8 -*- """
Datetime: 2020/07/26
Author: Zhang Yafei
Description:
"""
import asyncio
import aioredis async def execute(address, password):
print("开始执行", address)
# 网络IO操作:创建redis连接
redis = await aioredis.create_redis(address, password=password)
# 网络IO操作:在redis中设置哈希值car,内部在设三个键值对,即: redis = { car:{key1:1,key2:2,key3:3}}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作:去redis中获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 网络IO操作:关闭redis连接
await redis.wait_closed()
print("结束", address) asyncio.run(execute('redis://127.0.0.1:6379', "0000"))
- redis_pool
# -*- coding: utf-8 -*- """
Datetime: 2020/07/26
Author: Zhang Yafei
Description:
"""
import asyncio import aioredis async def execute(address, password):
print("开始执行", address)
# 网络IO操作:先去连接 127.0.0.1:6379,遇到IO则自动切换任务,去连接127.0.0.1:6379
redis = await aioredis.create_redis_pool(address, password=password)
# 网络IO操作:遇到IO会自动切换任务
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作:遇到IO会自动切换任务
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 网络IO操作:遇到IO会自动切换任务
await redis.wait_closed()
print("结束", address) task_list = [
execute('redis://127.0.0.1:6379', "0000"),
execute('redis://127.0.0.1:6379', "0000")
] asyncio.run(asyncio.wait(task_list))
- mysql
# -*- coding: utf-8 -*- """
Datetime: 2020/07/26
Author: Zhang Yafei
Description:
"""
import asyncio import aiomysql async def execute():
# 网络IO操作:连接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='0000', db='mysql', )
# 网络IO操作:创建CURSOR
cur = await conn.cursor()
# 网络IO操作:执行SQL
await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:获取SQL结果
result = await cur.fetchall()
print(result)
# 网络IO操作:关闭链接
await cur.close()
conn.close() asyncio.run(execute())
- mysql_pool
# -*- coding: utf-8 -*- """
Datetime: 2020/07/26
Author: Zhang Yafei
Description:
"""
import asyncio import aiomysql async def execute(host, password):
print("开始", host)
# 网络IO操作:先去连接 188.176.202.180,遇到IO则自动切换任务,去连接188.176.202.181
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
# 网络IO操作:遇到IO会自动切换任务
cur = await conn.cursor()
# 网络IO操作:遇到IO会自动切换任务
await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:遇到IO会自动切换任务
result = await cur.fetchall()
print(result)
# 网络IO操作:遇到IO会自动切换任务
await cur.close()
conn.close()
print("结束", host) task_list = [
execute('127.0.0.1', "0000"),
execute('127.0.0.1', "0000")
]
asyncio.run(asyncio.wait(task_list))