上篇文章中,已经介绍了celery和RabbitMQ的安装以及基本用法。
本文将从工程的角度介绍如何使用celery。
1.配置和启动RabbitMQ
2. 安装和使用celery
2.1 创建虚拟环境,并安装celery
$ mkdir celery_demo
$ cd celery_demo
$ virtualenv -p python3 venv3
$ ./venv3/bin/pip install celery
项目的目录结构说明:
-- celery_demo
-- api.py
-- celeryconfig.py
-- rocket
-- celery.py
-- tasks.py
2.2 配置celery
创建配置文件 celeryconfig.py,里面包含CELERY_IMPORTS
、BROKER_URL
、CELERYD_LOG_FORMAT
、CELERY_ROUTES
.
# celeryconfig.py
RABBIT_MQ = {
'HOST': '127.0.0.1',
'PORT': 5672,
'USER': 'test',
'PASSWORD': '123456'
}
CELERY_IMPORTS = ("rocket.tasks", )
BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT'])
CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s'
CELERY_ROUTES = {
'rocket.tasks.add': {'queue': 'sunday'},
}
其中,参数定义如下:
CELERY_IMPORTS
导入的taskBROKER_URL
指定了broker信息,即消息队列的地址。CELERYD_LOG_FORMAT
指定了日志格式。CELERY_ROUTES
指定了路由信息,即调用rocket.tasks.add
后,消息具体放入哪个队列,这里是队列名称为sunday
。
2.3 创建celery实例
创建目录
mkdir rocket
在rocket目录下,创建文件celery.py
from celery import Celery
app = Celery("orange", backend='amqp')
app.config_from_object("celeryconfig")
创建实例,并读取配置。
2.4 定义tasks
在rocket目录下,创建文件tasks.py
from rocket.celery import app
@app.task
def add(x, y):
return x + y
定义tasks。
2.5 启动celery(即消费者)
确认当前所在的目录:
$ pwd
/workspace/celery_demo
启动消费者
$ ./venv3/bin/celery worker -A rocket -Q sunday --loglevel=info -f app.log
/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
alternative='Please use RPC backend or a persistent backend.')
celery@admindeMacBook-Pro-2.local v4.3.0 (rhubarb)
Darwin-18.2.0-x86_64-i386-64bit 2019-04-22 20:53:56
[config]
.> app: orange:0x10abdb6d8
.> transport: amqp://test:**@127.0.0.1:5672/myvhost
.> results: amqp://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> sunday exchange=sunday(direct) key=sunday
[tasks]
. rocket.tasks.add
2.6 生产者
创建文件api.py,会调用apply_async
向队列中投放消息。
# api.py
from rocket.tasks import add
print("start...")
result = add.apply_async((1, 2), expires=10)
print("result:", result)
print(result.ready())
print("end...")
执行api.py,启动生产者
./venv3/bin/python api.py
start...
result: 6ef453e6-1797-4dd7-a2fc-bd6ef8096fde
True
end...
2.6 查看结果
查看日志文件app.log
[2019-04-23 09:53:46,406] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost
[2019-04-23 09:53:46,420] [INFO] mingle: searching for neighbors
[2019-04-23 09:53:47,455] [INFO] mingle: all alone
[2019-04-23 09:53:47,474] [INFO] celery@admindeMacBook-Pro-2.local ready.
[2019-04-23 09:53:55,850] [INFO] Received task: rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] expires:[2019-04-23 01:54:05.817853+00:00]
[2019-04-23 09:53:55,876] [INFO] Task rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] succeeded in 0.023095715092495084s: 3