通过Python3.5来学习几种不同的IO模型

时间:2021-11-07 03:30:41

计算机的核心资源,基本上就是CPU和内存。我们下面的讨论可以假定CPU只有一个物理核心。


从目前的情况看,CPU很快,IO很慢,即使是物理内存也很慢,否则就不需要CPU设置多层的高速cache了。


CPU主要快在哪里?1、频率;2、指令执行效率,这里主要是硬件级别的指令分阶段并行优化。


所以要充分利用CPU的指令来完成我们的计算任务。对于一个物理CPU来说,每时每刻只能执行一条指令。


那我们的故事开始了:

要完成的任务:创建一个HTTP服务器,启动,并能够响应外部的请求。


1. 阻塞式IO

流程:用户空间的应用程序通过系统调用去读取数据,如果内核中对应的文件描述符没有准备好数据,就会阻塞,等待数据的到来;数据到来的时候,从内核空间复制到用户空间,继而返回给应用程序来处理


服务器版本1.0: 首先,我们写了一个单线程版本的“HTTP服务器”,这个服务器仅仅是能够返回一个合法的HTTP响应而已,并且是单线程的:

#coding=utf-8
'''
Created on Nov 25, 2016

@author: Felix
@summary: 在一个线程中使用阻塞式IO
'''
import socket
import time

HOST, PORT = '', 7777

# HTTP协议中定义的分隔符
HEADER_BODY_SPLITER = b'\r\n\r\n'
HEADER_ITEM_SPLITER = b'\r\n'

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(2)

while True:
conn, addr = s.accept()

# 针对本次请求的一些变量定义
content_length = 0
request_line = b''
header_bytes, header_length, header_complete = b'', 0, False
body_bytes, body_length, body_complete = b'', 0, False

with conn:
print('Got Request From: {!r}'.format(addr))
while True:
received = conn.recv(1024) # 如果连接有效,等待数据到来的时候,将会在这里等待,有数据到来的时候会从内核态复制到用户态,进一步返回给received参数
if not received: # 连接关闭
break
if not header_complete:
header_bytes += received
header_length += len(received) # 可以做头大小的限制
if header_bytes.find(HEADER_BODY_SPLITER) >= 0:
header_complete = True # HTTP头已经传输完毕
header_bytes, body_bytes = header_bytes.split(HEADER_BODY_SPLITER)
body_length += len(body_bytes) # 用于同Content-Length做比较,以标记HTTP请求报文结束

# 解析HTTP头
HEAD_ITEMS_LIST = header_bytes.split(HEADER_ITEM_SPLITER)
request_line = HEAD_ITEMS_LIST[0]
HEAD_ITEMS_LIST = HEAD_ITEMS_LIST[1:]
HEAD_ITEM_DICT = {h[0].decode('utf-8'):h[1].decode('utf-8').strip()
for h in [item.split(b':') for item in HEAD_ITEMS_LIST]}
content_length = int(HEAD_ITEM_DICT.get('Content-Length', '0'))
else:
# 请求头已经读取完毕,读到的数据都属于请求体了
body_bytes += received
body_length += len(received)

if header_complete: # 如果HTTP请求头已经解析完毕,进一步解析HTTP请求体
if content_length <= body_length:
body_bytes = body_bytes[:content_length+1]
time.sleep(5) # 模拟处理过程需要花费的时间
message = 'hello {!r}'.format(addr)
response = 'HTTP/1.0 200 OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\n\r\n{}'.format(len(message), message)
response = response.encode('utf-8')
conn.sendall(response)
break


PS:这个服务器程序非常简单,返回一个合法的HTTP应答,其中的那个time.sleep可以模拟业务逻辑处理的时间。

优点:简单

缺点:一次只能服务一个客户,加入业务逻辑处理时间过长,其它的客户只能等待。

举例:如果两个客户端同时连进来,只有一个客户端能获得服务,等5秒之后,第二个客户端获得服务,服务完两个客户端需要10秒的时间


服务器版本1.1: 多线程版本,这也是现在主流的HTTP服务方式:

#coding=utf-8
'''
Created on Nov 28, 2016

@author: Felix
@summary: 在多个线程中使用阻塞式IO
'''
import concurrent.futures
import socket
import time

def serv_client(conn):
# 针对本次请求的一些变量定义
content_length = 0
request_line = b''
header_bytes, header_length, header_complete = b'', 0, False
body_bytes, body_length, body_complete = b'', 0, False

with conn:
print('Got Request From: {!r}'.format(addr))
while True:
received = conn.recv(1024) # 如果连接有效,等待数据到来的时候,将会在这里等待,有数据到来的时候会从内核态复制到用户态,进一步返回给received参数
if not received: # 连接关闭
break
if not header_complete:
header_bytes += received
header_length += len(received) # 可以做头大小的限制
if header_bytes.find(HEADER_BODY_SPLITER) >= 0:
header_complete = True # HTTP头已经传输完毕
header_bytes, body_bytes = header_bytes.split(HEADER_BODY_SPLITER)
body_length += len(body_bytes) # 用于同Content-Length做比较,以标记HTTP请求报文结束

