
测试使用环境:
1、Python==3.6.1
2、MongoDB==3.6.2
3、celery==4.1.1
4、eventlet==0.23.0
Celery分为3个部分
(1)worker部分负责任务的处理,即工作进程(我的理解工作进程就是你写的python代码,当然还包括python调用系统工具功能)
(2)broker部分负责任务消息的分发以及任务结果的存储,这部分任务主要由中间数据存储系统完成,比如消息队列服务器RabbitMQ、redis、
Amazon SQS、MongoDB、IronMQ等或者关系型数据库,使用关系型数据库依赖sqlalchemy或者django的ORM
(3)Celery主类,进行任务最开始的指派与执行控制,他可以是单独的python脚本,也可以和其他程序结合,应用到django或者flask等web框架里面以及你能想到的任何应用
上代码
这里将celery封装成一个Python包,结构如下图
celery.py
#!/usr/bin/env python
# -*- coding: utf-8 -*- """
Celery主类
启动文件名必须为celery.py!!!
""" from __future__ import absolute_import # 为兼容Python版本
from celery import Celery, platforms platforms.C_FORCE_ROOT = True # linux环境下,用于开启root也可以启动celery服务,默认是不允许root启动celery的
app = Celery(
main='celery_tasks', # celery启动包名称
# broker='redis://localhost',
# backend='redis://localhost',
include=['celery_tasks.tasks', ] # celery所有任务
)
app.config_from_object('celery_tasks.config') # celery使用文件配置 if __name__ == '__main__':
app.start()
config.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import CELERY_TIMEZONE = 'Asia/Shanghai'
# CELERY_RESULT_BACKEND='redis://localhost:6379/1'
# BROKER_URL='redis://localhost:6379/2'
BROKER_BACKEND = 'mongodb' # mongodb作为任务队列(或者说是缓存)
BROKER_URL = 'mongodb://localhost:27017/for_celery' # 队列地址
CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/for_celery' # 消息结果存储地址
CELERY_MONGODB_BACKEND_SETTINGS = { # 消息结果存储配置
'host': 'localhost',
'port': 27017,
'database': 'for_celery',
# 'user':'root',
# 'password':'root1234',
'taskmeta_collection': 'task_meta', # 任务结果的存放collection
}
CELERY_ROUTES = { # 配置任务的先后顺序
'celery_task.tasks.add': {'queue': 'for_add', 'router_key': 'for_add'},
'celery_task.tasks.subtract': {'queue': 'for_subtract', 'router_key': 'for_subtract'}
}
tasks.py
#!/usr/bin/env python
# -*- coding: utf-8 -*- """
worker部分
""" from __future__ import absolute_import
from celery import Celery, group
from .celery import app
from time import sleep @app.task
def add(x, y):
sleep(5)
return x + y @app.task
def substract(x, y):
sleep(5)
return x - y
接下来演示,演示之前先把config中mongdb的用到的database和collection配置好,并启动mongodb服务
首先启动consumer
注意启动目录为celery_tasks同一级,启动命令为
celery -A celery_tasks worker --loglevel=info -P eventlet
参数解释,命令中-A参数表示的是Celery APP的名称celery_tasks,这个实例中指的就是tasks.py,后面的tasks就是APP的名称,worker是一个执行任务角色,后面的loglevel=info记录日志类型默认是info,这个命令启动了一个worker,用来执行程序中add这个加法任务(task),-P eventlet是防止在windows环境下出现
[2018-06-02 15:08:15,550: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
File "d:\programmingsoftware\python35\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs)))
File "d:\programmingsoftware\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
若启动成功,结果如下
再启动produce
在另一个终端terminal,首先启动Python
如下
再导入并调用任务,使用delay方法
如下
调用之后,回到consumer终端,发现
收到任务。
再到mongodb中查看任务结果