I have a setup with multiple EC2's and I have celery
running on all of them. I also have one box with celerybeat
running on it. I'm able to get celerybeat
to run with tasks running on the remaining celery
clients.
我有一个多个EC2的设置,我有芹菜在所有这些上运行。我还有一个装有celerybeat的盒子。我能够让celerybeat在剩下的芹菜客户端上运行任务。
Is there a way to make a required task that all celery
instances have to run? The use case would be clearing logs, running basic sanity checks on the boxes, etc.
有没有办法让所有celery实例都必须运行所需的任务?用例是清除日志,在盒子上运行基本的健全性检查等。
I have read the below:
我看过以下内容:
http://docs.celeryproject.org/en/latest/userguide/workers.html
2 个解决方案
#1
0
Actually celery works not as a push-like system, but as a pull-like one.
实际上,芹菜不像推式系统那样工作,而是像拉式一样。
You just put a task into the queue and one of available workers get it and execute.
您只需将一个任务放入队列,其中一个可用的工作人员就可以获得并执行。
From your question I assume that under instances
you mean servers
on whose celery's workers
are running. So you would like to run the same task on all servers.
根据您的问题,我假设在实例中,您指的是芹菜工人正在运行的服务器。所以你想在所有服务器上运行相同的任务。
I think you can only put some tasks (corresponding to servers number) and specify exact routing number for each task (the same as worker's id).
我认为您只能放置一些任务(对应于服务器编号)并为每个任务指定确切的路由编号(与工作人员的ID相同)。
mytask.apply_async(kwargs={'a': 1, 'b': 2}, routing_key='aaabbc-dddeeff-243453')
mytask.apply_async(kwargs={'a': 1, 'b': 2}, routing_key='bbbbbb-fffddd-dabcfe')
...
#2
0
Broadcast
Celery can also support broadcast routing. Here is an example exchange broadcast_tasks that delivers copies of tasks to all workers connected to it:
芹菜也可以支持广播路由。以下是交换broadcast_tasks的示例,它将任务副本提供给与其连接的所有工作人员:
from kombu.common import Broadcast
来自kombu.common进口广播
CELERY_QUEUES = (Broadcast('broadcast_tasks'), )
CELERY_QUEUES =(广播('broadcast_tasks'),)
CELERY_ROUTES = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}} Now the tasks.reload_cache task will be sent to every worker consuming from this queue.
CELERY_ROUTES = {'tasks.reload_cache':{'queue':'broadcast_tasks'}}现在,tasks.reload_cache任务将被发送给从该队列消耗的每个工作人员。
#1
0
Actually celery works not as a push-like system, but as a pull-like one.
实际上,芹菜不像推式系统那样工作,而是像拉式一样。
You just put a task into the queue and one of available workers get it and execute.
您只需将一个任务放入队列,其中一个可用的工作人员就可以获得并执行。
From your question I assume that under instances
you mean servers
on whose celery's workers
are running. So you would like to run the same task on all servers.
根据您的问题,我假设在实例中,您指的是芹菜工人正在运行的服务器。所以你想在所有服务器上运行相同的任务。
I think you can only put some tasks (corresponding to servers number) and specify exact routing number for each task (the same as worker's id).
我认为您只能放置一些任务(对应于服务器编号)并为每个任务指定确切的路由编号(与工作人员的ID相同)。
mytask.apply_async(kwargs={'a': 1, 'b': 2}, routing_key='aaabbc-dddeeff-243453')
mytask.apply_async(kwargs={'a': 1, 'b': 2}, routing_key='bbbbbb-fffddd-dabcfe')
...
#2
0
Broadcast
Celery can also support broadcast routing. Here is an example exchange broadcast_tasks that delivers copies of tasks to all workers connected to it:
芹菜也可以支持广播路由。以下是交换broadcast_tasks的示例,它将任务副本提供给与其连接的所有工作人员:
from kombu.common import Broadcast
来自kombu.common进口广播
CELERY_QUEUES = (Broadcast('broadcast_tasks'), )
CELERY_QUEUES =(广播('broadcast_tasks'),)
CELERY_ROUTES = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}} Now the tasks.reload_cache task will be sent to every worker consuming from this queue.
CELERY_ROUTES = {'tasks.reload_cache':{'queue':'broadcast_tasks'}}现在,tasks.reload_cache任务将被发送给从该队列消耗的每个工作人员。