【python】-- IO多路复用(select、poll、epoll)介绍及实现

时间:2023-11-24 15:54:56

IO多路复用(select、poll、epoll)介绍及select、epoll的实现

IO多路复用中包括 select、pool、epoll,这些都属于同步,还不属于异步

一、IO多路复用介绍

1、select

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

  select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

  select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

  另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

2、poll

poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

  poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

  另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

3、epoll

直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

  epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

  epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

  另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

4、sellect、poll、epoll三者的区别

【python】--  IO多路复用(select、poll、epoll)介绍及实现

二、select IO多路复用

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样会造成效率的降低

1、select语法:

select(rlist, wlist, xlist, timeout=None)

select()方法接收并监控3个通信列表, 第一个rlist监控所有要进来的输入数据,第二个wlist是监控所有要发出去的输出数据,第三个监控异常错误数据,第四个设置指定等待时间,如果想立即返回,设为null即可,最后需要创建2个列表来包含输入和输出信息来传给select(),让select方法通过内核去监控,然后生成三个实例。

#建立两个列表,比如想让内核去检测50个连接,需要传给它一个列表,就是这个inputs(列表中里面存放的是需要被内核监控的链接),然后交给select,就相当于交给内核了
inputs = [server,] #输入列表,监控所有输入数据 outputs = [] #输入列表,监控所有输出数据 #把两个列表传给select方法通过内核去监控,生成三个实例
readable,writeable,exceptional = select.select(inputs,outputs,inputs) # 这里select方法的第三个参数同样传入input列表是因为,input列表中存放着所有的链接,比如之前放入的50被监控链接中有5个断了,出现了异常,就会输入到exceptional里面,但这5链接本身是放在inputs列表中

2、select服务端代码实例:

import select,socket,queue
server = socket.socket()
server.bind(("localhost",9000))
server.listen(1000)
server.setblocking(False) #设置为非阻塞
msg_dic = dict() #定义一个队列字典
inputs = [server,] #由于设置成非阻塞模式,accept和recive都不阻塞了,没有值就会报错,因此最开始需要最开始需要监控服务端本身,等待客户端连接
outputs = []
while True:
#exceptional表示如果inputs列表中出现异常,会输出到这个exceptional中
readable,writeable,exceptional = select.select(inputs,outputs,inputs)#如果没有任何客户端连接,就会阻塞在这里
for r in readable:# 没有个r代表一个socket链接
if r is server: #如果这个socket是server的话,就说明是是新客户端连接了
conn,addr = r.accept() #新连接进来了,接受这个连接,生成这个客户端实例
print("来了一个新连接",addr)
inputs.append(conn)#为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
#就会被交给select去监听
msg_dic[conn] = queue.Queue() #初始化一个队列,后面存要返回给这个客户端的数据
else: #如果不是server,就说明是之前建立的客户端来数据了
data = r.recv(1024)
print("收到数据:",data)
msg_dic[r].put(data)#收到的数据先放到queue里,一会返回给客户端
outputs.append(r)#为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
# r.send(data)
# print("send done....")
for w in writeable: #要返回给客户端的链接列表
data_to_client = msg_dic[w].get()
w.send(data_to_client) #返回给客户端的源数据
outputs.remove(w) #确保下次循环的时候writeable,不返回这个已经处理完的这个连接了 for e in exceptional: #处理异常的连接
if e in outputs: #因为e不一定在outputs,所以先要判断
outputs.remove(e)
inputs.remove(e) #删除inputs中异常连接
del msg_dic[e] #删除此连接对应的队列

三、epoll IO多路复用

epoll的方式,这种效率更高,但是这种方式在Windows下不支持,在Linux是支持的,selectors模块就是默认使用就是epoll,但是如果在windows系统上使用selectors模块,就会找不到epoll,从而使用select。

1、selectors语法:

#定义一个对象
sel = selectors.DefaultSelector() #注册一个事件
sel.register(server,selectors.EVENT_READ,accept) #注册事件,只要来一个连接就调accept这个函数,就相当于之前select的用法,sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,],readable,writeable,exceptional = select.select(inputs,outputs,inputs)意思是一样的。

2、selectors代码实例:

import selectors,socket

sel = selectors.DefaultSelector()

def accept(sock,mask):
"接收客户端信息实例"
conn,addr = sock.accept()
print("accepted",conn,'from',addr)
conn.setblocking(False)
sel.register(conn,selectors.EVENT_READ,read) #新连接注册read回调函数 def read(conn,mask):
"接收客户端的数据"
data = conn.recv(1024)
if data:
print("echoing",repr(data),'to',conn)
conn.send(data)
else:
print("closing",conn)
sel.unregister(conn)
conn.close() server = socket.socket()
server.bind(('localhost',9999))
server.listen(500)
server.setblocking(False)
sel.register(server,selectors.EVENT_READ,accept) #注册事件,只要来一个连接就调accept这个函数,
#sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,] while True:
events = sel.select() #这个select,看起来是select,有可能调用的是epoll,看你操作系统是Windows的还是Linux的
#默认阻塞,有活动连接就返回活动连接列表
print("事件:",events)
for key,mask in events:
callback = key.data #相当于调accept了
callback(key.fileobj,mask) #key.fileobj=文件句柄

打印服务端:

 [(SelectorKey(fileobj=<socket.socket fd=436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222)>, fd=436, events=1, data=<function accept at 0x0000022296063E18>), 1)]
accepted <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)> from ('127.0.0.1', 50281)
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'adas' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'HA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'asdHA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>

这样就容易明白:callback = key.data #第一次调用的是accept,第二次调用的是read    callback(key.fileobj,mask)  #key.fileobj=文件句柄

客户端代码:

在Linux端,selectors模块才能是epoll

import socket,sys

messages = [ b'This is the message. ',
b'It will be sent ',
b'in parts.',
]
server_address = ('localhost', 9999) # 创建100个 TCP/IP socket实例
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)] # 连接服务端
print('connecting to %s port %s' % server_address)
for s in socks:
s.connect(server_address) for message in messages: # 发送消息至服务端
for s in socks:
print('%s: sending "%s"' % (s.getsockname(), message) )
s.send(message) # 从服务端接收消息
for s in socks:
data = s.recv(1024)
print( '%s: received "%s"' % (s.getsockname(), data) )
if not data:
print(sys.stderr, 'closing socket', s.getsockname() )