django celery的分布式异步之路(一) 起步

时间:2023-03-09 03:19:39
django celery的分布式异步之路(一) 起步

如果你看完本文还有兴趣的话,可以看看进阶篇http://www.cnblogs.com/kangoroo/p/7300433.html

设想你遇到如下场景:

1)高并发

2)请求的执行相当消耗机器资源,流量峰值的时候可能超出单机界限

3)请求返回慢,客户长时间等在页面等待任务返回

4)存在耗时的定时任务

这时你就需要一个分布式异步的框架了。

celery会是一个不错的选择。本文将一步一步的介绍如何使用celery和django进行集成,并进行分布式异步编程。

1、安装依赖

默认你已经有了python和pip。我使用的版本是:

python 2.7.
pip 9.0.1
virtualenv 15.1.0

创建沙盒环境,我们生产过程中通过沙盒环境来使用各种python包的版本,各个应用的沙盒环境之间互不干扰。

$ mkdir kangaroo
$ cd kangaroo
$ virtualenv kangaroo.env
# 沙盒下面有什么,可以看到有python的bin、include和pip
$ ll kangaroo.env
total
drwxrwxr-x data_monitor data_monitor Aug : bin
drwxrwxr-x data_monitor data_monitor Aug : include
drwxrwxr-x data_monitor data_monitor Aug : lib
-rw-rw-r-- data_monitor data_monitor Aug : pip-selfcheck.json
# 让沙盒环境在当前session(shell)中生效
$ source kangaroo.env/bin/activate
# 可以看到命令行首多了(kangaroo.env),而且python的路径已经变了
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ which python
/data/home/data_monitor/yangfan/test/kangaroo/kangaroo.env/bin/python

下面我们开始在kangaroo环境下安装相应版本的django和celery,以及django-celery集成包。

(kangaroo.env) [XXX@XXX kangaroo]$ pip install django==1.10.
(kangaroo.env) [XXX@XXX kangaroo]$ pip install celery==3.1.
(kangaroo.env) [XXX@XXX kangaroo]$ pip install django-celery==3.2.

我在安装的时候写明了版本号,是因为这套版本号在我们的生产环境过玩转过。

如果你换了对应的版本号的话,可能会引发冲突,出现意想不到的问题。亲测还是有一些版本之间是有怪问题的。

2、创建工程

创建工程kangaroo:django-admin startproject kangaroo

# 在kangaroo.env同级目录下创建工程kangaroo
cd /home/data_monitor/yangfan/test/kangaroo
# 创建工程kangaroo
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ django-admin startproject kangaroo
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll
total
drwxrwxr-x data_monitor data_monitor Aug : kangaroo
drwxrwxr-x data_monitor data_monitor Aug : kangaroo.env

创建APP foot:python manage.py startapp foot

# 进入工程目录,你会看到manage.py文件
cd kangaroo
# 创建APP foot
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ python manage.py startapp foot
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll
total
drwxrwxr-x data_monitor data_monitor Aug : foot
drwxrwxr-x data_monitor data_monitor Aug : kangaroo
-rwxrwxr-x data_monitor data_monitor Aug : manage.py
# 进入foot app目录,看一下有什么
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ cd foot/
(kangaroo.env) [data_monitor@bigdata-arch-client10 foot]$ ll
total
-rw-rw-r-- data_monitor data_monitor Aug : admin.py
-rw-rw-r-- data_monitor data_monitor Aug : apps.py
-rw-rw-r-- data_monitor data_monitor Aug : __init__.py
drwxrwxr-x data_monitor data_monitor Aug : migrations
-rw-rw-r-- data_monitor data_monitor Aug : models.py
-rw-rw-r-- data_monitor data_monitor Aug : tests.py
-rw-rw-r-- data_monitor data_monitor Aug : views.py

至此我们创建了工程kangaroo和app foot,下面我们介绍如何集成celery。

3、django-celery的集成配置

我们这里集成的方式是使用django-celery包。

集成配置要注意以下几个地方就好了,配置起来还是比较简单的。

1)修改kangaroo/settings.py文件

让djcelery模块生效

import os
import djcelery
djcelery.setup_loader()
...
INSTALLED_APPS = (
...
'djcelery',
'kombu.transport.django',
...
)

配置broker和backend

# Celery settings
# redis做broker, 第二个":"前后是redis的用户名密码,后面的2是db
# BROKER_URL = 'redis://:password@10.93.84.53:6379/2'
# rabbitMQ做broker,第二个":"前后是rabbitMQ的用户名密码
BROKER_URL = 'amqp://admin:bigdata123@10.93.21.21:5672//'
# Celery的backend记录地址,这里只给出redis的配置
CELERY_RESULT_BACKEND = 'redis://:bigdata123@10.93.84.53:6379/3'

4、第一个task

其实应该在创建app的时候就将appName添加到settings.py的INSTALLED_APPS中,我们没有这样做事留到现在好说明问题。

我们修改settings.py加入foot。

import os
import djcelery
djcelery.setup_loader()
...
INSTALLED_APPS = (
...
'djcelery',
'kombu.transport.django',
...
'foot',
...
)

当settings.py中的djcelery.setup_loader()运行时, Celery便会查看所有INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为@task的function, 并将它们注册为celery task。

所以我们在foot包下创建tasks.py文件,并且添加我们的task。

foot/tasks.py

# -*- coding:utf-8 -*-
import time
import logging from celery import task
logger = logging.getLogger(__name__) @task()
def foot_task(param_dict):
logger.info('foot task start! param_dict:%s' % param_dict)
time.sleep(10)
logger.info('foot task finished!')
return

这个task等待接收一个参数字典,只是简单的打印参数,然后sleep10s就退出了。

让task在后台worker中注册,当有任务分发下来的时候就开始执行。只需执行

python manage.py celery worker --loglevel=info

5、分发任务dispatch

任务触发的两种方式:

1)定时调度,可以适用celery beat,这里没有详述,因为在实际生产中,我们使用了apschedule做了定时器。

2)请求异步执行,这里给出了例子,服务接收http请求,直接返回,任务异步的丢给worker执行。

我这里写了一个通用的函数,这个函数用于分发任务

def dispatch(task, param_dict):
param_json = json.dumps(param_dict)
try:
task.apply_async(
[param_json],
retry=True,
retry_policy={
'max_retries': 1,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
},
)
except Exception, ex:
logger.info(traceback.format_exc())
raise

如何触发foot.tasks中的任务呢,只需要

import logging
import traceback
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt from foot.tasks import foot_task
from common.dispatcher import dispatch logger = logging.getLogger(__name__) @csrf_exempt
def hello(request):
if request.method == 'GET':
try:
user = request.GET.get('username')
dispatch(test_task, {'hello': user})
return JsonResponse({'code': 0, 'msg':'success'})
except Exception, ex:
return JsonResponse({'code: -1, 'msg': traceback.format_exc()})

1)当你在客户端发送请求:hello?username='kangaroo'时

2)服务瞬间返回:{'code': 0, 'msg':'success'}

3)后端sleep10秒后执行成功,打印hello:kangaroo

这就是异步的效果。