起步
celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
运行模式是生产者消费者模式:
任务队列:任务队列是一种在线程或机器间分发任务的机制。
消息队列:消息队列的输入是工作的一个单元,称为任务,独立的职程(worker)进程持续监视队列中是否有需要处理的新任务。
celery 用消息通信,通常使用中间人(broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,职程对消息进行处理。
celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件:celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,rabbitmq, redis, mongodb等,本文使用 redis 。
任务执行单元:worker是celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
任务结果存储:task result store用来存储worker执行的任务的结果,celery支持以不同方式存储任务的结果,包括redis,mongodb,django orm,amqp等,这里我先不去看它是如何存储的,就先选用redis来存储任务执行结果。
安装
通过 pip 命令即可安装:
1
|
pip install celery
|
本文使用 redis 做消息中间件,所以需要在安装:
1
|
pip install redis
|
redis软件也要安装,官网只提供了 linux 版本的下载:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下载 exe 安装包。
简单的demo
为了运行一个简单的任务,从中说明 celery 的使用方式。在项目文件夹内创建 app.py 和 tasks.py 。tasks.py 用来定义任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# tasks.py
import time
from celery import celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery( 'my_tasks' , broker = broker, backend = backend)
@app .task
def add(x, y):
print ( 'enter task' )
time.sleep( 3 )
return x + y
|
这些代码做了什么事。 broker 指定任务队列的消息中间件,backend 指定了任务执行结果的存储。app 就是我们创建的 celery 对象。通过 app.task 修饰器将 add 函数变成一个一部的任务。
1
2
3
4
5
6
7
8
|
# app.py
from tasks import add
if __name__ = = '__main__' :
print ( 'start task' )
result = add.delay( 2 , 18 )
print ( 'end task' )
print (result)
|
add.delay 函数将任务序列化发送到消息中间件。终端执行 python app.py 可以看到输出一个任务的唯一识别:
start task
end task
79ef4736-1ecb-4afd-aa5e-b532657acd43
这个只是将任务推送到 redis,任务还没被消费,任务会在 celery 队列中。
开启 celery woker 可以将任务进行消费:
1
|
celery worker - a tasks - l info # -a 后是模块名
|
a 参数指定了celery 对象的位置,l 参数指定woker的日志级别。
如果此命令在终端报错:
file "e:\workspace\.env\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task
tasks, accept, hostname = _loc
valueerror: not enough values to unpack (expected 3, got 0)
这是win 10 在使用 celery 4.x 的时候会有这个问题,解决方式可以是改用 celery 3.x 版本,或者按照 unable to run tasks under windows 上提供的方式,该issue提供了两种方式解决,一种是安装 eventlet 扩展:
1
2
|
pip install eventlet
celery - a <mymodule> worker - l info - p eventlet
|
另一种方式是添加个 forked_by_multiprocessing = 1 的环境变量(推荐这种方式):
1
2
|
import os
os.environ.setdefault( 'forked_by_multiprocessing' , '1' )
|
如果一切顺利,woker 正常启动,就能在终端看到任务被消费了:
[2018-11-27 13:59:27,830: info/mainprocess] received task: tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19]
[2018-11-27 13:59:27,831: warning/spawnpoolworker-2] enter task
[2018-11-27 13:59:30,835: info/spawnpoolworker-2] task tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19] succeeded in 3.0s: 20
说明我们的demo已经成功了。
使用配置文件
在上面的demo中,是将broker和backend直接写在代码中的,而 celery 还有其他配置,最好是写出配置文件的形式,基本配置项有:
- celery_default_queue:默认队列
- broker_url : 代理人的网址
- celery_result_backend:结果存储地址
- celery_task_serializer:任务序列化方式
- celery_result_serializer:任务执行结果序列化方式
- celery_task_result_expires:任务过期时间
- celery_accept_content:指定任务接受的内容序列化类型(序列化),一个列表;
整理一下目录结构,将我们的任务封装成包:
内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# __init__.py
import os
from celery import celery
os.environ.setdefault( 'forked_by_multiprocessing' , '1' )
app = celery( 'demo' )
# 通过 celery 实例加载配置模块
app.config_from_object( 'celery_app.celery_config' )
# celery_config.py
broker_url = 'redis://127.0.0.1:6379/1'
celery_result_backend = 'redis://127.0.0.1:6379/2'
# utc
celery_enable_utc = true
celery_timezone = 'asia/shanghai'
# 导入指定的任务模块
celery_imports = (
'celery_app.task1' ,
'celery_app.task2' ,
)
# task1.py
import time
from celery_app import app
@app .task
def add(x, y):
print ( 'enter task' )
time.sleep( 3 )
return x + y
# task2.py
import time
from celery_app import app
@app .task
def mul(x, y):
print ( 'enter task' )
time.sleep( 4 )
return x * y
# app.py
from celery_app import task1
if __name__ = = '__main__' :
pass
print ( 'start task' )
result = task1.add.delay( 2 , 18 )
print ( 'end task' )
print (result)
|
提交任务与启动worker:
1
2
|
$ python app.py
$ celery worker - a celery_app - l info
|
result = task1.add.delay(2, 18) 返回的是一个任务对象,通过 delay 函数的方式可以发现这个过程是非阻塞的,这个任务对象有一个方法:
1
2
3
4
5
6
7
8
|
r.ready() # 查看任务状态,返回布尔值, 任务执行完成, 返回 true, 否则返回 false.
r.wait() # 等待任务完成, 返回任务执行结果,很少使用;
r.get(timeout = 1 ) # 获取任务执行结果,可以设置等待时间
r.result # 任务执行结果.
r.state # pending, start, success,任务当前的状态
r.status # pending, start, success,任务当前的状态
r.successful # 任务成功返回true
r.traceback # 如果任务抛出了一个异常,你也可以获取原始的回溯信息
|
定时任务
定时任务的功能类似 crontab,可以完成每日统计任务等。首先我们需要配置一下 schedule,通过改造上面的配置文件,添加 celerybeat_schedule 配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import datetime
from celery.schedules import crontab
celerybeat_schedule = {
'task1-every-1-min' : {
'task' : 'celery_app.task1.add' ,
'schedule' : datetime.timedelta(seconds = 60 ),
'args' : ( 2 , 15 ),
},
'task2-once-a-day' : {
'task' : 'celery_app.task2.mul' ,
'schedule' : crontab(hour = 15 , minute = 23 ),
'args' : ( 3 , 6 ),
}
}
|
task 指定要执行的任务;schedule 表示计划的时间,datetime.timedelta(seconds=60) 表示间隔一分钟,这里其实也可以是 crontab(minute='*/1') 来替换;args 表示要传递的参数。
启动 celery beat:
1
|
$ celery worker - a celery_app - l info
|
我们目前是用两个窗口来执行 woker 和 beat 。当然也可以只使用一个窗口来运行(仅限linux系统):
1
|
$ celery - b - a celery_app worker - l info
|
celery.task 装饰器
1
2
3
|
@celery .task()
def name():
pass
|
task() 方法将任务修饰成异步, name 可以显示指定的任务名字;serializer 指定序列化的方式;bind 一个bool值,若为true,则task实例会作为第一个参数传递到任务方法中,可以访问task实例的所有的属性,即前面反序列化中那些属性。
1
2
3
|
@task (bind = true) # 第一个参数是self,使用self.request访问相关的属性
def add( self , x, y):
logger.info( self .request. id )
|
base 可以指定任务积累,可以用来定义回调函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import celery
class mytask(celery.task):
# 任务失败时执行
def on_failure( self , exc, task_id, args, kwargs, einfo):
print ( '{0!r} failed: {1!r}' . format (task_id, exc))
# 任务成功时执行
def on_success( self , retval, task_id, args, kwargs):
pass
# 任务重试时执行
def on_retry( self , exc, task_id, args, kwargs, einfo):
pass
@task (base = mytask)
def add(x, y):
raise keyerror()
exc:失败时的错误的类型;
task_id:任务的 id ;
args:任务函数的参数;
kwargs:参数;
einfo:失败时的异常详细信息;
retval:任务成功执行的返回值;
|
总结
网上找了一份比较常用的配置文件,需要的时候可以参考下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# 注意,celery4版本后,celery_broker_url改为broker_url
broker_url = 'amqp://username:passwd@host:port/虚拟主机名'
# 指定结果的接受地址
celery_result_backend = 'redis://username:passwd@host:port/db'
# 指定任务序列化方式
celery_task_serializer = 'msgpack'
# 指定结果序列化方式
celery_result_serializer = 'msgpack'
# 任务过期时间,celery任务执行结果的超时时间
celery_task_result_expires = 60 * 20
# 指定任务接受的序列化类型.
celery_accept_content = [ "msgpack" ]
# 任务发送完成是否需要确认,这一项对性能有一点影响
celery_acks_late = true
# 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据
celery_message_compression = 'zlib'
# 规定完成任务的时间
celeryd_task_time_limit = 5 # 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
# celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
celeryd_concurrency = 4
# celery worker 每次去rabbitmq预取任务的数量
celeryd_prefetch_multiplier = 4
# 每个worker执行了多少任务就会死掉,默认是无限的
celeryd_max_tasks_per_child = 40
# 这是使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中
# celerybeat_scheduler = 'djcelery.schedulers.databasescheduler'
# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
celery_default_queue = "default"
# 设置详细的队列
celery_queues = {
"default" : { # 这是上面指定的默认队列
"exchange" : "default" ,
"exchange_type" : "direct" ,
"routing_key" : "default"
},
"topicqueue" : { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
"routing_key" : "topic.#" ,
"exchange" : "topic_exchange" ,
"exchange_type" : "topic" ,
},
"task_eeg" : { # 设置扇形交换机
"exchange" : "tasks" ,
"exchange_type" : "fanout" ,
"binding_key" : "tasks" ,
},
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.hongweipeng.com/index.php/archives/1676/