# 解析HTTP头
HEAD_ITEMS_LIST = header_bytes.split(HEADER_ITEM_SPLITER)
request_line = HEAD_ITEMS_LIST[0]
HEAD_ITEMS_LIST = HEAD_ITEMS_LIST[1:]
HEAD_ITEM_DICT = {h[0].decode('utf-8'):h[1].decode('utf-8').strip()
for h in [item.split(b':') for item in HEAD_ITEMS_LIST]}
content_length = int(HEAD_ITEM_DICT.get('Content-Length', '0'))
else:
# 请求头已经读取完毕,读到的数据都属于请求体了
body_bytes += received
body_length += len(received)

if header_complete: # 如果HTTP请求头已经解析完毕,进一步解析HTTP请求体
if content_length <= body_length:
body_bytes = body_bytes[:content_length+1]
time.sleep(5) # 模拟处理过程需要花费的时间
message = 'hello {!r}'.format(addr)
response = 'HTTP/1.0 200 OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\n\r\n{}'.format(len(message), message)
response = response.encode('utf-8')
conn.sendall(response)
break

HOST, PORT = '', 7777

# HTTP协议中定义的分隔符
HEADER_BODY_SPLITER = b'\r\n\r\n'
HEADER_ITEM_SPLITER = b'\r\n'

executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(2)

while True:
conn, addr = s.accept()
executor.submit(serv_client, conn)

PS: 这个程序逻辑非常简单,需要注意:1.为了不在交付给另外的线程后,连接被关闭,需要将上下文管理器去掉,连接的关闭在线程中执行;2.开启了一个4个任务的线程池,最多同时服务4个客户端,当然这个是可以调整的。

优点:终于可以同时服务多个客户端了,虽然最多只能同时服务4个,但是这个是可以调整的。

缺点:客户端的数量巨大的时候,只同时服务4个客户端肯定是不够的;增加服务器线程能够缓解这个问题,但是同时又带来了CPU在多线程之间来回切换的开销。

举例:如果有两个客户端同时连进来,两个客户端能够同时(当然肯定是有一个先后顺序的,只不过这个间隔非常小,可以忽略)得到服务,服务完两个客户需要5秒的时间。


2. 非阻塞式IO

流程:用户通过系统调用获取某个文件描述符上的数据的时候,假如,内核中此文件描述符上没有数据,或者没有连接时,线程不会进入睡眠状态,而是内核直接返回一个EWOULDBLOCK错误给用户,用户自己来处理错误;当数据准备好的时候,用户应用等待数据从内核复制到用户空间,然后处理数据。

服务器2.0版本:非阻塞IO的单线程版本

#coding=utf-8
'''
Created on Nov 25, 2016

@author: Felix
@summary: 在一个线程中使用非阻塞式IO
'''
import socket
import time

HOST, PORT = '', 7777

# HTTP协议中定义的分隔符
HEADER_BODY_SPLITER = b'\r\n\r\n'
HEADER_ITEM_SPLITER = b'\r\n'

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.setblocking(False) #### 将套接字设置为非阻塞
s.listen()

while True:
try:
conn, addr = s.accept() # 由于套接字是非阻塞的,再没有连接时会直接报错,需要做一些处理
except BlockingIOError as e:
print('conn not ready...', e)
continue

# 针对本次请求的一些变量定义
content_length = 0
request_line = b''
header_bytes, header_length, header_complete = b'', 0, False
body_bytes, body_length, body_complete = b'', 0, False

with conn:
print('Got Request From: {!r}'.format(addr))
while True:
try:
received = conn.recv(1024) # 由于套接字是非阻塞的,再没有数据时会直接报错,需要做一些处理
except BlockingIOError as e:
print('no data, wait....', e)
continue
if not received: # 连接关闭
break
if not header_complete:
header_bytes += received
header_length += len(received) # 可以做头大小的限制
if header_bytes.find(HEADER_BODY_SPLITER) >= 0:
header_complete = True # HTTP头已经传输完毕
header_bytes, body_bytes = header_bytes.split(HEADER_BODY_SPLITER)
body_length += len(body_bytes) # 用于同Content-Length做比较,以标记HTTP请求报文结束

# 解析HTTP头
HEAD_ITEMS_LIST = header_bytes.split(HEADER_ITEM_SPLITER)
request_line = HEAD_ITEMS_LIST[0]
HEAD_ITEMS_LIST = HEAD_ITEMS_LIST[1:]
HEAD_ITEM_DICT = {h[0].decode('utf-8'):h[1].decode('utf-8').strip()
for h in [item.split(b':') for item in HEAD_ITEMS_LIST]}
content_length = int(HEAD_ITEM_DICT.get('Content-Length', '0'))
else:
# 请求头已经读取完毕,读到的数据都属于请求体了
body_bytes += received
body_length += len(received)

if header_complete: # 如果HTTP请求头已经解析完毕,进一步解析HTTP请求体
if content_length <= body_length:
body_bytes = body_bytes[:content_length+1]
time.sleep(5) # 模拟处理过程需要花费的时间
message = 'hello {!r}'.format(addr)
response = 'HTTP/1.0 200 OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\n\r\n{}'.format(len(message), message)
response = response.encode('utf-8')
conn.sendall(response)
break

