io类型

时间:2021-07-08 07:11:05

非阻塞io

from socket import *
import time
s=socket(AF_INET,SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
conn_l=[]
del_l=[]
while True:
try:
conn,addr=s.accept()
conn_l.append(conn)
except BlockingIOError:
print(conn_l)
for conn in conn_l:
try:
data=conn.recv(1024)
if not data:
del_l.append(conn)
continue
conn.send(data.upper())
except BlockingIOError:
pass
except ConnectionResetError:
del_l.append(conn) for conn in del_l:
conn_l.remove(conn)
conn.close()
del_l=[]

特点:实现了非阻塞,提高了cpu占用率,但由于一直监听 accept ,cpu占用率过高!

多路复用

select 模型:

from socket import *
import select s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8081))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
read_l=[s,]
while True:
r_l,w_l,x_l=select.select(read_l,[],[])
print(r_l)
for ready_obj in r_l:
if ready_obj == s:
conn,addr=ready_obj.accept() #此时的ready_obj等于s
read_l.append(conn)
else:
try:
data=ready_obj.recv(1024) #此时的ready_obj等于conn
if not data:
ready_obj.close()
read_l.remove(ready_obj)
continue
ready_obj.send(data.upper())
except ConnectionResetError:
ready_obj.close()
read_l.remove(ready_obj)

select 模型过程:

用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;

#用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。

缺点:因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。

epoll 更好的选择,但是windows不支持。

selector模型——根据系统自动选择

 from socket import *
import selectors sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read) def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close() server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept while True:
events=sel.select() #检测所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
from socket import *
import time
s=socket(AF_INET,SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
conn_l=[]
del_l=[]
while True:
try:
conn,addr=s.accept()
conn_l.append(conn)
except BlockingIOError:
print(conn_l)
for conn in conn_l:
try:
data=conn.recv(1024)
if not data:
del_l.append(conn)
continue
conn.send(data.upper())
except BlockingIOError:
pass
except ConnectionResetError:
del_l.append(conn) for conn in del_l:
conn_l.remove(conn)
conn.close()
del_l=[]