django-celery动态添加定时任务

时间:2021-03-25 07:50:22
    为了使用celery替代crontab并做到实时添加定时任务的效果,需要使用django-celery,效果如下图,
django-celery动态添加定时任务

    要使用django-celery,需要安装python的以下包:django,celery,django-celery。其中django安装比较麻烦,首先它和python版本相关,django1.7.9和1.8.3都是支持python 2.7,所以打算安装这俩版本中其一;其次,django需要sqlite2或sqlite3的支持,而python 2.7中sqlite是已经包含在内、却没有编译的,因此先要安装sqlite、然后重新安装python 2.7。整个开发环境安装和配置如下


一. 环境配置
    首先安装sqlite3。 先手动从 http://www.sqlite.org/download.html下载软件包 sqlite-autoconf-3081002.tar.gz (也可直接使用wget),
tar -xzvf sqlite-autoconf-3081002.tar.gz
./configure --prefix=/home/panxiaofeng/install   #指定安装目录

make 

make install

    接着重新安装python 2.7,首先,

tar -zxf Python-2.7.9.tgz

cd ~/tools/Python-2.7.9

修改setup.py,

 sqlite_inc_paths = [ '/usr/include',
                             '/usr/include/sqlite',
                             '/usr/include/sqlite3',
                             '/usr/local/include',
                             '/usr/local/include/sqlite',
                             '/usr/local/include/sqlite3',
                             '~/install/include',
                             '~/install/include/sqlite3',
                           ]

也就是加上sqlite3的路径,然后,

./configure --prefix=/home/panxiaofeng/install/ && make -j4 && make install  #4核并行编译
ln -s ~/install/bin/python ~/bin/python
完成。

    测试安装成功否,

~/bin/python

>>>import sqlite3

>>>

没报错即可。

    配置python开发环境,不仅要安装python 2.7,还需要安装一些工具,
unzip setuptools-12.0.3.zip 2>/dev/null 1>/dev/null
cd ~/tools/setuptools-12.0.3
~/bin/python setup.py build install   #用哪个python来安装包,包就会安装到哪个python的目录下,其他版本的python无法使用该包
tar -zxf pip-6.0.6.tar.gz
cd ~/tools/pip-6.0.6
~/bin/python setup.py build install
ln -s ~/install/bin/pip ~/bin/pip
其中pip工具用来安装python包很方便,但是默认的源下载很慢经常超时,需要采用一些方法加速,具体可以参考 http://blog.csdn.net/sasoritattoo/article/details/10020547  。本文安装加速的手段采用了最简单的换源:
sudo ~/bin/pip install -i http://pypi.douban.com/simple django,用豆瓣的源,暴快! 注意,用哪个python安装的pip工具来安装包,包就会安装到哪个python的目录下,其他版本的python无法使用该包 。然后,celery和django-celery也采用该源来安装,快得很,不像之前用默认的源,很卡很慢还经常失败。
    记录一下,之前也手动安装过Django,手动安装唯一需要注意的一点就是 sudo  python setup.py时搞清楚你用的是哪个版本的python,不要搞错版本了,如果默认的python不能用,就自己指定,比如, sudo  /usr/local/bin/python2.7 setup.py install。还有手动安装比较利于卸载包:
setup.py 帮助你纪录安装细节方便你卸载
python setup.py install --record log
这时所有的安装细节都写到 log 里了
想要卸载的时候
cat log | xargs rm -rf
就可以干净卸载了

    最后记录下解决pip下载失败的一些可能有用的方法:
