前言
从语法上来看,协程和生成器类似,都是定义体中包含yield关键字的函数。
yield在协程中的用法:
- 在协程中yield通常出现在表达式的右边,例如:datum = yield,可以产出值,也可以不产出--如果yield关键字后面没有表达式,那么生成器产出none.
- 协程可能从调用方接受数据,调用方是通过send(datum)的方式把数据提供给协程使用,而不是next(...)函数,通常调用方会把值推送给协程。
- 协程可以把控制器让给中心调度程序,从而激活其他的协程
所以总体上在协程中把yield看做是控制流程的方式。
在前一篇《一文彻底搞懂python可迭代(iterable)、迭代器(iterator)和生成器(generator)的概念》 的文中,知道生成器(generator)可由以下两种方式定义:
- 列表生成器
- 使用yield定义的函数
在python早期的版本中协程也是通过生成器来实现的,也就是基于生成器的协程(generator-based coroutines)。在前一篇介绍生成器的文章末尾举了一个生产者-消费者的例子,就是基于生成器的协程来实现的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
def producer(c):
n = 0
while n < 5 :
n + = 1
print ( 'producer {}' . format (n))
r = c.send(n)
print ( 'consumer return {}' . format (r))
def consumer():
r = ''
while true:
n = yield r
if not n:
return
print ( 'consumer {} ' . format (n))
r = 'ok'
if __name__ = = '__main__' :
c = consumer()
next (c) # 启动consumer
producer(c)
|
看了这段代码,相信很多初学者和我一样对基于生成器的协程实现其实很难马上就能够根据业务写出自己的协程代码。python实现者们也注意到这个问题,因为它太不pythonic了。而基于生成器的协程也将被废弃,因此本文将重点介绍asyncio包的使用,以及涉及到的一些相关类概念。
注:我使用的python环境是3.7。
0x00 何为协程(coroutine)
协程(coroutine)是在线程中执行的,可理解为微线程,但协程的切换没有上下文的消耗,它比线程更加轻量些。一个协程可以随时中断自己让另一个协程开始执行,也可以从中断处恢复并继续执行,它们之间的调度是由程序员来控制的(可以看本文开篇处生产者-消费者的代码)。
定义一个协程
在python3.5+版本新增了aysnc和await关键字,这两个语法糖让我们非常方便地定义和使用协程。
在函数定义时用async声明就定义了一个协程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import asyncio
# 定义了一个简单的协程
async def simple_async():
print ( 'hello' )
await asyncio.sleep( 1 ) # 休眠1秒
print ( 'python' )
# 使用asynio中run方法运行一个协程
asyncio.run(simple_async())
# 执行结果为
# hello
# python
|
在协程中如果要调用另一个协程就使用await。要注意await关键字要在async定义的函数中使用,而反过来async函数可以不出现await
1
2
3
4
5
6
7
8
|
# 定义了一个简单的协程
async def simple_async():
print ( 'hello' )
asyncio.run(simple_async())
# 执行结果
# hello
|
asyncio.run()将运行传入的协程,负责管理asyncio事件循环。
除了run()方法可直接执行协程外,还可以使用事件循环loop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
async def do_something(index):
print (f 'start {time.strftime("%x")}' , index)
await asyncio.sleep( 1 )
print (f 'finished at {time.strftime("%x")}' , index)
def test_do_something():
# 生成器产生多个协程对象
task = [do_something(i) for i in range ( 5 )]
# 获取一个事件循环对象
loop = asyncio.get_event_loop()
# 在事件循环中执行task列表
loop.run_until_complete(asyncio.wait(task))
loop.close()
test_do_something()
# 运行结果
# start 00:04:03 3
# start 00:04:03 4
# start 00:04:03 1
# start 00:04:03 2
# start 00:04:03 0
# finished at 00:04:04 3
# finished at 00:04:04 4
# finished at 00:04:04 1
# finished at 00:04:04 2
# finished at 00:04:04 0
|
可以看出几乎同时启动了所有的协程。
其实翻阅源码可知asyncio.run()的实现也是封装了loop对象及其调用。而asyncio.run()每次都会创建一个新的事件循环对象用于执行协程。
0x01 awaitable对象
在python中可等待(awaitable)对象有:协程(corountine)、任务(task)、future。即这些对象可以使用await关键字进行调用
1
|
await awaitable_object
|
1. 协程(coroutine)
协程由async def声明定义,一个协程可由另一个协程使用await进行调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
async def nested():
print ( 'in nested func' )
return 13
async def outer():
# 要使用await 关键字 才会执行一个协程函数返回的协程对象
print (await nested())
asyncio.run(outer())
# 执行结果
# in nested func
# 13
|
如果在outer()方法中直接调用nested()而不使用await,将抛出一个runtimewarning
1
2
3
4
5
|
async def outer():
# 直接调用协程函数不会发生执行,只是返回一个 coroutine 对象
nested()
asyncio.run(outer())
|
运行程序,控制台将输出以下信息
runtimewarning: coroutine 'nested' was never awaited
nested()
runtimewarning: enable tracemalloc to get the object allocation traceback
2. 任务(task)
任务(task)是可以用来并发地执行协程。可以使用asyncio.create_task()将一个协程对象封装成任务,该任务将很快被排入调度队列并执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
async def nested():
print ( 'in nested func' )
return 13
async def create_task():
# create_task 将一个协程对象打包成一个 任务时,该协程就会被自动调度运行
task = asyncio.create_task(nested())
# 如果要看到task的执行结果
# 可以使用await等待协程执行完成,并返回结果
ret = await task
print (f 'nested return {ret}' )
asyncio.run(create_task())
# 运行结果
# in nested func
# nested return 13
|
注:关于并发下文还会详细说明。
3. future
future是一种特殊的低层级(low-level)对象,它是异步操作的最终结果(eventual result)。
当一个 future 对象 被等待,这意味着协程将保持等待直到该 future 对象在其他地方操作完毕。
通常在应用层代码不会直接创建future对象。在某些库和asyncio模块中的会使用到该对象。
1
2
|
async def used_future_func():
await function_that_returns_a_future_object()
|
0x02 并发
1. task
前面我们知道task可以并发地执行。 asyncio.create_task()就是一个把协程封装成task的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
async def do_after(what, delay):
await asyncio.sleep(delay)
print (what)
# 利用asyncio.create_task创建并行任务
async def corun():
task1 = asyncio.create_task(do_after( 'hello' , 1 )) # 模拟执行1秒的任务
task2 = asyncio.create_task(do_after( 'python' , 2 )) # 模拟执行2秒的任务
print (f 'started at {time.strftime("%x")}' )
# 等待两个任务都完成,两个任务是并行的,所以总时间两个任务中最大的执行时间
await task1
await task2
print (f 'finished at {time.strftime("%x")}' )
asyncio.run(corun())
# 运行结果
# started at 23:41:08
# hello
# python
# finished at 23:41:10
|
task1是一个执行1秒的任务,task2是一个执行2秒的任务,两个任务并发的执行,总共消耗2秒。
2. gather
除了使用asyncio.create_task()外还可以使用asyncio.gather(),这个方法接收协程参数列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
async def do_after(what, delay):
await asyncio.sleep(delay)
print (what)
async def gather():
print (f 'started at {time.strftime("%x")}' )
# 使用gather可将多个协程传入
await asyncio.gather(
do_after( 'hello' , 1 ),
do_after( 'python' , 2 ),
)
print (f 'finished at {time.strftime("%x")}' )
asyncio.run(gather())
# 运行结果
# started at 23:47:50
# hello
# python
# finished at 23:47:52
|
两个任务消耗的时间为其中消耗时间最长的任务。
0x03 引用
docs.python.org/3/library/a…
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对服务器之家的支持。
原文链接:https://juejin.im/post/5ccf0d18e51d453b557dc340