Python 进阶 异步async/await

时间:2021-03-14 19:16:09

一,前言

  本文将会讲述Python 3.5之后出现的async/await的使用方法,我从上看到一篇不错的博客,自己对其进行了梳理。该文章原地址https://www.cnblogs.com/dhcn/p/9032461.html

二,Python常见的函数形式

  2.1 普通函数

def fun():
return 1 if __name__ == '__main__':
fun()

  普通函数,没有什么特别的,直接函数名加括号调用即可。

  2.2 生成器函数

def generator_fun():
yield 1 if __name__ == '__main__':
print(generator_fun()) #<generator object generator_fun at 0x00000000005E6E08>
print(generator_fun().send(None))  
for i in generator_fun():
print(i)

  与普通函数不同的是,生成器函数返回的是一个生成器对象。我们可以通过send()方法或者是for循环的方法来对其进行取值。

  2.3 (异步)协程函数

async def async_fun():
return 1 if __name__ == '__main__':
print(async_fun()) # <coroutine object fun at 0x0000000002156E08>
print(async_fun().send(None)) # StopIteration: 1
try:
async_fun().send(None)
except StopIteration as e:
print(e.value)

  异步函数的调用返回的是一个协程对象,若改对象通过send()进行调用,会报一个StopIteration错,若要取值,就需要捕获该异常,e.value的形式进行获取。

  在协程函数中,可以通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果:

async def async_function():
return 1 async def await_coroutine():
result = await async_function()
print(result) run(await_coroutine())
#

  2.4 异步生成器

async def async_fun():
async for i in generator_async_fun():
print(i) async def generator_async_fun():
yield 1 if __name__ == '__main__':
async_fun().send(None)

  异步生成器的调用比较特殊,它需要依赖别的异步函数进行调用。

三,await的使用

  await语法可以挂起自声协程,等待另一个协程完成直到返回结果。但是有一些地方需要注意:

  3.1 await注意事项

  await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。

  而且await后面的对象需要是一个Awaitable,或者实现了相关的协议。

  3.2 关于await的源代码解析

  查看Awaitable抽象类的代码,表明了只要一个类实现了__await__方法,那么通过它构造出来的实例就是一个Awaitable:

class Awaitable(metaclass=ABCMeta):
__slots__ = () @abstractmethod
def __await__(self):
yield @classmethod
def __subclasshook__(cls, C):
if cls is Awaitable:
return _check_methods(C, "__await__")
return NotImplemented

  而且可以看到,Coroutine类也继承了Awaitable,而且实现了send,throw和close方法。所以await一个调用异步函数返回的协程对象是合法的。

class Coroutine(Awaitable):
__slots__ = () @abstractmethod
def send(self, value):
... @abstractmethod
def throw(self, typ, val=None, tb=None):
... def close(self):
... @classmethod
def __subclasshook__(cls, C):
if cls is Coroutine:
return _check_methods(C, '__await__', 'send', 'throw', 'close')
return NotImplemented

  3.3 关于异步生成器的实例

  案例:假如我要到一家超市去购买土豆,而超市货架上的土豆数量是有限的:

class Potato:
@classmethod
def make(cls, num, *args, **kws):
potatos = []
for i in range(num):
potatos.append(cls.__new__(cls, *args, **kws))
return potatos all_potatos = Potato.make(5)

  现在我想要买50个土豆,每次从货架上拿走一个土豆放到篮子:

def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
sleep(.1)
else:
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break def buy_potatos():
bucket = []
for p in take_potatos(50):
bucket.append(p)

  对应到代码中,就是迭代一个生成器的模型,显然,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的),也许可以用多进程和多线程解决,而在现实生活中,更应该像是这样的:

async def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
await ask_for_potato()
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break

  当货架上的土豆没有了之后,我可以询问超市请求需要更多的土豆,这时候需要等待一段时间直到生产者完成生产的过程:

async def ask_for_potato():
await asyncio.sleep(random.random())
all_potatos.extend(Potato.make(random.randint(1, 10)))

  当生产者完成和返回之后,这是便能从await挂起的地方继续往下跑,完成消费的过程。而这整一个过程,就是一个异步生成器迭代的流程:

async def buy_potatos():
bucket = []
async for p in take_potatos(50):
bucket.append(p)
print(f'Got potato {id(p)}...')

  async for语法表示我们要后面迭代的是一个异步生成器。

def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(buy_potatos())
loop.close()

  用asyncio运行这段代码,结果是这样的:

Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...

  既然是异步的,在请求之后不一定要死等,而是可以做其他事情。比如除了土豆,我还想买番茄,这时只需要在事件循环中再添加一个过程:

