celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。采用典型的生产者-消费者模型,主要由三部分组成:
1. 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis、rabbitmq等作为broker
2. 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程
3. 存储结果的backend:执行结果存储在backend,默认也会存储在broker使用的MQ队列服务中,也可以单独配置用何种服务做backend
flask,django是同步框架,所有的请求以队列形式完成。这样的话效率极差,用户体验不好,为了解决这个问题引入celery异步方式在后台执行这些任务(这里使用到了redis,3.0以下兼容性更好)
1,安装依赖
pip install celery pip install celery-with-redis pip install django-celery
2,settings.py设置
#配置celery
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_IMPORTS = ('mymac.tasks') #需执行异步的子应用 #将djcelery安装到应用中
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'mysg',
'lianxi',
"rest_framework",
'corsheaders',
#异步
'djcelery',
]
3,将异步的应用中注册celery.py
import os
import django
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mymac.settings')
django.setup()
app = Celery('mymac')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
4,建立异步任务和定时任务的(tasks.py)
- 注意tasks.py必须建在各app的根目录下,且只能叫tasks.py,不能随意命名
异步任务
import time,random
from celery import task
#发邮件
from django.core.mail import send_mail
from django.http import HttpResponse
#定义异步写文件方法
@task
def file_task():
#写文件操作 文件对象
file_object = open("./data.text",'a+',encoding='utf-8')
file_object.write("hello")
file_object.close()
print("ok") #定义异步发邮件的方法
@task
def email_():
captcha_text = []
for i in range(4):
#定义验证码字符
str = 'qwertyuiopasdfghjklzxcvbnm1234567890'
c = random.choice(str)
captcha_text.append(c)
#返回随机生成的字符串
captcha = "".join(captcha_text)
res = send_mail("欢迎注册",'您的验证码是:'+ captcha,['396961930@qq.com'],DEFAULT_FROM_EMAIL)
if res:
return HttpResponse("发送成功")
else:
return HttpResponse("发送失败")
views.py中引用使用这个tasks异步处理
from mymac.tasks import email,file_task #异步发邮件
def email(request):
print(email_.delay())
return HttpResponse("异步发邮件")
#异步写入文件
def failtask(request):
print(file_task.delay())
return HttpResponse("success") #配好路由触发任务即可
定时任务
#导入定时任务库
from celery.decorators import periodic_task
from celery.schedules import crontab
#发短信
from twilio.rest import Client #定义20点10分发送
#@periodic_task(run_every=crontab(minute=10,hour=20))
#定义10秒发送一次
@periodic_task(run_every=10)
def mail():
#定义短信sid
account_sid = 'ACbccc4d2127e888e6f6654dc8128c019e'
#定义秘钥
auth_token = 'a0f31b24c76c65c20c6400dc94537ac6'
#定义客户端对象
clinet = Client(account_sid,auth_token)
#定义短信内容 1,发给谁 2,发信人 3,内容
status = clinet.messages.create(to="+8616637712137",from_="+12016361207",body="hello world")
if status:
print("发送成功") #注意时区 例如中国
#settings.py 设置 语言相关配置
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai' crontab的参数有:
month_of_year:月份
day_of_month:日期
day_of_week:周
hour:小时
minute:分钟
5,启动任务
启动服务的命令:
celery -A mymac beat -l info 定时任务
celery -A mymac worker -l info 异步任务