IO模型
IO阻塞模型分类:
- 阻塞IO
- 非阻塞IO
- 多路复用IO
- 异步IO(爬虫阶段)
- 信号驱动IO(了解)
1、阻塞IO模型
socket模块默认是阻塞的,一个读操作流程如下:
问题:
同一时间只能服务一个客户端
解决办法:
1. 多线程
优点:如果并发量不高,效率是较高的,因为每个客户端都有单独线程来处理
缺点:不可能无限的开启线程,线程也需要占用资源
2. 多进程
优点:可以多个CPU并行处理
弊端:占用资源非常大,一旦客户端稍微多一点,立马就慢了
3.线程池
优点:保证了服务器正常运行,还帮你负责创建和销毁线程,以及任务分配
缺点:一旦并发量超出最大线程量,就只能等签名的运行完毕。
4. 协程
优点:不需要创建一段线程,也不需要在线程间做切换,没有数量限制
缺点:不能利用多核优势
结果:真正倒是效率低的是阻塞问题,但上述办法并没有真正的解决阻塞问题。
2、非阻塞IO模型
遇到IO操作也不阻塞,会继续执行。意味着即使遇到IO操作CPU执行权也不会被剥夺
从图中看出,非阻塞的recv系统调用之后,进程没有被阻塞,操作系统立马把结果返回给进程,如果数据还没准备好,则抛出异常,进程可以去做其他的事,然后在发起recv系统调用,重复上述过程(这个过程通常被称为轮询),一直到数据准备好,再拷贝数据到进程进行数据处理。需要注意,拷贝数据的整个过程,进程仍然是属于阻塞状态。
缺点: 占用CPU太多,原因是需要无限的循环去向操作系统拿数据。
import socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', ))
server.listen()
server.setblocking(False)
# 所有客户端的socket
conns = []
# 所有需要返回数据的客户端
send_cs = [] while True:
try:
conn, client_addr = server.accept()
print(client_addr,'已连接')
conns.append(conn)
except BlockingIOError:
# 接收数据
for conn in conns[:]:# 和conns.copy()一样,原因:迭代过程不能删除元素
try:
data = conn.recv()
print(data)
send_cs.append((data, conn))
# send也是IO操作,在一些极端情况下,如系统缓存满了,肯定也会抛出异常
# 所以,send要单拿出来处理
except BlockingIOError:
continue
except ConnectionResetError:
conn.close()
conns.remove(conn)
# 发送数据
for item in send_cs[:]:
data, conn = item
try:
conn.send(data.upper())
# 如果发送成功就把数据从列表中删除
send_cs.remove(item)
except BlockingIOError: # 如果缓冲区满了 就下次再发
continue
except ConnectionResetError:
conn.close()
send_cs.remove(item)
conns.remove(conn)
服务端代码
3、多路复用IO
用一个线程来处理并发所有的客户端。
需要使用select模块,select原理:把所有的socket交给select,select会不断轮询所负责的所有socket,当某个socket有数据到达,就通知进程继续执行后面代码。
流程:程序发起一个select调用,select使整个进程阻塞,直到有socket准备就绪,select就返回,这个时候进程在调用read操作,直接从缓冲中把数据拷贝到进程。流程图如下:
import socket
import select server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", ))
server.listen() r_list = [server] # 监测是否收到数据的客户端
w_list = [] # 监测是否需要发送数据的客户端
x_list = [] # 用来发送数据
data_dic = {} while True:
readables, writeables, _ = select.select(r_list, w_list, x_list)
# 接收数据以及建立连接
for conn in readables:
if conn == server:
new_conn, _ = conn.accept()
r_list.append(new_conn)
else:
try:
data = conn.recv()
if not data:
conn.close()
r_list.remove(conn)
continue
print(data)
# 发送数据
w_list.append(conn)
data_dic[conn] = data
except ConnectionResetError:
conn.close()
r_list.remove(conn)
# 发送数据
for conn in writeables:
try:
conn.send(data_dic[conn].upper())
except ConnectionResetError:
conn.close()
finally:
data_dic.pop(conn)
w_list.remove(conn)
服务端代码
import socket
import threading
from threading import Thread def communication():
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', ))
while True:
msg = "%s say hello for you"%threading.current_thread()
if not msg:
continue
client.send(msg.encode("utf-8"))
data = client.recv()
print(data.decode("utf-8")) for i in range():
Thread(target=communication).start()
客户端代码
强调:select的优势在于可以处理多个连接,并不适用于单个连接
优点:占用资源少,不消耗太多CPU,同时能够为多个客户端提供服务。(适用于简单的事件驱动服务器)
缺点:需要消耗大量时间区轮询各个socket,更好的选择时epoll,其次把事件探测和响应夹杂在一起,耦合性增加