django实现websocket实时数据推送。
应用场景
- 群组/单对单语音聊天
- 页面数据实时推送(后端主导)
技术
django + channels
django >=2
wsgi
Web服务器网关接口(Python Web Server Gateway Interface,缩写为WSGI)是为Python语言定义的Web服务器和Web应用程序或框架之间的一种简单而通用的接口。自从WSGI被开发出来以后,许多其它语言中也出现了类似接口
asgi
ASGI(Asynchronous Server Gateway Interface, 异步服务器网关接口) 是WSGI的传人,为了规范支持异步的Python网络服务器,框架和应用之间的通信而定制
http使用wsgi而websocket使用asgi
项目目录架构
- project
- project
- settings.py
- wsgi.py
- asgi.py
- urls.py
- ws
- routings.py
- consumers.py
配置
django - settings.py
注意
: redis版本必须依赖>=5x
redis_url = f'redis://default:{redis_password}@{redis_host}:{redis_port}'
ASGI_APPLICATION = 'project.asgi.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [f'{redis_url}/0'],
# django SECRET_KEY
'symmetric_encryption_keys': [SECRET_KEY]
}
}
}
asgi
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
django_asgi_app = get_asgi_application()
# 部署时 防止找不到包
import ws.routings
application = ProtocolTypeRouter(
{
'http': django_asgi_app,
'websocket': URLRouter(ws.routings.websocket_urlpatterns)
}
)
routings
类似Django的urls
from django.urls import re_path
from ws import consumers
websocket_urlpatterns = [
# 心跳
re_path(r'^heartbeat/(?P<room_name>\w+)/$', consumers.BaseConsumer.as_asgi()),
]
视图
concumers,py 类似django视图
使用异步消费者
from channels.generic.websocket import AsyncWebsocketConsumer
class BaseConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.group_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'chat_%s' % self.group_name
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
# 连接成功消息
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_data',
'data': {
'msg': 'heartbeat',
'code': 200,
'data': []
}
}
)
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
# Receive message from WebSocket
async def receive(self, text_data):
# 测试收发,根据情况修改
text_data_json = json.loads(text_data)
message = text_data_json['message']
# Send message to room group
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_data',
'data': {'message': message}
}
)
# Receive message from room group
async def chat_data(self, event):
# Send message to WebSocket
await self.send(text_data=json.dumps(event['data']))
async def msg_send(self, data, msg='success', code=200, ):
"""推送数据"""
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_data',
'data': {
'msg': msg,
'code': code,
'data': data
}
}
)
如果使用django的queryset
from channels.db import database_sync_to_async
await database_sync_to_async(self.get_data)(text_data_json)
如果不在consumers
逻辑中使用websocket
class SendMessage:
"""服务端 --> 客户端 推送"""
def __init__(self, room_group_name):
self.channel_layer = get_channel_layer()
self.room_group_name = 'chat_%s' % room_group_name
async def send(self, val: dict):
"""
chat_type: chat.chat_name
"""
chat_type = val.pop('chat_type')
await self.channel_layer.group_send(
self.room_group_name,
{
'type': chat_type,
'data': {
'data': val,
'msg': 'success',
'code': 200
} if not val.get('code') else val
}
)
部署
部署使用daphne
daphne -b 0.0.0.0 -p 8010 --proxy-headers project.asgi:application