asyncio介绍
熟悉c#的同学可能知道,在c#中可以很方便的使用 async 和 await 来实现异步编程,那么在python中应该怎么做呢,其实python也支持异步编程,一般使用 asyncio 这个库,下面介绍下什么是 asyncio :
asyncio 是用来编写 并发 代码的库,使用 async/await 语法。 asyncio 被用作多个提供高性能 python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。 asyncio 往往是构建 io 密集型和高层级 结构化 网络代码的最佳选择。
asyncio中的基本概念
可以看见,使用asyncio库我们也可以在python代码中使用 async 和 await 。在 asyncio 中,有四个基本概念,分别是:
eventloop
eventloop 可以说是 asyncio 应用的核心,*总控, eventloop 实例提供了注册、取消、执行任务和回调 的方法。 简单来说,就是我们可以把一些异步函数注册到这个事件循环上,事件循环回循环执行这些函数(每次只能执行一个),如果当前正在执行的函数在等待i/o返回,那么事件循环就会暂停它的执行去执行其他函数。当某个函数完成i/o后会恢复,等到下次循环到它的时候就会继续执行。
coroutine
协程本质就是一个函数,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import asyncio
import time
async def a():
print ( 'suspending a' )
await asyncio.sleep( 3 )
print ( 'resuming a' )
async def b():
print ( 'suspending b' )
await asyncio.sleep( 1 )
print ( 'resuming b' )
async def main():
start = time.perf_counter()
await asyncio.gather(a(), b())
print (f '{main.__name__} cost: {time.perf_counter() - start}' )
if __name__ = = '__main__' :
asyncio.run(main())
|
执行上述代码,可以看到类似这样的输出:
suspending a
suspending b
resuming b
resuming a
main cost: 3.0023356619999997
关于协程的具体介绍,可以参考我以前的文章python中的协程 不过以前的那种写法,需要使用装饰器,已经过时了。
future
future 是表示一个“未来”对象,类似于 javascript 中的 promise ,当异步操作结束后会把最终结果设置到这个 future 对象上, future 是对协程的封装。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
>>> import asyncio
>>> def fun():
... print ( "inner fun" )
... return 111
...
>>> loop = asyncio.get_event_loop()
>>> future = loop.run_in_executor(none, fun) #这里没有使用await
inner fun
>>> future #可以看到,fun方法状态是pending
<future pending cb = [_chain_future.< locals >._call_check_cancel() at / usr / local / cellar / python / 3.7 . 3 / frameworks / python.framework / versions / 3.7 / lib / python3. 7 / asyncio / futures.py: 348 ]>
>>> future.done() # 还没有完成
false
>>> [m for m in dir (future) if not m.startswith( '_' )]
[ 'add_done_callback' , 'cancel' , 'cancelled' , 'done' , 'exception' , 'get_loop' , 'remove_done_callback' , 'result' , 'set_exception' , 'set_result' ]
>>> future.result() #这个时候如果直接调用result()方法会报错
traceback (most recent call last):
file "<input>" , line 1 , in <module>
asyncio.base_futures.invalidstateerror: result is not set .
>>> async def runfun():
... result = await future
... print (result)
...
>>>loop.run_until_complete(runfun()) #也可以通过 loop.run_until_complete(future) 来执行,这里只是为了演示await
111
>>> future
<future finished result = 111 >
>>> future.done()
true
>>> future.result()
111
task
|
eventloop 除了支持协程,还支持注册 future 和 task 2种类型的对象,而 future 是协程的封装, future 对象提供了很多任务方法(如完成后的回调,取消,设置任务结果等等),但是一般情况下开发者不需要操作 future 这种底层对象,而是直接用 future 的子类 task 协同的调度协程来实现并发。那么什么是 task 呢?下面介绍下:
一个与 future 类似的对象,可运行 python 协程。非线程安全。 task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 future 对象, task 对象会挂起该协程的执行并等待该 future 对象完成。当该 future 对象完成被打包的协程将恢复执行。 事件循环使用协同日程调度: 一个事件循环每次运行一个 task 对象。而一个 task 对象会等待一个 future 对象完成,该事件循环会运行其他 task 、回调或执行io操作。
下面看看用法:
1
2
3
4
5
6
7
8
9
|
>>> async def a():
... print ( 'suspending a' )
... await asyncio.sleep( 3 )
... print ( 'resuming a' )
...
>>> task = asyncio.ensure_future(a())
>>> loop.run_until_complete(task)
suspending a
resuming a
|
asyncio中一些常见用法的区别
asyncio.gather和asyncio.wait
我们在上面的代码中用到过 asyncio.gather ,其实还有另外一种用法是 asyncio.wait ,他们都可以让多个协程并发执行,那么他们有什么区别呢?下面介绍下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
>>> import asyncio
>>> async def a():
... print ( 'suspending a' )
... await asyncio.sleep( 3 )
... print ( 'resuming a' )
... return 'a'
...
...
... async def b():
... print ( 'suspending b' )
... await asyncio.sleep( 1 )
... print ( 'resuming b' )
... return 'b'
...
>>> async def fun1():
... return_value_a, return_value_b = await asyncio.gather(a(), b())
... print (return_value_a,return_value_b)
...
>>> asyncio.run(fun1())
suspending a
suspending b
resuming b
resuming a
a b
>>> async def fun2():
... done,pending = await asyncio.wait([a(),b()])
... print (done)
... print (pending)
... task = list (done)[ 0 ]
... print (task)
... print (task.result())
...
>>> asyncio.run(fun2())
suspending b
suspending a
resuming b
resuming a
{<task finished coro = <a() done, defined at < input >: 1 > result = 'a' >, <task finished coro = <b() done, defined at < input >: 8 > result = 'b' >}
set ()
<task finished coro = <a() done, defined at < input >: 1 > result = 'a' >
a
|
根据上述代码,我们可以看出两者的区别:
asyncio.gather 能收集协程的结果,而且会按照输入协程的顺序保存对应协程的执行结果,而 asyncio.wait 的返回值有两项,第一项是完成的任务列表,第二项表示等待完成的任务列表。
asyncio.wait 支持接受一个参数 return_when ,在默认情况下, asyncio.wait 会等待全部任务完成 (return_when='all_completed') ,它还支持 first_completed (第一个协程完成就返回)和 first_exception (出现第一个异常就返回):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
>>> async def fun2():
... done,pending = await asyncio.wait([a(),b()],return_when = asyncio.tasks.first_completed)
... print (done)
... print (pending)
... task = list (done)[ 0 ]
... print (task)
... print (task.result())
...
>>> asyncio.run(fun2())
suspending a
suspending b
resuming b
{<task finished coro = <b() done, defined at < input >: 8 > result = 'b' >}
{<task pending coro = <a() running at < input >: 3 > wait_for = <future pending cb = [<taskwakeupmethwrapper object at 0x10757bf18 >()]>>}
<task finished coro = <b() done, defined at < input >: 8 > result = 'b' >
b
|
一般情况下,用 asyncio.gather 就足够了。
asyncio.create_task和loop.create_task以及asyncio.ensure_future
这三种方法都可以创建 task ,从python3.7开始可以统一的使用更高阶的 asyncio.create_task .其实 asyncio.create_task 就是用的 loop.create_task . loop.create_task 接受的参数需要是一个协程,但是 asyncio.ensure_future 除了接受协程,还可以是 future 对象或者 awaitable 对象:
- 如果参数是协程,其底层使用 loop.create_task ,返回 task 对象
- 如果是 future 对象会直接返回
- 如果是一个 awaitable 对象,会 await 这个对象的 __await__ 方法,再执行一次 ensure_future ,最后返回 task 或者 future 。
所以 ensure_future 方法主要就是确保这是一个 future 对象,一般情况下直接用 asyncio.create_task 就可以了。
注册回调和执行同步代码
可以使用 add_done_callback 来添加成功回调:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
def callback(future):
print (f 'result: {future.result()}' )
def callback2(future, n):
print (f 'result: {future.result()}, n: {n}' )
async def funa():
await asyncio.sleep( 1 )
return "funa"
async def main():
task = asyncio.create_task(funa())
task.add_done_callback(callback)
await task
#这样可以为callback传递参数
task = asyncio.create_task(funa())
task.add_done_callback(functools.partial(callback2, n = 1 ))
await task
if __name__ = = '__main__' :
asyncio.run(main())
|
执行同步代码
如果有同步逻辑,想要用 asyncio 来实现并发,那么需要怎么做呢?下面看看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
def a1():
time.sleep( 1 )
return "a"
async def b1():
await asyncio.sleep( 1 )
return "b"
async def main():
loop = asyncio.get_running_loop()
await asyncio.gather(loop.run_in_executor(none, a1), b1())
if __name__ = = '__main__' :
start = time.perf_counter()
asyncio.run(main())
print (f 'main method cost: {time.perf_counter() - start}' )
# 输出: main method cost: 1.0050589740000002
|
可以使用 run_into_executor
来将同步函数逻辑转化成一个协程,第一个参数是要传递 concurrent.futures.executor 实例的,传递 none 会选择默认的 executor 。
总结
以上所述是小编给大家介绍的python中的asyncio代码详解,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!原文链接:https://www.lylinux.net/article/2019/6/9/57.html