如何让芹菜任务调用异步任务?

时间:2022-05-01 19:17:50

I have an django aplication that needs to run an optimization algorithm. This algorithm is composed by two parts. The first part is an evolutionary algorithm and this algorithm calls a certain number of tasks of the second part which is an simulated annealing algorithm. The problem is that celery dont allows a task calls an asynchronous task. I have tried this code below:

我有一个需要运行优化算法的django应用程序。该算法由两部分组成。第一部分是进化算法,该算法调用第二部分的一定数量的任务,即模拟退火算法。问题是芹菜不允许任务调用异步任务。我在下面尝试过以下代码:

            sa_list = []
            for cromossomo in self._populacao:
                sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

            job = group(sa_list)

            result = job.apply_async()
            resultados = result.get()

This code is part of the evolutionary algorithm which is an celery task. When i tried to run it the celery shows this message:

此代码是进化算法的一部分,这是芹菜任务。当我试图运行它时,芹菜显示以下信息:

[2015-12-02 16:20:15,970: WARNING/Worker-1] /home/arthur/django-user/local/lib/python2.7/site-packages/celery/result.py:45: RuntimeWarning: Never call result.get() within a task! See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

[2015-12-02 16:20:15,970:警告/工人-1] /home/arthur/django-user/local/lib/python2.7/site-packages/celery/result.py:45:运行时警告:从不在任务中调用result.get()!见http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

In Celery 3.2 this will result in an exception being raised instead of just being a warning.

在Celery 3.2中,这将导致异常被提出而不仅仅是一个警告。

despite being just a warning the celery seems to be full of tasks and locks.

尽管只是一个警告,芹菜似乎充满了任务和锁定。

I searched for a lot of solutions but none of them worked.

我搜索了很多解决方案,但没有一个有效。

2 个解决方案

#1


1  

one way to deal with this is to have a 2 stage pipeline:

解决这个问题的一种方法是建立一个2阶段管道:

def first_task():
    sa_list = []
    for cromossomo in self._populacao:
        sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

    job = group(sa_list)

    result = job.apply_async()
    result.save()
    return result.id

then call it like this:

然后像这样称呼它:

from path.to.tasks import app, first_task

result_1 = first_task.apply_async()
result_2_id = result_1.get()
result_2 = app.GroupResult.restore(result_2_id)
resultados = result_2.get()

there are other ways to do this that involve more work - you could use a chord to gather the results of the group.

还有其他方法可以做更多工作 - 您可以使用和弦来收集组的结果。

#2


1  

The problem is not that celery doesn't allow the execution of async tasks in your example, but that you'll run into a deadlock, hence the warning:

问题不在于celery不允许在您的示例中执行异步任务,而是您将遇到死锁,因此警告:

Let's assume you have a task A that spawns a number of subtasks B through apply_async(). Every one of those tasks is executed by a worker. The problem is that if the number of tasks B is larger than the amount of available workers, task A is still waiting for their results (in your example, at least - it's not by default). When task A is still running, the workers that have executed a task B will not execute another one, they are blocked until task A is finished. (I don't know exactly why, but I had this problem just a few weeks ago.)

假设您有一个任务A,它通过apply_async()生成许多子任务B.这些任务中的每一个都由工人执行。问题是,如果任务B的数量大于可用工作量,则任务A仍在等待其结果(在您的示例中,至少 - 默认情况下不是这样)。当任务A仍在运行时,执行任务B的工作人员将不执行另一个任务,他们将被阻止,直到任务A完成。 (我不确切知道为什么,但几周前我遇到了这个问题。)

This means that celery can't execute anything until you manually shut down the workers.

这意味着在您手动关闭工人之前,芹菜不能执行任何操作。

Solutions

This depends entirely what you will do with your task results. If you need them to execute the next subtask, you can chain them through Linking with callbacks or by hardcoding it into the respective tasks (so that you call the first, that calls the second, and so on).

这完全取决于您对任务结果的处理方式。如果您需要它们来执行下一个子任务,您可以通过链接回调或将其硬编码到相应的任务中来链接它们(这样您就可以调用第一个,调用第二个,依此类推)。

If you only need to see if they are executed and are successful or not, you can use flower to monitor your tasks.

如果您只需要查看它们是否已执行且是否成功,您可以使用花来监控您的任务。

If you need to process the output of all the subtasks further, I recommend writing the results to an xml file: Have task A call all tasks B, and once they are done you execute task C that processes the results. Maybe there are more elegant solutions, but this avoids the deadlock for sure.

如果需要进一步处理所有子任务的输出,我建议将结果写入xml文件:让任务A调用所有任务B,完成任务后执行处理结果的任务C.也许有更优雅的解决方案,但这确实避免了僵局。

#1


1  

one way to deal with this is to have a 2 stage pipeline:

解决这个问题的一种方法是建立一个2阶段管道:

def first_task():
    sa_list = []
    for cromossomo in self._populacao:
        sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

    job = group(sa_list)

    result = job.apply_async()
    result.save()
    return result.id

then call it like this:

然后像这样称呼它:

from path.to.tasks import app, first_task

result_1 = first_task.apply_async()
result_2_id = result_1.get()
result_2 = app.GroupResult.restore(result_2_id)
resultados = result_2.get()

there are other ways to do this that involve more work - you could use a chord to gather the results of the group.

还有其他方法可以做更多工作 - 您可以使用和弦来收集组的结果。

#2


1  

The problem is not that celery doesn't allow the execution of async tasks in your example, but that you'll run into a deadlock, hence the warning:

问题不在于celery不允许在您的示例中执行异步任务,而是您将遇到死锁,因此警告:

Let's assume you have a task A that spawns a number of subtasks B through apply_async(). Every one of those tasks is executed by a worker. The problem is that if the number of tasks B is larger than the amount of available workers, task A is still waiting for their results (in your example, at least - it's not by default). When task A is still running, the workers that have executed a task B will not execute another one, they are blocked until task A is finished. (I don't know exactly why, but I had this problem just a few weeks ago.)

假设您有一个任务A,它通过apply_async()生成许多子任务B.这些任务中的每一个都由工人执行。问题是,如果任务B的数量大于可用工作量,则任务A仍在等待其结果(在您的示例中,至少 - 默认情况下不是这样)。当任务A仍在运行时,执行任务B的工作人员将不执行另一个任务,他们将被阻止,直到任务A完成。 (我不确切知道为什么,但几周前我遇到了这个问题。)

This means that celery can't execute anything until you manually shut down the workers.

这意味着在您手动关闭工人之前,芹菜不能执行任何操作。

Solutions

This depends entirely what you will do with your task results. If you need them to execute the next subtask, you can chain them through Linking with callbacks or by hardcoding it into the respective tasks (so that you call the first, that calls the second, and so on).

这完全取决于您对任务结果的处理方式。如果您需要它们来执行下一个子任务,您可以通过链接回调或将其硬编码到相应的任务中来链接它们(这样您就可以调用第一个,调用第二个,依此类推)。

If you only need to see if they are executed and are successful or not, you can use flower to monitor your tasks.

如果您只需要查看它们是否已执行且是否成功,您可以使用花来监控您的任务。

If you need to process the output of all the subtasks further, I recommend writing the results to an xml file: Have task A call all tasks B, and once they are done you execute task C that processes the results. Maybe there are more elegant solutions, but this avoids the deadlock for sure.

如果需要进一步处理所有子任务的输出,我建议将结果写入xml文件:让任务A调用所有任务B,完成任务后执行处理结果的任务C.也许有更优雅的解决方案,但这确实避免了僵局。