I have a CPU intensive Celery task. I would like to use all the processing power (cores) across lots of EC2 instances to get this job done faster (a celery parallel distributed task with multiprocessing - I think).
我有一个CPU密集型的芹菜任务。我希望使用跨许多EC2实例的所有处理能力(核心)来更快地完成这项工作(我认为是具有多处理的芹菜并行分布式任务)。
The terms, threading, multiprocessing, distributed computing, distributed parallel processing are all terms I'm trying to understand better.
术语、线程、多处理、分布式计算、分布式并行处理都是我想更好地理解的术语。
Example task:
示例任务:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
Using the code above (with an example if possible) how one would ago about distributed this task using Celery by allowing this one task to be split up utilising all the computing CPU power across all available machine in the cloud?
使用上面的代码(如果可能的话,可以举个例子),如何在使用芹菜分配这个任务之前,让这个任务在云中的所有可用机器上使用所有的CPU计算能力来进行分割?
4 个解决方案
#1
94
Your goals are:
你的目标是:
- Distribute your work to many machines (distributed computing/distributed parallel processing)
- 将您的工作分配给许多机器(分布式计算/分布式并行处理)
- Distribute the work on a given machine across all CPUs (multiprocessing/threading)
- 在给定的机器上跨所有cpu分配工作(多处理/线程)
Celery can do both of these for you fairly easily. The first thing to understand is that each celery worker is configured by default to run as many tasks as there are CPU cores available on a system:
芹菜很容易做到这两点。首先要理解的是,每个芹菜工作人员默认配置为运行系统中可用的CPU内核数量一样多的任务:
Concurrency is the number of prefork worker process used to process your tasks concurrently, when all of these are busy doing work new tasks will have to wait for one of the tasks to finish before it can be processed.
并发是用于同时处理您的任务的prefork worker进程的数量,当所有这些任务都忙于工作时,新任务将不得不等待其中一个任务完成,然后才能处理。
The default concurrency number is the number of CPU’s on that machine (including cores), you can specify a custom number using -c option. There is no recommended value, as the optimal number depends on a number of factors, but if your tasks are mostly I/O-bound then you can try to increase it, experimentation has shown that adding more than twice the number of CPU’s is rarely effective, and likely to degrade performance instead.
默认的并发数是该机器上CPU的数量(包括内核),您可以使用-c选项指定一个定制的数量。不存在推荐值,因为最优值取决于许多因素,但是如果您的任务大部分是I/ o绑定的,那么您可以尝试增加它,实验表明添加超过CPU数量两倍的值很少有效,而且很可能降低性能。
This means each individual task doesn't need to worry about using multiprocessing/threading to make use of multiple CPUs/cores. Instead, celery will run enough tasks concurrently to use each available CPU.
这意味着每个单独的任务不需要担心使用多处理/线程来使用多个cpu /内核。芹菜可以同时运行足够的任务来使用每个可用的CPU。
With that out of the way, the next step is to create a task that handles processing some subset of your list_of_millions_of_ids
. You have a couple of options here - one is to have each task handle a single ID, so you run N tasks, where N == len(list_of_millions_of_ids)
. This will guarantee that work is evenly distributed amongst all your tasks, since there will never be a case where one worker finishes early and is just waiting around; if it needs work, it can pull an id off the queue. You can do this (as mentioned by John Doe) using the a celery group
.
这样,下一步就是创建一个任务来处理您的list_of_million - s_of_ids的某些子集。这里有几个选项—一个是让每个任务处理一个ID,因此运行N个任务,其中N = len(list_of_millions - of_ids)。这将保证工作在您的所有任务中都是平均分配的,因为从来不会有一个工人提前完成工作并只是等待;如果它需要工作,它可以从队列中拉出一个id。你可以用芹菜组来做(正如John Doe提到的)。
tasks.py:
tasks.py:
@app.task
def process_id(item):
id = item #long complicated equation here
database.objects(newid=id).save()
And to execute the tasks:
并执行任务:
from celery import group
from tasks import process_id
jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()
Another option is to break the list into smaller pieces, and distribute the pieces to your workers. This approach runs the risk of wasting some cycles, because you may end up with some workers waiting around while others are still doing work. However, the celery documentation notes that this concern is often unfounded:
另一种选择是将列表分成更小的部分,并将这些部分分发给您的员工。这种方法存在浪费一些周期的风险,因为您可能会看到一些工人在等待,而另一些人仍在工作。然而,芹菜文件指出,这种关切往往是毫无根据的:
Some may worry that chunking your tasks results in a degradation of parallelism, but this is rarely true for a busy cluster and in practice since you are avoiding the overhead of messaging it may considerably increase performance.
有些人可能会担心将任务分块会导致并行性的降低,但对于繁忙的集群来说,这种情况很少出现,实际上,由于您避免了消息传递的开销,因此它可能会显著提高性能。
So, you may find that chunking the list and distributing the chunks to each task performs better, because of the reduced messaging overhead. You can probably also lighten the load on the database a bit this way, by calculating each id, storing it in a list, and then adding the whole list into the DB once you're done, rather than doing it one id at a time. The chunking approach would look something like this
因此,您可能会发现对列表进行分块并将块分发给每个任务执行得更好,因为消息传递开销减少了。通过计算每个id,将其存储在一个列表中,然后在完成之后将整个列表添加到DB中,而不是一次只执行一个id,您可能还可以通过这种方式减轻数据库的负载。分块方法看起来是这样的
tasks.py:
tasks.py:
@app.task
def process_ids(items):
for item in items:
id = item #long complicated equation here
database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
And to start the tasks:
并开始任务:
from tasks import process_ids
jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()
You can experiment a bit with what chunking size gives you the best result. You want to find a sweet spot where you're cutting down messaging overhead while also keeping the size small enough that you don't end up with workers finishing their chunk much faster than another worker, and then just waiting around with nothing to do.
你可以尝试一下什么块大小能给你最好的结果。你想要找到一个可以减少信息开销的好地方,同时也要保持足够小的数量,这样你就不会以比其他员工更快的速度完成工作,然后无所事事地等待。
#2
10
In the world of distribution there is only one thing you should remember above all :
在分配的世界里,只有一件事你应该记住:
Premature optimization is the root of all evil. By D. Knuth
过早的优化是万恶之源。由d Knuth
I know it sounds evident but before distributing double check you are using the best algorithm (if it exists...). Having said that, optimizing distribution is a balancing act between 3 things:
我知道这听起来很明显,但在发布复查之前,你正在使用最好的算法(如果存在的话…)。话虽如此,优化分布是三件事之间的平衡:
- Writing/Reading data from a persistent medium,
- 从持久介质中写入/读取数据,
- Moving data from medium A to medium B,
- 移动数据从A到中B,
- Processing data,
- 处理数据,
Computers are made so the closer you get to your processing unit (3) the faster and more efficient (1) and (2) will be. The order in a classic cluster will be : network hard drive, local hard drive, RAM, inside processing unit territory... Nowadays processors are becoming sophisticated enough to be considered as an ensemble of independent hardware processing units commonly called cores, these cores process data (3) through threads (2). Imagine your core is so fast that when you send data with one thread you are using 50% of the computer power, if the core has 2 threads you will then use 100%. Two threads per core is called hyper threading, and your OS will see 2 CPUs per hyper threaded core.
电脑的制造使得你离你的处理单元(3)越近,(1)和(2)的速度就越快,效率也就越高。经典集群的顺序是:网络硬盘驱动器、本地硬盘驱动器、RAM、处理单元区域内……现在处理器正变得足够成熟,被视为一个独立的硬件处理单元通常称为核心,这些核心流程数据(2)(3)通过线程。想象你的核心是如此之快,当你与一个线程发送数据使用的是50%的计算机能力,如果你会用100%的核心有两个线程。每个内核有两个线程称为hyper threading,您的操作系统将看到每个hyper thread内核有两个cpu。
Managing threads in a processor is commonly called multi-threading. Managing CPUs from the OS is commonly called multi-processing. Managing concurrent tasks in a cluster is commonly called parallel programming. Managing dependent tasks in a cluster is commonly called distributed programming.
在处理器中管理线程通常称为多线程。从操作系统管理cpu通常被称为多处理。在集群中管理并发任务通常称为并行编程。管理集群中的相关任务通常称为分布式编程。
So where is your bottleneck ?
那么瓶颈在哪里呢?
- In (1): Try to persist and stream from the upper level (the one closer to your processing unit, for example if network hard drive is slow first save in local hard drive)
- 在(1)中:尝试从上层(靠近处理单元的那个,例如如果网络硬盘驱动器速度慢,首先在本地硬盘驱动器中保存)
- In (2): This is the most common one, try to avoid communication packets not needed for the distribution or compress "on the fly" packets (for example if the HD is slow, save only a "batch computed" message and keep the intermediary results in RAM).
- 在(2)中:这是最常见的一种,尽量避免不需要发送或压缩“on the fly”数据包的通信数据包(例如,如果HD是慢的,只保存一个“批量计算的”消息,并保持中间结果在RAM中)。
- In (3): You are done! You are using all the processing power at your disposal.
- In(3):你完蛋了!您正在使用您可以使用的所有处理能力。
What about Celery ?
芹菜呢?
Celery is a messaging framework for distributed programming, that will use a broker module for communication (2) and a backend module for persistence (1), this means that you will be able by changing the configuration to avoid most bottlenecks (if possible) on your network and only on your network. First profile your code to achieve the best performance in a single computer. Then use celery in your cluster with the default configuration and set CELERY_RESULT_PERSISTENT=True
:
芹菜是一个用于分布式编程的消息传递框架,它将使用一个用于通信(2)的代理模块和一个用于持久性(1)的后端模块,这意味着您可以通过更改配置来避免网络上的大多数瓶颈(如果可能的话),并且只在您的网络上。首先剖析您的代码,以在单个计算机中实现最佳性能。然后在集群中使用默认配置的芹菜并设置CELERY_RESULT_PERSISTENT=True:
from celery import Celery
app = Celery('tasks',
broker='amqp://guest@localhost//',
backend='redis://localhost')
@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
#code that does stuff
return result
During execution open your favorite monitoring tools, I use the default for rabbitMQ and flower for celery and top for cpus, your results will be saved in your backend. An example of network bottleneck is tasks queue growing so much that they delay execution, you can proceed to change modules or celery configuration, if not your bottleneck is somewhere else.
在执行期间,打开您最喜欢的监视工具,我使用rabbitMQ和flower作为芹菜,top作为cpu,您的结果将保存在后端。网络瓶颈的一个例子是任务队列增长得如此之快以至于延迟执行,您可以继续更改模块或芹菜配置,如果您的瓶颈不在其他地方的话。
#3
9
Why not use group
celery task for this?
为什么不使用组芹菜的任务呢?
http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups
http://celery.readthedocs.org/en/latest/userguide/canvas.html组
Basically, you should divide ids
into chunks (or ranges) and give them to a bunch of tasks in group
.
基本上,您应该将id划分为块(或范围),并将它们分配给组中的一组任务。
For smth more sophisticated, like aggregating results of particular celery tasks, I have successfully used chord
task for similar purpose:
对于更复杂的smth,例如聚合特定芹菜任务的结果,我已经成功地将chord task用于类似的目的:
http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords
http://celery.readthedocs.org/en/latest/userguide/canvas.html和弦
Increase settings.CELERYD_CONCURRENCY
to a number that is reasonable and you can afford, then those celery workers will keep executing your tasks in a group or a chord until done.
增加设置。CELERYD_CONCURRENCY并发性对一个合理的数字,您可以负担得起,然后这些芹菜工作人员将继续在一个组或一个和弦中执行您的任务,直到完成为止。
Note: due to a bug in kombu
there were trouble with reusing workers for high number of tasks in the past, I don't know if it's fixed now. Maybe it is, but if not, reduce CELERYD_MAX_TASKS_PER_CHILD.
注意:由于kombu的一个bug,过去在大量的任务中重复使用工人有问题,我不知道现在是否已经修复。也许是这样,但如果不是这样,就减少CELERYD_MAX_TASKS_PER_CHILD。
Example based on simplified and modified code I run:
基于简化和修改代码的示例:
@app.task
def do_matches():
match_data = ...
result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())
summarize
gets results of all single_batch_processor
tasks. Every task runs on any Celery worker, kombu
coordinates that.
汇总所有single_batch_processor任务的结果。每一项任务都在任何一个芹菜工人身上运行,kombu负责协调。
Now I get it: single_batch_processor
and summarize
ALSO have to be celery tasks, not regular functions - otherwise of course it will not be parallelized (I'm not even sure chord constructor will accept it if it's not a celery task).
现在我明白了:single_batch_processor和summary也必须是芹菜任务,而不是常规函数——否则它当然不会被并行化(如果不是芹菜任务,我甚至不确定chord构造函数会接受它)。
#4
2
Adding more celery workers will certainly speed up executing the task. You might have another bottleneck though: the database. Make sure it can handle the simultaneous inserts/updates.
增加更多的芹菜工人肯定会加快执行任务。不过,您可能还有另一个瓶颈:数据库。确保它能够同时处理插入/更新。
Regarding your question: You are adding celery workers by assigning another process on your EC2 instances as celeryd
. Depending on how many workers you need you might want to add even more instances.
关于您的问题:通过在EC2实例上分配另一个进程celeryd,您正在添加芹菜worker。根据您需要多少工作人员,您可能需要添加更多的实例。
#1
94
Your goals are:
你的目标是:
- Distribute your work to many machines (distributed computing/distributed parallel processing)
- 将您的工作分配给许多机器(分布式计算/分布式并行处理)
- Distribute the work on a given machine across all CPUs (multiprocessing/threading)
- 在给定的机器上跨所有cpu分配工作(多处理/线程)
Celery can do both of these for you fairly easily. The first thing to understand is that each celery worker is configured by default to run as many tasks as there are CPU cores available on a system:
芹菜很容易做到这两点。首先要理解的是,每个芹菜工作人员默认配置为运行系统中可用的CPU内核数量一样多的任务:
Concurrency is the number of prefork worker process used to process your tasks concurrently, when all of these are busy doing work new tasks will have to wait for one of the tasks to finish before it can be processed.
并发是用于同时处理您的任务的prefork worker进程的数量,当所有这些任务都忙于工作时,新任务将不得不等待其中一个任务完成,然后才能处理。
The default concurrency number is the number of CPU’s on that machine (including cores), you can specify a custom number using -c option. There is no recommended value, as the optimal number depends on a number of factors, but if your tasks are mostly I/O-bound then you can try to increase it, experimentation has shown that adding more than twice the number of CPU’s is rarely effective, and likely to degrade performance instead.
默认的并发数是该机器上CPU的数量(包括内核),您可以使用-c选项指定一个定制的数量。不存在推荐值,因为最优值取决于许多因素,但是如果您的任务大部分是I/ o绑定的,那么您可以尝试增加它,实验表明添加超过CPU数量两倍的值很少有效,而且很可能降低性能。
This means each individual task doesn't need to worry about using multiprocessing/threading to make use of multiple CPUs/cores. Instead, celery will run enough tasks concurrently to use each available CPU.
这意味着每个单独的任务不需要担心使用多处理/线程来使用多个cpu /内核。芹菜可以同时运行足够的任务来使用每个可用的CPU。
With that out of the way, the next step is to create a task that handles processing some subset of your list_of_millions_of_ids
. You have a couple of options here - one is to have each task handle a single ID, so you run N tasks, where N == len(list_of_millions_of_ids)
. This will guarantee that work is evenly distributed amongst all your tasks, since there will never be a case where one worker finishes early and is just waiting around; if it needs work, it can pull an id off the queue. You can do this (as mentioned by John Doe) using the a celery group
.
这样,下一步就是创建一个任务来处理您的list_of_million - s_of_ids的某些子集。这里有几个选项—一个是让每个任务处理一个ID,因此运行N个任务,其中N = len(list_of_millions - of_ids)。这将保证工作在您的所有任务中都是平均分配的,因为从来不会有一个工人提前完成工作并只是等待;如果它需要工作,它可以从队列中拉出一个id。你可以用芹菜组来做(正如John Doe提到的)。
tasks.py:
tasks.py:
@app.task
def process_id(item):
id = item #long complicated equation here
database.objects(newid=id).save()
And to execute the tasks:
并执行任务:
from celery import group
from tasks import process_id
jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()
Another option is to break the list into smaller pieces, and distribute the pieces to your workers. This approach runs the risk of wasting some cycles, because you may end up with some workers waiting around while others are still doing work. However, the celery documentation notes that this concern is often unfounded:
另一种选择是将列表分成更小的部分,并将这些部分分发给您的员工。这种方法存在浪费一些周期的风险,因为您可能会看到一些工人在等待,而另一些人仍在工作。然而,芹菜文件指出,这种关切往往是毫无根据的:
Some may worry that chunking your tasks results in a degradation of parallelism, but this is rarely true for a busy cluster and in practice since you are avoiding the overhead of messaging it may considerably increase performance.
有些人可能会担心将任务分块会导致并行性的降低,但对于繁忙的集群来说,这种情况很少出现,实际上,由于您避免了消息传递的开销,因此它可能会显著提高性能。
So, you may find that chunking the list and distributing the chunks to each task performs better, because of the reduced messaging overhead. You can probably also lighten the load on the database a bit this way, by calculating each id, storing it in a list, and then adding the whole list into the DB once you're done, rather than doing it one id at a time. The chunking approach would look something like this
因此,您可能会发现对列表进行分块并将块分发给每个任务执行得更好,因为消息传递开销减少了。通过计算每个id,将其存储在一个列表中,然后在完成之后将整个列表添加到DB中,而不是一次只执行一个id,您可能还可以通过这种方式减轻数据库的负载。分块方法看起来是这样的
tasks.py:
tasks.py:
@app.task
def process_ids(items):
for item in items:
id = item #long complicated equation here
database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
And to start the tasks:
并开始任务:
from tasks import process_ids
jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()
You can experiment a bit with what chunking size gives you the best result. You want to find a sweet spot where you're cutting down messaging overhead while also keeping the size small enough that you don't end up with workers finishing their chunk much faster than another worker, and then just waiting around with nothing to do.
你可以尝试一下什么块大小能给你最好的结果。你想要找到一个可以减少信息开销的好地方,同时也要保持足够小的数量,这样你就不会以比其他员工更快的速度完成工作,然后无所事事地等待。
#2
10
In the world of distribution there is only one thing you should remember above all :
在分配的世界里,只有一件事你应该记住:
Premature optimization is the root of all evil. By D. Knuth
过早的优化是万恶之源。由d Knuth
I know it sounds evident but before distributing double check you are using the best algorithm (if it exists...). Having said that, optimizing distribution is a balancing act between 3 things:
我知道这听起来很明显,但在发布复查之前,你正在使用最好的算法(如果存在的话…)。话虽如此,优化分布是三件事之间的平衡:
- Writing/Reading data from a persistent medium,
- 从持久介质中写入/读取数据,
- Moving data from medium A to medium B,
- 移动数据从A到中B,
- Processing data,
- 处理数据,
Computers are made so the closer you get to your processing unit (3) the faster and more efficient (1) and (2) will be. The order in a classic cluster will be : network hard drive, local hard drive, RAM, inside processing unit territory... Nowadays processors are becoming sophisticated enough to be considered as an ensemble of independent hardware processing units commonly called cores, these cores process data (3) through threads (2). Imagine your core is so fast that when you send data with one thread you are using 50% of the computer power, if the core has 2 threads you will then use 100%. Two threads per core is called hyper threading, and your OS will see 2 CPUs per hyper threaded core.
电脑的制造使得你离你的处理单元(3)越近,(1)和(2)的速度就越快,效率也就越高。经典集群的顺序是:网络硬盘驱动器、本地硬盘驱动器、RAM、处理单元区域内……现在处理器正变得足够成熟,被视为一个独立的硬件处理单元通常称为核心,这些核心流程数据(2)(3)通过线程。想象你的核心是如此之快,当你与一个线程发送数据使用的是50%的计算机能力,如果你会用100%的核心有两个线程。每个内核有两个线程称为hyper threading,您的操作系统将看到每个hyper thread内核有两个cpu。
Managing threads in a processor is commonly called multi-threading. Managing CPUs from the OS is commonly called multi-processing. Managing concurrent tasks in a cluster is commonly called parallel programming. Managing dependent tasks in a cluster is commonly called distributed programming.
在处理器中管理线程通常称为多线程。从操作系统管理cpu通常被称为多处理。在集群中管理并发任务通常称为并行编程。管理集群中的相关任务通常称为分布式编程。
So where is your bottleneck ?
那么瓶颈在哪里呢?
- In (1): Try to persist and stream from the upper level (the one closer to your processing unit, for example if network hard drive is slow first save in local hard drive)
- 在(1)中:尝试从上层(靠近处理单元的那个,例如如果网络硬盘驱动器速度慢,首先在本地硬盘驱动器中保存)
- In (2): This is the most common one, try to avoid communication packets not needed for the distribution or compress "on the fly" packets (for example if the HD is slow, save only a "batch computed" message and keep the intermediary results in RAM).
- 在(2)中:这是最常见的一种,尽量避免不需要发送或压缩“on the fly”数据包的通信数据包(例如,如果HD是慢的,只保存一个“批量计算的”消息,并保持中间结果在RAM中)。
- In (3): You are done! You are using all the processing power at your disposal.
- In(3):你完蛋了!您正在使用您可以使用的所有处理能力。
What about Celery ?
芹菜呢?
Celery is a messaging framework for distributed programming, that will use a broker module for communication (2) and a backend module for persistence (1), this means that you will be able by changing the configuration to avoid most bottlenecks (if possible) on your network and only on your network. First profile your code to achieve the best performance in a single computer. Then use celery in your cluster with the default configuration and set CELERY_RESULT_PERSISTENT=True
:
芹菜是一个用于分布式编程的消息传递框架,它将使用一个用于通信(2)的代理模块和一个用于持久性(1)的后端模块,这意味着您可以通过更改配置来避免网络上的大多数瓶颈(如果可能的话),并且只在您的网络上。首先剖析您的代码,以在单个计算机中实现最佳性能。然后在集群中使用默认配置的芹菜并设置CELERY_RESULT_PERSISTENT=True:
from celery import Celery
app = Celery('tasks',
broker='amqp://guest@localhost//',
backend='redis://localhost')
@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
#code that does stuff
return result
During execution open your favorite monitoring tools, I use the default for rabbitMQ and flower for celery and top for cpus, your results will be saved in your backend. An example of network bottleneck is tasks queue growing so much that they delay execution, you can proceed to change modules or celery configuration, if not your bottleneck is somewhere else.
在执行期间,打开您最喜欢的监视工具,我使用rabbitMQ和flower作为芹菜,top作为cpu,您的结果将保存在后端。网络瓶颈的一个例子是任务队列增长得如此之快以至于延迟执行,您可以继续更改模块或芹菜配置,如果您的瓶颈不在其他地方的话。
#3
9
Why not use group
celery task for this?
为什么不使用组芹菜的任务呢?
http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups
http://celery.readthedocs.org/en/latest/userguide/canvas.html组
Basically, you should divide ids
into chunks (or ranges) and give them to a bunch of tasks in group
.
基本上,您应该将id划分为块(或范围),并将它们分配给组中的一组任务。
For smth more sophisticated, like aggregating results of particular celery tasks, I have successfully used chord
task for similar purpose:
对于更复杂的smth,例如聚合特定芹菜任务的结果,我已经成功地将chord task用于类似的目的:
http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords
http://celery.readthedocs.org/en/latest/userguide/canvas.html和弦
Increase settings.CELERYD_CONCURRENCY
to a number that is reasonable and you can afford, then those celery workers will keep executing your tasks in a group or a chord until done.
增加设置。CELERYD_CONCURRENCY并发性对一个合理的数字,您可以负担得起,然后这些芹菜工作人员将继续在一个组或一个和弦中执行您的任务,直到完成为止。
Note: due to a bug in kombu
there were trouble with reusing workers for high number of tasks in the past, I don't know if it's fixed now. Maybe it is, but if not, reduce CELERYD_MAX_TASKS_PER_CHILD.
注意:由于kombu的一个bug,过去在大量的任务中重复使用工人有问题,我不知道现在是否已经修复。也许是这样,但如果不是这样,就减少CELERYD_MAX_TASKS_PER_CHILD。
Example based on simplified and modified code I run:
基于简化和修改代码的示例:
@app.task
def do_matches():
match_data = ...
result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())
summarize
gets results of all single_batch_processor
tasks. Every task runs on any Celery worker, kombu
coordinates that.
汇总所有single_batch_processor任务的结果。每一项任务都在任何一个芹菜工人身上运行,kombu负责协调。
Now I get it: single_batch_processor
and summarize
ALSO have to be celery tasks, not regular functions - otherwise of course it will not be parallelized (I'm not even sure chord constructor will accept it if it's not a celery task).
现在我明白了:single_batch_processor和summary也必须是芹菜任务,而不是常规函数——否则它当然不会被并行化(如果不是芹菜任务,我甚至不确定chord构造函数会接受它)。
#4
2
Adding more celery workers will certainly speed up executing the task. You might have another bottleneck though: the database. Make sure it can handle the simultaneous inserts/updates.
增加更多的芹菜工人肯定会加快执行任务。不过,您可能还有另一个瓶颈:数据库。确保它能够同时处理插入/更新。
Regarding your question: You are adding celery workers by assigning another process on your EC2 instances as celeryd
. Depending on how many workers you need you might want to add even more instances.
关于您的问题:通过在EC2实例上分配另一个进程celeryd,您正在添加芹菜worker。根据您需要多少工作人员,您可能需要添加更多的实例。