def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
loop.close()

  再来运行这段代码:

Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...

  3.4 AsyncGenerator的定义

  它需要实现__aiter__和__anext__两个核心方法,以及asend,athrow,aclose方法。

class AsyncGenerator(AsyncIterator):
__slots__ = () async def __anext__(self):
... @abstractmethod
async def asend(self, value):
... @abstractmethod
async def athrow(self, typ, val=None, tb=None):
... async def aclose(self):
... @classmethod
def __subclasshook__(cls, C):
if cls is AsyncGenerator:
return _check_methods(C, '__aiter__', '__anext__',
'asend', 'athrow', 'aclose')
return NotImplemented

  异步生成器是在3.6之后才有的特性,同样的还有异步推导表达式,因此在上面的例子中,也可以写成这样:

bucket = [p async for p in take_potatos(50)]

  类似的,还有await表达式:

result = [await fun() for fun in funcs if await condition()]

四,async的其他用法

  4.1 async修饰类普通方法

  除了函数之外,类实例的普通方法也能用async语法修饰:

class ThreeTwoOne:
async def begin(self):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return async def game():
t = ThreeTwoOne()
await t.begin()
print('start')

  实例方法的调用同样是返回一个coroutine:

function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())

  4.2 async 修饰类方法

class ThreeTwoOne:
@classmethod
async def begin(cls):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return async def game():
await ThreeTwoOne.begin()
print('start')

  4.3 async的上下文管理器应用

  根据PEP 492中,async也可以应用到上下文管理器中,__aenter__和__aexit__需要返回一个Awaitable:

class GameContext:
async def __aenter__(self):
print('game loading...')
await asyncio.sleep(1) async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1) async def game():
async with GameContext():
print('game start...')
await asyncio.sleep(2)

  在3.7版本,contextlib中会新增一个asynccontextmanager装饰器来包装一个实现异步协议的上下文管理器:

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_connection():
conn = await acquire_db_connection()
try:
yield
finally:
await release_db_connection(conn)

五,await和yield from

  Python3.3的yield from语法可以把生成器的操作委托给另一个生成器,生成器的调用方可以直接与子生成器进行通信:

def sub_gen():
yield 1
yield 2
yield 3 def gen():
return (yield from sub_gen()) def main():
for val in gen():
print(val)
#
#
#

  利用这一特性,使用yield from能够编写出类似协程效果的函数调用,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from语法来创建协程:

# https://docs.python.org/3.4/library/asyncio-task.html
import asyncio @asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y @asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

  然而,用yield from容易在表示协程和生成器中混淆,没有良好的语义性,所以在Python 3.5推出了更新的async/await表达式来作为协程的语法。因此类似以下的调用是等价的:

async with lock:
... with (yield from lock):
...
######################
def main():
return (yield from coro()) def main():
return (await coro())

  那么,怎么把生成器包装为一个协程对象呢?这时候可以用到types包中的coroutine装饰器(如果使用asyncio做驱动的话,那么也可以使用asyncio的coroutine装饰器),@types.coroutine装饰器会将一个生成器函数包装为协程对象:

import asyncio
import types @types.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

  尽管两个函数分别使用了新旧语法,但他们都是协程对象,也分别称作native coroutine以及generator-based coroutine,因此不用担心语法问题。下面观察一个asyncio中Future的例子:

import asyncio

future = asyncio.Future()

async def coro1():
await asyncio.sleep(1)
future.set_result('data') async def coro2():
print(await future) loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2()
]))
loop.close()

  两个协程在在事件循环中,协程coro1在执行第一句后挂起自身切到asyncio.sleep,而协程coro2一直等待future的结果,让出事件循环,计时器结束后coro1执行了第二句设置了future的值,被挂起的coro2恢复执行,打印出future的结果'data'。

  future可以被await证明了future对象是一个Awaitable,进入Future类的源码可以看到有一段代码显示了future实现了__await__协议:

class Future:
...
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too. if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression

  当执行await future这行代码时,future中的这段代码就会被执行,首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。

  当future执行set_result方法时,会触发以下的代码,设置结果,标记future已经完成:

def set_result(self, result):
...
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self._schedule_callbacks()

  最后future会调度自身的回调函数,触发Task._step()告知Task驱动future从之前挂起的点恢复执行,不难看出,future会执行下面的代码:

class Future:
...
def __iter__(self):
...
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.

  最终返回结果给调用方。