day41-解决粘包问题

时间:2022-01-28 05:27:30

一、socket缓冲区

研究粘包之前先看看socket缓冲区的问题:

day41-解决粘包问题

二、socket缓存区的详细解释

每个socket被创建后,都会分配两个缓冲区,输入缓冲区和输出缓冲区。

write()/send() 并不立即向网络中传输数据,而是先将数据写入缓冲区中,再由TCP协议将数据从缓冲区发送到目标机器。一旦将数据写入到缓冲区,函数就可以成功返回,不管它们有没有到达目标机器,也不管它们何时被发送到网络,这些都是TCP协议负责的事情。

TCP协议独立于write()/send()函数,数据有可能刚被写入缓冲区就发送到网络,也可能在缓冲区中不断积压,多次写入的数据被一次性发送到网络,这取决于当时的网络情况、当前线程是否空闲等诸多因素,不由程序员控制。

read()/recv()函数也是如此,也从输入缓冲区中读取数据,而不是直接从网络中读取。

这些I/O缓冲区特性可整理如下:
1.I/O缓冲区在每个TCP套接字中单独存在;
2.I/O缓冲区在创建套接字时自动生成;
3.即使关闭套接字也会继续传送输出缓冲区中遗留的数据;
4.关闭套接字将丢失输入缓冲区中的数据。

输入输出缓冲区的默认大小一般都是8K,可以通过getsockopt()函数获取

须知:只有TCP有粘包现象,UDP永远不会粘包!

具体原因:
发送端可以是一K一K地发送数据,而接收端的应用程序可以两K两K地提走数据,当然也有可能一次提走3K或6K数据,或者一次只提走几个字节的数据,也就是说,应用程序所看到的数据是一个整体,或说是一个流(stream),一条消息有多少字节对应用程序是不可见的,因此TCP协议是面向流的协议,这也是容易出现粘包问题的原因。

而UDP是面向消息的协议,每个UDP段都是一条消息,应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,这一点和TCP是很不同的。

怎样定义消息呢?可以认为对方一次性write/send的数据为一个消息,需要明白的是当对方send一条信息的时候,无论底层怎样分段分片,TCP协议层会把构成整条消息的数据段排序完成后才呈现在内核缓冲区。

例如基于tcp的套接字客户端往服务端上传文件,发送时文件内容是按照一段一段的字节流发送的,在接收方看来,根本不知道该文件的字节流从何处开始,在何处结束

所谓粘包问题主要是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的。

此外,发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一个TCP段。若连续几次需要send的数据都很少,通常TCP会根据优化算法把这些数据合成一个TCP段后一次发送出去,这样接收方就收到了粘包数据。

TCP和UDP的区别

TCP(transport control protocol,传输控制协议)是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个包更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端就难于分辨出来了,必须提供科学的拆包机制。 即面向流的通信是无消息保护边界的。

UDP(user datagram protocol,用户数据报协议)是无连接的,面向消息的,提供高效率服务。不会使用块的合并优化算法,由于UDP支持的是一对多的模式,所以接收端的skbuff(套接字缓冲区)采用了链式结构来记录每一个到达的UDP包,在每个UDP包中就有了消息头(消息来源地址,端口等信息),这样,对于接收端来说,就容易进行区分处理了。 即面向消息的通信是有消息保护边界的。

解决粘包需要注意的问题

tcp是基于数据流的,于是收发的消息不能为空,这就需要在客户端和服务端都添加空消息的处理机制,防止程序卡住,而udp是基于数据报的,即便是你输入的是空内容(直接回车),那也不是空消息,udp协议会帮你封装上消息头。

udp的recvfrom是阻塞的,一个recvfrom(x)必须对唯一一个sendinto(y),收完了x个字节的数据就算完成,若是y>x数据就丢失,这意味着udp根本不会粘包,但是会丢数据,不可靠

tcp的协议数据不会丢,没有收完包,下次接收,会继续上次继续接收,己端总是在收到ack时才会清除缓冲区内容。数据是可靠的,但是会粘包。

三、两种情况下会发生粘包。

1、接收方没有及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)

服务端

