celery+RabbitMQ 实战记录2—工程化使用

时间:2021-08-19 13:57:12

上篇文章中,已经介绍了celery和RabbitMQ的安装以及基本用法。

本文将从工程的角度介绍如何使用celery。

1.配置和启动RabbitMQ

请参考celery+RabbitMQ实战记录

2. 安装和使用celery

2.1 创建虚拟环境,并安装celery

$ mkdir celery_demo
$ cd celery_demo
$ virtualenv -p python3 venv3
$ ./venv3/bin/pip install celery

项目的目录结构说明:

-- celery_demo
-- api.py
-- celeryconfig.py
-- rocket
-- celery.py
-- tasks.py

2.2 配置celery

创建配置文件 celeryconfig.py,里面包含CELERY_IMPORTSBROKER_URLCELERYD_LOG_FORMATCELERY_ROUTES.

# celeryconfig.py

RABBIT_MQ = {
'HOST': '127.0.0.1',
'PORT': 5672,
'USER': 'test',
'PASSWORD': '123456'
} CELERY_IMPORTS = ("rocket.tasks", ) BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT']) CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s' CELERY_ROUTES = {
'rocket.tasks.add': {'queue': 'sunday'},
}

其中,参数定义如下:

  • CELERY_IMPORTS 导入的task

  • BROKER_URL指定了broker信息,即消息队列的地址。

  • CELERYD_LOG_FORMAT 指定了日志格式。

  • CELERY_ROUTES 指定了路由信息,即调用rocket.tasks.add后,消息具体放入哪个队列,这里是队列名称为sunday

2.3 创建celery实例

创建目录

mkdir rocket

在rocket目录下,创建文件celery.py

from celery import Celery

app = Celery("orange", backend='amqp')
app.config_from_object("celeryconfig")

创建实例,并读取配置。

2.4 定义tasks

在rocket目录下,创建文件tasks.py

from rocket.celery import app

@app.task
def add(x, y):
return x + y

定义tasks。

2.5 启动celery(即消费者)

确认当前所在的目录:

$ pwd
/workspace/celery_demo

启动消费者

$ ./venv3/bin/celery worker  -A rocket -Q sunday --loglevel=info  -f app.log

/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend. alternative='Please use RPC backend or a persistent backend.') celery@admindeMacBook-Pro-2.local v4.3.0 (rhubarb) Darwin-18.2.0-x86_64-i386-64bit 2019-04-22 20:53:56 [config]
.> app: orange:0x10abdb6d8
.> transport: amqp://test:**@127.0.0.1:5672/myvhost
.> results: amqp://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker) [queues]
.> sunday exchange=sunday(direct) key=sunday [tasks]
. rocket.tasks.add

2.6 生产者

创建文件api.py,会调用apply_async向队列中投放消息。

# api.py
from rocket.tasks import add print("start...") result = add.apply_async((1, 2), expires=10) print("result:", result)
print(result.ready()) print("end...")

执行api.py,启动生产者

./venv3/bin/python api.py
start...
result: 6ef453e6-1797-4dd7-a2fc-bd6ef8096fde
True
end...

2.6 查看结果

查看日志文件app.log

[2019-04-23 09:53:46,406] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost
[2019-04-23 09:53:46,420] [INFO] mingle: searching for neighbors
[2019-04-23 09:53:47,455] [INFO] mingle: all alone
[2019-04-23 09:53:47,474] [INFO] celery@admindeMacBook-Pro-2.local ready.
[2019-04-23 09:53:55,850] [INFO] Received task: rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] expires:[2019-04-23 01:54:05.817853+00:00]
[2019-04-23 09:53:55,876] [INFO] Task rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] succeeded in 0.023095715092495084s: 3

参考

using celery in project