Flask 蓝图,数据库链接

时间:2023-03-09 00:41:57
Flask 蓝图,数据库链接

蓝图

使用场景

如果代码非常多,要进行归类。不同的功能放在不同的文件,把相关的视图函数也放进去。

蓝图也就是对flask的目录结构进行分配(应用于小,中型的程序)

当然对于大型项目也可以通过   url_prefix 加前缀的方式实现

使用方法

# __init__.py
from .views.account import ac
from .views.user import us app.register_blueprint(ac)
app.register_blueprint(us) # account.py
from flask import Blueprint,render_template
ac = Blueprint("ac" ,__name__,template_folder="xxxx",static_url_path="xxxx") # template_folder 优先在 templates 文件夹找。找不到再去 xxxx 里找
# static_url_path 优先在 static 文件夹找。找不到再去 xxxx 里找
# url_prefix="/xx" 为当前蓝图的url里加前缀

目录结构

crm
crm
view
account.py
user.py
static
templates
login.html
__init__.py
manage.py

 __init__.py

只要一导入crm就会执行__init__.py文件

在此文件实现app 对象的生成,以及所有蓝图的注册功能

from flask import Flask
from .views.account import ac
from .views.user import us
def create_app():
app = Flack(__name__) @app.before_request # 对全局的视图有效
def xx():
print("app.before_request") app.register_blueprint(ac)
app.register_blueprint(us) return app

account.py

各自的视图文件,创建蓝图对象

自己视图的使用为自己的蓝图对象

注意: 视图函数的名字不能和蓝图对象重名

from flask import Blueprint,render_template
ac = Blueprint("ac" ,__name__,template_folder="xxxx",static_url_path="xxxx")
# template_folder 优先在 templates 文件夹找。找不到再去 xxxx 里找
# static_url_path 优先在 static 文件夹找。找不到再去 xxxx 里找
# url_prefix="/xx" 为当前蓝图的url里加前缀
@ac.route("/login")
def login():
return render_template("login.html")

user.py

from flask import Blueprint
us = Blueprint("us" ,__name__) @us.before_request # 仅对当前的视图有效
def xx():
print("us.before_request") @us.route("/user")
def user():
return "user"

pymysql

方式一  数据库链接放在视图中

  每次视图的执行进行数据库连接查询关闭。

  反复创建数据库链接,多次链接数据库会非常耗时

解决办法:放在全局,单例模式

#!usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from flask import Flask app = Flask(__name__) @app.route('/index')
def index():
# 链接数据库
conn = pymysql.connect(host="127.0.0.1",port=3306,user='root',password='', database='pooldb',charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from td where id=%s", [5, ])
result = cursor.fetchall() # 获取数据
cursor.close()
conn.close() # 关闭链接
print(result)
return "执行成功" if __name__ == '__main__':
app.run(debug=True)

方式二  放在全局

  不在频繁链接数据库。

  如果是单线程,这样没什么问题,

  但是如果是多线程,就得加把锁。这样就成串行的了

  为了支持并发,此方法依旧不可取

#!usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from flask import Flask
from threading import RLock app = Flask(__name__)
CONN = pymysql.connect(host="127.0.0.1",port=3306,user='root',password='', database='pooldb',charset='utf8') @app.route('/index')
def index():
with RLock:
cursor = CONN.cursor()
cursor.execute("select * from td where id=%s", [5, ])
result = cursor.fetchall() # 获取数据
cursor.close()
print(result)
return "执行成功"
if __name__ == '__main__':
app.run(debug=True)

为此。为了解决方式一二的问题,实现不频繁操作且可以并行的数据库链接,我们需要用到 DBUtils

方式三 DBUtils + thread.local

为每一个线程创建一个链接(是基于本地线程来实现的。thread.local),

每个线程独立使用自己的数据库链接,该线程关闭不是真正的关闭,本线程再次调用时,还是使用的最开始创建的链接,直到线程终止,数据库链接才关闭

#!usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
app = Flask(__name__)
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable=False,
# 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host='127.0.0.1',
port=3306,
user='root',
password='',
database='pooldb',
charset='utf8'
) @app.route('/func')
def func():
  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect() conn.close()   conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close()
