继上一篇中间表的数据是动态的,图表展示的数据才比较准确。这里用到一个新的模块djcelery,安装配置步骤如下:
1.安装
redis==2.10.6
celery==3.1.23
django-celery==3.1.17
flower==0.9.2
supervisor==3.3.4
flower用于监控定时任务,supervisor管理进程,可选
2.配置
settings.py中添加以下几行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#最顶头加上
from __future__ import absolute_import
# celery settings
import djcelery
djcelery.setup_loader()
broker_url = 'redis://localhost:6379'
# broker_url = 'redis://:密码@主机地址:端口号/数据库号'
celerybeat_scheduler = 'djcelery.schedulers.databasescheduler' # 定时任务
celery_result_backend = 'djcelery.backends.database:databasebackend'
celery_result_backend = 'redis://localhost:6379'
celery_accept_content = [ 'application/json' ]
celery_task_serializer = 'json'
celery_result_serializer = 'json'
celeryd_max_tasks_per_child = 40
celery_timezone = 'asia/shanghai'
installed_apps = [
'djcelery' , # 添加djcelery
]
|
3.注册定时任务的几个表
1
2
3
4
5
6
7
8
9
10
11
|
from __future__ import absolute_import, unicode_literals
from djcelery.models import (
taskstate, workerstate,
periodictask, intervalschedule, crontabschedule,
)
from xadmin.sites import site
site.register(intervalschedule) # 存储循环任务设置的时间
site.register(crontabschedule) # 存储定时任务设置的时间
site.register(periodictask) # 存储任务
site.register(taskstate) # 存储任务执行状态
site.register(workerstate) # 存储执行任务的worker
|
4.主应用下添加celery.py
__init__.py修改如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# __init__.py
from __future__ import absolute_import
from .celery import app as celery_app
# celery.py
from __future__ import absolute_import
import os
from celery import celery, platforms
from django.conf import settings
# set the default django settings module for the 'celery' program.
os.environ.setdefault( 'django_settings_module' , 'hermes.settings' )
# hermes主应用名
app = celery( 'hermes' )
platforms.c_force_root = true
app.config_from_object( 'django.conf:settings' )
app.autodiscover_tasks( lambda : settings.installed_apps)
@app .task(bind = true)
def debug_task( self ):
print ( 'request: {0!r}' . format ( self .request))
|
5.添加任务 应用下添加tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
from __future__ import absolute_import
from celery import task
import time
from .channels import cache_data_to_redis
# 更新指定日期数据到sms_organizationcount
@task
def readandwrite(begin,end):
begin = str (begin)[: 4 ] + '-' + str (begin)[ 4 : 6 ] + '-' + str (begin)[ 6 : 8 ]
end = str (end)[: 4 ] + '-' + str (end)[ 4 : 6 ] + '-' + str (end)[ 6 : 8 ]
i = 0
begin_time = time.time()
read = cache_data_to_redis().connection
rcursor = read.cursor()
query = "select id from sms_organizationcount where alia_date_time between '"
query + = begin
query + = "' and '"
query + = end
query + = "'"
readsql = "select alia_month_time, alia_date_time, count( * ) as total_nums, count(t.`status` = 2 or null) as error_nums, name from \
(select * , date_format(req_time, '%y-%m' ) as alia_month_time, date_format(req_time, '%y-%m-%d' ) as alia_date_time, \
left(body,locate( '】' ,body)) as name from sms_smslog where locate( '】' ,body) > 0 \
and left(body, 1 ) = '【' and date_format(req_time, '%y-%m-%d' ) between '"
readsql + = begin
readsql + = "' and '"
readsql + = end
readsql + = "')"
readsql + = " as t group by alia_date_time , name;"
rcursor.execute(readsql)
readresult = rcursor.fetchall()
rcursor.execute(query)
query_result = rcursor.fetchall()
deletesql = "delete from sms_organizationcount where alia_date_time between '%s' and '%s'" % (begin,end)
if query_result:
delete_record = cache_data_to_redis().connection
dcursor = delete_record.cursor()
dcursor.execute(deletesql)
delete_record.commit()
delete_record.close()
for value in readresult:
write = cache_data_to_redis().connection
wcursor = write.cursor()
writesql = "insert into sms_organizationcount (alia_month_time, alia_date_time, total_nums, error_nums, `name`) " \
" values ('%s', '%s', '%s', '%s', '%s' )" % \
(value[ 'alia_month_time' ], value[ 'alia_date_time' ], value[ 'total_nums' ], value[ 'error_nums' ], value[ 'name' ])
try :
wcursor.execute(writesql)
i + = 1
write.commit()
except :
write.rollback()
write.close()
read.close()
end_time = time.time()
pass_time = end_time - begin_time
return i, pass_time
|
6.最终效果如下图:
7.终端启动celery命令:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# 查看注册的task
celery - a hermes inspect registered
# 启动
python manage.py celery - a django_celery_demo worker - b # django_celery_demo为celery和setting所在文件夹名
#celery_beat起不来
# 动态的输出启动进程时的输出
supervisorctl tail programname stdout
# flower监控celery
python manage.py celery flower
ip: 5555
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/NolaLi/p/9469994.html