PS:此版本的HTTP服务器和1.0版本的HTTP服务器,在效果上时相同的。它只能同时服务一个客户端,假如两个客户端同时请求,则完成这两个请求需要10秒的时间。可以按照1.1的方法,将程序改成多线程版本。

优点:实现简单、容易理解。非阻塞的特性给我们提供了一些机会,用户来做其它的事情,而不是傻等,可以更充分利用CPU资源。

缺点:要判断一个套接字是不是有数据,还是需要应用不断对套接字去做“测试”,这样明显是低效的

举例:如果两个客户端同时连进来,只有一个客户端能获得服务,等5秒之后,第二个客户端获得服务,服务完两个客户端需要10秒的时间


3. IO复用模型

用户会在关心的文件描述符上注册关心的事件:描述符可读、可写、发生异常等等。当对应的描述符有对应的事件发生的时候,系统内核就会通知到用户,用户就可以调用系统调用,来读取或者写入对应的描述符了。

服务器3.0版本:

#coding=utf-8
'''
Created on Nov 25, 2016

@author: Felix
@summary: IO复用模型
'''
import selectors
import socket
import time

HOST, PORT = '', 7777

# HTTP协议中定义的分隔符
HEADER_BODY_SPLITER = b'\r\n\r\n'
HEADER_ITEM_SPLITER = b'\r\n'

def conn_ready(s, mask):
'''这个函数会在描述符s可读的时候被调用,故,虽然s在下面的代码中被设置成nonblocking的,所以可以在这里放心的去读取,而不用做错误处理'''
conn, addr = s.accept()
print('New Connection Comming......{!r}'.format(conn))
conn.setblocking(False) # 设置成非阻塞套接字,防止在后续的处理中,读取或者写入数据的时候阻塞
default_selector.register(conn, selectors.EVENT_READ, data_ready) # 如果这个新建立的套接字已经可读,调用回调函数data_ready

def data_ready(conn, mask):
print('Data ready......{!r}'.format(conn))
# 针对本次HTTP请求的一些变量定义。注意,这里没有处理HTTP请求报文数据无法一次性读取完毕的情况,如果想实现完善的HTTP服务器,这里必须做好分块数据接收的处理
content_length = 0
request_line = b''
header_bytes, header_length, header_complete = b'', 0, False
body_bytes, body_length, body_complete = b'', 0, False

received = conn.recv(16)
if not received:
default_selector.unregister(conn) # 客户端关闭了连接
conn.close()
else:
addr = conn.getpeername()
if not header_complete:
header_bytes += received
print(header_bytes)
header_length += len(received) # 可以做头大小的限制
if header_bytes.find(HEADER_BODY_SPLITER) >= 0:
header_complete = True # HTTP头已经传输完毕
header_bytes, body_bytes = header_bytes.split(HEADER_BODY_SPLITER)
body_length += len(body_bytes) # 用于同Content-Length做比较,以标记HTTP请求报文结束

# 解析HTTP头
HEAD_ITEMS_LIST = header_bytes.split(HEADER_ITEM_SPLITER)
request_line = HEAD_ITEMS_LIST[0]
HEAD_ITEMS_LIST = HEAD_ITEMS_LIST[1:]
HEAD_ITEM_DICT = {h[0].decode('utf-8'):h[1].decode('utf-8').strip()
for h in [item.split(b':') for item in HEAD_ITEMS_LIST]}
content_length = int(HEAD_ITEM_DICT.get('Content-Length', '0'))
else:
# 请求头已经读取完毕,读到的数据都属于请求体了
body_bytes += received
body_length += len(received)

if header_complete: # 如果HTTP请求头已经解析完毕,进一步解析HTTP请求体
if content_length <= body_length:
body_bytes = body_bytes[:content_length+1]
time.sleep(5) # 模拟处理过程需要花费的时间
message = 'hello {!r}'.format(addr)
response = 'HTTP/1.0 200 OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\n\r\n{}'.format(len(message), message)
response = response.encode('utf-8')
conn.sendall(response)

default_selector = selectors.DefaultSelector()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(False) # 设置成非阻塞套接字,防止在后续的处理中,读取或者写入数据的时候阻塞
s.bind((HOST, PORT))
s.listen()
default_selector.register(s, selectors.EVENT_READ, conn_ready) # 如果此套接字可读,回调connection_ready函数

print('starting server...')
i = 0
while True:
i += 1
for conn, mask in default_selector.select(timeout=1):
print(i, conn)
callback = conn.data
callback(conn.fileobj, mask) # 如果此处阻塞,则会阻塞其它请求的处理
print('shutting down')
default_selector.close()
优点:此版本的服务器,只有单个的进程,但是能够同时监控许多socket对象的状态,selectors这个Python库在Linux上会选用epoll,在macOS上会选用kqueue,以达到最好的性能。

缺点:如果业务处理逻辑本身就是CPU密集型的操作,那么由于还是在一个进程里的计算,会导致其它的请求被阻塞。但是这个阻塞并不是IO阻塞。