if __name__ == '__main__': app.run(debug=True)

方式四 DBUtils + 链接池

创建一个链接池,为所有线程提供连接,使用时来进行获取,使用完毕后在放回到连接池。

PS:

  假设最大链接数有10个,其实也就是一个列表,当你pop一个,人家会在append一个,链接池的所有的链接都是按照排队的这样的方式来链接的。

  链接池里所有的链接都能重复使用,共享的, 即实现了并发,又防止了链接次数太多

#!usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
app = Flask(__name__)
from DBUtils.PersistentDB import PersistentDB
import pymysql POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。
# PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列 AQ 表。如:["set datestyle to ...", "set time zone ..."]
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
ping=0,
# ping MySQL服务端,检查是否服务可用。
# 取值:
# 0 = None = never,
# 1 = default = whenever it is requested,
# 2 = when a cursor is created,
# 4 = when a query is executed,
# 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='',
database='core_master',
charset='utf8'
)
@app.route('/func')
def func():
  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect() conn.close()   conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close()
if __name__ == '__main__': app.run(debug=True)

其他

  pymysql 的操作很是繁琐,大量的重复代码,可以进一部封装

# 创建 链接池的操作封装
import pymysql from settings import Config def connect():
conn = Config.POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn,cursor def connect_close(conn,cursor):
cursor.close()
conn.close() def fetch_all(sql,args):
conn,cursor = connect() cursor.execute(sql, args)
record_list = cursor.fetchall()
connect_close(conn,cursor) return record_list def fetch_one(sql, args):
conn, cursor = connect()
cursor.execute(sql, args)
result = cursor.fetchone()
connect_close(conn, cursor) return result def insert(sql, args):
conn, cursor = connect()
row = cursor.execute(sql, args)
conn.commit()
connect_close(conn, cursor)
return row

封装代码

DBUtils 内部原理

在 DBUtils 中为每个线程创建一个数据库连接的时候,

对每个线程单独创建内存空间来保存数据,实现数据的空间分离

初始的多线程

import threading
from threading import local
import time def task(i):
global v
time.sleep(1)
print(v) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
# 9 9 9 9 9 9 9 9 9

实现数据隔离的多线程

import threading
from threading import local
import time obj = local() # 为每个线程创建一个独立的空间。数据空间隔离 def task(i):
obj.xxxxx = i
time.sleep(2)
print(obj.xxxxx,i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
# 0-9 打乱顺序

获取线程的唯一标记示例

import threading
from threading import local def task(i):
print(threading.get_ident(),i) # 获取线程的唯一标记 for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()

threading local 的内部实现原理

import time
import threading
import greenlet DIC = {} # 用线程的唯一标识作为 key 来创建一个大字典分别保存每个线程的数据 def task(i): # ident = threading.get_ident() # 获取进程的 唯一id
ident = greenlet.getcurrent() # 获取协程的 唯一id
if ident in DIC:
DIC[ident]['xxxxx'] = i
else:
DIC[ident] = {'xxxxx':i }
time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()

最终完整版

import time
import threading
try:
import greenlet
get_ident = greenlet.getcurrent
except Exception as e:
get_ident = threading.get_ident class Local(object):
DIC = {} def __getattr__(self, item):
ident = get_ident()
if ident in self.DIC:
return self.DIC[ident].get(item)
return None def __setattr__(self, key, value):
ident = get_ident()
if ident in self.DIC:
self.DIC[ident][key] = value
else:
self.DIC[ident] = {key:value} obj = Local() def task(i):
obj.xxxxx = i
time.sleep(2)
print(obj.xxxxx,i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()