Python 协程总结
理解
协程,又称为微线程,看上去像是子程序,但是它和子程序又不太一样,它在执行的过程中,可以在中断当前的子程序后去执行别的子程序,再返回来执行之前的子程序,但是它的相关信息还是之前的。
优点:
- 极高的执行效率,因为子程序切换而不是线程切换,没有了线程切换的开销;
- 不需要多线程的锁机制,因为只有一个线程在执行;
如果要充分利用CPU多核,可以通过使用多进程+协程的方式
使用
打开asyncio的源代码,可以发现asyncio中的需要用到的文件如下:
下面的则是接下来要总结的文件
文件 | 解释 |
---|---|
base_events | 基础的事件,提供了BaseEventLoop事件 |
coroutines | 提供了封装成协程的类 |
events | 提供了事件的抽象类,比如BaseEventLoop继承了AbstractEventLoop |
futures | 提供了Future类 |
tasks | 提供了Task类和相关的方法 |
coroutines
函数 | 解释 |
---|---|
coroutine(func) | 为函数加上装饰器 |
iscoroutinefunction(func) | 判断函数是否使用了装饰器 |
iscoroutine(obj) | 判断该对象是否是装饰器 |
如果在函数使用了coroutine
装饰器,就可以通过yield from
去调用async def
声明的函数,如果已经使用async def
声明,就没有必要再使用装饰器了,这两个功能是一样的。
import asyncio
@asyncio.coroutine
def hello_world():
print("Hello World!")
async def hello_world2():
print("Hello World2!")
print('------hello_world------')
print(asyncio.iscoroutinefunction(hello_world))
print('------hello_world2------')
print(asyncio.iscoroutinefunction(hello_world2))
print('------event loop------')
loop = asyncio.get_event_loop()
# 一直阻塞该函数调用到函数返回
loop.run_until_complete(hello_world())
loop.run_until_complete(hello_world2())
loop.close()
上面的代码分别使用到了coroutine
装饰器和async def
,其运行结果如下:
------hello_world------True------hello_world2------True------event loop------Hello World!Hello World2!
注意:不可以直接调用协程,需要一个event loop
去调用。
如果想要在一个函数中去得到另外一个函数的结果,可以使用yield from
或者await
,例子如下:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
函数print_sum
会一直等到函数compute
返回结果,执行过程如下:
base_events
这个文件里面漏出来的只有BaseEventLoop
一个类,它的相关方法如下:
函数 | 解释 |
---|---|
create_future() | 创建一个future对象并且绑定到事件上 |
create_task() | 创建一个任务 |
run_forever() | 除非调用stop,否则事件会一直运行下去 |
run_until_complete(future) | 直到future对象执行完毕,事件才停止 |
stop() | 停止事件 |
close() | 关闭事件 |
is_closed() | 判断事件是否关闭 |
time() | 返回事件运行时的时间 |
call_later(delay, callback, *args) | 设置一个回调函数,并且可以设置延迟的时间 |
call_at(when, callback, *args) | 同上,但是设置的是绝对时间 |
call_soon(callback, *args) | 马上调用 |
events
函数 | 解释 |
---|---|
get_event_loop() | 返回一个异步的事件 |
... | ... |
返回的就是BaseEventLoop的对象。
future
Future类的相关方法如下:
方法 | 解释 |
---|---|
cancel() | 取消掉future对象 |
cancelled() | 返回是否已经取消掉 |
done() | 如果future已经完成则返回true |
result() | 返回future执行的结果 |
exception() | 返回在future中设置了的exception |
add_done_callback(fn) | 当future执行时执行回调函数 |
remove_done_callback(fn) | 删除future的所有回调函数 |
set_result(result) | 设置future的结果 |
set_exception(exception) | 设置future的异常 |
设置future的例子如下:
import asyncio
async def slow_operation(future):
await asyncio.sleep(1) # 睡眠
future.set_result('Future is done!') # future设置结果
loop = asyncio.get_event_loop()
future = asyncio.Future() # 创建future对象
asyncio.ensure_future(slow_operation(future)) # 创建任务
loop.run_until_complete(future) # 阻塞直到future执行完才停止事件
print(future.result())
loop.close()
run_until_complete
方法在内部通过调用了future的add_done_callback
,当执行future完毕的时候,就会通知事件。
下面这个例子则是通过使用future的add_done_callback
方法实现和上面例子一样的效果:
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop() # 关闭事件
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result) # future执行完毕就执行该回调
try:
loop.run_forever()
finally:
loop.close()
一旦slow_operation
函数执行完毕的时候,就会去执行got_result
函数,里面则调用了关闭事件,所以不用担心事件会一直执行。
task
Task类是Future的一个子类,也就是Future中的方法,task都可以使用,类方法如下:
方法 | 解释 |
---|---|
current_task(loop=None) | 返回指定事件中的任务,如果没有指定,则默认当前事件 |
all_tasks(loop=None) | 返回指定事件中的所有任务 |
cancel() | 取消任务 |
并行执行三个任务的例子:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
loop.close()
执行结果为
Task A: Compute factorial(2)...Task B: Compute factorial(2)...Task C: Compute factorial(2)...Task A: factorial(2) = 2Task B: Compute factorial(3)...Task C: Compute factorial(3)...Task B: factorial(3) = 6Task C: Compute factorial(4)...Task C: factorial(4) = 24
可以发现,ABC同时执行,直到future执行完毕才退出。
下面一些方法是和task相关的方法
方法 | 解释 |
---|---|
as_completed(fs, *, loop=None, timeout=None) | 返回是协程的迭代器 |
ensure_future(coro_or_future, *, loop=None) | 调度执行一个 coroutine object:并且它封装成future。返回任务对象 |
async(coro_or_future, *, loop=None) | 丢弃的方法,推荐使用ensure_future |
wrap_future(future, *, loop=None) | Wrap a concurrent.futures.Future object in a Future object. |
gather(*coros_or_futures, loop=None, return_exceptions=False) | 从给定的协程或者future对象数组中返回future汇总的结果 |
sleep(delay, result=None, *, loop=None) | 创建一个在给定时间(以秒为单位)后完成的协程 |
shield(arg, *, loop=None) | 等待future,屏蔽future被取消 |
wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED) | 等待由序列futures给出的Futures和协程对象完成。协程将被包裹在任务中。返回含两个集合的Future:(done,pending) |
wait_for(fut, timeout, *, loop=None) | 等待单个Future或coroutine object完成超时。如果超时为None,则阻止直到future完成 |
参考文章