celery异步任务、定时任务

时间:2022-04-27 14:24:33

阅读目录

一 什么是Celery?

1、介绍

  Celery是一个简单、灵活且可靠的,并且可以处理大量消息的分布式系统!专注于两个方面,一是实时处理的异步任务队列,二是同时也支持任务调度,任务调度其实就是定时任务。

2、Celery架构

celery异步任务、定时任务

  Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。首先是由用户User向broker提交任务,然后works里面的多个work去broker里面取任务执行,最后如果有结果就返回。

消息中间件:

  celery本身不提供消息服务,但是可以很方便地和第三方提供的消息中间件集成。上述图中的broker代表的就是消息中间件,我们可以把broker'比喻成一个箱子,箱子里面装满了任务,专业术语叫做消息队列。消息中间件包括著名的RabbitMQ,Redis(redis不只是用来做缓存数据库,其实他还可以用来做消息队列)等...

任务执行单元:

  worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中,一个节点指的是一台机器,worker里面可以包含多个work,我们可以把每个work当做是一台机器,这样就是把多个work分布式部署在多台机器上。

任务结果存储:

  Task result store是用来存储worker执行任务返回的结果,Celery支持以下不同方式存储任务的结果,包括AMQP,redis等。由此可见,我们既可以使用redis来做broker,也可以用redis来做Task result。因为我们目前还没有学rabbitMQ,所以我们将采用的是redis。

版本支持情况:

Celery version 4.0 runs on
Python ❨2.7, 3.4, 3.5❩
PyPy ❨5.4, 5.5❩
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required. If you’re running an older version of Python, you need to be running an older version of Celery: Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

二 Celery的使用场景

  celery的应用场景只有两种,它也只能干这两种活,其他的一律不行。比如你们刚写完的bbs项目,里面注册功能有个发送邮件的功能,发送邮件的原理其实就是往另一台的服务器发送一个请求,等待另一台服务器发给你返回结果才真正能知道邮件是否发送成功!那这个过程有可能因为网络的原因导致极其漫长,所以我们可以通过异步任务,让发送邮件这个耗时的任务异步提交,先返回给用户一个发送成功,其实有没有成功你不需要管,只要等最后拿到的结果就可以了。

  

一 异步任务:

  异步任务指的是将耗时操作的任务交给Celery去异步执行,比如发短信/邮件、消息推送、音/视频处理等等...

  例如: cnbolgs.com 博客园内的发布随笔时,后台其实是通过异步任务去把博主发布的随笔渲染到主页上

  

二 定时任务

  定时任务指的是定时执行某件事情,比如定时统计每天的数据、定时收集程序的日志操作等...

三 Celery的安装配置

# pip3 install celery

# 消息中间件: RabbitMQ/Redis

# 创建celery对象,通过对象来调用相应的方法进行操作.
# 1.参数backend指的是接受执行任务返回的结果
# 2.broker指的是集成任务的一个箱子。
app=Celery('任务名称', backend="Task result", broker="broker")

四 Celery异步任务

1、基本使用

celery_demo_stak.py

'''
基本使用 创建项目: celery_demo 创建py文件: celery_demo_stak.py
'''
import celery import time backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2' cel = celery.Celery('demo1', backend=backend, broker=broker) # 设置任务
@cel.task
def add(x, y): for line in y:
x += line
time.sleep(1)
return x

创建任务

add_task.py

'''

添加任务进broker

'''

from celery_demo_task import add

# 给broker添加任务
result = add.delay(1, 10) # 每添加一个任务都会产生一个id,可以通过id查看任务是否执行完毕并返回结果
print(result.id)

添加任务,并获取任务ID

run.py

'''
创建run.py文件:
用来执行任务!相当于发送命令让work去执行broker里面的任务! 或者使用命令执行: celery worker -A celery_demo_task -l info
  
注意:
  在windows下执行命令为: celery worker -A celery_demo_task -l info -P eventlet
  eventlet: 因为celery是一个小公司开发出来的软件,只支持linux系统,windows系统他本身是不支持的,但是有些开发人员会在windows的环境下进行开发,所以推出了一个第三方的插件,叫做eventlet。用来在windows的环境下可以执行celery。 ''' from celery_demo_task import cel if __name__ == "__main__":
cel.worker_main()
# cel.worker_main(argv=['--loglevel=info'])

work执行任务

result.py

'''
用于查看用户执行返回的结果
''' from celery_demo_task import cel
from celery.result import AsyncResult # 实例化得到一个异步接受结果对象,实例化的过程中把任务的id传进去即可查看执行结果
async = AsyncResult(id='...', app=cel) if async.successful():
result = async.get()
# 打印返回的结果
print(result) # 将返回的结果删除
result.forget() elif async.failed():
print('执行失败!') elif async.status == 'PENDING':
print('任务等待中被执行') elif async.status == 'RETRY':
print('任务异常后正在重试') elif async.status == 'STARTED':
print('任务已经开始被执行')

