twsited(4)--不同模块用redis共享以及用web发送数据到tcpserver

时间:2022-12-21 12:57:08

  上一章开头我们说,要连接之前flask系列文章中的用户,结果篇幅不够,没有实现。

  今天我们把它实现一下。话说,不同模块之间,该如何联系在一起,通常都是mysql、redis、rabbitmq还有RPC等,之所以着重讲redis,因为我太喜欢这个内存数据库了。small stong simple。这就跟我喜欢flask、tornado而不太喜欢django和twisted一样(以后我们着重讲tornado,源码比较简单直观,哈哈,虽然自己用着twisted,但真不喜欢twisted这种大型库,好多虚类的继承,太过于注重设计模式。好了,废话说到这。)

  这篇文章的主要目的就如下图所示,不但在http和tcp之间共享数据,而且能在http端发送数据到tcpserver服务器,然后下发到tcpclient。

twsited(4)--不同模块用redis共享以及用web发送数据到tcpserver

  在twisted中访问redis,就是找一个redis版的twisted库就好了,在redis官网,有推荐,其源码在这 https://github.com/fiorix/txredisapi   我一直使用,很稳定,操作也极其简单。

  我们就用这个库,利用之前flask系列中,login的时候,返回的token进行验证,把http模块和tcp模块联系到一起。服务器端代码如下:

frontTCP.py

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, task, defer
import struct
import json
from twisted.python import log
import sys
import time
import txredisapi as redis
log.startLogging(sys.stdout) REDIS_HOST = 'localhost'
REDIS_PORT = 6380
REDIS_DB = 4
REDIS_PASSWORD = 'dahai123' redis_store = redis.lazyConnectionPool(dbid=4, host='localhost', port=6380, password='dahai123') @defer.inlineCallbacks
def check_token(phone_number, token):
token_in_redis = yield redis_store.hget('user:%s' % phone_number, 'token')
if token != token_in_redis:
defer.returnValue(False)
else:
defer.returnValue(True) class Chat(Protocol):
def __init__(self, factory):
self.factory = factory
self.phone_number = None
self.state = "VERIFY"
self.version = 0
self.last_heartbeat_time = 0
self.command_func_dict = {
1: self.handle_verify,
2: self.handle_single_chat,
3: self.handle_group_chat,
4: self.handle_broadcast_chat,
5: self.handle_heartbeat
}
self._data_buffer = bytes() def connectionMade(self):
log.msg("New connection, the info is:", self.transport.getPeer()) def connectionLost(self, reason):
log.msg("[%s]:断线" % self.phone_number.encode('utf-8'))
if self.phone_number in self.factory.users:
del self.factory.users[self.phone_number] def dataReceived(self, data):
"""
接受到数据以后的操作
"""
self._data_buffer += data while True:
length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12]) if length > len(self._data_buffer):
return content = self._data_buffer[12:length] if command_id not in [1, 2, 3, 4, 5]:
return if self.state == "VERIFY" and command_id == 1:
self.handle_verify(content) if self.state == "DATA":
self.handle_data(command_id, content) self._data_buffer = self._data_buffer[length:] if len(self._data_buffer) < 12:
return def handle_heartbeat(self, content):
"""
处理心跳包
"""
self.last_heartbeat_time = int(time.time()) @defer.inlineCallbacks
def handle_verify(self, content):
"""
验证函数
"""
content = json.loads(content)
phone_number = content.get('phone_number')
token = content.get('token') result = yield check_token(phone_number, token) if not result:
send_content = json.dumps({'code': 0})
self.send_content(send_content, 101, [phone_number])
length = 12 + len(send_content)
version = self.version
command_id = 101
header = [length, version, command_id]
header_pack = struct.pack('!3I', *header)
self.transport.write(header_pack + send_content)
return if phone_number in self.factory.users:
log.msg("电话号码<%s>存在老的连接." % phone_number.encode('utf-8'))
self.factory.users[phone_number].connectionLost("")
self.factory.users.pop(phone_number) log.msg("欢迎, %s!" % (phone_number.encode('utf-8'),))
self.phone_number = phone_number
self.factory.users[phone_number] = self
self.state = "DATA" send_content = json.dumps({'code': 1}) self.send_content(send_content, 101, [phone_number]) def handle_data(self, command_id, content):
"""
根据command_id来分配函数
"""
self.command_func_dict[command_id](content) def handle_single_chat(self, content):
"""
单播
"""
content = json.loads(content)
chat_from = content.get('chat_from')
chat_to = content.get('chat_to')
chat_content = content.get('chat_content')
send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(send_content, 102, [chat_to]) def handle_group_chat(self, content):
"""
组播
"""
content = json.loads(content)
chat_from = content.get('chat_from')
chat_to = content.get('chat_to')
chat_content = content.get('chat_content')
send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = chat_to
self.send_content(send_content, 103, phone_numbers) def handle_broadcast_chat(self, content):
"""
广播
"""
content = json.loads(content)
chat_from = content.get('chat_from')
chat_content = content.get('chat_content')
send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = self.factory.users.keys()
self.send_content(send_content, 104, phone_numbers) def send_content(self, send_content, command_id, phone_numbers):
"""
发送函数
"""
length = 12 + len(send_content)
version = self.version
command_id = command_id
header = [length, version, command_id]
header_pack = struct.pack('!3I', *header)
for phone_number in phone_numbers:
if phone_number in self.factory.users.keys():
self.factory.users[phone_number].transport.write(header_pack + send_content)
else:
log.msg("Phone_number:%s 不在线." % phone_number.encode('utf-8')) class ChatFactory(Factory):
def __init__(self):
self.users = {} def buildProtocol(self, addr):
return Chat(self) def check_users_online(self):
for key, value in self.users.items():
if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
log.msg("[%s]没有检测到心跳包,主动切断" % key.encode('utf-8'))
value.transport.abortConnection() cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False) reactor.listenTCP(8124, cf)
reactor.run()

  上一章有点地方有错误,这次在这一起更正了。比如users,在每个Protocol里面不保存,直接存储在Factory里面,每次引用的时候,直接去取就可以了;还有,如果经过验证之前,如果验证错误,users里面是没有连接的值的,只能self.transport.write(),不能通过send_content()来发送。

  好了,上面2处修改了,并且在此基础上,加了一个访问redis的函数,非常简单,跟http访问一样,就是要注意异步化的问题。

  再看看客户端的代码,客户端这边要获取token,首先要引用flask restful api 系列中,我们模拟的client.py,这次我也把它引用进来了,先登录,获取token,拿到token以后,再用tcpclient进行验证,其实这个在生产环境中也这么做的。

  下面是frontClient.py的代码。

