django+celery +rabbitmq

时间:2021-10-28 09:06:56

celery是一个python的分布式任务队列框架,支持 分布的 机器/进程/线程的任务调度。采用典型的生产者-消费者模型

包含三部分:
1. 队列 broker :可使用redis ,rabbitmq ,或关系数据库作为broker

2.处理任务的消费者workers : 队列中有任务时就发出通知,worker收到通知就去处理

3.任务结果存储 backend:  存储任务的返回值

celery 4.2,django 1.11.7 ,rabbitmq 3.7.3,centos 6.5

事先需安装 rabbitmq  , (安装rabbitmq前需安装erlang ,esl-erlang )

yum install rabbitmq-server

(开启 rabbitmq :    systemctl start rabbitmq-server   ;  查看rabbitmq的状态 ,切换到合适的目录与用户(一般为安装时的用户,目录一般为/usr/lib/bin): rabbitmqctl status   ;)

安装celery包

pip install celery ==4.2

通常django的项目目录为

- proj /

- manage.py

-proj/

-__init__.py

-settings.py

- urls.py

-myapp/

-urls.py

-views.py

-models.py

首先要创建 一个celery实例  proj/proj/celery.py

from __future__import absolute_import,unicode_literals

import os

from celery import Celery

#为这个celery项目设置系统环境变量

os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')

#实例化Celery对象

app=Celery('proj')

#配置传入字符串而不是配置对象,使worker不需要在子进程中序列化配置对象;命名空间设置为’CELERY‘意味着所有celery相关的配置键值都应该以'CELERY_'开头

app.config_from_object('django.conf:settings',namespace='CELERY')

#自动加载task模块

app.autodiscover_tasks()

@app.task(bind=True)

def debug_task(self):

print('Request: {0!r}'.format(self.request))

#debug_task是一个复制它自己的request的信息的task ,使用bind=True简单地将其绑定到当前的task实例上。

2.在 proj/proj/__init__.py中导入第一步中创建的app,是为了保证django项目开启时就加载这个app,让后续的@shared_task装饰器可以使用它。

from __future__ import absolute_import,unicode_literals

from .celery import app as celery_app

__all__=('celery_app',)

3.创建myapp/tasks.py ,使用@shared_task装饰器

from __future__ import absolute_import,unicode_literals

from celery import shared_task

@shared_task

def add(x,y):

return x+y

@shared_task

def mul(x,y):

return x*y

@shared_task

def xsum(numbers):

return sum(numbers)

#一般将比较耗时的操作,定义在task.py中,这样在view中使用这个函数时,就可以异步调用,不必等操作完成再返回页面结果,而是可以异步调用完后直接进行下一步。

而这个操作会在子进程中继续执行,执行结果保存在队列中,也可指定保存在django的orm中

4.将celery队列的任务执行结果保存到 django的orm或缓存框架中

*需要安装 django-celery-results

pip install django-celery-results

*将其('django_celery_results')添加到settings.py的INSTALL_APPS中

*在数据库中创建保存celery结果的表

python manage.py migrate django_celery_results

*在settings.py为celery配置后端存储

CELERY_RESULT_BACKEND='django-db'

CELERY_RESULT_BACKEND='django-cache'

启动完django 的 manage.py runserver后运行 celery -A proj worker -l info 开启celery队列