celery是一个python的分布式任务队列框架,支持 分布的 机器/进程/线程的任务调度。采用典型的生产者-消费者模型
包含三部分:
1. 队列 broker :可使用redis ,rabbitmq ,或关系数据库作为broker
2.处理任务的消费者workers : 队列中有任务时就发出通知,worker收到通知就去处理
3.任务结果存储 backend: 存储任务的返回值
celery 4.2,django 1.11.7 ,rabbitmq 3.7.3,centos 6.5
事先需安装 rabbitmq , (安装rabbitmq前需安装erlang ,esl-erlang )
yum install rabbitmq-server
(开启 rabbitmq : systemctl start rabbitmq-server ; 查看rabbitmq的状态 ,切换到合适的目录与用户(一般为安装时的用户,目录一般为/usr/lib/bin): rabbitmqctl status ;)
安装celery包
pip install celery ==4.2
通常django的项目目录为
- proj /
- manage.py
-proj/
-__init__.py
-settings.py
- urls.py
-myapp/
-urls.py
-views.py
-models.py
首先要创建 一个celery实例 proj/proj/celery.py
from __future__import absolute_import,unicode_literals
import os
from celery import Celery
#为这个celery项目设置系统环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')
#实例化Celery对象
app=Celery('proj')
#配置传入字符串而不是配置对象,使worker不需要在子进程中序列化配置对象;命名空间设置为’CELERY‘意味着所有celery相关的配置键值都应该以'CELERY_'开头
app.config_from_object('django.conf:settings',namespace='CELERY')
#自动加载task模块
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
#debug_task是一个复制它自己的request的信息的task ,使用bind=True简单地将其绑定到当前的task实例上。
2.在 proj/proj/__init__.py中导入第一步中创建的app,是为了保证django项目开启时就加载这个app,让后续的@shared_task装饰器可以使用它。
from __future__ import absolute_import,unicode_literals
from .celery import app as celery_app
__all__=('celery_app',)
3.创建myapp/tasks.py ,使用@shared_task装饰器
from __future__ import absolute_import,unicode_literals
from celery import shared_task
@shared_task
def add(x,y):
return x+y
@shared_task
def mul(x,y):
return x*y
@shared_task
def xsum(numbers):
return sum(numbers)
#一般将比较耗时的操作,定义在task.py中,这样在view中使用这个函数时,就可以异步调用,不必等操作完成再返回页面结果,而是可以异步调用完后直接进行下一步。
而这个操作会在子进程中继续执行,执行结果保存在队列中,也可指定保存在django的orm中
4.将celery队列的任务执行结果保存到 django的orm或缓存框架中
*需要安装 django-celery-results
pip install django-celery-results
*将其('django_celery_results')添加到settings.py的INSTALL_APPS中
*在数据库中创建保存celery结果的表
python manage.py migrate django_celery_results
*在settings.py为celery配置后端存储
CELERY_RESULT_BACKEND='django-db'
或
CELERY_RESULT_BACKEND='django-cache'
启动完django 的 manage.py runserver后运行 celery -A proj worker -l info 开启celery队列