# coding:utf-8
from twisted.internet import reactor, task
from twisted.internet.protocol import Protocol, ClientFactory
import struct
from twisted.python import log
import sys
import json
from client import API_1_1 log.startLogging(sys.stdout) class EchoClient(Protocol):
def __init__(self):
self.command_func_dict = {
101: self.handle_verify_s,
102: self.handle_single_chat_s,
103: self.handle_group_chat_s,
104: self.handle_broadcast_chat_s
}
self.version = 0
self.state = "VERIFY"
self.phone_number = "" def connectionMade(self):
log.msg("New connection", self.transport.getPeer()) def dataReceived(self, data):
length, self.version, command_id = struct.unpack('!3I', data[:12])
content = data[12:length]
if self.state == "VERIFY" and command_id == 101:
self.handle_verify_s(content)
else:
self.handle_data(command_id, content) def handle_data(self, command_id, pack_data):
self.command_func_dict[command_id](pack_data) def connectionLost(self, reason):
log.msg("connection lost") def handle_verify_s(self, pack_data):
"""
接受验证结果
"""
content = json.loads(pack_data)
code = content.get('code')
if code == 1:
log.msg('验证通过')
else:
log.msg('验证没有通过,请重新输入,程序暂停')
reactor.stop()
self.state = "Data" def handle_single_chat_s(self, pack_data):
"""
接受单聊
"""
content = json.loads(pack_data)
chat_from = content.get('chat_from')
chat_content = content.get('chat_content')
log.msg("[单聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def handle_group_chat_s(self, pack_data):
"""
接受组聊
"""
content = json.loads(pack_data)
chat_from = content.get('chat_from')
chat_content = content.get('chat_content')
log.msg("[组聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def handle_broadcast_chat_s(self, pack_data):
"""
接受广播
"""
content = json.loads(pack_data)
chat_from = content.get('chat_from')
chat_content = content.get('chat_content')
log.msg("[群聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def send_verify(self, phone_number, token):
"""
发送验证
"""
content = json.dumps(dict(phone_number=phone_number, token=token))
self.send_data(content, 1) def send_single_chat(self, chat_from, chat_to, chat_content):
"""
发送单聊内容
"""
content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
self.send_data(content, 2) def send_group_chat(self, chat_from, chat_to, chat_content):
"""
发送组聊内容
"""
content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
self.send_data(content, 3) def send_broadcast_chat(self, chat_from, chat_content):
"""
发送群聊内容
"""
content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))
self.send_data(content, 4) def send_data(self, send_content, command_id):
"""
发送函数
"""
length = 12 + len(send_content)
version = self.version
command_id = command_id
header = [length, version, command_id]
header_pack = struct.pack('!3I', *header)
self.transport.write(header_pack + send_content) def send_heartbeat(self):
"""
发送心跳包
"""
length = 12
version = self.version
command_id = 5
header = [length, version, command_id]
header_pack = struct.pack('!3I', *header)
self.transport.write(header_pack) class EchoClientFactory(ClientFactory):
def __init__(self):
self.p = EchoClient() def startedConnecting(self, connector):
log.msg("Started to connect") def buildProtocol(self, addr):
log.msg("Connected.")
return self.p def clientConnectionFailed(self, connector, reason):
log.msg("Lost connection. Reason:", reason) def clientConnectionLost(self, connector, reason):
log.msg("Connection failed. Reason:", reason) if __name__ == '__main__':
api = API_1_1()
chat_from = sys.argv[1]
chat_password = sys.argv[2] u = api.login(chat_from, chat_password)
token = api.token cf = EchoClientFactory()
chat_from = sys.argv[1] all_phone_numbers = ['', '', '1390854961g']
all_phone_numbers.remove(chat_from)
import random task_send_heartbeat = task.LoopingCall(cf.p.send_heartbeat)
task_send_heartbeat.start(2, now=False) reactor.callLater(10, cf.p.send_verify, chat_from, token)
reactor.callLater(20, cf.p.send_group_chat, chat_from, all_phone_numbers, '你好,这是10秒的时候发送')
reactor.callLater(30, cf.p.send_group_chat, chat_from, all_phone_numbers, '你好,这是20秒的时候发送') reactor.connectTCP('192.168.5.60', 8124, cf) reactor.run()

  分别把之前项目中的账号拉出来运行一下吧。客户端的认证函数也改变了,先引用之前的api客户端,直接获取正确的token,把token拿来发给tcp服务器端,tcp服务器端再到redis里面去找,如果正确,就验证通过,否则,返回code=0给客户端,这时候服务器端的记录当前客户端状态还是未验证通过,因此下面的客户端再发其他请求,服务器端全部丢弃。这跟http的思想是一样的。

  这是一个客户端的调试结果,看,一切都正常。 

yudahai@yudahaiPC:tcpserver$ python frontClient.py 13565208554 123456
2016-06-24 13:28:36+0800 [-] Log opened.
2016-06-24 13:28:36+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fe377d25440>
2016-06-24 13:28:36+0800 [-] Started to connect
2016-06-24 13:28:36+0800 [Uninitialized] Connected.
2016-06-24 13:28:36+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-24 13:28:46+0800 [EchoClient,client] 验证通过
2016-06-24 13:28:53+0800 [EchoClient,client] [组聊][13764408552]:你好,这是10秒的时候发送
2016-06-24 13:29:03+0800 [EchoClient,client] [组聊][13764408552]:你好,这是20秒的时候发送

  其实利用redis把http模块和tcp模块集合起来比较简单,但难的地方在于设计思想,如何很好的通过redis把几个模块联系起来。

  

  上面其实本质上讲的就是如何通过redis来共享状态。

  下面我们再深入一下,通过纯web端发送命令到后台,然后后台接受到以后,通过redis来做消息系统,本来这部分应该是rabbitmq的事,毕竟rabbitmq是专门做消息系统的。但简单的消息系统可以用redis做,redis中有个list模型,每个消息发送的时候从左边push进来,接受的时候从右边pop,这样就是一个简单的消息系统。这边用redis先做一个简单的,主要可以让大家非常直观的看到twisted如何作为消费者客户端运行的,下一章讲rabbitmq的时候,就更简单了。

  好了,废话少说,现在之前flask restful api那一个系列的项目中,加一个web页面,进入web页面,只有一个按钮,输入内容,发送一次,就广播一次。

  下面是代码和具体的页面。

  先是原来的flask项目中,我们增加一个web蓝图,这个我在flask restful api的第七篇  http://www.cnblogs.com/yueerwanwan0204/p/5522749.html 中讲过,增加相应的文件夹web,然后在里面添加2个文件__init__.py,view.py。

  结构图下下所示:

twsited(4)--不同模块用redis共享以及用web发送数据到tcpserver

  编辑原来的run.py文件,添加蓝图指向

    from app_1_0 import api as api_1_0_blueprint
app.register_blueprint(api_1_0_blueprint, url_prefix='/api/v1000') from api_1_1 import api as api_1_1_blueprint
app.register_blueprint(api_1_1_blueprint, url_prefix='/api/v1100') from web import web as web_blueprint
app.register_blueprint(web_blueprint, url_prefix='/web')

  然后在web/__init__.py下面添加蓝图对象

# coding:utf-8
from flask import Blueprint web = Blueprint('web', __name__) from . import view

  web/view.py就跟简单了,就渲染一个页面,同时具有get和post方法

# coding:utf-8
from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app
from app.model import User, db_session
import json
from . import web @web.teardown_request
def handle_teardown_request(exception):
db_session.remove() @web.route('/send-command', methods=['GET', 'POST'])
def send_command():
if request.method == 'GET':
users = User.query.all()
return render_template('web/send-command.html', users=users)
else:
data = request.get_json()
command_id = data.get('command_id')
chat_from = ''
chat_to = data.get('chat_to')
chat_content = data.get('content') print data if not chat_to or not chat_content or not command_id:
return jsonify({'code': 0, 'message': '信息不完整'}) send_data = json.dumps(dict(command_id=command_id, chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
current_app.redis.lpush('front_tcp', send_data) return jsonify({'code': 1, 'message': '发送成功'})

  get的时候,就渲染;post的时候,接受页面上传的数据。

  页面就一个页面,主要使用ajax上传,由于本人好长时间没有开发html了,所以页面丑了一点,js代码也丑了一点,但是能用,以后有空,我优化一下,大家先看吧,功能达到了。

templates/web/send-command.html的代码如下:

<!DOCTYPE html>
<html lang="zh_CN">
<head>
<meta charset="UTF-8">
<title>发送命令</title>
<script src="../../static/js/jquery-2.1.4.min.js"></script>
</head>
<body>
<div>
<select id="single_object">
{% for user in users %}
<option value="{{ user.phone_number }}">{{ user.phone_number }}</option>
{% endfor %}
</select>
<input type="text" name="single_content">
<button id="single_chat">单聊</button>
</div>
<br>
<br>
<br>
<br>
<div>
<select multiple id="group_object">
{% for user in users %}
<option value="{{ user.phone_number }}">{{ user.phone_number }}</option>
{% endfor %}
</select>
<input type="text" name="group_content">
<button id="group_chat">组聊</button>
</div>
<br>
<br>
<br>
<br>
<div>
<input type="text" name="broadcast_content">
<button id="broadcast_chat">群聊</button>
</div>
<script>
var baseurl = '/web/';
$(function(){
$("#single_chat").click(function(){
var chat_to = [];
chat_to.push($("#single_object option:selected").val());
var content = $("input[name=single_content]").val();
console.log("chat_to:" + chat_to + " content:" + content);
$.ajax({
type: "POST",
url: baseurl + "send-command",
data: JSON.stringify({chat_to:chat_to, content:content, command_id:102}),
dataType: "json",
contentType: "application/json",
success: function(data){
if (data["code"] == 1){
$("input[name=single_content]").val("");
console.log(data["message"]);
}else
{
console.log(data["message"]);
} }
});
}); $("#group_chat").click(function(){
var chat_tos = [];
var chat_to = $("#group_object option:selected").each(function(){
chat_tos.push($(this).val());
});
var content = $("input[name=group_content]").val();
console.log("chat_to:" + chat_tos + " content:" + content);
$.ajax({
type: "POST",
url: baseurl + "send-command",
data: JSON.stringify({chat_to:chat_tos, content:content, command_id:103}),
dataType: "json",
contentType: "application/json",
success: function(data){
if (data["code"] == 1){
$("input[name=group_content]").val("");
console.log(data["message"]);
}else
{
console.log(data["message"]);
}
}
});
}); $("#broadcast_chat").click(function(){
var chat_to = [];
{% for user in users %}
chat_to.push("{{ user.phone_number }}");
{% endfor %}
var content = $("input[name=broadcast_content]").val();
console.log("content:" + content);
$.ajax({
type: "POST",
url: baseurl + "send-command",
data: JSON.stringify({chat_to:chat_to, content:content, command_id:104}),
dataType: "json",
contentType: "application/json",
success: function(data){
if (data["code"] == 1){
$("input[name=broadcast_content]").val("");
console.log(data["message"]);
}else
{
console.log(data["message"]);
} }
});
});
}); </script>
</body>
</html>

  效果有点丑,

twsited(4)--不同模块用redis共享以及用web发送数据到tcpserver

  这样就可以直接发送了,发送到http服务器端,http服务器再把数据打包成json格式,发送到frontTCP端,那自然,frontTCP需要增加一点代码,之前的Protocol不变,只是在Factory里面增加2个函数,再增加一个循环任务,不停的接受redis的消息。

  frontTCP新增代码如下:

class ChatFactory(Factory):
def __init__(self):
self.users = {} def buildProtocol(self, addr):
return Chat(self) def check_users_online(self):
for key, value in self.users.items():
if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
log.msg("[%s]没有检测到心跳包,主动切断" % key.encode('utf-8'))
value.transport.abortConnection() @defer.inlineCallbacks
def receive_from_mq(self):
data = yield redis_store.rpop('front_tcp')
if data:
log.msg("接受到来自消息队列的消息:", data)
self.process_data_from_mq(data) def process_data_from_mq(self, data):
loads_data = json.loads(data)
command_id = loads_data.get('command_id')
phone_numbers = loads_data.get('chat_to')
chat_from = loads_data.get('chat_from')
chat_content = loads_data.get('chat_content') content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(content, command_id, phone_numbers) def send_content(self, send_content, command_id, phone_numbers):
"""
发送函数
"""
length = 12 + len(send_content)
version = 1100
command_id = command_id
header = [length, version, command_id]
header_pack = struct.pack('!3I', *header)
for phone_number in phone_numbers:
if phone_number in self.users.keys():
self.users[phone_number].transport.write(header_pack + send_content)
else:
log.msg("Phone_number:%s 不在线." % phone_number.encode('utf-8')) cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False) task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq)
task_receive_data_from_mq.start(0.1, now=False) reactor.listenTCP(8124, cf)
reactor.run() 
    receive_from_mq就是接受来之redis的消息,异步化一下,然后建一个循环任务task_receive_data_from_mq,这个循环任务,每0.1秒触发一次(以后rabbitmq也是这样),
如果队列消息里面有数据,就处理,否则继续循环。
  process_data_from_mq这是拿到具体的data,然后处理的过程,基本就解包、打包,然后发送。
  send_content这就是发送函数,我基本就把Protocol里面的发送函数重新抄了一遍,以后我们会做一个虚类,然后具体的处理函数来继承它,这次我就直接抄了。
  好了,整个过程就这样,我们来运行一下,启动2个客户端,看看客户端接受情况吧。
yudahai@yudahaiPC:tcpserver$ python frontClient.py
-- ::+ [-] Log opened.
-- ::+ [-] Starting factory <__main__.EchoClientFactory instance at 0x7fd9a7eff3b0>
-- ::+ [-] Started to connect
-- ::+ [Uninitialized] Connected.
-- ::+ [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', )
-- ::+ [EchoClient,client] 验证通过
-- ::+ [EchoClient,client] [单聊][]:ddddddddd
-- ::+ [EchoClient,client] [组聊][]:fghytjhnuyjuyjmuikiuk
-- ::+ [EchoClient,client] [群聊][]:ffffffffffffffffffffffffffffff
-- ::+ [EchoClient,client] [组聊][]:你好,这是web组聊
-- ::+ [EchoClient,client] [群聊][]:你好,这是web群聊

  看,是不是全接受到了?

  

  这章就讲到这,主要讲到了如果通过redis把不同的模块联系在一起,其实本质上就是把客户端的状态在模块之间共享;之后我们讲了如何通过redis做一个简单的消息队列,这个其实是rabbitmq的特性,之所以要先讲一下,就是用最简单的方式来预热一下,因为rabbitmq的应用很广,可能一下子接受不了。还有就是把上一章的一些小bug解决掉。至于异步化,这个概念稍微有点大(好吧,我也不是研究特别的深,以后我会专门抽出一章讲这个内容)。