内容概要:
1.flask
- 蓝图
- 中间件
- 闪现
2.扩展
- session
- wtfrom
3.上下文管理
- local-threading
4.websocket
- 轮训
- 长轮训
- websocket
一.谈谈你对面向对象的理解
1.三大特性,继承,封装,多态(初级水平)
封装分两种:
(1).函数的封装
class db1():
def func():
pass
def func2():
pass
(2).数据的封装(一个类的对象)
2.__str__...方法
__str__,
__new__,
__init__,
__getattr__,
__setattr__,
__delattr__,
__getitem__,
__setitem__,
__delitem__,
__enter__,
__exit__,
__del__,
__call__,
__dict__,
这些方法有特有的执行场景,类加()执行__init__方法,对象加(),__call__方法....
****一个对象和一个对象能不能相加、减、乘、除
可以 加用__add__方法
class Foo(object):
def __add__(self, other):
pass obj1 = Foo()
obj2 = Foo() res = obj1 + obj2
3.metaclass
1.创建类的两种方法
#创建类的方式一
class Foo(object):
pass #创建类的方式二
#第一个参数是类名,第二个元祖是继承谁,第三个字典是构造字段
Bar = type("MyFoo",(object,),{})
g = Bar()
print(g)
2.验证类是由type创建
class Mytype(type):
def __init__(self,*args,**kwargs):
print("from my type")
super(Mytype,self).__init__(*args,**kwargs) class Foo(metaclass=Mytype):
pass g = Foo() """result
from my type
"""
二、flask-蓝图
1.目录结构:
2.__init__.py
from flask import Flask app = Flask(__name__) from .views import account
from .views import user
app.register_blueprint(account.ac) #注册之后可以进行路由分发
app.register_blueprint(user.us) """
这里特殊的装饰器,是全局app,每个function都需要经过
@app.before_request
def check_login():
print('.....')
"""
3.user.py
from flask import Blueprint us = Blueprint('us',__name__,url_prefix='/xx') #生成蓝图对象,url_prefix是前缀,加上之后请求index就变为/xx/index #这里是局部的装饰器,用于特定视图需要权限等
# @us.before_request
# def check_login():
# print('.....') @us.route('/index')
def index():
return 'index'
4.manage.py
from pro_flask import app if __name__ == '__main__':
app.run()
三、flask-闪现
1.用于1次请求之后删除,基于session,取的时候用pop
from flask import Blueprint,redirect,request,flash,get_flashed_messages ac = Blueprint('ac',__name__) @ac.route('/login')
def login():
flash("登录成功1",category="x1") #category多了一层分组
flash("登录成功2",category="x2")
return redirect("/logout") @ac.route('/logout')
def logout():
res = get_flashed_messages(category_filter="x1") #默认不加category_filter取所有
print(res)
return 'logout'
2.get_flask_message源码
def get_flashed_messages(with_categories=False, category_filter=[]): flashes = _request_ctx_stack.top.flashes
if flashes is None:
#存入local里的stack里对象的flashes
_request_ctx_stack.top.flashes = flashes = session.pop('_flashes') \
if '_flashes' in session else []
if category_filter:
flashes = list(filter(lambda f: f[0] in category_filter, flashes))
if not with_categories:
return [x[1] for x in flashes]
return flashes
三、flask-middleware
from flask import Flask
app = Flask(__name__) @app.route('/user',methods=['GET','POST'],endpoint='xxx')
def user():
return "login" class MiddleWare(object): #主要理解对象加括号执行__call__方法
def __init__(self,old_wsgi_app): self.old_wsgi_app = old_wsgi_app def __call__(self, *args, **kwargs):
#这是个时候还没有request
print("我做一些数据库连接check,或者清理缓存操作")
return self.old_wsgi_app(*args,**kwargs) if __name__ == '__main__':
app.wsgi_app = MiddleWare(app.wsgi_app)
app.run("0.0.0.0",9999)
四、flask-session
from flask import Flask,session
app = Flask(__name__) #如下几行操作就成功将session写入redis中了
from flask.ext.session import Session
from redis import Redis
app.config["SESSION_TYPE"] = 'redis'
app.config["SESSION_REDIS"] = Redis(host='192.16.1.1',port="6379",)
Session(app) @app.route('/user',methods=['GET','POST'],endpoint='xxx')
def user():
return "login" if __name__ == '__main__':
app.run("0.0.0.0",9999)
源码剖析:
#session_interface = RedisSessionInterface() #程序刚开始加载时候执行
def _get_interface(self, app):
config = app.config.copy()
config.setdefault('SESSION_TYPE', 'null')
config.setdefault('SESSION_PERMANENT', True)
config.setdefault('SESSION_USE_SIGNER', False)
config.setdefault('SESSION_KEY_PREFIX', 'session:')
config.setdefault('SESSION_REDIS', None)
config.setdefault('SESSION_MEMCACHED', None)
config.setdefault('SESSION_FILE_DIR',
os.path.join(os.getcwd(), 'flask_session'))
config.setdefault('SESSION_FILE_THRESHOLD', 500)
config.setdefault('SESSION_FILE_MODE', 384)
config.setdefault('SESSION_MONGODB', None)
config.setdefault('SESSION_MONGODB_DB', 'flask_session')
config.setdefault('SESSION_MONGODB_COLLECT', 'sessions')
config.setdefault('SESSION_SQLALCHEMY', None)
config.setdefault('SESSION_SQLALCHEMY_TABLE', 'sessions') if config['SESSION_TYPE'] == 'redis':
session_interface = RedisSessionInterface(
config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'memcached':
session_interface = MemcachedSessionInterface(
config['SESSION_MEMCACHED'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'filesystem':
session_interface = FileSystemSessionInterface(
config['SESSION_FILE_DIR'], config['SESSION_FILE_THRESHOLD'],
config['SESSION_FILE_MODE'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'mongodb':
session_interface = MongoDBSessionInterface(
config['SESSION_MONGODB'], config['SESSION_MONGODB_DB'],
config['SESSION_MONGODB_COLLECT'],
config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'sqlalchemy':
session_interface = SqlAlchemySessionInterface(
app, config['SESSION_SQLALCHEMY'],
config['SESSION_SQLALCHEMY_TABLE'],
config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
config['SESSION_PERMANENT'])
else:
session_interface = NullSessionInterface() return session_interface
程序执行session["xx"]=123时候先执行session_interface的open_session方法:
def open_session(self, app, request):
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid() #根据uuid生成一个随机字符串
#第一次登陆进,返回{"sid":"asdasdada","xx":123}
return self.session_class(sid=sid, permanent=self.permanent)
if self.use_signer:
signer = self._get_signer(app)
if signer is None:
return None
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent) if not PY2 and not isinstance(sid, text_type):
sid = sid.decode('utf-8', 'strict')
val = self.redis.get(self.key_prefix + sid)
if val is not None:
try:
data = self.serializer.loads(val)
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid, permanent=self.permanent)
return self.session_class(sid=sid, permanent=self.permanent)
程序返回之前会执行save_session动作:
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
if not session:
if session.modified:
self.redis.delete(self.key_prefix + session.sid)
response.delete_cookie(app.session_cookie_name,
domain=domain, path=path)
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
val = self.serializer.dumps(dict(session))
self.redis.setex(name=self.key_prefix + session.sid, value=val,
time=total_seconds(app.permanent_session_lifetime)) #往redis里写数据ex为过期时间
if self.use_signer:
session_id = self._get_signer(app).sign(want_bytes(session.sid))
else:
session_id = session.sid
#最后将随机字符串写到cookie里
response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)
五、上下文管理
1.threading.local
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading #本地线程,保证即使是多个线程,自己的值也是互相隔离。
local_values = threading.local() def func(num):
#创建数据库连接
local_values.name = num
import time
time.sleep(1)
print(local_values.name, threading.current_thread().name) for i in range(20):
th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
th.start()
2.自定义local类实现本地线程,也可以用setitem
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
from threading import get_ident #通过自定义模拟本地线程原理
class Local(object):
def __init__(self):
object.__setattr__(self, 'storage', {})
#self.storage = {} #这样设置就会产生递归,触发settattr def __setattr__(self, key, value):
ident = get_ident()
if ident in self.storage:
self.storage[ident][key] = value
else:
self.storage[ident] = {key: value} def __getattr__(self, item):
ident = get_ident()
return self.storage[ident][item] obj = Local()
def func(num):
#创建数据库连接
obj.value = num
import time
time.sleep(1)
print(obj.value, threading.current_thread().name) for i in range(20):
th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
th.start()
3.带协程判断加入
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
from threading import get_ident try: #协程获取唯一id方法
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident #通过自定义模拟本地线程原理
class Local(object):
def __init__(self):
object.__setattr__(self, 'storage', {})
#self.storage = {} #这样设置就会产生递归,触发settattr def __setattr__(self, key, value):
ident = get_ident()
if ident in self.storage:
self.storage[ident][key] = value
else:
self.storage[ident] = {key: value} def __getattr__(self, item):
ident = get_ident()
return self.storage[ident][item] obj = Local()
def func(num):
#创建数据库连接
obj.value = num
import time
time.sleep(1)
print(obj.value, threading.current_thread().name) for i in range(20):
th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
th.start()