芹菜迫使所有芹菜进程运行任务

时间:2022-05-01 19:17:56

I have a setup with multiple EC2's and I have celery running on all of them. I also have one box with celerybeat running on it. I'm able to get celerybeat to run with tasks running on the remaining celery clients.

我有一个多个EC2的设置,我有芹菜在所有这些上运行。我还有一个装有celerybeat的盒子。我能够让celerybeat在剩下的芹菜客户端上运行任务。

Is there a way to make a required task that all celery instances have to run? The use case would be clearing logs, running basic sanity checks on the boxes, etc.

有没有办法让所有celery实例都必须运行所需的任务?用例是清除日志,在盒子上运行基本的健全性检查等。

I have read the below:

我看过以下内容:

http://docs.celeryproject.org/en/latest/userguide/workers.html

2 个解决方案

#1


0  

Actually celery works not as a push-like system, but as a pull-like one.

实际上,芹菜不像推式系统那样工作,而是像拉式一样。

You just put a task into the queue and one of available workers get it and execute.

您只需将一个任务放入队列,其中一个可用的工作人员就可以获得并执行。

From your question I assume that under instances you mean servers on whose celery's workers are running. So you would like to run the same task on all servers.

根据您的问题,我假设在实例中,您指的是芹菜工人正在运行的服务器。所以你想在所有服务器上运行相同的任务。

I think you can only put some tasks (corresponding to servers number) and specify exact routing number for each task (the same as worker's id).

我认为您只能放置一些任务(对应于服务器编号)并为每个任务指定确切的路由编号(与工作人员的ID相同)。

mytask.apply_async(kwargs={'a': 1, 'b': 2}, routing_key='aaabbc-dddeeff-243453')
mytask.apply_async(kwargs={'a': 1, 'b': 2}, routing_key='bbbbbb-fffddd-dabcfe')
...

#2


0  

Broadcast

Celery can also support broadcast routing. Here is an example exchange broadcast_tasks that delivers copies of tasks to all workers connected to it:

芹菜也可以支持广播路由。以下是交换broadcast_tasks的示例,它将任务副本提供给与其连接的所有工作人员:

from kombu.common import Broadcast

来自kombu.common进口广播

CELERY_QUEUES = (Broadcast('broadcast_tasks'), )

CELERY_QUEUES =(广播('broadcast_tasks'),)

CELERY_ROUTES = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}} Now the tasks.reload_cache task will be sent to every worker consuming from this queue.

CELERY_ROUTES = {'tasks.reload_cache':{'queue':'broadcast_tasks'}}现在,tasks.reload_cache任务将被发送给从该队列消耗的每个工作人员。

#1


0  

Actually celery works not as a push-like system, but as a pull-like one.

实际上,芹菜不像推式系统那样工作,而是像拉式一样。

You just put a task into the queue and one of available workers get it and execute.

您只需将一个任务放入队列,其中一个可用的工作人员就可以获得并执行。

From your question I assume that under instances you mean servers on whose celery's workers are running. So you would like to run the same task on all servers.

根据您的问题,我假设在实例中,您指的是芹菜工人正在运行的服务器。所以你想在所有服务器上运行相同的任务。

I think you can only put some tasks (corresponding to servers number) and specify exact routing number for each task (the same as worker's id).

我认为您只能放置一些任务(对应于服务器编号)并为每个任务指定确切的路由编号(与工作人员的ID相同)。

mytask.apply_async(kwargs={'a': 1, 'b': 2}, routing_key='aaabbc-dddeeff-243453')
mytask.apply_async(kwargs={'a': 1, 'b': 2}, routing_key='bbbbbb-fffddd-dabcfe')
...

#2


0  

Broadcast

Celery can also support broadcast routing. Here is an example exchange broadcast_tasks that delivers copies of tasks to all workers connected to it:

芹菜也可以支持广播路由。以下是交换broadcast_tasks的示例,它将任务副本提供给与其连接的所有工作人员:

from kombu.common import Broadcast

来自kombu.common进口广播

CELERY_QUEUES = (Broadcast('broadcast_tasks'), )

CELERY_QUEUES =(广播('broadcast_tasks'),)

CELERY_ROUTES = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}} Now the tasks.reload_cache task will be sent to every worker consuming from this queue.

CELERY_ROUTES = {'tasks.reload_cache':{'queue':'broadcast_tasks'}}现在,tasks.reload_cache任务将被发送给从该队列消耗的每个工作人员。