开启服务 #日志 级别 celery -A write_file worker -l info
file name :write_file.pyfrom celery import Celery celery_app = Celery("write_file",broker="redis://127.0.0.1:6379") import time import random @celery_app.task def test(): #发送短信 或者 一些容易阻塞的功能 with open(random.randint(,).__str__()+'.txt','wt',encoding="utf-8") as f: time.sleep(,))) f.write(random.randint(,).__str__())
python3 manage.py runserver
file name : manage.py from flask import Flask from flask_script import Manager from write_file import * app = Flask(__name__) manager = Manager(app) @app.route('/') def index(): t=test.delay() #返回的是任务编号 print(t) print(type(t)) return 'test' manager.run()