[Flask]celery异步任务队列的使用

时间:2022-03-22 09:08:11

Celery异步任务队列

目录结构树:

[Flask]celery异步任务队列的使用

配置文件config.py:

# 设置中间人地址
broker_url = 'redis://127.0.0.1:6379/1'

  

主main.py:

import sys
import os from celery import Celery from flask import Flask
from flask_mail import Mail CELERY_DIR = os.path.dirname(os.getcwd())
sys.path.insert(0, CELERY_DIR)
import config mail = Mail() app = Flask(__name__)
app.config.from_object(config.config.get(os.environ.get('env'))
mail.init_app(app) def make_celery(app):
# 创建celery对象并设置
celery = Celery(app.import_name)
# 加载配置
celery.config_from_object('celery_tasks.config')
# celery.conf.update(app.config)
TaskBase = celery.Task class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask
# 启动celery worker时自动发现任务
celery.autodiscover_tasks(['celery_tasks.email',])
return celery celery = make_celery(app)

  

任务函数tasks.py:

from flask_mail import Message

import config
from celery_tasks.main import celery, mail # 使用装饰器将send_email函数装饰成任务函数
@celery.task(name='send_email')
def send_email(to, subject, html_message): msg = Message(
subject,
sender=config.Config.MAIL_USERNAME,
html=html_message,
recipients=[to]
)
mail.send(msg) if __name__ == '__main__':
send_email.delay('xx@xx.com', 'xx', 'xx')

  

启动命令:

celery worker -A main.celery -l info

  

发出任务函数:

send_email.delay('xx@xx.com', 'xx', 'xx')