出现 Bad md5 hash for package https://here/package/path错误时,尝试,
1. Y ou can use wheels for installing python packages: pip install wheel. Then try to pip the package you want again.
2. pip失败后,会看到你需要的包信息,那么直接下载安装,如下面的例子,
[root@master conn]# pip install chardet==2.2.1
Collecting chardet==2.2.1
/usr/lib/python2.6/site-packages/pip/_vendor/requests/packages/urllib3/util/ssl_.py:79: InsecurePlatformWarning: A true SSLContext object is not available. This prevents urllib3 from configuring SSL appropriately and may cause certain SSL connections to fail. For more information, see https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning.
  InsecurePlatformWarning
  Downloading chardet-2.2.1-py2.py3-none-any.whl (180kB)
    72% |███████████████████████▎        | 131kB 357bytes/s eta 0:02:19
  Hash of the package https://pypi.python.org/packages/2.7/c/chardet/chardet-2.2.1-py2.py3-none-any.whl#md5=556de73cc5d2d14233b3512798423da1 (from https://pypi.python.org/simple/chardet/) (be001cd2dbe90bb1f1dd4ab4b008c169) doesn't match the expected hash 556de73cc5d2d14233b3512798423da1!
  Bad md5 hash for package https://pypi.python.org/packages/2.7/c/chardet/chardet-2.2.1-py2.py3-none-any.whl#md5=556de73cc5d2d14233b3512798423da1 (from https://pypi.python.org/simple/chardet/)
解决方案:
wget https://pypi.python.org/packages/2.7/c/chardet/chardet-2.2.1-py2.py3-none-any.whl --no-check-certificate
md5sum chardet-2.2.1-py2.py3-none-any.whl
pip install chardet-2.2.1-py2.py3-none-any.whl



二. 简单测试django
~/bin/python ~/install/bin/django-admin.py startproject djtest   #使用上面安装了相关包的python
cd djtest
~/bin/python manage.py runserver 0.0.0.0:10501
然后最便打开一个浏览器输入:10.121.84.90:10501/,得到响应的页面就成功了。 10.121.84.90是执行上面命令的主机ip,即django server的主机ip。
 
~/bin/python manage.py createsuperuser来创建admin账户。

三. django-celery替代crontab
测试:
    测试djcelery的动态添加任务函数并crontab部署的功能。目录结构,
djtest
├── apps
│   ├── app1
│   │   ├── __init__.py
│   │   └── tasks.py
│   ├── app2
│   │   ├── __init__.py
│   │   └── tasks.py
│   ├── __init__.py
│   ├── tasks.py
├── djtest
│   ├── celery.py
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   ├── wsgi.py
└── manage.py
    其中 manage.py、 __init__.py、 settings.py、 urls.py、 wsgi.py是 ~/bin/python ~/install/bin/django-admin.py startproject djtest命令自动生成的,但部分文件需要修改。首先修改settings.py,在里面添加,
 
   
import djcelery ###
djcelery.setup_loader() ###
CELERY_TIMEZONE='Asia/Shanghai' #并没有北京时区,与下面TIME_ZONE应该一致
BROKER_URL='redis://10.121.84.90:16379/8' #任何可用的redis都可以,不一定要在django server运行的主机上
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' ###
# Application definition
 
INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'djcelery', ###
    'apps', ###
)
TIME_ZONE='Asia/Shanghai' ###
上面代码首先导出djcelery模块,并调用setup_loader方法加载有关配置; 注意配置时区,不然默认使用UTC时间会比东八区慢8个小时 。其中INSTALLED_APPS末尾添加两项,分别表示添加celery服务和自己定义的apps服务。接下来编写celery.py文件,
 
   
#!/bin/python
from __future__ import absolute_import
 
import os
 
from celery import Celery
 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djtest.settings')
#Specifying the settings here means the celery command line program will know where your Django project is. 
#This statement must always appear before the app instance is created, which is what we do next:  
from django.conf import settings
 
app = Celery('djtest')
 
