celery的分布式应用,就是一个应用里定义了很多不同功能的函数,而哪个任务由哪个worker执行是通过消息路由机制实现的
创建异步任务后,exchange通过routing_key把任务消息发送到不同的消息队列,worker根据routing_key判断取哪个消息队列取任务
消息在exchange、broker和worker之间传递,是在一个配置文件里定义的,配置文件定义消息队列和worker监控对应的消息队列
1. 应用
[root@localhost sbin]# cat tasks.py #!/usr/bin/env python from celery import Celery app = Celery() app.config_from_object('celeryconfig') @app.task def task1(x, y): return x*y @app.task def task2(x, y): return x/y @app.task def add(x,y ): return x+y
2. 配置文件
[root@localhost sbin]# cat celeryconfig.py #!/usr/bin/env python from kombu import Exchange,Queue BROKER_URL = "redis://127.0.0.1:6379/0" CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/1" CELERY_QUEUES = ( //定义消息队列 Queue("default", Exchange("default"), routing_key="default"), Queue("for_task_1", Exchange("for_task_1"), routing_key="for_task_1"), Queue("for_task_2",Exchange("for_task_2"),routing_key="for_task_2") ) CELERY_ROUTES = { //定义worker去哪个消息队列里取任务 'tasks.task1': {"queue":"for_task_1", "routing_key":"for_task_1"}, 'tasks.task2': {"queue":"for_task_2", "routing_key":"for_task_2"} }
3. 启动worker
[root@localhost sbin]# celery -A tasks worker -l info -n worker1.%h -Q for_task_1 //-n指定一个名字 -Q执行worker监控哪个消息队列 [root@localhost sbin]# celery -A tasks worker -l info -n worker2.%h -Q for_task_2 [root@localhost sbin]# celery -A tasks worker -l info -n worker.%h -Q celery //没有指定消息队列,就监控默认的消息队列
4. 创建异步任务
[root@localhost sbin]# cat client.py #!/usr/bin/env python from tasks import task1, task2, add r1 = task1.delay(2, 3) while 1: if r1.ready() and r1.successful(): print r1.result break r2 = task2.delay(9, 3) while 1: if r2.ready() and r2.successful(): print r2.result break r3 = add.delay(1, 2) while 1: if r3.ready() and r3.successful(): print r3.result break [root@localhost sbin]# python client.py 6 3 3
celery的定时任务
1. 配置文件增加CELERYBEAT_SCHEDULE变量
[root@localhost sbin]# cat celeryconfig.py #!/usr/bin/env python from kombu import Exchange,Queue BROKER_URL = "redis://127.0.0.1:6379/0" CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/1" CELERY_QUEUES = ( Queue("default", Exchange("default"), routing_key="default"), Queue("for_task_1", Exchange("for_task_1"), routing_key="for_task_1"), Queue("for_task_2",Exchange("for_task_2"),routing_key="for_task_2") ) CELERY_ROUTES = { 'tasks.task1': {"queue":"for_task_1", "routing_key":"for_task_1"}, 'tasks.task2': {"queue":"for_task_2", "routing_key":"for_task_2"} } CELERY_TIMEZONE = 'UTC' CELERYBEAT_SCHEDULE = { 'task1_schedule' : { 'task':'tasks.task1', 'schedule':5, //5秒执行一次 'args':(5,6) }, 'task2_scheduler' : { 'task':"tasks.task2", "schedule":10, "args":(6, 2) }, 'add_schedule': { "task":"tasks.add", "schedule":15, "args":(1,2) } }
2. 启动对应的worker 启动redis
3. 启动定时任务
[root@localhost sbin]# celery -A tasks beat celery beat v4.1.0 (latentcall) is starting. __ - ... __ - _ LocalTime -> 2017-12-23 15:38:41 Configuration -> . broker -> redis://127.0.0.1:6379/0 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%WARNING . maxinterval -> 5.00 minutes (300s) //定时任务间隔最大300秒