celery分布式应用与定时任务

时间:2022-08-12 07:51:20
celery的分布式应用,就是一个应用里定义了很多不同功能的函数,而哪个任务由哪个worker执行是通过消息路由机制实现的
创建异步任务后,exchange通过routing_key把任务消息发送到不同的消息队列,worker根据routing_key判断取哪个消息队列取任务
消息在exchange、broker和worker之间传递,是在一个配置文件里定义的,配置文件定义消息队列和worker监控对应的消息队列
 
1. 应用
[root@localhost sbin]# cat tasks.py
#!/usr/bin/env python

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

@app.task
def task1(x, y):
    return x*y

@app.task
def task2(x, y):
    return x/y

@app.task
def add(x,y ):
    return x+y

 

 

2. 配置文件

[root@localhost sbin]# cat celeryconfig.py
#!/usr/bin/env python

from kombu import Exchange,Queue

BROKER_URL = "redis://127.0.0.1:6379/0"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/1"

CELERY_QUEUES = (                                                                    //定义消息队列
    Queue("default", Exchange("default"), routing_key="default"),
    Queue("for_task_1", Exchange("for_task_1"), routing_key="for_task_1"),
    Queue("for_task_2",Exchange("for_task_2"),routing_key="for_task_2")
)

CELERY_ROUTES = {                                                                 //定义worker去哪个消息队列里取任务
    'tasks.task1': {"queue":"for_task_1", "routing_key":"for_task_1"},
    'tasks.task2': {"queue":"for_task_2", "routing_key":"for_task_2"}
}

 

 

3. 启动worker

[root@localhost sbin]# celery -A tasks worker -l info -n worker1.%h -Q for_task_1               //-n指定一个名字 -Q执行worker监控哪个消息队列
[root@localhost sbin]# celery -A tasks worker -l info -n worker2.%h -Q for_task_2
[root@localhost sbin]# celery -A tasks worker -l info -n worker.%h -Q celery                   //没有指定消息队列,就监控默认的消息队列 

 

 

4. 创建异步任务

[root@localhost sbin]# cat client.py
#!/usr/bin/env python

from tasks import task1, task2, add

r1 = task1.delay(2, 3)
while 1:
    if r1.ready() and r1.successful():
        print r1.result
        break

r2 = task2.delay(9, 3)
while 1:
    if r2.ready() and r2.successful():
        print r2.result
        break

r3 = add.delay(1, 2)
while 1:
    if r3.ready() and r3.successful():
        print r3.result
        break


[root@localhost sbin]# python client.py
6
3
3

 

 

celery的定时任务

1. 配置文件增加CELERYBEAT_SCHEDULE变量

[root@localhost sbin]# cat celeryconfig.py
#!/usr/bin/env python

from kombu import Exchange,Queue

BROKER_URL = "redis://127.0.0.1:6379/0"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/1"

CELERY_QUEUES = (
    Queue("default", Exchange("default"), routing_key="default"),
    Queue("for_task_1", Exchange("for_task_1"), routing_key="for_task_1"),
    Queue("for_task_2",Exchange("for_task_2"),routing_key="for_task_2")
)

CELERY_ROUTES = {
    'tasks.task1': {"queue":"for_task_1", "routing_key":"for_task_1"},
    'tasks.task2': {"queue":"for_task_2", "routing_key":"for_task_2"}
}

CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
    'task1_schedule' : {
        'task':'tasks.task1',
        'schedule':5,               //5秒执行一次
        'args':(5,6)
    },
    'task2_scheduler' : {
        'task':"tasks.task2",
        "schedule":10,
        "args":(6, 2)
    },
    'add_schedule': {
        "task":"tasks.add",
        "schedule":15,
        "args":(1,2)
    }
}

 

2. 启动对应的worker 启动redis

 

3. 启动定时任务

[root@localhost sbin]# celery -A tasks beat
celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2017-12-23 15:38:41
Configuration ->
    . broker -> redis://127.0.0.1:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)             //定时任务间隔最大300秒