为什么coroutines不能与run_in_executor一起使用?

时间:2021-09-11 20:43:00

I want to run a service that requests urls using coroutines and multithread. However I cannot pass coroutines to the workers in the executor. See the code below for a minimal example of this issue:

我希望运行一个使用coroutines和多线程来请求url的服务。然而,我不能把这些东西传给遗嘱执行人的工人。请参阅下面的代码,以获得这个问题的最小示例:

import time
import asyncio
import concurrent.futures

EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=5)

async def async_request(loop):
    await asyncio.sleep(3)

def sync_request(_):
    time.sleep(3)

async def main(loop):
    futures = [loop.run_in_executor(EXECUTOR, async_request,loop) 
               for x in range(10)]

    await asyncio.wait(futures)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

Resulting in the following error:

导致以下错误:

Traceback (most recent call last):
  File "co_test.py", line 17, in <module>
    loop.run_until_complete(main(loop))
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "co_test.py", line 10, in main
    futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
  File "co_test.py", line 10, in <listcomp>
    futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
  File "/usr/lib/python3.5/asyncio/base_events.py", line 541, in run_in_executor
    raise TypeError("coroutines cannot be used with run_in_executor()")
TypeError: coroutines cannot be used with run_in_executor()

I know that I could use sync_request funcion instead of async_request, in this case I would have coroutines by means of sending the blocking function to another thread.

我知道我可以使用sync_request funcion而不是async_request,在这种情况下,我可以通过将阻塞函数发送到另一个线程来使用coroutines。

I also know I could call async_request ten times in the event loop. Something like in the code below:

我还知道,在事件循环中,我可以调用async_request 10次。下面的代码如下:

loop = asyncio.get_event_loop()
futures = [async_request(loop) for i in range(10)]
loop.run_until_complete(asyncio.wait(futures))

But in this case I would be using a single thread.

但是在这种情况下,我将使用一个线程。

How could I use both scenarios, the coroutines working within multithreads? As you can see by the code, I am passing (and not using) the pool to the async_request in the hopes I can code something that tells the worker to make a future, send it to the pool and asynchronously (freeing the worker) waits for the result.

我如何使用两个场景,在多线程中工作的协同程序?正如您在代码中看到的,我正在通过(而不是使用)池到async_request,希望我可以编写一些代码来告诉worker创建一个未来,将其发送到池中并异步(释放worker)等待结果。

The reason I want to do that is to make the application scalable. Is it an unnecessary step? Should I simply have a thread per url and that is it? Something like:

我这样做的原因是为了使应用程序具有可伸缩性。这是不必要的步骤吗?我是否应该简单地有一个每个url的线程?喜欢的东西:

LEN = len(list_of_urls)
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=LEN)

is good enough?

是足够好?

1 个解决方案

#1


3  

You have to create and set a new event loop in the thread context in order to run coroutines:

您必须在线程上下文中创建并设置一个新的事件循环,以便运行coroutines:

import asyncio
from concurrent.futures import ThreadPoolExecutor


def run(corofn, *args):
    loop = asyncio.new_event_loop()
    try:
        coro = corofn(*args)
        asyncio.set_event_loop(loop)
        return loop.run_until_complete(coro)
    finally:
        loop.close()


async def main():
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(max_workers=5)
    futures = [
        loop.run_in_executor(executor, run, asyncio.sleep, 1, x)
        for x in range(10)]
    print(await asyncio.gather(*futures))
    # Prints: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

#1


3  

You have to create and set a new event loop in the thread context in order to run coroutines:

您必须在线程上下文中创建并设置一个新的事件循环,以便运行coroutines:

import asyncio
from concurrent.futures import ThreadPoolExecutor


def run(corofn, *args):
    loop = asyncio.new_event_loop()
    try:
        coro = corofn(*args)
        asyncio.set_event_loop(loop)
        return loop.run_until_complete(coro)
    finally:
        loop.close()


async def main():
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(max_workers=5)
    futures = [
        loop.run_in_executor(executor, run, asyncio.sleep, 1, x)
        for x in range(10)]
    print(await asyncio.gather(*futures))
    # Prints: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())