Problem
I've segmented a long-running task into logical subtasks, so I can report the results of each subtask as it completes. However, I'm trying to report the results of a task that will effectively never complete (instead yielding values as it goes), and am struggling to do so with my existing solution.
我已将长时间运行的任务分段为逻辑子任务,因此我可以在完成时报告每个子任务的结果。但是,我正在尝试报告一个有效永远无法完成的任务的结果(而不是随着时间的推移产生价值),并且我正在努力使用现有的解决方案。
Background
I'm building a web interface to some Python programs I've written. Users can submit jobs through web forms, then check back to see the job's progress.
我正在为我编写的一些Python程序构建一个Web界面。用户可以通过Web表单提交作业,然后回来查看作业的进度。
Let's say I have two functions, each accessed via separate forms:
假设我有两个函数,每个函数都通过不同的形式访问:
-
med_func
: Takes ~1 minute to execute, results are passed off torender()
, which produces additional data. - med_func:执行约1分钟,结果传递给render(),生成其他数据。
-
long_func
: Returns a generator. Eachyield
takes on the order of 30 minutes, and should be reported to the user. There are so many yields, we can consider this iterator as infinite (terminating only when revoked). - long_func:返回一个生成器。每个产量大约需要30分钟,应该报告给用户。收益率如此之多,我们可以将此迭代器视为无限(仅在撤销时终止)。
Code, current implementation
With med_func
, I report results as follows:
使用med_func,我报告结果如下:
On form submission, I save an AsyncResult
to a Django session:
在表单提交时,我将AsyncResult保存到Django会话:
task_result = med_func.apply_async([form], link=render.s())
request.session["task_result"] = task_result
The Django view for the results page accesses this AsyncResult
. When a task has completed, results are saved into an object that is passed as context to a Django template.
结果页面的Django视图访问此AsyncResult。任务完成后,结果将保存到作为上下文传递给Django模板的对象中。
def results(request):
""" Serve (possibly incomplete) results of a session's latest run. """
session = request.session
try: # Load most recent task
task_result = session["task_result"]
except KeyError: # Already cleared, or doesn't exist
if "results" not in session:
session["status"] = "No job submitted"
else: # Extract data from Asynchronous Tasks
session["status"] = task_result.status
if task_result.ready():
session["results"] = task_result.get()
render_task = task_result.children[0]
# Decorate with rendering results
session["render_status"] = render_task.status
if render_task.ready():
session["results"].render_output = render_task.get()
del(request.session["task_result"]) # Don't need any more
return render_to_response('results.html', request.session)
This solution only works when the function actually terminates. I can't chain together logical subtasks of long_func
, because there are an unknown number of yield
s (each iteration of long_func
's loop may not produce a result).
此解决方案仅在函数实际终止时有效。我不能将long_func的逻辑子任务链接在一起,因为有一个未知数量的yield(long_func循环的每次迭代都可能不会产生结果)。
Question
Is there any sensible way to access yielded objects from an extremely long-running Celery task, so that they can be displayed before the generator is exhausted?
是否有任何合理的方法可以从极长时间运行的Celery任务中访问产生的对象,以便在生成器耗尽之前显示它们?
6 个解决方案
#1
23
In order for Celery to know what the current state of the task is, it sets some metadata in whatever result backend you have. You can piggy-back on that to store other kinds of metadata.
为了让Celery知道任务的当前状态是什么,它会在您拥有的任何结果后端设置一些元数据。您可以通过它来存储其他类型的元数据。
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.backend.mark_as_started(
report_progress.request.id,
progress=progress)
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
I wouldn't throw a ton of data in there, but it works well for tracking the progress of a long-running task.
我不会在那里投入大量数据,但它可以很好地跟踪长期运行任务的进度。
#2
7
Paul's answer is great. As an alternative to using mark_as_started
you can use Task
's update_state
method. They ultimately do the same thing, but the name "update_state" is a little more appropriate for what you're trying to do. You can optionally define a custom state that indicates your task is in progress (I've named my custom state 'PROGRESS'):
保罗的回答很棒。作为使用mark_as_started的替代方法,您可以使用Task的update_state方法。他们最终做同样的事情,但名称“update_state”更适合你想要做的事情。您可以选择定义一个自定义状态,指示您的任务正在进行中(我将自定义状态命名为“PROGRESS”):
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.update_state(state='PROGRESS', meta={'progress': progress})
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
#3
5
Celery part:
芹菜部分:
def long_func(*args, **kwargs):
i = 0
while True:
yield i
do_something_here(*args, **kwargs)
i += 1
@task()
def test_yield_task(task_id=None, **kwargs):
the_progress = 0
for the_progress in long_func(**kwargs):
cache.set('celery-task-%s' % task_id, the_progress)
Webclient side, starting task:
Web客户端,启动任务:
r = test_yield_task.apply_async()
request.session['task_id'] = r.task_id
Testing last yielded value:
测试最后产生的价值:
v = cache.get('celery-task-%s' % session.get('task_id'))
if v:
do_someting()
If you do not like to use cache, or it's impossible, you can use db, file or any other place which celery worker and server side will have both accesss. With cache it's a simplest solution, but workers and server have to use the same cache.
如果您不喜欢使用缓存,或者不可能,您可以使用db,file或芹菜工作者和服务器端都具有访问权限的任何其他位置。使用缓存它是一个最简单的解决方案,但工作人员和服务器必须使用相同的缓存。
#4
4
A couple options to consider:
有两种选择:
1 -- task groups. If you can enumerate all the sub tasks from the time of invocation, you can apply the group as a whole -- that returns a TaskSetResult object you can use to monitor the results of the group as a whole, or of individual tasks in the group -- query this as-needed when you need to check status.
1 - 任务组。如果您可以从调用时枚举所有子任务,则可以将该组作为一个整体应用 - 它返回一个TaskSetResult对象,您可以使用该对象来监视整个组的结果,或者组中的各个任务 - 当您需要检查状态时,根据需要进行查询。
2 -- callbacks. If you can't enumerate all sub tasks (or even if you can!) you can define a web hook / callback that's the last step in the task -- called when the rest of the task completes. The hook would be against a URI in your app that ingests the result and makes it available via DB or app-internal API.
2 - 回调。如果您无法枚举所有子任务(或者即使您可以!),您也可以定义Web挂钩/回调,这是任务的最后一步 - 在任务的其余部分完成时调用。该钩子将针对您的应用中的URI,该URI提取结果并通过DB或app-internal API使其可用。
Some combination of these could solve your challenge.
这些的一些组合可以解决您的挑战。
#5
3
See also this great PyCon preso from one of the Instagram engineers.
另见Instagram工程师之一的这个伟大的PyCon preso。
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
At video mark 16:00, he discusses how they structure long lists of sub-tasks.
在视频标记16:00,他讨论了如何构建长的子任务列表。
#6
-1
Personally, I would want to see the start time, duration, progress (number of items yielded), stop time (or ETA), status, and any other helpful information. It would be nice if it looked similar to a related display, maybe like ps
on Linux. It is, after all, a process status.
就个人而言,我希望看到开始时间,持续时间,进度(产生的项目数),停止时间(或ETA),状态以及任何其他有用的信息。如果它看起来类似于相关的显示器,可能就像Linux上的ps一样。毕竟,这是一个过程状态。
You could include some options to pause or kill the task, and/or to "open" it and display detailed information about the children or results.
您可以包含一些选项来暂停或终止任务,和/或“打开”它并显示有关子项或结果的详细信息。
#1
23
In order for Celery to know what the current state of the task is, it sets some metadata in whatever result backend you have. You can piggy-back on that to store other kinds of metadata.
为了让Celery知道任务的当前状态是什么,它会在您拥有的任何结果后端设置一些元数据。您可以通过它来存储其他类型的元数据。
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.backend.mark_as_started(
report_progress.request.id,
progress=progress)
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
I wouldn't throw a ton of data in there, but it works well for tracking the progress of a long-running task.
我不会在那里投入大量数据,但它可以很好地跟踪长期运行任务的进度。
#2
7
Paul's answer is great. As an alternative to using mark_as_started
you can use Task
's update_state
method. They ultimately do the same thing, but the name "update_state" is a little more appropriate for what you're trying to do. You can optionally define a custom state that indicates your task is in progress (I've named my custom state 'PROGRESS'):
保罗的回答很棒。作为使用mark_as_started的替代方法,您可以使用Task的update_state方法。他们最终做同样的事情,但名称“update_state”更适合你想要做的事情。您可以选择定义一个自定义状态,指示您的任务正在进行中(我将自定义状态命名为“PROGRESS”):
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.update_state(state='PROGRESS', meta={'progress': progress})
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
#3
5
Celery part:
芹菜部分:
def long_func(*args, **kwargs):
i = 0
while True:
yield i
do_something_here(*args, **kwargs)
i += 1
@task()
def test_yield_task(task_id=None, **kwargs):
the_progress = 0
for the_progress in long_func(**kwargs):
cache.set('celery-task-%s' % task_id, the_progress)
Webclient side, starting task:
Web客户端,启动任务:
r = test_yield_task.apply_async()
request.session['task_id'] = r.task_id
Testing last yielded value:
测试最后产生的价值:
v = cache.get('celery-task-%s' % session.get('task_id'))
if v:
do_someting()
If you do not like to use cache, or it's impossible, you can use db, file or any other place which celery worker and server side will have both accesss. With cache it's a simplest solution, but workers and server have to use the same cache.
如果您不喜欢使用缓存,或者不可能,您可以使用db,file或芹菜工作者和服务器端都具有访问权限的任何其他位置。使用缓存它是一个最简单的解决方案,但工作人员和服务器必须使用相同的缓存。
#4
4
A couple options to consider:
有两种选择:
1 -- task groups. If you can enumerate all the sub tasks from the time of invocation, you can apply the group as a whole -- that returns a TaskSetResult object you can use to monitor the results of the group as a whole, or of individual tasks in the group -- query this as-needed when you need to check status.
1 - 任务组。如果您可以从调用时枚举所有子任务,则可以将该组作为一个整体应用 - 它返回一个TaskSetResult对象,您可以使用该对象来监视整个组的结果,或者组中的各个任务 - 当您需要检查状态时,根据需要进行查询。
2 -- callbacks. If you can't enumerate all sub tasks (or even if you can!) you can define a web hook / callback that's the last step in the task -- called when the rest of the task completes. The hook would be against a URI in your app that ingests the result and makes it available via DB or app-internal API.
2 - 回调。如果您无法枚举所有子任务(或者即使您可以!),您也可以定义Web挂钩/回调,这是任务的最后一步 - 在任务的其余部分完成时调用。该钩子将针对您的应用中的URI,该URI提取结果并通过DB或app-internal API使其可用。
Some combination of these could solve your challenge.
这些的一些组合可以解决您的挑战。
#5
3
See also this great PyCon preso from one of the Instagram engineers.
另见Instagram工程师之一的这个伟大的PyCon preso。
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
At video mark 16:00, he discusses how they structure long lists of sub-tasks.
在视频标记16:00,他讨论了如何构建长的子任务列表。
#6
-1
Personally, I would want to see the start time, duration, progress (number of items yielded), stop time (or ETA), status, and any other helpful information. It would be nice if it looked similar to a related display, maybe like ps
on Linux. It is, after all, a process status.
就个人而言,我希望看到开始时间,持续时间,进度(产生的项目数),停止时间(或ETA),状态以及任何其他有用的信息。如果它看起来类似于相关的显示器,可能就像Linux上的ps一样。毕竟,这是一个过程状态。
You could include some options to pause or kill the task, and/or to "open" it and display detailed information about the children or results.
您可以包含一些选项来暂停或终止任务,和/或“打开”它并显示有关子项或结果的详细信息。