使用asyncio时,如何在关闭事件循环之前完成所有正在运行的任务

时间:2021-02-19 18:08:56

I have the following code:

我有以下代码:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

I run this function until complete. The problem occurs when shutdown is set - the function completes and any pending tasks are never run. (You see this as an error

我运行此功能直到完成。设置关闭时会出现问题 - 函数完成,并且永远不会运行任何挂起的任务。 (你认为这是一个错误

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>

). How do I schedule a shutdown correctly?

)。如何正确安排关机?

To give some context, I'm writing a system monitor which reads from /proc/stat every 5 seconds, computes the cpu usage in that period, and then sends the result to a server. I want to keep scheduling these monitoring jobs until I receive sigterm, when I stop scheduling, wait for all current jobs to finish, and exit gracefully.

为了给出一些上下文,我正在编写一个系统监视器,它每隔5秒从/ proc / stat读取一次,计算该时间段内的CPU使用情况,然后将结果发送到服务器。我想继续安排这些监视作业,直到我收到sigterm,当我停止调度,等待所有当前作业完成,然后正常退出。

1 个解决方案

#1


35  

You can retrieve unfinished tasks and run the loop again until they finished, then close the loop or exit your program.

您可以检索未完成的任务并再次运行循环直到完成,然后关闭循环或退出程序。

pending = asyncio.Task.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
  • pending is a list of pending tasks.
  • pending是待处理任务的列表。
  • asyncio.gather() allows to wait on several tasks at once.
  • asyncio.gather()允许一次等待几个任务。

If you want to ensure all the tasks are completed inside a coroutine (maybe you have a "main" coroutine), you can do it this way, for instance:

如果你想确保在协程内完成所有任务(也许你有一个“主”协程),你可以这样做,例如:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*asyncio.Task.all_tasks())

Also, in this case, since all the tasks are created in the same coroutine, you already have access to the tasks:

此外,在这种情况下,由于所有任务都在同一个协程中创建,因此您已经可以访问任务:

@asyncio.coroutine
def do_something_periodically():
    tasks = []
    while True:
        tasks.append(asyncio.async(my_expensive_operation()))
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*tasks)

#1


35  

You can retrieve unfinished tasks and run the loop again until they finished, then close the loop or exit your program.

您可以检索未完成的任务并再次运行循环直到完成,然后关闭循环或退出程序。

pending = asyncio.Task.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
  • pending is a list of pending tasks.
  • pending是待处理任务的列表。
  • asyncio.gather() allows to wait on several tasks at once.
  • asyncio.gather()允许一次等待几个任务。

If you want to ensure all the tasks are completed inside a coroutine (maybe you have a "main" coroutine), you can do it this way, for instance:

如果你想确保在协程内完成所有任务(也许你有一个“主”协程),你可以这样做,例如:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*asyncio.Task.all_tasks())

Also, in this case, since all the tasks are created in the same coroutine, you already have access to the tasks:

此外,在这种情况下,由于所有任务都在同一个协程中创建,因此您已经可以访问任务:

@asyncio.coroutine
def do_something_periodically():
    tasks = []
    while True:
        tasks.append(asyncio.async(my_expensive_operation()))
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*tasks)