celery 快速入门教程 celery 定时器

时间:2021-08-22 14:22:20

当然首先得安装celery和rabbitmq-server,如果有redis需要安装redis

安装Redis
$ yum install redis
启动 Redis $redis-server
检查Redis是否在工作? $redis-cli
这将打开一个Redis提示,如下图所示: redis 127.0.0.1:6379>
上面的提示127.0.0.1是本机的IP地址,6379为Redis服务器运行的端口。现在输入PING命令,如下图所示。 redis 127.0.0.1:6379> ping
PONG
这说明你已经成功地安装Redis在您的机器上。

  

tasks.py

from celery import Celery
from time import sleep app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
#app = Celery('tasks', backend='redis://localhost', broker='amqp://guest@localhost//')
#app = Celery('tasks', backend='redis://localhost:6379/1', broker='amqp://guest@localhost//')
#app = Celery('tasks', backend='redis://localhost:6379/1', broker='redis://localhost:6379/0')
#app = Celery('taskwb', backend='amqp://guest@localhost:5672/0', broker='amqp://guest@localhost:5672/1') @app.task
def add(x, y):
sleep(5)
return x + y

保存后执行celery -A tasks worker --loglevel=info,可以看到提示页面

[wenbin celery]$ celery -A tasks worker --loglevel=info
[-- ::,: WARNING/MainProcess] /usr/lib/python2./site-packages/celery/apps/worker.py:: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice. If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@wb-test-multiple-portal v3.1.13 (Cipater)
---- **** -----
--- * *** * -- Linux-2.6.-.el6.x86_64-x86_64-with-centos-6.6-Final
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x1071e50
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp
- *** --- * --- .> concurrency: (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery

然后新开一个窗口进入python命令行:

>>>from tasks import add

>>>res = add.delay(4, 3)

可以看到之前启动celery的那边有信息输出,并且过5s后执行了结果(因为我们add中sleep了5秒)

[2016-03-07 15:21:43,153: INFO/MainProcess] Received task: tasks.add[fd523296-8b99-4530-a77e-3fa56cc19a5d]
[2016-03-07 15:21:48,184: INFO/MainProcess] Task tasks.add[fd523296-8b99-4530-a77e-3fa56cc19a5d] succeeded in 5.02967603202s: 7

说明我们的demo已经成功了。

有一个小错误自己发现的,我之后又开了一个celery运行的窗口(其实是忘记关了之前的),发现在python命令行执行delay的时候只有一半成功了。。。这是个神奇的东西,能够自动检测到另一个的存在,然后分配任务;然后我又再运行一个celery,这样发现原来3个celery都能轮流来进行任务的执行,简直是个神奇的东西。。。

定时器功能

使用定时器功能首先得配置一下schedule,刚刚我们是直接在Celery函数加入配置,现在我们专门用一个文件来放配置文件,schedule也会写在这里面。

修改tasks.py

from celery import Celery
from time import sleep
import celeryconfig app = Celery('tasks')#, backend='amqp', broker='amqp://guest@localhost//')
app.config_from_object('celeryconfig') @app.task
def add(x, y):
sleep()
return x + y

增加配置文件celeryconfig.py

from celery.schedules import crontab

BROKER_URL = 'amqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'amqp://'
CELERYBEAT_SCHEDULE={
"every-1-minute": {
'task': 'tasks.add',
'schedule': crontab(minute='*/1'),
'args': (,)
}
}

表示一分钟触发一次add的函数,args是传入的参数,表示一分钟执行一次add(5,6),注意如果再添加一个任务,不能与every-1-minute重复,不然只有最后一个生效了。

然后执行celery -A tasks worker -B --loglevel=info    就能够增加触发beat任务了,会在本地生成一个celerybeat-schedule文件。最好在-B后面加一个 -s /tmp/celerybeat-schedule  ,不然很可能导致当前目录没有写权限而报permission refused

[wenbin@wb-test-multiple-portal celery]$ celery -A tasks worker -B --loglevel=info
[-- ::,: WARNING/MainProcess] /usr/lib/python2./site-packages/celery/apps/worker.py:: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice. If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@wb-test-multiple-portal v3.1.13 (Cipater)
---- **** -----
--- * *** * -- Linux-2.6.-.el6.x86_64-x86_64-with-centos-6.6-Final
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x2a095d0
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp://
- *** --- * --- .> concurrency: (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery [tasks]
. tasks.add [-- ::,: INFO/Beat] beat: Starting...
[-- ::,: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[-- ::,: INFO/MainProcess] mingle: searching for neighbors
[-- ::,: INFO/MainProcess] mingle: all alone
[-- ::,: WARNING/MainProcess] celery@wb-test-multiple-portal ready.
[-- ::,: INFO/Beat] Scheduler: Sending due task every--minute (tasks.add)
[-- ::,: INFO/MainProcess] Received task: tasks.add[a81182bd--4d4a-b3cd-a81f7900a12b]
[-- ::,: INFO/MainProcess] Task tasks.add[a81182bd--4d4a-b3cd-a81f7900a12b] succeeded in .02911209001s:

当然也可以分开执行celery和beat,当然需要两个窗口了

celery -A tasks worker --loglevel=info

celery -A tasks beat

参考文献:

http://docs.jinkan.org/docs/celery/index.html

http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

http://my.oschina.net/hochikong/blog/419191?p={{currentPage-1}}

http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html