celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。采用典型的生产者-消费者模型,主要由三部分组成:
1. 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis、rabbitmq等作为broker
2. 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程
3. 存储结果的backend:执行结果存储在backend,默认也会存储在broker使用的MQ队列服务中,也可以单独配置用何种服务做backend
flask,django是同步框架,所有的请求以队列形式完成。这样的话效率极差,用户体验不好,为了解决这个问题引入celery异步方式在后台执行这些任务(这里使用到了redis,3.0以下兼容性更好)
1,安装依赖
pip install celery pip install celery-with-redis pip install django-celery
2,settings.py设置
#配置celery import djcelery djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379' CELERY_IMPORTS = ('mymac.tasks') #需执行异步的子应用 #将djcelery安装到应用中 INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'mysg', 'lianxi', "rest_framework", 'corsheaders', #异步 'djcelery', ]
3,将异步的应用中注册celery.py
import os import django from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mymac.settings') django.setup() app = Celery('mymac') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
4,建立异步任务和定时任务的(tasks.py)
- 注意tasks.py必须建在各app的根目录下,且只能叫tasks.py,不能随意命名
异步任务
import time,random from celery import task #发邮件 from django.core.mail import send_mail from django.http import HttpResponse #定义异步写文件方法 @task def file_task(): #写文件操作 文件对象 file_object = open("./data.text",'a+',encoding='utf-8') file_object.write("hello") file_object.close() print("ok") #定义异步发邮件的方法 @task def email_(): captcha_text = [] for i in range(4): #定义验证码字符 str = 'qwertyuiopasdfghjklzxcvbnm1234567890' c = random.choice(str) captcha_text.append(c) #返回随机生成的字符串 captcha = "".join(captcha_text) res = send_mail("欢迎注册",'您的验证码是:'+ captcha,['396961930@qq.com'],DEFAULT_FROM_EMAIL) if res: return HttpResponse("发送成功") else: return HttpResponse("发送失败")
views.py中引用使用这个tasks异步处理
from mymac.tasks import email,file_task #异步发邮件 def email(request): print(email_.delay()) return HttpResponse("异步发邮件") #异步写入文件 def failtask(request): print(file_task.delay()) return HttpResponse("success")
#配好路由触发任务即可
定时任务
#导入定时任务库 from celery.decorators import periodic_task from celery.schedules import crontab #发短信 from twilio.rest import Client #定义20点10分发送 #@periodic_task(run_every=crontab(minute=10,hour=20)) #定义10秒发送一次 @periodic_task(run_every=10) def mail(): #定义短信sid account_sid = 'ACbccc4d2127e888e6f6654dc8128c019e' #定义秘钥 auth_token = 'a0f31b24c76c65c20c6400dc94537ac6' #定义客户端对象 clinet = Client(account_sid,auth_token) #定义短信内容 1,发给谁 2,发信人 3,内容 status = clinet.messages.create(to="+8616637712137",from_="+12016361207",body="hello world") if status: print("发送成功") #注意时区 例如中国 #settings.py 设置 语言相关配置 LANGUAGE_CODE = 'zh-hans' TIME_ZONE = 'Asia/Shanghai' crontab的参数有: month_of_year:月份 day_of_month:日期 day_of_week:周 hour:小时 minute:分钟
5,启动任务
启动服务的命令:
celery -A mymac beat -l info 定时任务
celery -A mymac worker -l info 异步任务