在Django项目中使用djcelery model api创建和更新定时任务--实际使用版本

时间:2021-07-05 07:49:41

在Django项目中使用djcelery model api创建和更新定时任务--实际使用版本

环境说明

1. 依赖

python 2.7
django==1.8.16
celery==3.1.25
Django==1.8.16
django-celery==3.1.17
djangorestframework==3.5.3
django-filter==1.0.0
django-crispy-forms

2. 设置

在实际使用中我们使用分离式的设置,即celery的设置和初始化与tasks分开
首先,要设置django中的django.conf,即settings.py

djcelery.setup_loader()
# BROKER_URL = 'django://' # 直接使用django做broker生产环境不建议,建议使用redis或者rabbitMQ
BROKER_URL = 'redis://10.xx.xx.xx:6379/0' # broker使用reids
# 允许的格式
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'yaml']
# CELERY_TASK_SERIALIZER = 'json'
# CELERY_RESULT_SERIALIZER = 'json'

# 定时任务
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_RESULT_BACKEND = 'redis://10.xx.xx.xx:6379/1'
# 不用UTC
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = 'Asia/Shanghai'
# 任务结果的时效时间,默认一天
CELERY_TASK_RESULT_EXPIRES = 10
# log路径
CELERYD_LOG_FILE = BASE_DIR + "/logs/celery/celery.log"
# beat log路径
CELERYBEAT_LOG_FILE = BASE_DIR + "/logs/celery/beat.log"

以上是我在实际使用时的设置,更多详细设置可以参考:celery文档

创建celery.py文件

在django的项目目录中创建celery.py(与settings.py在同一级目录)

# -*- coding: UTF-8 -*-

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'syslog.settings')

from django.conf import settings # noqa

app = Celery('xxx')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

这种设置方法可以让celery自动在所有app中查找tasks文件,比较适合多人多APP同时开发的中大型项目
详情参考:Using Celery with Django

创建实际任务tasks.py

首先,创建一个django的app,然后在app目录下创建tasks.py文件
我们先简单定义一个加法,用以说明后面的例子足够了

from xxx.celery import app
@app.task
def add(x, y):
return x + y

创建调用djcelery model api的视图

from djcelery import models as celery_models

def _create_task(name, task, task_args, crontab_time):
'''
name # 任务名字
task # 执行的任务 "myapp.tasks.add"
task_args # 任务参数 {"x":1, "Y":1}

crontab_time # 定时任务时间 格式:
{
'month_of_year': * # 月份
'day_of_month': * # 日期
'hour': * # 小时
'minute':*/2 # 分钟
}
'''

# task任务, created是否定时创建
task, created = celery_models.PeriodicTask.objects.get_or_create(name=name, task=task)
# 获取 crontab
crontab = celery_models.CrontabSchedule.objects.filter(**crontab_time).first()
if crontab is None:
# 如果没有就创建,有的话就继续复用之前的crontab
crontab = celery_models.CrontabSchedule.objects.create(**crontab_time)
task.crontab = crontab # 设置crontab
task.enabled = True # 开启task
# task.args = [int(x) for x in json.loads(task_args)] # 传入task参数
task.args = [int(x) for x in task_args] # 传入task参数
task.save()
return True

def create(self, request, *args, **kwargs):
tasks = ['event_alarm', 'continue_event_alarm', 'condition_alarm']
blog_item_id = request.data['blogitem_id']
event_type = request.data['event_type']
intval = request.data['intval']
name = str(blog_item_id)
if event_type == 0:
task = '.'.join(['syslogalery.tasks', tasks[int(event_type)]])
elif event_type == 1:
task = '.'.join(['syslogalery.tasks', tasks[int(event_type)]])
elif event_type == 2:
task = '.'.join(['syslogalery.tasks', tasks[int(event_type)]])
task_args = [blog_item_id, intval]
crontab_time = {
'month_of_year': '*', # 月份
'day_of_month': '*', # 日期
'hour': '*', # 小时
'minute': '*/' + str(intval) # 分钟
}
if _create_task(name, task, task_args, crontab_time):
return JsonResponse({'code': 200, 'msg': 'create success'})
else:
return JsonResponse({'code': 400, 'msg': 'failed'})

遇到的问题

在上述开发中,直接使用celery -A xxx worker -l info -B,自定义的定时不起作用,所有任务都是以5s为定时执行,不太明白为什么
原因:见2017.2.24更新

解决方法:
使用以下命令启动:

python2.7 manage.py celery -A syslog worker -B -l info

2017.2.24 更新

定时不生效问题解决方法

偶然一个机会,再次部署前面写的使用celery定时调度的程序,发现又出现了定时不起作用,全是每个5s执行一次的问题,正好有时间就仔细查google下发现好多人遇到这个问题,并测试了解决方法:
针对UTC的设置:CELERY_ENABLE_UTC = True
celery issues#943: Celerybeat runs periodic tasks every 5 seconds regardless of interval

There’s definitely a bug somewhere in celery.schedules.crontab.remaining_delta() or surrounding it and it is related to timezones. I just have no idea what it is exactly.
The bug does not seem to reproduce when I specify CELERY_ENABLE_UTC=True so there’s might be a workaround for this bug.

Always make timezones aware in the schedule even if UTC is disabled #2666

Fixes #943.
@monax Please verify that this works for you.
In any event, setting CELERY_UTC_ENABLE to true is a good idea if you are using a Django version newer than 1.4.

上述更新后,也可以直接使用celery -A xxx worker -l info -B方式启动了

supervisor 部署celery注意事项

  1. 推荐使用/usr/bin/celery -A xxx worker -B –logfile=/opt/logfilter/syslog/logs/celery/celery.log –loglevel=INFO
    -l 方式已经不在推荐,在celery 4.x中将禁用-l

  2. 遇到错误提示且celery没有启动成功:
    Running a worker with superuser privileges when the
    worker accepts messages serialized with pickle is a very bad idea!
    解决方法:要用普通用户启动celery

部署时遇到django.core.exceptions.ImproperlyConfigured: The SECRET_KEY must not be empty.

  • 原因: celery 默认找settings.py文件没有找到
  • 解决方案: 开发过程中通常会根据情况生成多个配置文件,但最后部署时一定要还原settings.py