协程 (Coroutine)
什么是协程
协程(微线程)是比线程更轻量化的存在,像一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程
最重要的是,协程不是被操作系统内核所管理,而完全是由程序所控制
如何判断
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 一个协程遇到 IO 操作自动切换到其它协程
协程的好处:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
"原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。 - 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将单个 CPU 的多个核用上,协程需要和进程配合才能运行在多 CPU 上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是 CPU 集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
greenlet
greenlet 通过 greenlet(func)
启动一个协程,通过 switch()
手动切换协程
示例:
from greenlet import greenlet
def func1():
print('from func1: 1')
greenlet.switch(gr2)
print('from func1: 2')
greenlet.switch(gr2)
def func2():
print('from func2: 1')
greenlet.switch(gr1)
print('from func2: 2')
gr1 = greenlet(func1)
gr2 = greenlet(func2)
greenlet.switch(gr1)
输出结果:
from func1: 1
from func2: 1
from func1: 2
from func2: 2
gevent
gevent 封装了 greenlet,并实现了遇到 IO 自动切换
通过 gevent.spawn(func)
创建一个要执行 func 的 gevent 类,用 gevent.joinall()
等待执行完成
注意: gevent.sleep()
是用于模仿 IO 操作的,实际使用中不需要 gevent.sleep()
示例:
import gevent
def func1():
print('from func1: 1')
gevent.sleep(0)
print('from func1: 2')
gevent.sleep(1)
def func2():
print('from func2: 1')
gevent.sleep(2)
print('from func2: 2')
def func3():
print('from func3: 1')
gevent.sleep(1)
print('from func3: 2')
gevent.joinall([
gevent.spawn(func1),
gevent.spawn(func2),
gevent.spawn(func3),
])
输出结果:
from func1: 1
from func2: 1
from func3: 1
from func1: 2
from func3: 2
from func2: 2
通过运行结果可以看出:每次 sleep 都会自动切换
实际使用示例
注意: 如果不使用 monkey.patch_all()
就无法自动识别 IO 操作,无法自动切换,变成同步执行
import gevent
import time
from gevent import monkey
from urllib import request
monkey.patch_all() # 把当前程序的所有 IO 操作标记起来,否则模块无法知道 IO 操作
def func(url):
print('GET:', url)
resp = request.urlopen(url)
data = resp.read()
print('%i bytes received from %s' % (len(data), url))
urls = [
'http://www.python.org/',
'http://github.com/',
'http://cnblogs.com/dbf-/',
]
time_start = time.time()
for item in urls:
func(item)
print('同步耗时:', time.time() - time_start)
async_time_start = time.time()
gevent.joinall([
gevent.spawn(func, 'http://www.python.org/'),
gevent.spawn(func, 'http://www.github.com/'),
gevent.spawn(func, 'http://cnblogs.com/dbf-/'),
])
print('异步耗时:', time.time() - async_time_start)
通过结果可以看出异步明显更快
socket 并发连接
server:
import gevent
from gevent import socket, monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(8001)
client:
import socket
HOST = 'localhost'
PORT = 8001
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"), encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
print('Received', repr(data))
# s.close()