import socket
import subprocess phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.bind(('127.0.0.1', 8080))
phone.listen(5) while 1: # 循环连接客户端
conn, client_addr = phone.accept()
print(client_addr) while 1:
try:
cmd = conn.recv(1024)
ret = subprocess.Popen(cmd.decode('utf-8'), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
correct_msg = ret.stdout.read()
error_msg = ret.stderr.read()
conn.send(correct_msg + error_msg)
except ConnectionResetError:
break conn.close()
phone.close()

客户端

import socket

phone = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  # 买电话
phone.connect(('127.0.0.1',8080)) # 与客户端建立连接, 拨号 while 1:
cmd = input('>>>')
phone.send(cmd.encode('utf-8'))
from_server_data = phone.recv(1024)
print(from_server_data.decode('gbk'))
phone.close()
# 由于客户端发的命令获取的结果大小已经超过1024,那么下次在输入命令,会继续取上次残留到缓存区的数据。

2、发送端需要等缓冲区满才发送出去,造成粘包(发送数据时间间隔很短,数据也很小,会合到一起,产生粘包)

服务端

import socket

phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.bind(('127.0.0.1', 8080))
phone.listen(5) conn, client_addr = phone.accept()
frist_data = conn.recv(1024)
print('1:',frist_data.decode('utf-8')) # 1: helloworld
second_data = conn.recv(1024)
print('2:',second_data.decode('utf-8')) conn.close()
phone.close()

客户端

import socket

phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.connect(('127.0.0.1', 8080))
phone.send(b'hello')
phone.send(b'world') phone.close()
# 两次返送信息时间间隔太短,数据小,造成服务端一次收取

四、粘包的解决方案

粘包问题的根源在于,接收端不知道发送端将要传送的字节流的长度,所以解决粘包的方法就是围绕如何让发送端在发送数据前,把自己将要发送的字节流总数按照固定字节发送给接收端,后面跟上总数据,然后接收端先接收固定字节的总字节流,就可以知道需要接收的数据长度,再来一个死循环接收完所有数据。

方案一、低端版

服务端:

import socket
import subprocess
import struct server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8082))
server.listen(5) while 1:
conn, addr = server.accept()
print(conn, addr) while 1:
try:
cmd = conn.recv(1024)
obj = subprocess.Popen(cmd.decode('utf-8'),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
ret = obj.stdout.read()
ret_err = obj.stderr.read()
total_size = len(ret+ret_err)
total_size_bytes = struct.pack('i', total_size )
# print(total_size)
conn.send(total_size_bytes)
conn.send(ret)
conn.send(ret_err) except Exception:
break
conn.close()
server.close()

客户端:

import socket
import struct client = socket.socket()
client.connect(('127.0.0.1', 8082)) while 1:
msg = input('>>>').strip()
if not msg:continue
elif msg.upper() == 'Q':break
client.send(msg.encode('utf-8'))
header_size_bytes = client.recv(4)
header_size =struct.unpack('i', header_size_bytes)[0] client_recv_data = b''
size = 0
while size < header_size:
data = client.recv(1024)
client_recv_data += data
size += len(data) rst = client_recv_data.decode('gbk')
print(rst) client.close()

结果:

服务端显示
<socket.socket fd=236, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8082), raddr=('127.0.0.1', 14106)> ('127.0.0.1', 14106)
931
1790
5040
7802
8130 客户端:
执行dir,ipconfig,tasklist,netstat -an等命令都可以正常显示

这种方法当文件较小时可以使用,当文件太大时,total_size_bytes = struct.pack('i', total_size )也无法表示

方案二、高端版(可自定制报头版)

整体的流程解释:

我们可以把报头做成字典,字典里包含将要发送的真实数据的描述信息(大小之类的),然后json序列化,然后用struct将序列化后的数据长度打包成4个字节。
我们在网络上传输的所有数据都叫做数据包,数据包里的所有数据都叫做报文,报文里面不止有数据,还有ip地址、mac地址、端口号等等,其实所有的报文都有报头,这个报头是协议规定的。 发送时:
先发报头长度
再发报头,需要编码报头内容,其中包括数据的长度
最后发真实数据内容 接收时:
先接收报头长度,用struct取出来
根据取出的长度收取报头内容,然后解码,反序列化
从反序列化的结果中取出待接收数据的描述信息(数据长度),然后再接收真实的数据内容

服务端