app.config_from_object('django.conf:settings')
#This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.
#You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
#With the line above Celery will automatically discover tasks in reusable apps if you define all tasks in a separate tasks.py module.
#The tasks.py should be in dir which is added to INSTALLED_APP in settings.py. 
#So you do not have to manually add the individual modules to the CELERY_IMPORT in settings.py.
 
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request)) # dumps its own request information
然后修改djtest/__init__.py,
 
   
#!/bin/python
from __future__ import absolute_import
 
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
    接下来编写你希望django去完成的app,本文中要编写的就是在INSTALLED_APPS中注册的apps。在celery.py中设定了对settings.py中INSTALLED_APPS做autodiscover_tasks,本文希望apps中能够接受这样的目录组织:所有的app都可以放到apps下面,而且每个app都有独立的目录,就和上面的app1、app2一样,每个app各自有各自的__init__.py和tasks.py( 注意,每个app都需要__init__.py文件,可以是空白的 )。但是这样的结构组织在启动时会报错说module apps找不到。然后在apps下增加了一个 __init__.py文件,这时报错没了,但是apps下每个app的tasks.py中的任务函数还是无法被django和celery worker找到。
     然后尝试了在apps下面写一个__init__.py(空白)和task.py,所有的task function都写到tasks.py中,如下,
 
   
from __future__ import absolute_import
 
from celery import task
 
from celery import shared_task
 
#from celery.task import tasks 
#from celery.task import Task 
 
@task()
#@shared_task
def add(x, y):
    print "%d + %d = %d"%(x,y,x+y)
    return x+y
#class AddClass(Task):
#    def run(x,y):
#        print "%d + %d = %d"%(x,y,x+y)
#        return x+y
#tasks.register(AddClass)
 
@shared_task
def mul(x, y):
    print "%d * %d = %d"%(x,y,x*y)
    return x*y
 
 
@shared_task
def sub(x, y):
    print "%d - %d = %d"%(x,y,x-y)
    return x-y
上面代码中,decorator可以用@task()也可以用@shared_task。 然后,同步django数据库,
~/bin/python manage.py makemigrations
~/bin/python manage.py migrate
运行django web server和celery beat、celery worker,
~/bin/python manage.py createsuperuser   #如果没管理员账号,先建一个
~/bin/python manage.py runserver 0.0.0.0:10501    #重新指定server使用的端口,0.0.0.0 运行该命令的主机ip:  10.121.84.90
~/bin/python manage.py celery beat
~/bin/python manage.py celery worker -c 6 -l debug   #正式上线-l改成info
~/bin/celery flower --port=10201 --broker=redis://10.121.84.90:16379/8       #如果必要开个flower,从而通过网页 10.121.84.90:10201 来监控任务情况
这时候,在浏览器输入 10.121.84.90:10501/admin/ 可以登录管理员界面,
django-celery动态添加定时任务

在该界面可以点P eriodic tasks 来添加crontab任务,
django-celery动态添加定时任务
点击右上角的Add periodic task,
django-celery动态添加定时任务

在该页面上可以通过上图中的Task(registered)或者Task(custom)来增加crontab任务, 前者会在下拉列表中显示apps/tasks.py里定义的task function 除了动态增加、删除任务,上面的Enabled选项可以用来暂停任务。这时,在 django web server和celery beat、celery worker都启动着的情况下,修改apps/tasks.py文件,往其中增减task function,会实时反映在上图中 Task(registered)的下拉列表上,验证了django的 autodiscover_tasks功能的确实有效的 但是在不重启celery worker的情况下,添加该 task function的crontab任务,celery worker无法调用该task function,从而报错,
django-celery动态添加定时任务

