I'm using Celery (3.0.15) with Redis as a broker.
我正在使用Celery(3.0.15)和Redis作为经纪人。
Is there a straightforward way to query the number of tasks with a given name that exist in a Celery queue?
是否有一种直接的方法来查询Celery队列中存在的具有给定名称的任务数量?
And, as a followup, is there a way to cancel all tasks with a given name that exist in a Celery queue?
并且,作为后续,有没有办法取消Celery队列中存在的具有给定名称的所有任务?
I've been through the Monitoring and Management Guide and don't see a solution there.
我已经阅读了“监控和管理指南”,但没有看到解决方案。
4 个解决方案
#1
26
# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)
# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
celery.control.revoke(uuid, terminate=True)
#2
10
There is one issue that earlier answers have not addressed and may throw off people if they are not aware of it.
有一个问题,早期的答案没有解决,如果他们不知道它可能会甩掉他们。
Among those solutions already posted, I'd use Danielle's with one minor modification: I'd import the task into my file and use its .name
attribute to get the task name to pass to .tasks_by_type()
.
在已发布的解决方案中,我将Danielle用于一个小修改:我将任务导入到我的文件中并使用其.name属性将任务名称传递给.tasks_by_type()。
app.control.revoke(
[uuid for uuid, _ in
celery.events.state.State().tasks_by_type(task.name)])
However, this solution will ignore those tasks that have been scheduled for future execution. Like some people who commented on other answers, when I checked what .tasks_by_type()
return I had an empty list. And indeed my queues were empty. But I knew that there were tasks scheduled to be executed in the future and these were my primary target. I could see them by executing celery -A [app] inspect scheduled
but they were unaffected by the code above.
但是,此解决方案将忽略已安排用于将来执行的那些任务。像一些评论其他答案的人一样,当我检查.tasks_by_type()返回时,我有一个空列表。事实上,我的队列是空的。但我知道有些任务计划在未来执行,这些是我的主要目标。我可以通过执行芹菜-A [app]检查预定来看到它们,但它们不受上述代码的影响。
I managed to revoke the scheduled tasks by doing this:
我设法通过这样做来撤销计划任务:
app.control.revoke(
[scheduled["request"]["id"] for scheduled in
chain.from_iterable(app.control.inspect().scheduled()
.itervalues())])
app.control.inspect().scheduled()
returns a dictionary whose keys are worker names and values are lists of scheduling information (hence, the need for chain.from_iterable
which is imported from itertools
). The task information is in the "request"
field of the scheduling information and "id"
contains the task id. Note that even after revocation, the scheduled task will still show among the scheduled tasks. Scheduled tasks that are revoked won't get removed from the list of scheduled tasks until their timers expire or until Celery performs some cleanup operation. (Restarting workers triggers such cleanup.)
app.control.inspect()。scheduled()返回一个字典,其键是工作者名称,值是调度信息列表(因此,需要从itertools导入的chain.from_iterable)。任务信息在调度信息的“请求”字段中,“id”包含任务id。请注意,即使在撤销后,计划任务仍将显示在计划任务中。已撤销的计划任务将不会从计划任务列表中删除,直到计时器到期或Celery执行某些清理操作。 (重新启动工作程序会触发此类清理。)
#3
3
You can do this in one request:
您可以在一个请求中执行此操作:
app.control.revoke([
uuid
for uuid, _ in
celery.events.state.State().tasks_by_type(task_name)
])
#4
1
It looks like flower
provides monitoring:
看起来像花提供监控:
https://github.com/mher/flower
Real-time monitoring using Celery Events
使用Celery Events进行实时监控
Task progress and history Ability to show task details (arguments, start time, runtime, and more) Graphs and statistics Remote Control
任务进度和历史记录能够显示任务详细信息(参数,开始时间,运行时等)图形和统计信息远程控制
View worker status and statistics Shutdown and restart worker instances Control worker pool size and autoscale settings View and modify the queues a worker instance consumes from View currently running tasks View scheduled tasks (ETA/countdown) View reserved and revoked tasks Apply time and rate limits Configuration viewer Revoke or terminate tasks HTTP API
查看工作程序状态和统计信息关闭并重新启动工作程序实例控制工作程序池大小和自动缩放设置查看和修改工作程序实例消耗的队列查看当前正在运行的任务查看计划任务(ETA /倒计时)查看保留和已撤销的任务应用时间和速率限制配置viewer撤消或终止任务HTTP API
OpenID authentication
#1
26
# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)
# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
celery.control.revoke(uuid, terminate=True)
#2
10
There is one issue that earlier answers have not addressed and may throw off people if they are not aware of it.
有一个问题,早期的答案没有解决,如果他们不知道它可能会甩掉他们。
Among those solutions already posted, I'd use Danielle's with one minor modification: I'd import the task into my file and use its .name
attribute to get the task name to pass to .tasks_by_type()
.
在已发布的解决方案中,我将Danielle用于一个小修改:我将任务导入到我的文件中并使用其.name属性将任务名称传递给.tasks_by_type()。
app.control.revoke(
[uuid for uuid, _ in
celery.events.state.State().tasks_by_type(task.name)])
However, this solution will ignore those tasks that have been scheduled for future execution. Like some people who commented on other answers, when I checked what .tasks_by_type()
return I had an empty list. And indeed my queues were empty. But I knew that there were tasks scheduled to be executed in the future and these were my primary target. I could see them by executing celery -A [app] inspect scheduled
but they were unaffected by the code above.
但是,此解决方案将忽略已安排用于将来执行的那些任务。像一些评论其他答案的人一样,当我检查.tasks_by_type()返回时,我有一个空列表。事实上,我的队列是空的。但我知道有些任务计划在未来执行,这些是我的主要目标。我可以通过执行芹菜-A [app]检查预定来看到它们,但它们不受上述代码的影响。
I managed to revoke the scheduled tasks by doing this:
我设法通过这样做来撤销计划任务:
app.control.revoke(
[scheduled["request"]["id"] for scheduled in
chain.from_iterable(app.control.inspect().scheduled()
.itervalues())])
app.control.inspect().scheduled()
returns a dictionary whose keys are worker names and values are lists of scheduling information (hence, the need for chain.from_iterable
which is imported from itertools
). The task information is in the "request"
field of the scheduling information and "id"
contains the task id. Note that even after revocation, the scheduled task will still show among the scheduled tasks. Scheduled tasks that are revoked won't get removed from the list of scheduled tasks until their timers expire or until Celery performs some cleanup operation. (Restarting workers triggers such cleanup.)
app.control.inspect()。scheduled()返回一个字典,其键是工作者名称,值是调度信息列表(因此,需要从itertools导入的chain.from_iterable)。任务信息在调度信息的“请求”字段中,“id”包含任务id。请注意,即使在撤销后,计划任务仍将显示在计划任务中。已撤销的计划任务将不会从计划任务列表中删除,直到计时器到期或Celery执行某些清理操作。 (重新启动工作程序会触发此类清理。)
#3
3
You can do this in one request:
您可以在一个请求中执行此操作:
app.control.revoke([
uuid
for uuid, _ in
celery.events.state.State().tasks_by_type(task_name)
])
#4
1
It looks like flower
provides monitoring:
看起来像花提供监控:
https://github.com/mher/flower
Real-time monitoring using Celery Events
使用Celery Events进行实时监控
Task progress and history Ability to show task details (arguments, start time, runtime, and more) Graphs and statistics Remote Control
任务进度和历史记录能够显示任务详细信息(参数,开始时间,运行时等)图形和统计信息远程控制
View worker status and statistics Shutdown and restart worker instances Control worker pool size and autoscale settings View and modify the queues a worker instance consumes from View currently running tasks View scheduled tasks (ETA/countdown) View reserved and revoked tasks Apply time and rate limits Configuration viewer Revoke or terminate tasks HTTP API
查看工作程序状态和统计信息关闭并重新启动工作程序实例控制工作程序池大小和自动缩放设置查看和修改工作程序实例消耗的队列查看当前正在运行的任务查看计划任务(ETA /倒计时)查看保留和已撤销的任务应用时间和速率限制配置viewer撤消或终止任务HTTP API
OpenID authentication