import socket
import subprocess
import struct
import json server_ip = ('192.168.10.1', 8088)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(server_ip)
server_socket.listen(5) while 1:
conn, ipaddr = server_socket.accept()
print(conn, ipaddr) while 1:
try:
cmd = conn.recv(1024)
ret = subprocess.Popen(cmd.decode('utf-8'),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,)
msg = ret.stdout.read()
err_msg = ret.stderr.read() # 1 数据总长度
total_size = len(msg) + len(err_msg)
# 2 制作报头字典
header_dict = {
'md5': 'fdsaf2143254f',
'file_name': 'f1.txt',
'total_size':total_size,
}
# 3 将报头字典转换成json再转换成bytes格式
header_dict_json = json.dumps(header_dict)
header_dict_bytes = header_dict_json.encode('utf-8')
# 4 将报头长度转换成固定长度的struct格式
header_dict_size = len(header_dict_bytes)
header_dict_size_struct = struct.pack('i', header_dict_size)
# 5 发送报头长度
conn.send(header_size_struct)
# 6 发送报头
conn.send(header_dict)
# 7 发送真实数据:
conn.send(msg)
conn.send(error_msg)
except ConnectionResetError:
break conn.close()
phone.close()

客户端

import socket
import struct
import json
phone = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server_ip = ('192.168.10.1', 8088)
client_socket.connect(server_ip) while 1:
cmd = input('>>>')
if not cmd:continue
elif cmd.strip().upper() == 'Q':break
client_socket.send(cmd.encode('utf-8')) # 1,接收固定报头长度的struct格式
header_size_struct = client_socket.recv(4) #2,将struct格式的报头长度转换成int类型
head_size = struct.unpack('i', header_size_struct)[0] #3,接收报头的字典的bytes类型,也就是json格式
header_bytes = client_socket.recv(head_size) #4,将报头由bytes类型解码成字典
header_dict = json.loads(header_bytes.decode('utf-8')) #5,从字典中获取数据总大小
total_size = header_dict['total_size'] #6,根据报头信息,循环接收真实数据
recv_size = 0
msg = b''
while recv_size < total_size:
recv_data = client_socket.recv(1024)
msg += recv_data
recv_size += len(recv_data)
#由于msg是服务器在windows下执行命令的结果,默认编码为gbk,所以客户端显示需要使用gbk解码
print(msg.decode('gbk')) client_socket.close()

五、例子:FTP上传下载文件(简单版)

FTP上传下载文件(简单版)

server端

import socket
import struct
import json
sk = socket.socket()
# buffer = 4096 # 当双方接收发送的大小比较大的时候,比如4096,就会丢数据,改小了就ok,在linux上也是ok的。
buffer = 1024 #每次接收数据的大小
sk.bind(('127.0.0.1',8081))
sk.listen() conn,addr = sk.accept()
print(conn, addr)
#接收文件头部大小,获取文件大小值,接收文件
head_len = conn.recv(4)
head_len = struct.unpack('i',head_len)[0] #解包
json_head = conn.recv(head_len).decode('utf-8') #反序列化
head = json.loads(json_head)
filesize = head['filesize'] with open(head['filename'],'wb') as f:
recv_size = 0
while recv_size < filesize:
content = conn.recv(buffer)
f.write(content)
recv_size += len(content)
print('文件名称:%s,文件大小:%s,已接收%s'%(head['filename'], filesize, recv_size))
conn.close()
sk.close()

client端

import os
import json
import socket
import struct
sk = socket.socket()
sk.connect(('127.0.0.1',8081))
buffer = 1024 #读取文件的时候,每次读取的大小
head = {
'filepath':r'G:\share#需要下载的文件路径,也就是文件所在的文件夹
'filename':'1.py', #改成上面filepath下的一个文件
'filesize':None,
} file_path = os.path.join(head['filepath'],head['filename'])
filesize = os.path.getsize(file_path)
head['filesize'] = filesize
# json_head = json.dumps(head,ensure_ascii=False) #字典转换成字符串
json_head = json.dumps(head) #字典转换成字符串
bytes_head = json_head.encode('utf-8') #字符串转换成bytes类型 #计算head的长度,因为接收端先接收我们自己定制的报头
head_len = len(bytes_head) #报头长度
pack_len = struct.pack('i',head_len)
sk.send(pack_len) #先发送报头长度
sk.send(bytes_head) #再发送bytes类型的报头 #即便是视频文件,也是可以按行来读取的,也可以readline,也可以for循环,但是读取出来的数据大小就不固定了,影响效率,有可能读的比较小,也可能很大,像视频文件一般都是一行的二进制字节流。
#所有我们可以用read,设定一个一次读取内容的大小,一边读一边发,一边收一边写
with open(file_path,'rb') as f:
send_size = 0
while send_size < filesize:
content = f.read(buffer)
sk.send(content)
send_size += len(content)
print('文件大小:%s,已发送%s'%(filesize, send_size))
sk.close()

