Celery工具

时间:2024-09-21 22:06:56

什么是Celery

Celery的功能

Celery是基于python实现的第三方组件,可以实现定时任务、周期任务等。

Celery的组成

Celery的角色

- 任务,创建或发布任务。

- 使用redis/rabbitMQ进行任务记录(broker)和任务返回结果(backend)的缓存。

- worker,主动执行任务,主公返回结果。

简单实例

from celery import Celery
import time my_task = Celery(
"task",
broker = "redis://127.0.0.1:6379",
backend = "redis://127.0.0.1:6379",
)
@my_task
def func1(x,y):
time.sleep(15)
return x+y
from work import func1
from celery.result import AsyncResult
from work import my_task res = func1.delay(2,4) async_task = AsyncResult(
app = "my_task",
id = res.id,
)
if async_task.successiful():
result = async_task.get()
print(result)
else:
print("等待任务完成")

注意:如果是在windows平台使用celery需要下载eventlet

work的启动启动不是直接python work.py,而是先celery worker -A work -l INFO -P eventlet

Celery的项目目录规范

在celery中,项目中需要一个文件夹,文件夹的名字没有强制规范,但是其中必须要有一个配置worker的名叫celery的文件。

celey.py实例

from celery import Celery
celery_task = Celery(
"task",
broker = "redis://127.0.0.1:6379",
backend = "redis://127.0.0.1:6379",
include = ["Celery_task.task_one","Celery_task.task_two"]
) #Celery_task是我定义的文件夹名

celery.py的同级文件还有将要运行的脚本,与文件夹同级的还有一个调用文件,用来调用不同任务。

此时使用celery就通过Celery文件夹进行启动。

celery worker -A Celery_task -l INFO -p eventlet -c 2

celery 默认启动五个线程,-c参数就是设置线程数的

Celery的定时任务和周期任务

celery的定时任务

celery的定时任务是通过apply_async来实现的,apply_async接受两个参数,一个是任务需要的参数,另一个是任务的定时时间,这里的时间指的是utc时间。

utc_time = datatime.datatime.utcfromtimestamp(time.time())
add_time = datatime.timedelta(second=2) #定时的时间
utc_time+=add_time
res = task_one.apply_async(args=(1,2),eta = utc_time)

celery的周期任务

周期任务就是周期性的生产出任务交给worker去执行,增加celery.py文件的内容如下。

from celery.schedules import crontab
celery_task.conf.beat_schedule = {
"ten_task":{
"task":"Celery_task.task_one.one", #这是目录
"schedule":crontab(hour=24),
"args":(2,4),
}
}

当我们使用celery实现周期任务时,不能直接创建worker,需要先创建创建任务的beat工厂。

celery beat -A Celery_task