Threading.local
作用:为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)。
应用:
- flask上下文管理中的local中比这更高级,为协程。
- SQLAchemy
DBUtils线程池的模式一:为每个线程创建一个连接
"""
import threading
from threading import local
import time obj = local() def task(i):
obj.xxxxx = i
time.sleep(2)
print(obj.xxxxx,i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
"""
"""
import threading
from threading import local def task(i):
print(threading.get_ident(),i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
"""
"""
import time
import threading
import greenlet DIC = {} def task(i): # ident = threading.get_ident()
ident = greenlet.getcurrent()
if ident in DIC:
DIC[ident]['xxxxx'] = i
else:
DIC[ident] = {'xxxxx':i }
time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start() """
import time
import threading try:
import greenlet
get_ident = greenlet.getcurrent
except Exception:
get_ident = threading.get_ident class Local(object):
DIC = {} def __getattr__(self, item):
ident = get_ident()
if ident in self.DIC:
return self.DIC[ident].get(item)
return None def __setattr__(self, key, value):
ident = get_ident()
if ident in self.DIC:
self.DIC[ident][key] = value
else:
self.DIC[ident] = {key: value} obj = Local() def task(i):
obj.xxxxx = i
time.sleep(2)
print(obj.xxxxx, i) for i in range(10):
t = threading.Thread(target=task, args=(i, ))
t.start()
threading.local.py
- 如何获取一个线程的唯一标记? threading.get_ident()
- 根据字典自定义一个类似于threading.local功能?
import time
import threading DIC = {} def task(i):
ident = threading.get_ident()
if ident in DIC:
DIC[ident]['xxxxx'] = i
else:
DIC[ident] = {'xxxxx':i }
time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start() - 根据字典自定义一个为每个协程开辟空间进行存取数据。 import time
import threading
import greenlet DIC = {} def task(i): # ident = threading.get_ident()
ident = greenlet.getcurrent()
if ident in DIC:
DIC[ident]['xxxxx'] = i
else:
DIC[ident] = {'xxxxx':i }
time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start() - 通过getattr/setattr 构造出来 threading.local的加强版(协程)
import time
import threading
try:
import greenlet
get_ident = greenlet.getcurrent
except Exception as e:
get_ident = threading.get_ident class Local(object):
DIC = {} def __getattr__(self, item):
ident = get_ident()
if ident in self.DIC:
return self.DIC[ident].get(item)
return None def __setattr__(self, key, value):
ident = get_ident()
if ident in self.DIC:
self.DIC[ident][key] = value
else:
self.DIC[ident] = {key:value} obj = Local() def task(i):
obj.xxxxx = i
time.sleep(2)
print(obj.xxxxx,i) for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
问题
应用:DBUtils中为每个线程创建一个数据库连接时使用
一、上下文管理
两类请求:请求上下文,app上下文
上下文管理的实现(重点)
当请求到来的时候,
flask会把请求相关和session相关信息封装到一个ctx = RequestContext对象中,
会把app和g封装到app_ctx = APPContext对象中去,
通过localstack对象将ctx、app_ctx对象封装到local对象中 问题:
local是什么?作用?
为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)
localstack是什么?作用
storage = {
1234:{stack:[ctx,]}
}
localstack将local中的stack中的维护成一个栈
获取数据(执行视图函数的时候)
通过导入一个模块,用模块.的方式获取我们想要的数据
实现细节:
通过localproxy对象+偏函数,调用localstack去local中获取对应的响应ctx、app_ctx中封装值
问题:为什么要把ctx = request/session app_ctx = app/g
因为离线脚本需要使用app_ctx
请求结束
调用localstk的pop方法,将ctx和app_ctx移除
1. 请求上下文管理
基础知识
- 偏函数:给函数取一个默认的参数,这样可以使函数不必每一次都传参。
应用:request = LocalProxy(partial(_lookup_req_object, 'request'))
session = LocalProxy(partial(_lookup_req_object, 'session'))
import functools def index(a1,a2):
return a1 + a2 # 原来的调用方式
# ret = index(1,23)
# print(ret) # 偏函数,帮助开发者自动传递参数
new_func = functools.partial(index,666)
ret = new_func(1)
print(ret)
偏函数
- super和执行类的区别?
"""
class Base(object): def func(self):
print('Base.func') class Foo(Base): def func(self):
# 方式一:根据mro的顺序执行方法
# super(Foo,self).func()
# 方式二:主动执行Base类的方法
# Base.func(self) print('Foo.func') obj = Foo()
obj.func()
"""
####################################
class Base(object): def func(self):
super(Base, self).func()
print('Base.func') class Bar(object):
def func(self):
print('Bar.func') class Foo(Base,Bar):
pass # 示例一
# obj = Foo()
# obj.func()
# print(Foo.__mro__) # 示例二
# obj = Base()
# obj.func()
super和执行类的区别
# -*- coding: utf-8 -*-
'''
# @Datetime: 2018/12/23
# @author: Zhang Yafei
'''
'''
class Base(object):
def func(self):
print('Base_func') class Foo(Base):
def func(self):
# 方式一:根据mro的顺序执行方法
# super(Foo, self).func()
# 方式二:主动执行Base类的方法
# Base.func(self)
print('Foo_func') obj = Foo()
obj.func()
''' ############################### super和执行父类的方法的区别 ###################
class Base(object):
def func(self):
super(Base, self).func()
print('Base_func') class Bar(object):
def func(self):
print('Bar_func') class Foo(Base, Bar):
pass obj = Foo()
obj.func()
print(Foo.__mro__) # obj = Base()
print(Base.__mro__)
# obj.func()
执行父类方法
# -*- coding: utf-8 -*-
'''
# @Datetime: 2018/12/23
# @author: Zhang Yafei
''' class Foo(object):
def __init__(self):
object.__setattr__(self, 'storage', {}) def __setattr__(self, key, value):
print(key, value, self.storage) obj = Foo()
obj.xx = 123
特殊方法_getattr__和__setattr__
class Stack(object):
"""栈"""
def __init__(self):
self.__list = [] #私有变量,不允许外部调用者对其进行操作 def push(self,item):
"""添加一个新的元素item到栈顶"""
self.__list.append(item) #顺序表尾部插入时间复杂度O(1),头部插入O(n),故尾部方便
#self.__list.insert(0,item) #链表表尾部插入时间复杂度O(n),头部插入O(1),故链表用头插方便 def pop(self):
"""弹出栈顶元素"""
return self.__list.pop() if __name__ == '__main__':
s = Stack()
s.push(1)
s.push(2)
s.push(3)
s.push(4) print(s.pop())
print(s.pop())
print(s.pop())
print(s.pop())
基于列表实现栈
'''
# @Datetime: 2018/12/25
# @author: Zhang Yafei
''' class Obj(object):
def func(self):
pass # 执行方法一
# obj = Obj()
# obj.func() # 方法 # 执行方法二
# obj = Obj()
# Obj.func(obj) # 函数 from types import FunctionType, MethodType obj = Obj()
print(isinstance(obj.func, FunctionType)) # False
print(isinstance(obj.func, MethodType)) # True print(isinstance(Obj.func, FunctionType)) # True
print(isinstance(Obj.func, MethodType)) # Flase
"""
总结:函数和方法的区别
1.被类调用的是函数,被对象调用的是方法
2.全部参数自己传的是函数,自动传的是方法(比如self)
"""
方法和函数的区别
# -*- coding: utf-8 -*- """
@Datetime: 2018/12/29
@Author: Zhang Yafei
"""
"""方式一"""
# my_singleton.py
#
# class Singleton(object):
# pass
#
# singleton = Singleton()
#
# from mysingleton import singleton """方式二:使用装饰器"""
# def Singleton(cls):
# _instance = {}
#
# def _singleton(*args, **kargs):
# if cls not in _instance:
# _instance[cls] = cls(*args, **kargs)
# return _instance[cls]
#
# return _singleton
#
#
# @Singleton
# class A(object):
# a = 1
#
# def __init__(self, x=0):
# self.x = x
#
#
# a1 = A(2)
# a2 = A(3) """方式三:使用类"""
# import threading
# import time
#
#
# class Singleton(object):
#
# def __init__(self):
# # time.sleep(1)
# pass
# @classmethod
# def instance(cls, *args, **kwargs):
# if not hasattr(Singleton, "_instance"):
# Singleton._instance = Singleton(*args, **kwargs)
# return Singleton._instance
#
#
# def task(arg):
# obj = Singleton.instance()
# print(obj)
#
#
# for i in range(10):
# t = threading.Thread(target=task,args=[i,])
# t.start() """解决方法:加锁"""
# import time
# import threading
#
#
# class Singleton(object):
# _instance_lock = threading.Lock()
#
# def __init__(self):
# time.sleep(1)
#
# @classmethod
# def instance(cls, *args, **kwargs):
# if not hasattr(Singleton, "_instance"):
# with Singleton._instance_lock:
# if not hasattr(Singleton, "_instance"):
# Singleton._instance = Singleton(*args, **kwargs)
# return Singleton._instance
#
#
#
#
# def task(arg):
# obj = Singleton.instance()
# print(obj)
#
# for i in range(10):
# t = threading.Thread(target=task,args=[i,])
# t.start()
# time.sleep(20)
# obj = Singleton.instance()
# print(obj) """方法四:基于__new__方法"""
#
# import threading
#
# class Singleton(object):
# _instance_lock = threading.Lock()
#
# def __init__(self):
# pass
#
# def __new__(cls, *args, **kwargs):
# if not hasattr(Singleton, "_instance"):
# with Singleton._instance_lock:
# if not hasattr(Singleton, "_instance"):
# Singleton._instance = object.__new__(cls)
# return Singleton._instance
#
#
# obj1 = Singleton()
# obj2 = Singleton()
# print(obj1,obj2)
#
#
# def task(arg):
# obj = Singleton()
# print(obj)
#
#
# for i in range(10):
# t = threading.Thread(target=task,args=[i,])
# t.start()
#
"""方法五:元类实现单例模式"""
import threading class SingletonType(type):
_instance_lock = threading.Lock() def __call__(cls, *args, **kwargs):
if not hasattr(cls, "_instance"):
with SingletonType._instance_lock:
if not hasattr(cls, "_instance"):
cls._instance = super(SingletonType,cls).__call__(*args, **kwargs)
return cls._instance class Foo(metaclass=SingletonType):
def __init__(self,name):
self.name = name obj1 = Foo('name')
obj2 = Foo('name')
print(obj1,obj2)
单例模式
执行流程
请求到来时候:wsgi会触发__call__方法,由__call__方法再次调用wsgi_app方法
-在wsgi_app方法中:
首先将 请求相关+空session 封装到一个RequestContext对象中,即ctx
将ctx交给LocalStack中,再由localstack将ctx添加到local中,local结构:
__storage__ = {
1231:{stack:[ctx,]}
}
-根据请求中的cookie中提取名称为sessionnid对应的值,对cookie进行加密+反序列化,再次赋值给ctx中的session
-> 视图函数
- 把session中的数据再次写入到cookie中
-将ctx删除
-结果返回用户浏览器
-断开socket连接 # ctx = RequestContext(self, environ) # self是app对象,environ请求相关的原始数据
# ctx.request = Request(environ)
# ctx.session = None # 将包含了request/session的ctx对象放到“空调”
{
1232:{ctx:ctx对象}
1231:{ctx:ctx对象}
1211:{ctx:ctx对象}
1111:{ctx:ctx对象}
1261:{ctx:ctx对象}
} 视图函数:
from flask import reuqest,session request.method 请求结束:
根据当前线程的唯一标记,将“空调”上的数据移除。
请求上下文执行流程
主要内容
a. 温大爷:wsig,
run_simple() 自动执行__call__方法,__call__方法调用wsgi_app()方法
b. 赵毅:
ctx = ReuqestContext(session,request) # 将request和session封装成一个ctx对象
ctx.push() # 通过localstack添加到local(空调)中
c. 刘松:LocalStack,把ctx对象添加到local中
LocalStack的作用:线程标记作为字典的key,字典里面包含key为stack的字典,value为一个列表,
localstack的作用就是将这个列表维护成一个stack.
d. 空调:Local
__storage__={
1321:{stack:[ctx,]}
}
上下文管理request
a. 温大爷:wsig
b. 赵毅:
ctx = ReuqestContext(session=None,request)
ctx.push()
c. 刘松:LocalStack,把ctx对象添加到local中
d. 空调:Local
__storage__={
1321:{stack:[ctx,]}
}
e. 郭浩:通过刘松获取ctx中的session,给session赋值(从cookie中读取数据) => open_session
上下文管理session
pip3 install flask-session 掌握:
- 使用
import redis
from flask import Flask,request,session
from flask.sessions import SecureCookieSessionInterface
from flask_session import Session app = Flask(__name__) # app.session_interface = SecureCookieSessionInterface()
# app.session_interface = RedisSessionInterface()
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='')
Session(app) @app.route('/login')
def login():
session['user'] = 'alex'
return 'asdfasfd' @app.route('/home')
def index():
print(session.get('user')) return '...' if __name__ == '__main__':
app.run()
- 原理:
- session数据保存到redis
session:随机字符串1:q23asifaksdfkajsdfasdf
session:随机字符串2:q23asifaksdfkajsdfasdf
session:随机字符串3:q23asifaksdfkajsdfasdf
session:随机字符串4:q23asifaksdfkajsdfasdf
session:随机字符串5:q23asifaksdfkajsdfasdf
- 随机字符串返回给用户,浏览器。
随机字符串 源码:
from flask_session import RedisSessionInterface
flask-session
2. 应用上下文管理
- - 请求上下文(ctx=RequestContext()):request/session
- - App上下文(app_ctx=AppContext()): app/g
- 程序启动:
两个Local:
local1 = { } local2 = { } 两个LocalStack:
_request_ctx_stack
_app_ctx_stack
- 请求到来
对数据进行封装:
ctx = RequestContext(request,session)
app_ctx = AppContext(app,g)
保存数据:
将包含了(app,g)数据的app_ctx对象,利用 _app_ctx_stack(贝贝,LocalStack())将app_ctx添加到Local中
storage = {
1231:{stack:[app_ctx(app,g),]}
}
将包含了request,session数据的ctx对象,利用_request_ctx_stack(刘淞,LocalStack()),将ctx添加到Local中
storage = {
1231:{stack:[ctx(request,session),]} - 视图函数处理:
from flask import Flask,request,session,current_app,g app = Flask(__name__) @app.route('/index')
def index():
# 去请求上下文中获取值 _request_ctx_stack
request.method # 找小东北获取值
session['xxx'] # 找龙泰获取值 # 去app上下文中获取值:_app_ctx_stack
print(current_app)
print(g) return "Index" if __name__ == '__main__':
app.run()
app.wsgi_app - 结束
_app_ctx_stack.pop()
_request_ctx_stack.pop() app上下文管理执行流程
应用上下文管理执行流程
3.视图函数中获取上下文信息注意点
请求上下文和应用上下文需要先放入Local中,才能获取到
# -*- coding: utf-8 -*- """
@Datetime: 2019/1/4
@Author: Zhang Yafei
"""
from flask import Flask, current_app, request, session, g app = Flask(__name__) # 错误:程序解释的时候还没有执行__call__方法,还没有将上下文信息放入local中,所以取不到会报错
# print(current_app.config) @app.route('/index')
def index():
# 正确
print(current_app.config)
return 'index' if __name__ == '__main__':
app.run()
视图函数中使用
# -*- coding: utf-8 -*- """
@Datetime: 2018/12/31
@Author: Zhang Yafei
"""
from web import db, create_app
from flask import current_app # 错误,还没有放入local中
# print(current_app.config) # RuntimeError: Working outside of application context. app = create_app() # app_ctx = app/g
app_ctx = app.app_context() with app_ctx: # __enter__,通过LocalStack放入Local中
# db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置
# 正确
print(current_app.config)
离线脚本中使用
4.问题总结
1. Flask中g的生命周期?
(1) 当一个请求到来时,flask会把app和g封装成一个AppContext对象,
(2) 通过_app_ctx_stack = LocalStack()(作用是将local维护成一个栈),将g存放到local中.
app_ctx = _app_ctx_stack.top
app_ctx.push()
_app_ctx_stack.push(self)
rv = getattr(self._local, 'stack', None)
rv.append(obj)
(3) 执行视图函数:
g = LocalProxy(partial(_lookup_app_object, 'g'))
g.x 通过localProxy对象去local中取g
(4) _app_ctx_stack.pop() # 将g清空
- 一次请求声明周期结束
注:两次请求的g不一样
2. g和session一样吗?
g只能在同一次请求起作用,请求结束之后就销毁了,昙花一现
但session请求完之后不销毁,不同请求都可以使用
3. g和全局变量一样吗?
g是项目启动的时候创建的,且一次请求结束之后删除,可以通过线程标记实现多线程
全局变量多次请求不删除,可以共用,但不能多线程
问题
二、进阶功能扩展
1. 链接和版本管理器:方便对url和版本进行管理
class UrlManager(object):
@staticmethod
def buildUrl(path):
return path @staticmethod
def buildStaticUrl(path):
path = path + '?ver=' + '201901292105'
return UrlManager.buildUrl(path) url = url_for('index') # /index
url_1 = UrlManager.buildUrl('/api') # /api
url_2 = UrlManager.buildStaticUrl('/css/bootstrap.css') # /css/bootstrap.css?ver=201901292105
msg = 'Hello World,url:{} url_1:{} url_2:{}'.format(url, url_1, url_2) # .....
2.日志系统
A logging.Logger object for this application. The default configuration is to log to stderr if the application is in debug mode. This logger can be used to (surprise) log messages. Here some examples: app.logger.debug('A value for debugging')
app.logger.warning('A warning occurred (%d apples)', 42)
app.logger.error('An error occurred')
3.错误处理:使用户有更好的使用体验
@app.errorhandler(404)
def page_not_found(error):
app.logger.error(error)
return 'This page does not exist', 404
三、第三方组件
1. 数据库连接池:DBUtils
from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='',
database='flask_code',
charset='utf8'
) -注意:使用数据库连接池
from settings import POOL
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
2.wtforms
-使用
生成HTML标签
form表单验证
-安装
pip install wtforms
-使用
示例:用户登录
用户注册
实时更新:从数据库获取字段,静态字段只在程序扫描第一遍时执行。
解决方法:1.重启 2. 重写form的构造方法,每一次实例化都动态更新需要更新的字段
问题:
1. form对象为什么可以被for循环?
答:变为可迭代对象:
class Obj(object):
def __iter__(self):
# return iter([1,2,3])
yield 1
yield 2
yield 3 obj = Obj() for item in obj:
print(item)
2. new方法的返回值决定对象到底是什么?
class BAR(object):
def __init__(self, cls):
self.cls = cls class NEW_OBJ(object): def __new__(cls, *args, **kwargs):
# return super(NEW_OBJ, cls).__new__(cls, *args, **kwargs) # <__main__.NEW_OBJ object at 0x000000D445061CF8>
# return 123 # 123
# return BAR # <class '__main__.BAR'>
# return BAR() # <__main__.BAR object at 0x000000AD77141C50>
return BAR(cls) # <__main__.BAR object at 0x0000003BFFA31D68> obj = NEW_OBJ()
print(obj) 3. metaclass
1. 创建类时,先执行metaclass(默认为type)的__init__方法
2. 类在实例化时, 执行metaclass(默认为type)的__call__方法,__call__方法的返回值就是实例化的对象
__call__内部调用:
- 类.__new__方法:创建对象
_ 类.__init__方法:对象的初始化 class MyType(type):
def __init__(self, *args, **kwargs):
print('MyType的__init__')
super(MyType, self).__init__(*args, **kwargs) def __call__(cls, *args, **kwargs):
print('MyType的__call__')
obj3 = cls.__new__(cls)
cls.__init__(obj3)
return obj class Obj3(object, metaclass=MyType):
x = 123 def __init__(self):
print('Obj3的__init__') def __new__(cls, *args, **kwargs):
print('Obj3的__new__')
return object.__new__(cls) def func(self):
return 666 源码:
-类的创建
type.__init__
-对象的创建:
type.__call__
类.__new__
类.__init__ Meta
- __mro__
- 应用:wtforms中meta使用(meta作用定制csrf token)
钩子函数
class LoginForm(Form):
name = simple.StringField(
validators=[
validators.DataRequired(message='用户名不能为空.'),
],
widget=widgets.TextInput(),
render_kw={'placeholder':'请输入用户名'}
)
pwd = simple.PasswordField(
validators=[
validators.DataRequired(message='密码不能为空.'), ],
render_kw={'placeholder':'请输入密码'}
) def validate_name(self, field):
"""
自定义name字段规则
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值
print('钩子函数获取的值',field.data)
if not field.data.startswith('old'):
raise validators.ValidationError("用户名必须以old开头") # 继续后续验证
# raise validators.StopValidation("用户名必须以old开头") # 不再继续后续验证 总结:
记住: https://www.cnblogs.com/wupeiqi/articles/8202357.html
理解:
- metaclass
- __new__
- __mro__
3.flask-sqlalchemy
pip install flask-sqlalchemy
下载安装
导入并实例化SQLAlchemy
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy() 注意事项:
- 必须在导入蓝图之前
- 必须导入models.py
web.__init__.py
db.init_app(app)
初始化
# ##### SQLALchemy配置文件 #####
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:0000@127.0.0.1:3306/flask_web?charset=utf8"
SQLALCHEMY_POOL_SIZE = 10
SQLALCHEMY_MAX_OVERFLOW = 5
配置文件配置
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime
from sqlalchemy import create_engine
from chun import db class Users(db.Model):
__tablename__ = 'users' id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
depart_id = Column(Integer)
models.py
from web import db,create_app app = create_app()
app_ctx = app.app_context() # app_ctx = app/g
with app_ctx: # __enter__,通过LocalStack放入Local中
db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置
生成表
from flask import Blueprint
from web import db
from web import models
us = Blueprint('us',__name__) @us.route('/index')
def index():
# 使用SQLAlchemy在数据库中插入一条数据
# db.session.add(models.Users(name='高件套',depart_id=1))
# db.session.commit()
# db.session.remove()
result = db.session.query(models.Users).all()
print(result)
db.session.remove() return 'Index'
基于orm对数据库进行操作
4.flask_script
# python manage.py runserver -h 127.0.0.1 -p 8001 from web import create_app
from flask_script import Manager app = create_app()
manager = Manager(app) if __name__ == '__main__':
# app.run()
manager.run()
增加runserver
from web import create_app
from flask_script import Manager app = create_app()
manager = Manager(app) @manager.command
def custom(arg):
"""
自定义命令
python manage.py custom 123
:param arg:
:return:
"""
print(arg) if __name__ == '__main__':
# app.run()
manager.run()
位置传参
from web import create_app
from flask_script import Manager app = create_app()
manager = Manager(app) @manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
"""
自定义命令
执行: python manage.py cmd -n wupeiqi -u http://www.oldboyedu.com
:param name:
:param url:
:return:
"""
print(name, url) if __name__ == '__main__':
# app.run()
manager.run()
关键字传参
5. flask-sqlacodegen
使用 flask-sqlacodegen 扩展 方便快速生成 ORM model
(1) 安装
pip install flask-sqlacodegen
(2)使用
flask-sqlacodegen "mysql://root:0000@127.0.0.1/food_db" --outfile "common/models/model.py" --flask
flask-sqlacodegen "mysql://root:0000@127.0.0.1/food_db" --tables user --outfile "common/models/user.py" --flask
6. flask-migrate:利用sqlalchemy对数据库中的表进行迁移操作
依赖:flask-script
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from web import create_app
from web import db app = create_app()
manager = Manager(app)
Migrate(app, db) """
# 数据库迁移命名
python manage.py db init
python manage.py db migrate
python manage.py db upgrade
"""
manager.add_command('db', MigrateCommand) if __name__ == '__main__':
manager.run()
# app.run()
manage.py
7.找到项目使用的所有组件及版本
第一步:pip install pipreqs
第二步:
进入项目目录:pipreqs ./ --encoding=utf-8 从文件中安装包:pip install -r requirements.txt 补充:
虚拟环境导出包:pip freeze>requeriments.txt
8.虚拟环境
方式一:pip3 install virtualenv
virtualenv env1 --no-site-packages 方式二::python -m venv mysite_venv
进入script目录:
激活虚拟环境:activate
退出虚拟环境:deactivate
四、打造高可用的MVC框架
五、信号
信号即就是框架给我们预留出一些位置,让我们能通过几行简单的代码在生命周期的某个位置执行一些操作
1.内置信号
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行 before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
源码示例
class Flask(_PackageBoundObject): def full_dispatch_request(self): self.try_trigger_before_first_request_functions()
try:
# ############### 触发request_started 信号 ###############
request_started.send(self)
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
response = self.make_response(rv)
response = self.process_response(response) # ############### request_finished 信号 ###############
request_finished.send(self, response=response)
return response def wsgi_app(self, environ, start_response): ctx = self.request_context(environ)
ctx.push()
error = None
try:
try:
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.make_response(self.handle_exception(e))
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error)
request_started
def render_template(template_name_or_list, **context):
"""Renders a template from the template folder with the given
context. :param template_name_or_list: the name of the template to be
rendered, or an iterable with template names
the first one existing will be rendered
:param context: the variables that should be available in the
context of the template.
"""
ctx = _app_ctx_stack.top
ctx.app.update_template_context(context)
return _render(ctx.app.jinja_env.get_or_select_template(template_name_or_list),
context, ctx.app) def _render(template, context, app):
"""Renders the template and fires the signal""" # ############### before_render_template 信号 ###############
before_render_template.send(app, template=template, context=context)
rv = template.render(context) # ############### template_rendered 信号 ###############
template_rendered.send(app, template=template, context=context)
return rv before_render_template
before_render_template
class Flask(_PackageBoundObject): def handle_exception(self, e): exc_type, exc_value, tb = sys.exc_info() # ############### got_request_exception 信号 ###############
got_request_exception.send(self, exception=e)
handler = self._find_error_handler(InternalServerError()) if self.propagate_exceptions:
# if we want to repropagate the exception, we can attempt to
# raise it with the whole traceback in case we can do that
# (the function was actually called from the except part)
# otherwise, we just raise the error again
if exc_value is e:
reraise(exc_type, exc_value, tb)
else:
raise e self.log_exception((exc_type, exc_value, tb))
if handler is None:
return InternalServerError()
return handler(e) def wsgi_app(self, environ, start_response): ctx = self.request_context(environ)
ctx.push()
error = None
try:
try:
response = self.full_dispatch_request()
except Exception as e:
error = e
# 这里这里这里这里这里这里这里这里这里这里这里这里 #
response = self.make_response(self.handle_exception(e))
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error) got_request_exception
got_request_exception
class AppContext(object):
def push(self):
"""Binds the app context to the current context."""
self._refcnt += 1
if hasattr(sys, 'exc_clear'):
sys.exc_clear()
_app_ctx_stack.push(self)
# ############## 触发 appcontext_pushed 信号 ##############
appcontext_pushed.send(self.app) def pop(self, exc=_sentinel):
"""Pops the app context."""
try:
self._refcnt -= 1
if self._refcnt <= 0:
if exc is _sentinel:
exc = sys.exc_info()[1]
# ############## 触发 appcontext_tearing_down 信号 ##############
self.app.do_teardown_appcontext(exc)
finally:
rv = _app_ctx_stack.pop()
assert rv is self, 'Popped wrong app context. (%r instead of %r)' \
% (rv, self) # ############## 触发 appcontext_popped 信号 ##############
appcontext_popped.send(self.app) class RequestContext(object):
def push(self):
top = _request_ctx_stack.top
if top is not None and top.preserved:
top.pop(top._preserved_exc) app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app: # ####################################################
app_ctx = self.app.app_context()
app_ctx.push()
self._implicit_app_ctx_stack.append(app_ctx)
else:
self._implicit_app_ctx_stack.append(None) if hasattr(sys, 'exc_clear'):
sys.exc_clear() _request_ctx_stack.push(self) # Open the session at the moment that the request context is
# available. This allows a custom open_session method to use the
# request context (e.g. code that access database information
# stored on `g` instead of the appcontext).
self.session = self.app.open_session(self.request)
if self.session is None:
self.session = self.app.make_null_session() class Flask(_PackageBoundObject): def wsgi_app(self, environ, start_response): ctx = self.request_context(environ)
ctx.push()
error = None
try:
try:
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.make_response(self.handle_exception(e))
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error) def pop(self, exc=_sentinel):
app_ctx = self._implicit_app_ctx_stack.pop() try:
clear_request = False
if not self._implicit_app_ctx_stack:
self.preserved = False
self._preserved_exc = None
if exc is _sentinel:
exc = sys.exc_info()[1] # ################## 触发 request_tearing_down 信号 ##################
self.app.do_teardown_request(exc) # If this interpreter supports clearing the exception information
# we do that now. This will only go into effect on Python 2.x,
# on 3.x it disappears automatically at the end of the exception
# stack.
if hasattr(sys, 'exc_clear'):
sys.exc_clear() request_close = getattr(self.request, 'close', None)
if request_close is not None:
request_close()
clear_request = True
finally:
rv = _request_ctx_stack.pop() # get rid of circular dependencies at the end of the request
# so that we don't require the GC to be active.
if clear_request:
rv.request.environ['werkzeug.request'] = None # Get rid of the app as well if necessary.
if app_ctx is not None:
# ####################################################
app_ctx.pop(exc) assert rv is self, 'Popped wrong request context. ' \
'(%r instead of %r)' % (rv, self) def auto_pop(self, exc):
if self.request.environ.get('flask._preserve_context') or \
(exc is not None and self.app.preserve_context_on_exception):
self.preserved = True
self._preserved_exc = exc
else:
self.pop(exc)
request_tearing_down
def flash(message, category='message'):
"""Flashes a message to the next request. In order to remove the
flashed message from the session and to display it to the user,
the template has to call :func:`get_flashed_messages`. .. versionchanged:: 0.3
`category` parameter added. :param message: the message to be flashed.
:param category: the category for the message. The following values
are recommended: ``'message'`` for any kind of message,
``'error'`` for errors, ``'info'`` for information
messages and ``'warning'`` for warnings. However any
kind of string can be used as category.
"""
# Original implementation:
#
# session.setdefault('_flashes', []).append((category, message))
#
# This assumed that changes made to mutable structures in the session are
# are always in sync with the session object, which is not true for session
# implementations that use external storage for keeping their keys/values.
flashes = session.get('_flashes', [])
flashes.append((category, message))
session['_flashes'] = flashes # ############### 触发 message_flashed 信号 ###############
message_flashed.send(current_app._get_current_object(),
message=message, category=category)
message_flashed
自定义信号
# -*- coding: utf-8 -*- """
@Datetime: 2019/1/7
@Author: Zhang Yafei
"""
from flask import Flask, current_app, flash, render_template
from flask.signals import _signals app = Flask(import_name=__name__) # 自定义信号
xxxxx = _signals.signal('xxxxx') def func(sender, *args, **kwargs):
print(sender) # 自定义信号中注册函数
xxxxx.connect(func) @app.route("/x")
def index():
# 触发信号
xxxxx.send('', k1='v1')
return 'Index' if __name__ == '__main__':
app.run()
自定义信号
重点:
1. 上下文管理
当请求到来的时候,
flask会把请求相关和session相关信息封装到一个ctx = RequestContext对象中,
会把app和g封装到app_ctx = APPContext对象中去,
通过localstack对象将ctx、app_ctx对象封装到local对象中 问题:
local是什么?作用?
为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)
localstack是什么?作用
storage = {
1234:{stack:[ctx,]}
}
localstack将local中的stack中的维护成一个栈
获取数据
通过localproxy对象+偏函数,调用localstack去local中获取响应ctx、app_ctx中封装值
问题:为什么要把ctx = request/session app_ctx = app/g
因为离线脚本需要使用app_ctx
请求结束
调用localstk的pop方法,将ctx和app_ctx移除
2.程序中有多少个local和localstack?
请求localstack,帮助开发在对stack对应列表操作
请求local = {
1212:{
stack:[ctx,]
}
}
应用localstack,帮助开发在对stack对应列表操作
应用local = {
1212:{
stack:[app_ctx,]
}
} 3.为什么要创建两个local和两个localstack?
- 编写离线脚本时,需要配置文件,而配置文件存放在app中。
- 编写离线脚本时,不需要请求相关的数据 所以,将app和请求相关的数据分开:
应用上下文(app,g)
请求上下文(request,session) 4. Local中作用?
为每个协程创建一个独立的空间,使得协程对自己的空间中的数据进行操作(数据隔离)
类似于threading.local的作用,但是是他升级版(greentlet.get_currt())
__storage__ = {
1231:{},
1232:{},
}
5.LocalStack作用?
将local中存储数据的列表stack维护成一个栈。协程标记作为字典的key,字典里面包含key为stack的字典,
value为一个列表,localstack的作用就是将这个列表维护成一个stack.
__storage__ = {
1231:{stack:[],},
1232:{stack:[],},
}
6.为什么要维护成一个栈?
web运行时(web runtime):
请求Local = {
1111:{
stack:[ctx1,]
},
1112:{
stack:[ctx2,]
},
1113:{
stack:[ctx3,]
}
} 应用Local = {
1111:{
stack:[app_ctx1,]
}
1112:{
stack:[app_ctx2,]
},
1113:{
stack:[app_ctx3,]
}
}
多app离线脚本:
from flask import current_app
app1 = create_app()
app_ctx1 = app1.app_context() # app_ctx = app/g app2 = create_app()
app_ctx2 = app2.app_context() # app_ctx = app/g with app_ctx1: # __enter__,通过LocalStack放入Local中
print(current_app) # app1
with app_ctx2:
print(current_app) # app2 print(current_app) # app1 """
请求Local = {
} 应用Local = {
1111:{
stack:[app_ctx1,app_ctx2]
}
}
"""
7. threading.local作用?
为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)
8. LocalProxy作用?
执行视图函数的时候,LocalProxy通过Localstack从local中取数据
9. g的作用?
类似每次请求中的全局变量,但要注意的是g只存在于一次请求中,请求结束之后就销毁了。
10.为什么要导入request,就可以使用?
每次执行request.xx方法时,会触发LocalProxy对象的__getattr__等方法,由方法每次动态的使用LocalStack去Local中获取数据。
10. before_request、after_request 实现原理?
将视图函数添加到一个列表中,然后循环执行其中的函数,after_request将列表reverse一下。
11. 对面向对象的理解?
python支持函数式编程和面向对象编程。java和c#只支持面向对象编程。
基础:谈面向对象就要从他的三大特性开始说起,如:封装、继承、多态。
封装:
- 方法封装到来类中:某一类功能相似的方法
class File:
def file_add():pass def file_update():pass def file_del():pass def file_fetch():pass - 数据封装到对象中
class File:
def __init__(self,name,age,email):
self.name = name
self.age = age
self.email = email
def file_add():pass def file_update():pass def file_del():pass def file_fetch():pass obj1 = File('oldboy',19,"asdf@live.com")
obj2 = File('oldboy1',119,"asdf12@live.com") 应用:
- session/request封装到了RequestContext对象中
- app/g封装到了AppContext中 继承:如果多个类中有相同的方法,为了避免重复编写,可以将其放在父类(基类)中。
python支持多继承,继承顺序按__mro__的顺序执行,新式类按广度优先,旧式类类广度优先,Python3都是新式类。先执行左边,在执行右边 class Base(object):
def xxxx():pass class File(Base):
def __init__(self,name,age,email):
self.name = name
self.age = age
self.email = email
def file_add():pass def file_update():pass def file_del():pass def file_fetch():pass class DB(Base):
def db_add():pass def db_update():pass def db_del():pass def xxxx():pass def db_fetch():pass 应用:
rest framework中的视图类的继承 多态(鸭子模型):天生支持多态,对于参数来说可以传入任何类型的对象,只要保证有想要的send方法即可。 class Msg(object):
def send():
pass class WX(object):
def send():
pass def func(arg):
arg.send() 进阶:
__init__,初始化
__new__,创建对象
__call__,对象()
__getattr__,对象.xx 且xx不存在
__getattribute__, 对象.xx xx为任何属性
__setattr__. 对象.xx = yy
__delattr__, del 对象.xx
__setitem__,对象['xx'] = yy
__getitem__, 对象['xx']
__delitem__, del 对象['xx'] __mro__,查找成员顺序
__str__, 对象返回值
__repr__,
__iter__, 实现此方法,且返回一个迭代器,此对象可迭代
__dict__, 类的成员
__add__, 类 + xx
__del__, 对象的生命周期结束之后 高级:metaclass
1. 类创建
class Foo(object):pass Foo = type('Foo',(object,),{})
2. 如何指定类由自定义type创建?
class MyType(type):
pass class Foo(object,metaclass=MyType):
# __metaclass__ = MyType # py2
pass Foo = MyType('Foo',(object,),{})
3. 默认执行顺序 class Foo(object,metaclass=MyType):
pass obj = Foo() class MyType(type):
def __init__(self,*args,**kwargs):
print('')
super(MyType,self).__init__(*args,**kwargs) class Base(object, metaclass=MyType):
pass class Foo(Base):
pass 如果一类自己或基类中指定了metaclass,那么该类就是由metaclass指定的type或mytype创建。 同:
class MyType(type):
def __init__(self,*args,**kwargs):
print('')
super(MyType,self).__init__(*args,**kwargs) # class Base(object, metaclass=MyType):
# pass Base = MyType('Base',(object,),{}) class Foo(Base):
pass
同:
class MyType(type):
def __init__(self,*args,**kwargs):
print('')
super(MyType,self).__init__(*args,**kwargs) # class Base(object, metaclass=MyType):
# pass
def with_metaclass(arg):
Base = MyType('Base',(arg,),{})
return Base class Foo(with_metaclass(object)):
pass
flask常见问题