FTP上传下载文件(面向对象升级版)

server端

import socket
import struct
import json
import subprocess
import os class MYTCPServer:
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
allow_reuse_address = False
max_packet_size = 1024
coding='utf-8'
request_queue_size = 5
server_dir='file_upload' def __init__(self, server_address, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
self.server_address=server_address
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise def server_bind(self):
"""Called by constructor to bind the socket.
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname() def server_activate(self):
"""Called by constructor to activate the server.
"""
self.socket.listen(self.request_queue_size) def server_close(self):
"""Called to clean-up the server.
"""
self.socket.close() def get_request(self):
"""Get the request and client address from the socket.
"""
return self.socket.accept() def close_request(self, request):
"""Called to clean up an individual request."""
request.close() def run(self):
while True:
self.conn,self.client_addr=self.get_request()
print('from client ',self.client_addr)
while True:
try:
head_struct = self.conn.recv(4)
if not head_struct:break head_len = struct.unpack('i', head_struct)[0]
head_json = self.conn.recv(head_len).decode(self.coding)
head_dic = json.loads(head_json) print(head_dic)
#head_dic={'cmd':'put','filename':'a.txt','filesize':123123}
cmd=head_dic['cmd']
if hasattr(self,cmd):
func=getattr(self,cmd)
func(head_dic)
except Exception:
break def put(self,args):
file_path=os.path.normpath(os.path.join(
self.server_dir,
args['filename']
)) filesize=args['filesize']
recv_size=0
print('----->',file_path)
with open(file_path,'wb') as f:
while recv_size < filesize:
recv_data=self.conn.recv(self.max_packet_size)
f.write(recv_data)
recv_size+=len(recv_data)
print('recvsize:%s filesize:%s' %(recv_size,filesize)) tcpserver1=MYTCPServer(('127.0.0.1',8080))
tcpserver1.run()

client端

import socket
import struct
import json
import os class MYTCPClient:
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
allow_reuse_address = False
max_packet_size = 8192
coding='utf-8'
request_queue_size = 5 def __init__(self, server_address, connect=True):
self.server_address=server_address
self.socket = socket.socket(self.address_family,
self.socket_type)
if connect:
try:
self.client_connect()
except:
self.client_close()
raise def client_connect(self):
self.socket.connect(self.server_address) def client_close(self):
self.socket.close() def run(self):
while True:
inp=input(">>: ").strip()
if not inp:continue
l=inp.split()
cmd=l[0]
if hasattr(self,cmd):
func=getattr(self,cmd)
func(l) def put(self,args):
cmd=args[0]
filename=args[1]
if not os.path.isfile(filename):
print('file:%s is not exists' %filename)
return
else:
filesize=os.path.getsize(filename) head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize}
print(head_dic)
head_json=json.dumps(head_dic)
head_json_bytes=bytes(head_json,encoding=self.coding) head_struct=struct.pack('i',len(head_json_bytes))
self.socket.send(head_struct)
self.socket.send(head_json_bytes)
send_size=0
with open(filename,'rb') as f:
for line in f:
self.socket.send(line)
send_size+=len(line)
print(send_size)
else:
print('upload successful') client=MYTCPClient(('127.0.0.1',8080))
client.run()

打印进度条示例

#=========知识储备==========
#进度条的效果
[# ]
[## ]
[### ]
[#### ] #指定宽度
print('[%-15s]' %'#')
print('[%-15s]' %'##')
print('[%-15s]' %'###')
print('[%-15s]' %'####') #打印%
print('%s%%' %(100)) #第二个%号代表取消第一个%的特殊意义 #可传参来控制宽度
print('[%%-%ds]' %50) #[%-50s]
print(('[%%-%ds]' %50) %'#')
print(('[%%-%ds]' %50) %'##')
print(('[%%-%ds]' %50) %'###') #=========实现打印进度条函数==========
import sys
import time def progress(percent,width=50):
if percent >= 1:
percent=1
show_str = ('%%-%ds' % width) % (int(width*percent)*'|')
print('\r%s %d%%' %(show_str, int(100*percent)), end='') #=========应用==========
data_size=1025
recv_size=0
while recv_size < data_size:
time.sleep(0.1) #模拟数据的传输延迟
recv_size+=1024 #每次收1024 percent=recv_size/data_size #接收的比例
progress(percent,width=70) #进度条的宽度70