Some context: I'm building a Django App that allows a user to pre-save an action, and schedule the exact date/time in the future they want said action to execute. E.g, scheduling a post to be programmatically pushed to ones Facebook wall next week at 5:30am.
一些上下文:我正在构建一个Django应用程序,允许用户预先保存一个动作,并安排他们希望所述动作执行的确切日期/时间。例如,将一个帖子安排在下周早上5:30以编程方式推送到Facebook墙上。
I'm looking for a task scheduling system that could handle a thousand instances of a one-off task, all set to execute near-simultaneously (error margin plus or minus a minute).
我正在寻找一个可以处理一次性任务的一千个实例的任务调度系统,所有这些都设置为几乎同时执行(误差范围加上或减去一分钟)。
I'm considering Django-celery/Rabbitmq for this, but I noticed the Celery docs do not address tasks meant for one-time use. Is Django-celery the right choice here (perhaps by subclassing CrontabSchedule) or is my energy better spent researching some other approach? Perhaps hacking together something with the Sched Module and Cron.
我正在考虑使用Django-celery / Rabbitmq,但我注意到Celery文档没有解决一次性使用的任务。 Django-celery在这里是正确的选择(也许是通过继承CrontabSchedule)或者我的能量更好地用于研究其他方法吗?也许和Sched Module和Cron一起乱砍。
2 个解决方案
#1
8
Edit 2:
For some reason, my head was originally stuck in the realm of recurring tasks. Here is a simpler solution.
出于某种原因,我的头脑最初被困在重复任务的领域。这是一个更简单的解决方案。
All you really need is to define one task for each user action. You can skip storing tasks to be executed in your database--that's what celery is here for!
您真正需要的是为每个用户操作定义一个任务。您可以跳过存储要在数据库中执行的任务 - 这就是芹菜的用途!
Reusing your facebook post example again, and again assuming you have a function post_to_facebook
somewhere, which takes a user and some text, does some magic, and posts the text to that user's facebook, you can just define it to be a task like this:
重新使用你的facebook帖子示例,并再次假设你有一个函数post_to_facebook某个地方,它需要用户和一些文本,做一些魔术,并将文本发布到该用户的Facebook,你可以将它定义为这样的任务:
# Task to send one update.
@celery.task(ignore_result=True)
def post_to_facebook(user, text):
# perform magic
return whatever_you_want
When a user is ready to enqueue such a post, you just tell celery when to run the task:
当用户准备将这样的帖子排队时,您只需告诉芹菜何时运行该任务:
post_to_facebook.apply_async(
(user, text), # args
eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440) # pass execution options as kwargs
)
This is all detailed here, among a whole bunch of available call options: http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown
这里有详细介绍,包括一大堆可用的看涨期权:http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown
If you need the result of the call, you can skip the ignore_result param in the task definition and get an AsyncResult object back, and then check it for the results of the call. More here: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results
如果需要调用结果,可以跳过任务定义中的ignore_result参数并返回AsyncResult对象,然后检查调用结果。更多信息:http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results
Some of the answer below is still relevant. You still want a task for each user action, you still want to think about task design, etc., but this is a much simpler road to doing what you asked about.
下面的一些答案仍然有用。您仍然希望为每个用户操作执行任务,您仍然需要考虑任务设计等,但这是一个更简单的方法来做您所询问的事情。
Original answer using recurring tasks follows:
使用重复任务的原始答案如下:
Dannyroa has the right idea. I'll build upon that a bit here.
Dannyroa有正确的想法。我会在这里建立一点。
Edit / TLDR: The answer is Yes, celery is suited to your needs. You just may need to rethink your task definition.
编辑/ TLDR:答案是肯定的,芹菜适合您的需求。您可能需要重新考虑您的任务定义。
I assume you aren't allowing your users to write arbitrary Python code to define their tasks. Short of that, you will have to predefine some actions users can schedule, and then allow them to schedule those actions as they like. Then, you can just run one scheduled task for each user action, checking for entries and performing the action for each entry.
我假设您不允许您的用户编写任意Python代码来定义他们的任务。除此之外,您必须预定义用户可以安排的某些操作,然后允许他们按照自己的喜好安排这些操作。然后,您可以为每个用户操作运行一个计划任务,检查条目并为每个条目执行操作。
One user action:
一个用户操作:
Using your Facebook example, you would store users' updates in a table:
使用您的Facebook示例,您可以将用户的更新存储在表格中:
class ScheduledPost(Model):
user = ForeignKey('auth.User')
text = TextField()
time = DateTimeField()
sent = BooleanField(default=False)
Then you would run a task every minute, checking for entries in that table scheduled to be posted in the last minute (based on the error margin you mentioned). If it is very important that you hit your one minute window, you might schedule the task more often, say, every 30 seconds. The task might look like this (in myapp/tasks.py):
然后,您将每分钟运行一个任务,检查该表中计划在最后一分钟发布的条目(基于您提到的错误余量)。如果您按下一分钟窗口非常重要,则可以更频繁地安排任务,例如每30秒。任务可能如下所示(在myapp / tasks.py中):
@celery.task
def post_scheduled_updates():
from celery import current_task
scheduled_posts = ScheduledPost.objects.filter(
sent=False,
time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
time__lte=timezone.now()
)
for post in scheduled_posts:
if post_to_facebook(post.text):
post.sent = True
post.save()
The config might look like this:
配置可能如下所示:
CELERYBEAT_SCHEDULE = {
'fb-every-30-seconds': {
'task': 'tasks.post_scheduled_updates',
'schedule': timedelta(seconds=30),
},
}
Additional user actions:
其他用户操作:
For each user action in addition to posting to Facebook, you can define a new table and a new task:
对于除了发布到Facebook之外的每个用户操作,您还可以定义新表和新任务:
class EmailToMom(Model):
user = ForeignKey('auth.User')
text = TextField()
subject = CharField(max_length=255)
sent = BooleanField(default=False)
time = DateTimeField()
@celery.task
def send_emails_to_mom():
scheduled_emails = EmailToMom.objects.filter(
sent=False,
time__lt=timezone.now()
)
for email in scheduled_emails:
sent = send_mail(
email.subject,
email.text,
email.user.email,
[email.user.mom.email],
)
if sent:
email.sent = True
email.save()
CELERYBEAT_SCHEDULE = {
'fb-every-30-seconds': {
'task': 'tasks.post_scheduled_updates',
'schedule': timedelta(seconds=30),
},
'mom-every-30-seconds': {
'task': 'tasks.send_emails_to_mom',
'schedule': timedelta(seconds=30),
},
}
Speed and optimization:
速度和优化:
To get more throughput, instead of iterating over the updates to post and sending them serially during a post_scheduled_updates
call, you could spawn up a bunch of subtasks and do them in parallel (given enough workers). Then the call to post_scheduled_updates
runs very quickly and schedules a whole bunch of tasks--one for each fb update--to run asap. That would look something like this:
为了获得更多的吞吐量,您可以在post_scheduled_updates调用期间迭代更新以发布和串行发送它们,而不是产生一堆子任务并且并行执行(给定足够的工作者)。然后对post_scheduled_updates的调用运行得非常快,并调度了一大堆任务 - 每个fb更新一个 - 以尽快运行。这看起来像这样:
# Task to send one update. This will be called by post_scheduled_updates.
@celery.task
def post_one_update(update_id):
try:
update = ScheduledPost.objects.get(id=update_id)
except ScheduledPost.DoesNotExist:
raise
else:
sent = post_to_facebook(update.text)
if sent:
update.sent = True
update.save()
return sent
@celery.task
def post_scheduled_updates():
from celery import current_task
scheduled_posts = ScheduledPost.objects.filter(
sent=False,
time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
time__lte=timezone.now()
)
for post in scheduled_posts:
post_one_update.delay(post.id)
The code I have posted is not tested and certainly not optimized, but it should get you on the right track. In your question you implied some concern about throughput, so you'll want to look closely at places to optimize. One obvious one is bulk updates instead of iteratively calling post.sent=True;post.save()
.
我发布的代码没有经过测试,当然也没有经过优化,但它应该让你走上正轨。在您的问题中,您暗示了对吞吐量的一些担忧,因此您需要仔细查看要优化的位置。一个显而易见的是批量更新,而不是迭代地调用post.sent = True; post.save()。
More info:
More info on periodic tasks: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html.
有关定期任务的更多信息:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html。
A section on task design strategies: http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies
关于任务设计策略的部分:http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies
There is a whole page about optimizing celery here: http://docs.celeryproject.org/en/latest/userguide/optimizing.html.
这里有关于优化芹菜的整个页面:http://docs.celeryproject.org/en/latest/userguide/optimizing.html。
This page about subtasks may also be interesting: http://docs.celeryproject.org/en/latest/userguide/canvas.html.
这个关于子任务的页面也可能很有趣:http://docs.celeryproject.org/en/latest/userguide/canvas.html。
In fact, I recommend reading all the celery docs.
事实上,我建议阅读所有芹菜文档。
#2
0
What I'll do is to create a model called ScheduledPost.
我要做的是创建一个名为ScheduledPost的模型。
I'll have a PeriodicTask that runs every 5 minutes or so.
我将有一个每5分钟左右运行一次的PeriodicTask。
The task will check the ScheduledPost table for any post that needs to be pushed to Facebook.
该任务将检查ScheduledPost表以查找需要推送到Facebook的任何帖子。
#1
8
Edit 2:
For some reason, my head was originally stuck in the realm of recurring tasks. Here is a simpler solution.
出于某种原因,我的头脑最初被困在重复任务的领域。这是一个更简单的解决方案。
All you really need is to define one task for each user action. You can skip storing tasks to be executed in your database--that's what celery is here for!
您真正需要的是为每个用户操作定义一个任务。您可以跳过存储要在数据库中执行的任务 - 这就是芹菜的用途!
Reusing your facebook post example again, and again assuming you have a function post_to_facebook
somewhere, which takes a user and some text, does some magic, and posts the text to that user's facebook, you can just define it to be a task like this:
重新使用你的facebook帖子示例,并再次假设你有一个函数post_to_facebook某个地方,它需要用户和一些文本,做一些魔术,并将文本发布到该用户的Facebook,你可以将它定义为这样的任务:
# Task to send one update.
@celery.task(ignore_result=True)
def post_to_facebook(user, text):
# perform magic
return whatever_you_want
When a user is ready to enqueue such a post, you just tell celery when to run the task:
当用户准备将这样的帖子排队时,您只需告诉芹菜何时运行该任务:
post_to_facebook.apply_async(
(user, text), # args
eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440) # pass execution options as kwargs
)
This is all detailed here, among a whole bunch of available call options: http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown
这里有详细介绍,包括一大堆可用的看涨期权:http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown
If you need the result of the call, you can skip the ignore_result param in the task definition and get an AsyncResult object back, and then check it for the results of the call. More here: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results
如果需要调用结果,可以跳过任务定义中的ignore_result参数并返回AsyncResult对象,然后检查调用结果。更多信息:http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results
Some of the answer below is still relevant. You still want a task for each user action, you still want to think about task design, etc., but this is a much simpler road to doing what you asked about.
下面的一些答案仍然有用。您仍然希望为每个用户操作执行任务,您仍然需要考虑任务设计等,但这是一个更简单的方法来做您所询问的事情。
Original answer using recurring tasks follows:
使用重复任务的原始答案如下:
Dannyroa has the right idea. I'll build upon that a bit here.
Dannyroa有正确的想法。我会在这里建立一点。
Edit / TLDR: The answer is Yes, celery is suited to your needs. You just may need to rethink your task definition.
编辑/ TLDR:答案是肯定的,芹菜适合您的需求。您可能需要重新考虑您的任务定义。
I assume you aren't allowing your users to write arbitrary Python code to define their tasks. Short of that, you will have to predefine some actions users can schedule, and then allow them to schedule those actions as they like. Then, you can just run one scheduled task for each user action, checking for entries and performing the action for each entry.
我假设您不允许您的用户编写任意Python代码来定义他们的任务。除此之外,您必须预定义用户可以安排的某些操作,然后允许他们按照自己的喜好安排这些操作。然后,您可以为每个用户操作运行一个计划任务,检查条目并为每个条目执行操作。
One user action:
一个用户操作:
Using your Facebook example, you would store users' updates in a table:
使用您的Facebook示例,您可以将用户的更新存储在表格中:
class ScheduledPost(Model):
user = ForeignKey('auth.User')
text = TextField()
time = DateTimeField()
sent = BooleanField(default=False)
Then you would run a task every minute, checking for entries in that table scheduled to be posted in the last minute (based on the error margin you mentioned). If it is very important that you hit your one minute window, you might schedule the task more often, say, every 30 seconds. The task might look like this (in myapp/tasks.py):
然后,您将每分钟运行一个任务,检查该表中计划在最后一分钟发布的条目(基于您提到的错误余量)。如果您按下一分钟窗口非常重要,则可以更频繁地安排任务,例如每30秒。任务可能如下所示(在myapp / tasks.py中):
@celery.task
def post_scheduled_updates():
from celery import current_task
scheduled_posts = ScheduledPost.objects.filter(
sent=False,
time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
time__lte=timezone.now()
)
for post in scheduled_posts:
if post_to_facebook(post.text):
post.sent = True
post.save()
The config might look like this:
配置可能如下所示:
CELERYBEAT_SCHEDULE = {
'fb-every-30-seconds': {
'task': 'tasks.post_scheduled_updates',
'schedule': timedelta(seconds=30),
},
}
Additional user actions:
其他用户操作:
For each user action in addition to posting to Facebook, you can define a new table and a new task:
对于除了发布到Facebook之外的每个用户操作,您还可以定义新表和新任务:
class EmailToMom(Model):
user = ForeignKey('auth.User')
text = TextField()
subject = CharField(max_length=255)
sent = BooleanField(default=False)
time = DateTimeField()
@celery.task
def send_emails_to_mom():
scheduled_emails = EmailToMom.objects.filter(
sent=False,
time__lt=timezone.now()
)
for email in scheduled_emails:
sent = send_mail(
email.subject,
email.text,
email.user.email,
[email.user.mom.email],
)
if sent:
email.sent = True
email.save()
CELERYBEAT_SCHEDULE = {
'fb-every-30-seconds': {
'task': 'tasks.post_scheduled_updates',
'schedule': timedelta(seconds=30),
},
'mom-every-30-seconds': {
'task': 'tasks.send_emails_to_mom',
'schedule': timedelta(seconds=30),
},
}
Speed and optimization:
速度和优化:
To get more throughput, instead of iterating over the updates to post and sending them serially during a post_scheduled_updates
call, you could spawn up a bunch of subtasks and do them in parallel (given enough workers). Then the call to post_scheduled_updates
runs very quickly and schedules a whole bunch of tasks--one for each fb update--to run asap. That would look something like this:
为了获得更多的吞吐量,您可以在post_scheduled_updates调用期间迭代更新以发布和串行发送它们,而不是产生一堆子任务并且并行执行(给定足够的工作者)。然后对post_scheduled_updates的调用运行得非常快,并调度了一大堆任务 - 每个fb更新一个 - 以尽快运行。这看起来像这样:
# Task to send one update. This will be called by post_scheduled_updates.
@celery.task
def post_one_update(update_id):
try:
update = ScheduledPost.objects.get(id=update_id)
except ScheduledPost.DoesNotExist:
raise
else:
sent = post_to_facebook(update.text)
if sent:
update.sent = True
update.save()
return sent
@celery.task
def post_scheduled_updates():
from celery import current_task
scheduled_posts = ScheduledPost.objects.filter(
sent=False,
time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
time__lte=timezone.now()
)
for post in scheduled_posts:
post_one_update.delay(post.id)
The code I have posted is not tested and certainly not optimized, but it should get you on the right track. In your question you implied some concern about throughput, so you'll want to look closely at places to optimize. One obvious one is bulk updates instead of iteratively calling post.sent=True;post.save()
.
我发布的代码没有经过测试,当然也没有经过优化,但它应该让你走上正轨。在您的问题中,您暗示了对吞吐量的一些担忧,因此您需要仔细查看要优化的位置。一个显而易见的是批量更新,而不是迭代地调用post.sent = True; post.save()。
More info:
More info on periodic tasks: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html.
有关定期任务的更多信息:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html。
A section on task design strategies: http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies
关于任务设计策略的部分:http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies
There is a whole page about optimizing celery here: http://docs.celeryproject.org/en/latest/userguide/optimizing.html.
这里有关于优化芹菜的整个页面:http://docs.celeryproject.org/en/latest/userguide/optimizing.html。
This page about subtasks may also be interesting: http://docs.celeryproject.org/en/latest/userguide/canvas.html.
这个关于子任务的页面也可能很有趣:http://docs.celeryproject.org/en/latest/userguide/canvas.html。
In fact, I recommend reading all the celery docs.
事实上,我建议阅读所有芹菜文档。
#2
0
What I'll do is to create a model called ScheduledPost.
我要做的是创建一个名为ScheduledPost的模型。
I'll have a PeriodicTask that runs every 5 minutes or so.
我将有一个每5分钟左右运行一次的PeriodicTask。
The task will check the ScheduledPost table for any post that needs to be pushed to Facebook.
该任务将检查ScheduledPost表以查找需要推送到Facebook的任何帖子。