Django+Celery 执行异步任务和定时任务

时间:2021-03-25 07:45:10

celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。采用典型的生产者-消费者模型,主要由三部分组成:

1. 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis、rabbitmq等作为broker
2. 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程
3. 存储结果的backend:执行结果存储在backend,默认也会存储在broker使用的MQ队列服务中,也可以单独配置用何种服务做backend

flask,django是同步框架,所有的请求以队列形式完成。这样的话效率极差,用户体验不好,为了解决这个问题引入celery异步方式在后台执行这些任务(这里使用到了redis,3.0以下兼容性更好)

1,安装依赖

pip install celery

pip install celery-with-redis

pip install django-celery

2,settings.py设置

#配置celery
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_IMPORTS = ('mymac.tasks') #需执行异步的子应用

#将djcelery安装到应用中
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'mysg',
    'lianxi',
    "rest_framework",
    'corsheaders',
    #异步
    'djcelery',
]

3,将异步的应用中注册celery.py

import os
import django
from celery import Celery
from django.conf import settings 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mymac.settings')
django.setup()
app = Celery('mymac')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

4,建立异步任务和定时任务的(tasks.py)

  • 注意tasks.py必须建在各app的根目录下,且只能叫tasks.py,不能随意命名

  异步任务

import time,random
from celery import task
#发邮件
from django.core.mail import send_mail
from django.http import HttpResponse
#定义异步写文件方法
@task
def file_task():
        #写文件操作  文件对象
        file_object = open("./data.text",'a+',encoding='utf-8')
        file_object.write("hello")
        file_object.close()
        print("ok")

#定义异步发邮件的方法
@task
def email_():
        captcha_text = []
        for i in range(4):
        #定义验证码字符
                str = 'qwertyuiopasdfghjklzxcvbnm1234567890'
                c = random.choice(str)
                captcha_text.append(c)
        #返回随机生成的字符串         
        captcha = "".join(captcha_text)
        res = send_mail("欢迎注册",'您的验证码是:'+ captcha,['396961930@qq.com'],DEFAULT_FROM_EMAIL)
        if res:
                return HttpResponse("发送成功")
        else:
                return HttpResponse("发送失败")

   views.py中引用使用这个tasks异步处理

from mymac.tasks import email,file_task

#异步发邮件
def email(request):
    print(email_.delay())
    return HttpResponse("异步发邮件")
#异步写入文件
def failtask(request):
    print(file_task.delay())
    return HttpResponse("success")

#配好路由触发任务即可

  定时任务

#导入定时任务库
from celery.decorators import periodic_task
from celery.schedules import crontab
#发短信
from twilio.rest import Client

#定义20点10分发送
#@periodic_task(run_every=crontab(minute=10,hour=20))
#定义10秒发送一次
@periodic_task(run_every=10)
def mail():   
        #定义短信sid
        account_sid = 'ACbccc4d2127e888e6f6654dc8128c019e'
        #定义秘钥
        auth_token = 'a0f31b24c76c65c20c6400dc94537ac6'
        #定义客户端对象  
        clinet = Client(account_sid,auth_token)
        #定义短信内容 1,发给谁 2,发信人 3,内容
        status = clinet.messages.create(to="+8616637712137",from_="+12016361207",body="hello world")
        if status:
                print("发送成功")

#注意时区 例如中国 
#settings.py 设置  语言相关配置
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'

crontab的参数有:
month_of_year:月份
day_of_month:日期
day_of_week:周
hour:小时
minute:分钟

5,启动任务

启动服务的命令:

celery -A mymac beat -l info 定时任务
celery -A mymac worker -l info 异步任务