查看任务状态并取结果

2、多任务结构

  上面celery的基本使用是把一个任务写在一个文件夹内,多任务结构指的是把每一个任务都拆分到一个文件里。

pro_cel
├── celery_task #celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件,必须叫这个名字
│ └── tasks1.py # 所有任务函数
│ └── tasks2.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务

celery.py

用来绑定多任务结构的celery文件

task1.py

from celery_task.celery import cel

@cel.task
def add1(x, y): print('这是task1里面的add任务')
return x + y

第一个任务

task2.py

from celery_task.celery import cel

@cel.task
def add2(x, y):
print('这是task2中的add任务!') return x * y

第二个任务

send_tasks.py

from celery_task.task1 import add1
from celery_task.task2 import add2
from datetime import datetime # 1、普通任务
result1 = add1.delay(5, 7)
print(result1) result2 = add2.delay(5, 8)
print(result2)

发送多个任务获取id

check_result.py

from celery.result import AsyncResult
from celery_task.celery import cel async = AsyncResult(id='ca3c39f7-c7bc-4716-8053-ad0f20bde9fd', app=cel) if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除,执行完成,结果不会自动删除
# async.revoke(terminate=True) # 无论现在是什么时候,都要终止
# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

查看任务执行返回结果

五 Celery定时任务

1、指定时间发送定时任务

send_tasks.py

发送定时任务设置
from celery_task.task1 import add1
from datetime import datetime # 3、延时发送
# 当前时间
ctime = datetime.now() # 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta # 延时10秒后发送
time_delay = timedelta(seconds=10) # 发送时间 = 当前时间 + 10秒时间
task_time = utc_ctime + time_delay
'''
这里的timedelta是啥意思,里面有很多参数,
有days,seconds等等,我是用当前时间 + 10秒之后的时间执行你的任务。
上面我们是自己指定一个时间,下面我们是指定当前时候之后的多长时间执行任务,
比如三个小时之后,我就设置hour=3。
''' # 使用apply_async并设定时间
result = add1.apply_async(args=[4, 3], eta=task_time)
print(result.id)

延迟时间发送定时任务

2、每隔多久发送一次定时任务

celery.py

from celery import Celery
from datetime import timedelta from celery.schedules import crontab cel = Celery('celery_test',
broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2',
# include下包含两个任务文件,去相应的.py文件中找任务,对多个任务做分类
include=['celery_task.task1', 'celery_task.task2']) # 时区
cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC
cel.conf.enable_utc = False cel.conf.beat_schedule = {
# 任务的名字随意命名
'add-every-3-seconds': {
# 执行tasks1下的add1函数
'task': 'celery_task.task1.add1',
# 每隔3秒执行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=3),
# 传递参数
'args': ('test', '')
},
}

每隔多久发送一次定时任务

from celery import Celery

from celery.schedules import crontab

cel = Celery('celery_test',
broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2',
# include下包含两个任务文件,去相应的.py文件中找任务,对多个任务做分类
include=['celery_task.task1', 'celery_task.task2']) # 时区
cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC
cel.conf.enable_utc = False cel.conf.beat_schedule = {
# 名字随意命名
'add-every-12-seconds': {
'task': 'celery_task.task1.add1',
# 指定每5月4号,0点50分执行
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'schedule': crontab(minute=50, hour=0, day_of_month=4, month_of_year=5),
# 传递给task1.add1的参数
'args': (16, 16)
},
}

每年的指定时间发送定时任务

六 在Django中使用Celery

  Celery的异步任务和定时任务都是在Django中使用,Django中自带一个Django-celery模块。他可以让你在Django项目中使用celery异步任务以及定时任务。在Django中使用异步任务和定时任务就相当于我们测试的时候写的send_task.py一样。

1、Django中使用异步任务

app01.task.py

from celery import task

'''
使用celery自带的task方法装饰在设置的函数任务中
''' @task
def add(a,b):
# with open('a.text', 'a', encoding='utf-8') as f:
# f.write('a')
# print(a+b)
return a+b

task任务

app01.views.py

from django.shortcuts import render,HttpResponse
from app01.task import add
from datetime import datetime
def test(request):
result=add.delay(2,3)
print(result)
return HttpResponse('ok')

视图函数里面提交异步任务

Django项目.django_celery.celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
'app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

在Django项目下设置异步任务配置

2、Django中使用定时任务

from django.shortcuts import render,HttpResponse
from app01.task import add
from datetime import datetime
def test(request):
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
# 延迟5秒发送一次任务
time_delay = timedelta(seconds=5)
task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
return HttpResponse('ok')

Django中使用定时任务