
Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于接收http请求并对请求进行预处理,然后触发Flask框架,开发人员基于Flask框架提供的功能对请求进行相应的处理,并返回给用户,如果要返回给用户复杂的内容时,需要借助jinja2模板来实现对模板的处理,即:将模板和数据进行渲染,将渲染后的字符串返回给用户浏览器。
“微”(micro) 并不表示你需要把整个 Web 应用塞进单个 Python 文件(虽然确实可以 ),也不意味着 Flask 在功能上有所欠缺。微框架中的“微”意味着 Flask 旨在保持核心简单而易于扩展。Flask 不会替你做出太多决策——比如使用何种数据库。而那些 Flask 所选择的——比如使用何种模板引擎——则很容易替换。除此之外的一切都由可由你掌握。如此,Flask 可以与您珠联璧合。
默认情况下,Flask 不包含数据库抽象层、表单验证,或是其它任何已有多种库可以胜任的功能。然而,Flask 支持用扩展来给应用添加这些功能,如同是 Flask 本身实现的一样。众多的扩展提供了数据库集成、表单验证、上传处理、各种各样的开放认证技术等功能。Flask 也许是“微小”的,但它已准备好在需求繁杂的生产环境中投入使用。
1
|
pip3 install flask |
from werkzeug.wrappers import Request, Response @Request.application
def hello(request):
return Response('Hello World!') if __name__ == '__main__':
from werkzeug.serving import run_simple
run_simple('localhost', 4000, hello)
werkzeug
一. 基本使用
1
2
3
4
5
6
7
8
9
|
from flask import Flask
app = Flask(__name__)
@app .route( '/' )
def hello_world():
return 'Hello World!'
if __name__ = = '__main__' :
app.run()
|
二、配置文件
flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:
{
'DEBUG': get_debug_flag(default=False), 是否开启Debug模式
'TESTING': False, 是否开启测试模式
'PROPAGATE_EXCEPTIONS': None,
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'SECRET_KEY': None,
'PERMANENT_SESSION_LIFETIME': timedelta(days=),
'USE_X_SENDFILE': False,
'LOGGER_NAME': None,
'LOGGER_HANDLER_POLICY': 'always',
'SERVER_NAME': None,
'APPLICATION_ROOT': None,
'SESSION_COOKIE_NAME': 'session',
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=),
'TRAP_BAD_REQUEST_ERRORS': False,
'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False,
'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'JSONIFY_PRETTYPRINT_REGULAR': True,
'JSONIFY_MIMETYPE': 'application/json',
'TEMPLATES_AUTO_RELOAD': None,
} 方式一:
app.config['DEBUG'] = True PS: 由于Config对象本质上是字典,所以还可以使用app.config.update(...) 方式二:
app.config.from_pyfile("python文件名称")
如:
settings.py
DEBUG = True app.config.from_pyfile("settings.py") import os
os.environ['FLAKS-SETTINGS'] = 'settings.py'
app.config.from_envvar("环境变量名称")
环境变量的值为python文件名称名称,内部调用from_pyfile方法 app.config.from_json("json文件名称")
JSON文件名称,必须是json格式,因为内部会执行json.loads app.config.from_mapping({'DEBUG':True})
字典格式 app.config.from_object("python类或类的路径") app.config.from_object('pro_flask.settings.TestingConfig') settings.py class Config(object):
DEBUG = False
TESTING = False
DATABASE_URI = 'sqlite://:memory:' class ProductionConfig(Config):
DATABASE_URI = 'mysql://user@localhost/foo' class DevelopmentConfig(Config):
DEBUG = True class TestingConfig(Config):
TESTING = True PS: 从sys.path中已经存在路径开始写 PS: settings.py文件默认路径要放在程序root_path目录,如果instance_relative_config为True,则就是instance_path目录
三、路由系统
- @app.route('/user/<username>')
- @app.route('/post/<int:post_id>')
- @app.route('/post/<float:post_id>')
- @app.route('/post/<path:path>')
- @app.route('/login', methods=['GET', 'POST'])
常用路由系统有以上五种,所有的路由系统都是基于以下对应关系来处理:
1
2
3
4
5
6
7
8
9
|
DEFAULT_CONVERTERS = {
'default' : UnicodeConverter,
'string' : UnicodeConverter,
'any' : AnyConverter,
'path' : PathConverter,
'int' : IntegerConverter,
'float' : FloatConverter,
'uuid' : UUIDConverter,
} |
def auth(func):
def inner(*args, **kwargs):
print('before')
result = func(*args, **kwargs)
print('after')
return result return inner @app.route('/index.html',methods=['GET','POST'],endpoint='index')
@auth
def index():
return 'Index' 或 def index():
return "Index" self.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
or
app.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
app.view_functions['index'] = index 或
def auth(func):
def inner(*args, **kwargs):
print('before')
result = func(*args, **kwargs)
print('after')
return result return inner class IndexView(views.View):
methods = ['GET']
decorators = [auth, ] def dispatch_request(self):
print('Index')
return 'Index!' app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint 或 class IndexView(views.MethodView):
methods = ['GET']
decorators = [auth, ] def get(self):
return 'Index.GET' def post(self):
return 'Index.POST' app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint @app.route和app.add_url_rule参数:
rule, URL规则
view_func, 视图函数名称
defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数
endpoint=None, 名称,用于反向生成URL,即: url_for('名称')
methods=None, 允许的请求方式,如:["GET","POST"] strict_slashes=None, 对URL最后的 / 符号是否严格要求,
如:
@app.route('/index',strict_slashes=False),
访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可
@app.route('/index',strict_slashes=True)
仅访问 http://www.xx.com/index
redirect_to=None, 重定向到指定地址
如:
@app.route('/index/<int:nid>', redirect_to='/home/<nid>')
或
def func(adapter, nid):
return "/home/888"
@app.route('/index/<int:nid>', redirect_to=func)
subdomain=None, 子域名访问
from flask import Flask, views, url_for app = Flask(import_name=__name__)
app.config['SERVER_NAME'] = 'wupeiqi.com:5000' @app.route("/", subdomain="admin")
def static_index():
"""Flask supports static subdomains
This is available at static.your-domain.tld"""
return "static.your-domain.tld" @app.route("/dynamic", subdomain="<username>")
def username_index(username):
"""Dynamic subdomains are also supported
Try going to user1.your-domain.tld/dynamic"""
return username + ".your-domain.tld" if __name__ == '__main__':
app.run()
a.注册路由原理
from flask import Flask, views, url_for
from werkzeug.routing import BaseConverter app = Flask(import_name=__name__) class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
:param value:
:return:
"""
return int(value) def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
:param value:
:return:
"""
val = super(RegexConverter, self).to_url(value)
return val # 添加到flask中
app.url_map.converters['regex'] = RegexConverter @app.route('/index/<regex("\d+"):nid>')
def index(nid):
print(url_for('index', nid=''))
return 'Index' if __name__ == '__main__':
app.run()
b. 自定制正则路由匹配
四、视图函数
在Flask中视图函数也分为CBV和FBV
FBV
def auth(func):
def inner(*args, **kwargs):
result = func(*args, **kwargs)
return result
return inner 方式一:
@app.route('/index',endpoint='xx')
@auth
def index(nid):
url_for('xx',nid=123)
return "Index" 方式二:
@auth
def index(nid):
url_for('xx',nid=123)
return "Index" app.add_url_rule('/index',index)
FBV
CBV
def auth(func):
def inner(*args, **kwargs):
result = func(*args, **kwargs)
return result
return inner class IndexView(views.MethodView):
# methods = ['POST'] decorators = [auth,]
def get(self):
v = url_for('index')
print(v)
return "GET" def post(self):
return "GET" app.add_url_rule('/index', view_func=IndexView.as_view(name='index'))
CBV
五、模板
1、模板的使用
Flask使用的是Jinja2模板,所以其语法和Django无差别
2、自定义模板方法
Flask中自定义模板方法的方式和Bottle相似,创建一个函数并通过参数的形式传入render_template,如:
<!DOCTYPE html>
<html>
<head lang="en">
<meta charset="UTF-8">
<title></title>
</head>
<body>
<h1>自定义函数</h1>
{{ww()|safe}} </body>
</html>
html
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask,render_template
app = Flask(__name__) def wupeiqi():
return '<h1>Wupeiqi</h1>' @app.route('/login', methods=['GET', 'POST'])
def login():
return render_template('login.html', ww=wupeiqi) app.run()
run.py
from flask import Flask,url_for,request,redirect,render_template,jsonify,make_response,Markup
from urllib.parse import urlencode,quote,unquote
app = Flask(__name__) def test(a1,a2):
return a1+a2 #全局视图函数,无需在render_template时传入也可在前端调用
#前端使用方式{{sb(1,2)}}
@app.template_global()
def sb(a1, a2):
return a1 + a2 + 100 #与上面的全局视图函数相似,无需在render_template时传入也可在前端调用
#前端使用方式{{ 1|db(2,3)}}
@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3 @app.route('/index',endpoint='xx')
def index():
v1 = "字符串"
v2 = [11,22,33]
v3 = {'k1':'v1','k2':'v2'}
v4 = Markup("<input type='text' />")#若不使用Markup,前端需{{v4|safe}}
return render_template('index.html',v1=v1,v2=v2,v3=v3,v4=v4,test=test) if __name__ == '__main__':
# app.__call__
app.run()
视图函数扩展
{% extends 'layout.html'%} {%block body %}
{{v1}} <ul>
{% for item in v2 %}
<li>{{item}}</li>
{% endfor %}
</ul>
{{v2.1}} <ul>
{% for k,v in v3.items() %}
<li>{{k}} {{v}}</li>
{% endfor %}
</ul>
{{v3.k1}}
{{v3.get('k1')}} {{v4}}
<!--{{v4|safe}}--> <h1>{{test(1,19)}}</h1> {{sb(1,2)}} {{ 1|db(2,3)}} {% macro xxxx(name, type='text', value='') %}
<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
{% endmacro %} {{ xxxx('n1') }} {%endblock%}
index.html
注意:Markup等价django的mark_safe
六、请求和响应
from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response app = Flask(__name__) @app.route('/login.html', methods=['GET', "POST"])
def login(): # 请求相关信息
# request.method
# request.args
# request.form
# request.values
# request.cookies
# request.headers
# request.path
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files
# obj = request.files['the_file_name']
# obj.save('/var/www/uploads/' + secure_filename(f.filename)) # 响应相关信息
# return "字符串"
# return render_template('html模板路径',**{})
# return redirect('/index.html') # response = make_response(render_template('index.html'))
# response是flask.wrappers.Response类型
# response.delete_cookie('key')
# response.set_cookie('key', 'value')
# response.headers['X-Something'] = 'A value'
# return response return "内容" if __name__ == '__main__':
app.run()
七、Session
除请求对象之外,还有一个 session 对象。它允许你在不同请求间存储特定用户的信息。它是在 Cookies 的基础上实现的,并且对 Cookies 进行密钥签名要使用会话,你需要设置一个密钥。
设置:session['username'] = 'xxx'
- 删除:session.pop('username', None)
from flask import Flask, session, redirect, url_for, escape, request app = Flask(__name__) @app.route('/')
def index():
if 'username' in session:
return 'Logged in as %s' % escape(session['username'])
return 'You are not logged in' @app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
session['username'] = request.form['username']
return redirect(url_for('index'))
return '''
<form action="" method="post">
<p><input type=text name=username>
<p><input type=submit value=Login>
</form>
''' @app.route('/logout')
def logout():
# remove the username from the session if it's there
session.pop('username', None)
return redirect(url_for('index')) # set the secret key. keep this really secret:
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
基本使用
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
pip3 install redis
pip3 install flask-session """ from flask import Flask, session, redirect
from flask.ext.session import Session app = Flask(__name__)
app.debug = True
app.secret_key = 'asdfasdfasd' app.config['SESSION_TYPE'] = 'redis'
from redis import Redis
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='')
Session(app) @app.route('/login')
def login():
session['username'] = 'alex'
return redirect('/index') @app.route('/index')
def index():
name = session['username']
return name if __name__ == '__main__':
app.run()
第三方session
#session.py中用两个类自定义:
import uuid
import json
from flask.sessions import SessionInterface
from flask.sessions import SessionMixin
from itsdangerous import Signer, BadSignature, want_bytes class MySession(dict, SessionMixin):
def __init__(self, initial=None, sid=None):
self.sid = sid
self.initial = initial
super(MySession, self).__init__(initial or ()) def __setitem__(self, key, value):
super(MySession, self).__setitem__(key, value) def __getitem__(self, item):
return super(MySession, self).__getitem__(item) def __delitem__(self, key):
super(MySession, self).__delitem__(key) class MySessionInterface(SessionInterface):
session_class = MySession
container = {
# 'asdfasdfasdfas':{'k1':'v1','k2':'v2'}
# 'asdfasdfasdfas':"{'k1':'v1','k2':'v2'}"
} def __init__(self):
pass
# import redis
# self.redis = redis.Redis() def _generate_sid(self):
return str(uuid.uuid4()) def _get_signer(self, app):
if not app.secret_key:
return None
return Signer(app.secret_key, salt='flask-session',
key_derivation='hmac') def open_session(self, app, request):
"""
程序刚启动时执行,需要返回一个session对象
"""
sid = request.cookies.get(app.session_cookie_name)
if not sid:
# 生成随机字符串,并将随机字符串添加到 session对象中
sid = self._generate_sid()
return self.session_class(sid=sid) signer = self._get_signer(app)
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid) # session保存在redis中
# val = self.redis.get(sid)
# session保存在内存中
val = self.container.get(sid) if val is not None:
try:
data = json.loads(val)
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid)
return self.session_class(sid=sid) def save_session(self, app, session, response):
"""
程序结束前执行,可以保存session中所有的值
如:
保存到resit
写入到用户cookie
"""
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session) val = json.dumps(dict(session)) # session保存在redis中
# self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime)
# session保存在内存中
self.container.setdefault(session.sid, val) session_id = self._get_signer(app).sign(want_bytes(session.sid)) response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure) #run.py:
from flask import Flask,request,session
app = Flask(__name__)
app.secret_key = 'sdfsdfsd'
app.session_interface = MySessionInterface()
# app.session_interface
# app.make_null_session()
@app.route('/index')
def index():
print('网站的所有session',MySessionInterface.container)
print(session)
session['k1'] = 'v1'
session['k2'] = 'v2'
del session['k1'] # 在内存中操作字典....
# session['k1'] = 'v1'
# session['k2'] = 'v2'
# del session['k1'] return "xx" if __name__ == '__main__':
app.run()
自定义session
补充:
Flask的session与Django有所不同,Flask默认服务器不保存session,当用户来访问时若需要用到session则在内存中生成字典,字典中存放着相应的键值对,然后经过加盐加密后返回给浏览器,浏览器以cookie的形式保存,访问结束后服务器立即删除内存中的session。下次访问时浏览器携带cookie,其中键值对的值就是加密形态的session,经过服务器解密后得到session内容。
而Django的session则是写入数据库中的,浏览器的cookie只存有sessionid:随机字符串,待下次访问时由随机字符串匹配数据库中的session的键,得到session的值(字典)
八、跨域请求
跨域请求问题一直都是web项目的第一门槛,Flask 处理跨域请求需要导入一个模块 flask_cors
pip install flask_cors
from flask import Flask
from flask_cors import * app = Flask(__name__)
CORS(app, supports_credentials=True) # 用于允许跨域请求
九、flash(闪现)
flash是session的一种特殊形式,是一个基于Session实现的用于保存数据的集合,其与普通session的不同之处:使用一次就删除。
from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request
app = Flask(__name__)
app.secret_key ='sdfsdfsdf' @app.route('/users')
def users():
# msg = request.args.get('msg','')
# msg = session.get('msg')
# if msg:
# del session['msg'] v = get_flashed_messages()
print(v)
msg = ''
return render_template('users.html',msg=msg) @app.route('/useradd')
def user_add():
# 在数据库中添加一条数据
# 假设添加成功,在跳转到列表页面时,显示添加成功
# return redirect('/users?msg=添加成功')
# session['msg'] = '添加成功' flash('添加成功')
return redirect('/users') if __name__ == '__main__':
app.run()
flash用法
十、蓝图
蓝图用于为应用提供目录划分:
小型应用程序:示例
大型应用程序:示例
其他:
- 蓝图URL前缀:xxx = Blueprint('xxx', __name__,url_prefix='/xxx')
- 蓝图子域名:xxx = Blueprint('xxx', __name__,subdomain='admin')
# 使用子域名前提需要给配置SERVER_NAME: app.config['SERVER_NAME'] = 'helloworld.com:5000'
# 访问时:admin.helloworld.com:5000/login.html
实例说明:
import fcrm if __name__ == '__main__':
fcrm.app.run(port=8001)
manage.py
from flask import Flask
from .views import account
from .views import order app = Flask(__name__)
print(app.root_path)
app.register_blueprint(account.account)
app.register_blueprint(order.order)
__init__.py
from flask import Blueprint,render_template account = Blueprint('account',__name__) @account.route('/login')
def login():
# return 'Login'
return render_template('login.html')
account.py
from flask import Blueprint order = Blueprint('order',__name__) @order.route('/order')
def login():
return 'Order'
order.py
十一、请求扩展(Flask中的类似于Django中间件的功能)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, Request, render_template app = Flask(__name__, template_folder='templates')
app.debug = True @app.before_first_request
def before_first_request1():
print('before_first_request1') @app.before_first_request
def before_first_request2():
print('before_first_request2') @app.before_request
def before_request1():
Request.nnn = 123
print('before_request1') @app.before_request
def before_request2():
print('before_request2') @app.after_request
def after_request1(response):
print('before_request1', response)
return response @app.after_request
def after_request2(response):
print('before_request2', response)
return response @app.errorhandler(404)
def page_not_found(error):
return 'This page does not exist', 404 @app.template_global()
def sb(a1, a2):
return a1 + a2 @app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3 @app.route('/')
def hello_world():
return render_template('hello.html') if __name__ == '__main__':
app.run()
方式一
from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request
app = Flask(__name__)
app.secret_key ='sdfsdfsdf' @app.before_request
def process_request1():
print('process_request1') @app.after_request
def process_response1(response):
print('process_response1')
return response @app.before_request
def process_request2():
print('process_request2') @app.after_request
def process_response2(response):
print('process_response2')
return response @app.route('/index')
def index():
print('index')
return 'Index' @app.route('/order')
def order():
print('order')
return 'order' @app.route('/test')
def test():
print('test')
return 'test' if __name__ == '__main__':
app.run()
方式二
十二、数据库连接池(DButils)
在Flask中创建多线程,在程序运行时若想对数据库进行操作需建立连接。若每次都进行连接的创建,就会使连接数太多,费时费资源;若直接加锁则不能支持并发。所以这里我们引入数据库连接池来解决问题。
安装
pip3 install dbutils
使用方法:
from flask import Flask
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='',
database='pooldb',
charset='utf8'
) app = Flask(__name__)
app.secret_key ='sdfsdfsdf' @app.route('/index')
def index():
# 第一步:缺点:每次请求反复创建数据库连接,连接数太多
# conn = pymysql.connect()
# cursor = conn.cursor()
# cursor.execute('select * from tb where id > %s',[5,])
# result = cursor.fetchall()
# cursor.close()
# conn.close()
# print(result) # 第二步:缺点,不能支持并发
# pymysql.threadsafety
# with LOCK:
# cursor = CONN.cursor()
# cursor.execute('select * from tb where id > %s', [5, ])
# result = cursor.fetchall()
# cursor.close()
#
# print(result) # 第三步:基于DBUtils实现数据连接池
# - 为每个线程创建一个连接,该线程关闭时,不是真正关闭;本线程再次调用时,还是使用的最开始创建的连接。直到线程终止,数据库连接才关闭。
# - 创建一个连接池(10个连接),为所有线程提供连接,使用时来进行获取,使用完毕后,再次放回到连接池。
# PS: #检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close() return '执行成功' if __name__ == '__main__': app.run()
数据库连接池
扩展:用类的自定义方法解耦数据库连接池的数据库操作
#用类的自定义方法解耦数据库连接池的数据库操作 #创建数据库连接池
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='47.93.4.198',
port=3306,
user='root',
password='',
database='s6',
charset='utf8'
) #定义用于解耦操作的类
class SQLHelper(object): def __init__(self):
self.conn = None
self.cursor = None def open(self,cursor=pymysql.cursors.DictCursor):
self.conn = db_pool.POOL.connection()
self.cursor = self.conn.cursor(cursor=cursor) def close(self):
self.cursor.close()
self.conn.close() def fetchone(self,sql,params):
cursor = self.cursor
cursor.execute(sql,params)
result = cursor.fetchone() return result def fetchall(self, sql, params):
cursor = self.cursor
cursor.execute(sql, params)
result = cursor.fetchall()
return result def __enter__(self):
self.open()
return self def __exit__(self, exc_type, exc_val, exc_tb):
self.close() #调用(可以是在其他文件下,只要引入上述代码所在文件即可)
with SQLHelper() as helper:
result = helper.fetchone('select * from users where name=%s and pwd = %s',[request.form.get('user'),request.form.get('pwd'),])
if result:
#做操作
用类的自定义方法解耦数据库连接池的数据库操作
十三、Flask的__call__源码解析
Flask执行app.run()时即执行Flask类的__call__方法
import sys
from itertools import chain
from .globals import _request_ctx_stack, _app_ctx_stack #flask\app.py:
class Flask(_PackageBoundObject):
#1、执行__call__方法
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
#2、执行Flask类的wsgi_app方法
def wsgi_app(self, environ, start_response):
#2.1、执行Flask类的request_context(environ),最后得到封装了request和session的对象
ctx = self.request_context(environ)#RequestContext类的对象
#3、在ctx.py中的RequestContext类中执行push堆栈操作
ctx.push()
error = None
try:
try:
#4、执行Flask类的full_dispatch_request方法
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except:
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error) #2.1
def request_context(self, environ):
#2.2、实例化了ctx.py中的RequestContext类赋值给2中的ctx,执行其构造方法
return RequestContext(self, environ)#self=app #2.3.1、得到Werkzeug 模块提供的Request
request_class = Request #2.4、封装request
def create_url_adapter(self, request):
"""Creates a URL adapter for the given request. The URL adapter
is created at a point where the request context is not yet set up
so the request is passed explicitly. .. versionadded:: 0.6 .. versionchanged:: 0.9
This can now also be called without a request object when the
URL adapter is created for the application context.
"""
if request is not None:
return self.url_map.bind_to_environ(request.environ,
server_name=self.config['SERVER_NAME'])
# We need at the very least the server name to be set for this
# to work.
if self.config['SERVER_NAME'] is not None:
return self.url_map.bind(
self.config['SERVER_NAME'],
script_name=self.config['APPLICATION_ROOT'] or '/',
url_scheme=self.config['PREFERRED_URL_SCHEME']) #4、
def full_dispatch_request(self):
#4.1、执行before_first_request方法,只执行一次
self.try_trigger_before_first_request_functions()
try:
#执行信号
request_started.send(self)
#5、执行Flask类的preprocess_request方法,类似Django中间件的装饰器函数
rv = self.preprocess_request()#rv是装饰器函数的返回值
if rv is None:#装饰器函数没有返回值,执行视图函数
# 6、执行Flask类的dispatch_request方法,即视图函数
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
#7、执行Flask类的finalize_request方法,试图函数结束后执行的装饰器
return self.finalize_request(rv) # 5、执行Flask类的preprocess_request方法,类似Django中间件的装饰器函数
def preprocess_request(self):
#top读栈中的request
bp = _request_ctx_stack.top.request.blueprint funcs = self.url_value_preprocessors.get(None, ())
if bp is not None and bp in self.url_value_preprocessors:
funcs = chain(funcs, self.url_value_preprocessors[bp])
for func in funcs:
func(request.endpoint, request.view_args) funcs = self.before_request_funcs.get(None, ())
if bp is not None and bp in self.before_request_funcs:
funcs = chain(funcs, self.before_request_funcs[bp])
for func in funcs:
rv = func()
if rv is not None:
return rv #6、执行Flask类的dispatch_request方法,即视图函数
def dispatch_request(self):
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
rule = req.url_rule
# if we provide automatic options for this URL and the
# request came with the OPTIONS method, reply automatically
if getattr(rule, 'provide_automatic_options', False) \
and req.method == 'OPTIONS':
return self.make_default_options_response()
# otherwise dispatch to the handler for that endpoint
return self.view_functions[rule.endpoint](**req.view_args) # 7、执行Flask类的finalize_request方法,试图函数结束后执行的装饰器
def finalize_request(self, rv, from_error_handler=False): response = self.make_response(rv)
try:
#8、执行Flask类的process_response方法,处理session
response = self.process_response(response)
#信号
request_finished.send(self, response=response)
except Exception:
if not from_error_handler:
raise
self.logger.exception('Request finalizing failed with an '
'error while handling an error')
return response # 8、执行Flask类的process_response方法,处理session
def process_response(self, response): ctx = _request_ctx_stack.top
bp = ctx.request.blueprint
funcs = ctx._after_request_functions
if bp is not None and bp in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[bp]))
if None in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[None]))
for handler in funcs:
response = handler(response)
if not self.session_interface.is_null_session(ctx.session):
self.save_session(ctx.session, response)
return response #ctx.py:
class RequestContext(object):
#2.2、
def __init__(self, app, environ, request=None):
#self=ctx,app=app
self.app = app
# 2.3.1、如果没有request的话,执行Flask类下的request_class静态字段
if request is None:
request = app.request_class(environ)
#2.3.2、有request
self.request = request
#2.4、执行Flask类下的create_url_adapter方法封装request
self.url_adapter = app.create_url_adapter(self.request)
#2.5、默认flashes和session为None
self.flashes = None
self.session = None #3、堆栈操作
def push(self):
# _request_ctx_stack=LocalStack(),由文件导入过来,是一个字典
top = _request_ctx_stack.top#top是读栈
if top is not None and top.preserved:
top.pop(top._preserved_exc)
#_app_ctx_stack = LocalStack()
app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app:
app_ctx = self.app.app_context()
app_ctx.push()
self._implicit_app_ctx_stack.append(app_ctx)
else:
self._implicit_app_ctx_stack.append(None) if hasattr(sys, 'exc_clear'):
sys.exc_clear()
#3.1 堆栈操作
_request_ctx_stack.push(self)
#查看session,如果没有的话赋予NullSession
self.session = self.app.open_session(self.request)
if self.session is None:
self.session = self.app.make_null_session() #global.py:
from functools import partial
from werkzeug.local import LocalStack, LocalProxy
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, 'request'))
session = LocalProxy(partial(_lookup_req_object, 'session'))
g = LocalProxy(partial(_lookup_app_object, 'g')) #local.py:
class LocalStack(object): def __init__(self):
#_request_ctx_stack._local=self._local=__storage__: {线程或协程唯一标识: {"stack": [request]}, }
self._local = Local()#形成牛逼的字典 def __release_local__(self):
self._local.__release_local__() def _get__ident_func__(self):
return self._local.__ident_func__ def _set__ident_func__(self, value):
object.__setattr__(self._local, '__ident_func__', value)
__ident_func__ = property(_get__ident_func__, _set__ident_func__)
del _get__ident_func__, _set__ident_func__ def __call__(self):
def _lookup():
rv = self.top
if rv is None:
raise RuntimeError('object unbound')
return rv
return LocalProxy(_lookup)
#3.1、堆栈操作
def push(self, obj):
"""Pushes a new item to the stack"""
rv = getattr(self._local, 'stack', None)#找牛逼的字典中有没有叫stack的键,默认为None
if rv is None:#如果是None的话,在牛逼的字典中创建"stack":[]
self._local.stack = rv = []
rv.append(obj)#将obj放入[],真正的堆栈操作
return rv def pop(self):
"""Removes the topmost item from the stack, will return the
old value or `None` if the stack was already empty.
"""
stack = getattr(self._local, 'stack', None)
if stack is None:
return None
elif len(stack) == 1:
release_local(self._local)
return stack[-1]
else:
return stack.pop() @property
def top(self):
"""The topmost item on the stack. If the stack is empty,
`None` is returned.
"""
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None class Local(object):
__slots__ = ('__storage__', '__ident_func__') def __init__(self):
#最后得到__storage__:{线程或协程唯一标识:{"stack":[request]},}.赋值给LocalStack()._local,即_request_ctx_stack
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident) def __iter__(self):
return iter(self.__storage__.items()) def __call__(self, proxy):
"""Create a proxy for a name."""
return LocalProxy(self, proxy) def __release_local__(self):
self.__storage__.pop(self.__ident_func__(), None) def __getattr__(self, name):
try:
return self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name) def __setattr__(self, name, value):
ident = self.__ident_func__()
storage = self.__storage__
try:
storage[ident][name] = value
except KeyError:
storage[ident] = {name: value} def __delattr__(self, name):
try:
del self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)
flask执行app.run()时源码流程
流程分析:
- 执行__call__方法时即执行Flask类的wsgi_app方法。
- 在该方法中调用request_context方法用于封装request和session,默认flashed和session为None。
- 然后,在ctx.py中的RequestContext类中执行push堆栈操作,堆栈操作中会查看session,如果没有的话会赋予NullSession。堆栈操作中会形成一个很大的字典,字典的形式为{线程或协程的唯一id: {"stack": [request]},...}。
- 堆栈操作完毕后执行full_dispatch_request方法,该方法主要执行装饰器(中间件或信号)和视图函数。首先会执行before_first_request方法,只执行一次。然后执行信号request_started.send(self)。
- 接下来执行preprocess_request(),其作用类似于Django的中间件。
- 若preprocess_request()没有返回值即表明通过中间件,执行dispatch_request()即视图函数。
- 视图函数结束后执行finalize_request()方法,为自定义的装饰器。
- 执行Flask类的process_response方法,处理session。
- 最后得到__storage__:{线程或协程唯一标识:{"stack":[request]},}.赋值给LocalStack()._local,即_request_ctx_stack
十四、信号 blinker
安装 pip3 install blinker
Flask有10个内置信号,信号是什么呢,是请求执行时会自动执行的一些方法,没有返回值。信号在源码中靠send触发。
内置信号:
2. request_started = _signals.signal('request-started') # 请求到来前执行
5. request_finished = _signals.signal('request-finished') # 请求结束后执行 3. before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
4. template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 2/3/4/5或不执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 6. request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
7. appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 请求上下文执行完毕后自动执行(无论成功与否) 1. appcontext_pushed = _signals.signal('appcontext-pushed') # 请求app上下文push时执行 8. appcontext_popped = _signals.signal('appcontext-popped') # 请求上下文pop时执行 最后.message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
内置十个信号
信号用于做什么?
主要是用于降低代码之间的耦合
特殊的装饰器和信号有什么区别?
装饰器返回值有意义
自定义信号:
作用:发送短信,邮件,微信
from flask import Flask,flash
from flask.signals import _signals
app = Flask(__name__)
mysignal = _signals.signal('mysignal')#实质上是一个列表 # 定义函数
def fun1(*args,**kwargs):
print('函数一',args,kwargs) # 定义函数
def fun2(*args,**kwargs):
print('函数二',args,kwargs) # 将函数注册到request_started信号中: 添加到这个列表
mysignal.connect(fun1)
mysignal.connect(fun2) @app.route('/index')
def index():
# 触发这个信号:执行注册到列表中的所有函数
# 自定义信号的作用:发送短信,邮件,微信
mysignal.send(sender='xxx',a1=123,a2=456)
return "xx" if __name__ == '__main__':
app.__call__
app.run()
自定义信号
十五、Flask插件
- flask-script
- wtforms
- SQLAlchemy
- 等... http://flask.pocoo.org/extensions/