也就是说django的确可以autodiscover_tasks,并且celery beat也会准确无误的发布动态增加的任务消息,只是celery worker无法动态调用新的task function(这是不是说django的autodiscover_tasks功能只管django而不管celery worker,从而只能看到而无法执行,其实并没有什么卵用? 。。。然后验证 Task(custom)选项的作用: 我自己写一个app3,放在apps同级目录和apps的子目录各自试验,不过不往settings.py中INSTALLED_TASKS中注册子目录(注册了不就是Task(registered)了吗),然后Add periodic task时 Task(custom)填写"apps.app3.tasks.function"或者"app3.tasks.function"或者"目录的绝对路径.tasks.function",celery beat会发布相关的任务消息,但是celery worker无法调用function(这就表示,想绕开INSTALLED_APPS和celery_worker是不行的, Task(custom)似乎没有什么卵用 。。。总的来说,就是 Task(registered)或者Task(custom)并不能做到动态添加task function并部署该task function的相关任务,虽然官方文档似乎表示可以。
    现在能做到的就是,需要的task function写好,然后通过上面的例子可以动态的部署写好的task function的crontab任务(只可以动态传参数),似乎有点寒碜,不过勉强算是够用了,只要把task function写得通用一点,比如说开一个进程去调用用户自己编写的脚本。上面提到的失败的尝试,今后有空再去探索吧。

后续:
(一)
    经过实验,celery worker使用 --autoreload选项启动可以解决上面apps/tasks.py中新增、减函数无法被worker检测到的问题,具体见 http://docs.celeryproject.org/en/latest/userguide/workers.html#autoreloading。不过该功能属于实验阶段,官方不推荐使用,而且,
When running the worker with the --autoreload option, it correctly reloads if the tasks.py module is changed. But if I change a module that is imported and used by the tasks module, nothing is reloaded and the worker still uses the old code. Currently autoreloader doesn't monitor the dependency tree .  来源: < https://github.com/celery/celery/issues/1025 >
不过,用来实现apps/tasks.py里面增减函数、自动被worker检测,是足够了的。 在原生的celery中,--autoreload要搭配CELERY_IMPORT或include设定来用(autoreload这两项设定中的module文件),不过在django中,INSTALLED_APPS设定可以用来替代CELERY_IMPORT,所以django中可以使celery worker autoreload INSTALLED_APPS设定目录下的tasks.py文件 最终,django的autodiscover_tasks用来使django admin页面动态检测apps/tasks.py里task function的变化,celery的autoreload用来使celery worker动态检测apps/tasks.py里task function的变化 ,从而实现动态增减task function并正确执行的功能。 注意,动态新增的task function的decorator使用@tasks会报"NotRegistered: u'apps.tasks.funcName'"的错误,用@shared_task才可以正确被worker执行 不少问题出在celery worker上,建议看看Workers Guide:  http://docs.celeryproject.org/en/latest/userguide/workers.html#autoreloading

    之前的尝试中,apps下每个app一个子目录,如果只在INSTALLED_APPS中添加"apps"这一项而不注册子目录,django无法autodiscover_tasks到子目录(google出来的例子是每个子目录都注册),看来 autodiscover_tasks只支持注册目录下tasks.py文件的监测、而不支持注册目录下子目录的tasks.py文件的监测(如果支持,那么celery worker的--autoreload也可以检测到子目录的tasks.py文件,那么apps子目录支持的问题就解决了 。不过,要实现apps下 每个app一个子目录,可以考虑换种途径:添加新的子目录时动态修改settings.py中的INSTALLED_APPS,因为django似乎有支持这种操作的函数(参考笔记./ 【转】python - dynamically loading django apps at runtime ,每增加一次调用一次用来增加的代码 。实验等有空再做。

    综上所述,现在动态修改apps/tasks.py、添加apps目录下新的任务子目录(子目录目录结构和 最上面 app1、app2一样)的功能都可以了。唯一没有起作用的就是django admin页面上的 Task(custom) 选项,该选项设计的初衷想必是:你写一个类似app1的app,直接通过该选项就可以调用。想法很好,只是目前来看,这个设计绕过了INSTALLED_APPS设定,似乎并不能被django和celery worker检测到新增的app,起不了作用。

(二)
    django正式上线后,需要将settings.py中的"DEBUG = True"注释掉免得内存泄漏,同时在下面加上"ALLOWED_HOSTS = ['10.121.84.90']",使得admin网页可用 10.121.84.90是django server运行的主机ip 这时候admin网页会比较丑陋而且有些功能不正常 ,因为非debug模式下django server不会帮忙处理静态文件 ,解决方案是在启动django server的时候添加--insecure选项 http://*.com/questions/5836674/why-does-debug-false-setting-make-my-django-static-files-access-fail  ),使用该选项要确定 settings.py的INSTALLED_APPS中有 'django.contrib.staticfiles'

参考:
djcelery入门:实现运行定时任务!!!: http://my.oschina.net/kinegratii/blog/292395
Django中如何使用django-celery完成异步任务 (1): http://www.weiguda.com/blog/73/