声明:代码是从项目中截取的, 为进行测试
使用Celery任务队列,Celery 只是一个任务队列,需要一个broker媒介,将耗时的任务传递给Celery任务队列执行,执行完毕将结果通过broker媒介返回。官方推荐使用RabbitMQ作为消息传递,redis也可以
一、Celery 介绍:
注意:
1、当使用RabbitMQ时,需要按照pika第三方库,pika0.10.0存在bug,无法获得回调信息,需要按照0.9.14版本即可
2、tornado-celery 库比较旧,无法适应Celery的最新版,会导致报无法导入task Producter包错误,只需要将celery版本按照在3.0.25就可以了
二、配置
单个参数配置:
app.conf.CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘
多个参数配置:
app.conf.update(
CELERY_BROKER_URL = ‘amqp://guest@localhost//‘,
CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘
)
从配置文件中获取:(将配置参数写在文件app.py中)
BROKER_URL=‘amqp://guest@localhost//‘
CELERY_RESULT_BACKEND=‘redis://localhost:6379/0‘
app.config_from_object(‘celeryconfig‘)
三、案例
启动一个Celery 任务队列,也就是消费者:
from celery import Celery
celery = Celery(‘tasks‘, broker=‘amqp://guest:guest@119.29.151.45:5672‘, backend=‘amqp‘) 使用RabbitMQ作为载体, 回调也是使用rabbit作为载体 @celery.task(name=‘doing‘) #异步任务,需要命一个独一无二的名字
def doing(s, b):
print(‘开始任务‘)
logging.warning(‘开始任务--{}‘.format(s))
time.sleep(s)
return s+b
命令行启动任务队列守护进程,当队列中有任务时,自动执行 (命令行可以放在supervisor中管理)
--loglevel=info --concurrency=5
记录等级,默认是concurrency:指定工作进程数量,默认是CPU核心数
启动任务生产者
#!/usr/bin/env python
# -*- coding:utf-8 -*- import tcelery
from tornado.web import RequestHandler
import tornado tcelery.setup_nonblocking_producer() # 设置为非阻塞生产者,否则无法获取回调信息 class MyMainHandler(RequestHandler):
@tornado.web.asynchronous
@tornado.gen.coroutine
def get(self, *args, **kwargs):
print('begin')
result = yield tornado.gen.Task(sleep.apply_async, args=[10]) # 使用yield 获取异步返回值,会一直等待但是不阻塞其他请求
print('ok - -{}'.format(result.result)) # 返回值结果 # sleep.apply_async((10, ), callback=self.on_success)
# print(‘ok -- {}‘.format(result.get(timeout=100)))#使用回调的方式获取返回值,发送任务之后,请求结束,所以不能放在处理tornado的请求任务当中,因为请求已经结束了,与客户端已经断开连接,无法再在获取返回值的回调中继续向客户端返回数据 # result = sleep.delay(10) #delay方法只是对apply_async方法的封装而已
# data = result.get(timeout=100) #使用get方法获取返回值,会导致阻塞,相当于同步执行 def on_success(self, response): # 回调函数
print('Ok - - {}'.format(response))
=======================
#!/usr/bin/env python # -*- coding:utf-8 -*- from tornado.web import Application from tornado.ioloop import IOLoop import tcelery from com.analysis.handlers.data_analysis_handlers import * from com.analysis.handlers.data_summary_handlers import * from com.analysis.handlers.data_cid_sumjson_handler import Cid_Sumjson_Handler from com.analysis.handlers.generator_handlers import GeneratorCsv, GeneratorSpss Handlers = [ (r"/single_factor_variance_analysis/(.*)", SingleFactorVarianceAnalysis), # 单因素方差检验 ] if __name__ == "__main__": tcelery.setup_nonblocking_producer() application = Application(Handlers) application.listen(port=8888, address="0.0.0.0") IOLoop.instance().start()
server
#!/usr/bin/env python # -*- coding:utf-8 -*- import tornado.gen import tornado.web from com.analysis.core.base import BaseAnalysisRequest from com.analysis.tasks.data_analysis import * class SingleFactorVarianceAnalysis(BaseAnalysisRequest): @tornado.gen.coroutine def get(self, *args, **kwargs): response = yield self.celery_task(single_factor_variance_analysis.apply_async, params=args) print(response.result) self.write(response.result[2])
handler
#!/usr/bin/env python # -*- coding:utf-8 -*- from collections import defaultdict import pandas as pd import numpy as np import pygal import tornado.gen from pygal.style import LightStyle from tornado.web import RequestHandler import json from com.analysis.db.db_engine import DBEngine from com.analysis.utils.log import LogCF from com.analysis.handlers.data_cid_sumjson_handler import cid_sumjson class BaseRequest(RequestHandler): def __init__(self, application, request, **kwargs): super(BaseRequest, self).__init__(application, request, **kwargs) class BaseAnalysisRequest(BaseRequest): def __init__(self, application, request, **kwargs): super(BaseAnalysisRequest, self).__init__(application, request, **kwargs) @tornado.gen.coroutine def celery_task(self, func, params, queue="default_analysis"): args_list = list(params) args_list.insert(0, "") response = yield tornado.gen.Task(func, args=args_list, queue=queue) raise tornado.gen.Return(response)
basehandler
#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery from com.analysis.core.chi_square_test import CST from com.analysis.generator.generator import GeneratorCsv, GeneratorSpss celery = Celery( 'com.analysis.tasks.data_analysis', broker='amqp://192.168.1.1:5672', include='com.analysis.tasks.data_analysis' ) celery.conf.CELERY_RESULT_BACKEND = "amqp://192.168.1.1:5672" celery.conf.CELERY_ACCEPT_CONTENT = ['application/json'] celery.conf.CELERY_TASK_SERIALIZER = 'json' celery.conf.CELERY_RESULT_SERIALIZER = 'json' celery.conf.BROKER_HEARTBEAT = 30 celery.conf.CELERY_IGNORE_RESULT = False # this is less important logger = Logger().getLogger() @celery.task() def single_factor_variance_analysis(*args): return SFV().do_(*args)
task