起步
在 《分布式任务队列celery使用说明》 中介绍了在 python 中使用 celery 来实验异步任务和定时任务功能。本文介绍如何在 django 中使用 celery。
安装
1
|
pip install django - celery
|
这个命令使用的依赖是 celery 3.x 的版本,所以会把我之前安装的 4.x 卸载,不过对功能上并没有什么影响。我们也完全可以仅用celery在django中使用,但使用 django-celery 模块能更好的管理 celery。
使用
可以把有关 celery 的配置放到 settings.py 里去,但我比较习惯单独一个文件来放,然后在 settings.py 引入进来:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# celery_config.py
import djcelery
import os
os.environ.setdefault( 'forked_by_multiprocessing' , '1' )
djcelery.setup_loader()
broker_url = 'redis://127.0.0.1:6379/1'
celery_result_backend = 'redis://127.0.0.1:6379/2'
# utc
celery_enable_utc = true
celery_timezone = 'asia/shanghai'
celery_imports = (
'app.tasks' ,
)
# 有些情况可以防止死锁
celery_force_execv = true
# 设置并发的worker数量
celeryd_concurrency = 4
# 任务发送完成是否需要确认,这一项对性能有一点影响
celery_acks_late = true
# 每个worker执行了多少任务就会销毁,防止内存泄露,默认是无限的
celeryd_max_tasks_per_child = 40
# 规定完成任务的时间
celeryd_task_time_limit = 15 * 60 # 在15分钟内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
celery_default_queue = "default"
# 设置详细的队列
celery_queues = {
"default" : { # 这是上面指定的默认队列
"exchange" : "default" ,
"exchange_type" : "direct" ,
"routing_key" : "default"
},
"beat_queue" : {
"exchange" : "beat_queue" ,
"exchange_type" : "direct" ,
"routing_key" : "beat_queue"
}
}
|
配置文件中设置了 celery_imports 导入的任务,所以在django app中创建相应的任务文件:
1
2
3
4
5
6
7
8
9
10
11
12
|
# app/tasks.py
from celery.task import task
import time
class testtask(task):
name = 'test-task' # 给任务设置个自定义名称
def run( self , * args, * * kwargs):
print ( 'start test task' )
time.sleep( 4 )
print ( 'args={}, kwargs={}' . format (args, kwargs))
print ( 'end test task' )
|
在 settings.py 添加:
1
2
3
4
5
6
7
|
installed_apps = [
# ...
'djcelery' ,
]
# celery
from learn_django.celery_config import *
|
触发任务或提交任务可以在view中来调用:
1
2
3
4
5
6
7
8
9
10
11
|
# views.py
from django.http import httpresponse
from app.tasks import testtask
def test_task(request):
# 执行异步任务
print ( 'start do request' )
t = testtask()
t.delay()
print ( 'end do request' )
return httpresponse( 'ok' )
|
启动 woker 的命令是:
1
|
python manage.py celery worker - l info
|
再启动django,访问该view,可以看到任务在worker中被消费了。
定时任务
在celery的配置文件 celery_config.py 文件中添加:
1
2
3
4
5
6
7
8
9
10
11
|
celerybeat_schedule = {
'task1-every-1-min' : { # 自定义名称
'task' : 'test-task' , # 与任务中name名称一致
'schedule' : datetime.timedelta(seconds = 5 ),
'args' : ( 2 , 15 ),
'options' : {
'queue' : 'beat_queue' , # 指定要使用的队列
}
},
}
|
通过 options 的 queque 来指定要使用的队列,这里需要单独的队列是因为,如果所有任务都使用同一队列,对于定时任务来说,任务提交后会位于队列尾部,任务的执行时间会靠后,所以对于定时任务来说,使用单独的队列。
启动 beat:
1
|
python manage.py celery beat - l info
|
监控工具 flower
如果celery中的任务执行失败了,有些场景是需要对这些任务进行监控, flower 是基于 tornado 开发的web应用。安装用 pip install flower ;启动它可以是:
1
2
3
|
python manage.py celery flower
# python manage.py celery flower --basic_auth=admin:admin
|
用浏览器访问 http://localhost:5555 即可查看:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.hongweipeng.com/index.php/archives/1680/