Python 3.5异步/等待实际代码示例

时间:2022-08-24 04:23:59

I've read tons of articles and tutorial about Python's 3.5 async/await thing. I have to say I'm pretty confused, because some use get_event_loop() and run_until_complete(), some use ensure_future(), some use asyncio.wait(), and some use call_soon().

我已经阅读了大量有关Python的3.5异步/等待事物的文章和教程。我不得不说我很困惑,因为有些人使用get_event_loop()和run_until_complete(),有些人使用ensure_future(),有些人使用asyncio.wait(),有些人使用call_soon()。

It seems like I have a lot choices, but I have no idea if they are completely identical or there are cases where you use loops and there are cases where you use wait().

看起来我有很多选择,但我不知道它们是完全相同还是有使用循环的情况,有些情况下你使用wait()。

But the thing is all examples work with asyncio.sleep() as simulation of real slow operation which returns an awaitable object. Once I try to swap this line for some real code the whole thing fails. What the heck are the differences between approaches written above and how should I run a third-party library which is not ready for async/await. I do use the Quandl service to fetch some stock data.

但事实是所有的例子都使用asyncio.sleep()作为真实慢速操作的模拟,它返回一个等待的对象。一旦我尝试将这一行换成一些真正的代码,整个事情就会失败。上面介绍的方法与我应该如何运行尚未准备好进行异步/等待的第三方库之间存在差异。我确实使用Quandl服务来获取一些股票数据。

 import asyncio
 import quandl

 async def slow_operation(n):
     # await asyncio.sleep(1) # Works because it's await ready.
     await quandl.Dataset(n) # Doesn't work because it's not await ready.


 async def main():
     await asyncio.wait([
         slow_operation("SIX/US9884981013EUR4"),
         slow_operation("SIX/US88160R1014EUR4"),
     ])

 # You don't have to use any code for 50 requests/day.
 quandl.ApiConfig.api_key = "MY_SECRET_CODE"

 loop = asyncio.get_event_loop()
 loop.run_until_complete(main())

I hope you get the point how lost I feel and how simple thing I would like to have running in parallel.

我希望你能明白我的感受是多么的失落,以及我希望并行运行的简单方法。

1 个解决方案

#1


31  

If a third-party library is not compatible with async/await then obviously you can't use it easily. There are two cases:

如果第三方库与async / await不兼容,那么显然你不能轻易使用它。有两种情况:

  1. Let's say that the function in the library is asynchronous and it gives you a callback, e.g.

    假设库中的函数是异步的,它会给你一个回调,例如

    def fn(..., clb):
        ...
    

    So you can do:

    所以你可以这样做:

    def on_result(...):
        ...
    
    fn(..., on_result)
    

    In that case you can wrap such functions into the asyncio protocol like this:

    在这种情况下,您可以将这些函数包装到asyncio协议中,如下所示:

    from asyncio import Future
    
    def wrapper(...):
        future = Future()
        def my_clb(...):
            future.set_result(xyz)
        fn(..., my_clb)
        return future
    

    (use future.set_exception(exc) on exception)

    (在异常时使用future.set_exception(exc))

    Then you can simply call that wrapper in some async function with await:

    然后你可以通过await简单地在一些异步函数中调用该包装器:

    value = await wrapper(...)
    

    Note that await works with any Future object. You don't have to declare wrapper as async.

    请注意,await适用于任何Future对象。您不必将包装器声明为异步。

  2. If the function in the library is synchronous then you can run it in a separate thread (probably you would use some thread pool for that). The whole code may look like this:

    如果库中的函数是同步的,那么你可以在一个单独的线程中运行它(可能你会使用一些线程池)。整个代码可能如下所示:

    import asyncio
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    # Initialize 10 threads
    THREAD_POOL = ThreadPoolExecutor(10)
    
    def synchronous_handler(param1, ...):
        # Do something synchronous
        time.sleep(2)
        return "foo"
    
    # Somewhere else
    async def main():
        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
        ]
        await asyncio.wait(futures)
        for future in futures:
            print(future.result())
    
    with THREAD_POOL:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

If you can't use threads for whatever reason then using such a library simply makes entire asynchronous code pointless.

如果由于某种原因无法使用线程,那么使用这样的库只会使整个异步代码毫无意义。

Note however that using synchronous library with async is probably a bad idea. You won't get much and yet you complicate the code a lot.

但请注意,使用带异步的同步库可能是一个坏主意。你不会得到太多,但你会使代码复杂化很多。

#1


31  

If a third-party library is not compatible with async/await then obviously you can't use it easily. There are two cases:

如果第三方库与async / await不兼容,那么显然你不能轻易使用它。有两种情况:

  1. Let's say that the function in the library is asynchronous and it gives you a callback, e.g.

    假设库中的函数是异步的,它会给你一个回调,例如

    def fn(..., clb):
        ...
    

    So you can do:

    所以你可以这样做:

    def on_result(...):
        ...
    
    fn(..., on_result)
    

    In that case you can wrap such functions into the asyncio protocol like this:

    在这种情况下,您可以将这些函数包装到asyncio协议中,如下所示:

    from asyncio import Future
    
    def wrapper(...):
        future = Future()
        def my_clb(...):
            future.set_result(xyz)
        fn(..., my_clb)
        return future
    

    (use future.set_exception(exc) on exception)

    (在异常时使用future.set_exception(exc))

    Then you can simply call that wrapper in some async function with await:

    然后你可以通过await简单地在一些异步函数中调用该包装器:

    value = await wrapper(...)
    

    Note that await works with any Future object. You don't have to declare wrapper as async.

    请注意,await适用于任何Future对象。您不必将包装器声明为异步。

  2. If the function in the library is synchronous then you can run it in a separate thread (probably you would use some thread pool for that). The whole code may look like this:

    如果库中的函数是同步的,那么你可以在一个单独的线程中运行它(可能你会使用一些线程池)。整个代码可能如下所示:

    import asyncio
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    # Initialize 10 threads
    THREAD_POOL = ThreadPoolExecutor(10)
    
    def synchronous_handler(param1, ...):
        # Do something synchronous
        time.sleep(2)
        return "foo"
    
    # Somewhere else
    async def main():
        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
        ]
        await asyncio.wait(futures)
        for future in futures:
            print(future.result())
    
    with THREAD_POOL:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

If you can't use threads for whatever reason then using such a library simply makes entire asynchronous code pointless.

如果由于某种原因无法使用线程,那么使用这样的库只会使整个异步代码毫无意义。

Note however that using synchronous library with async is probably a bad idea. You won't get much and yet you complicate the code a lot.

但请注意,使用带异步的同步库可能是一个坏主意。你不会得到太多,但你会使代码复杂化很多。