IO多路复用
IO多路复用是指:通过一种机制,可以监视多个描述符,一旦某个系统描述符就绪(一般是读就绪或者写就绪)能够通知程序进行相应的读写操作
实例化例子就是在SocketServer模块中,客户端和服务端建立好连接,此时服务端通过监听conn这条链路,一旦客户端发送了数据,conn链路状态就发生变化,服务端就知道有数据要接收...
Linux系统中同时存在select、pull、epoll三种IO多路复用机制
windows中只有select机制
1)select
select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点是:
1 单个进程可监视的fd数量被限制
2 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大
3 对socket进行扫描时是线性扫描
2)pull
poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。
它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。
3)epoll
epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就需态,并且只会通知一次。
在前面说到的复制问题上,epoll使用mmap减少复制开销。
还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知
直接来看看sockserver利用select多路复用机制实现的伪并发实例
客户端
import socket
# 创建一个socket实例
sk = socket.socket()
sk.connect(('127.0.0.1', 9999))
# 连接成功后打印服务端发送的消息
recv_bytes = sk.recv(1024)
recv_str = str(recv_bytes, encoding='utf-8')
print(recv_str)
# 开始循环交互发送数据
while True:
inp = input(">>>:")
sk.sendall(bytes(inp, encoding='utf-8'))
ret = sk.recv(1024)
print(ret)
sk.close()
服务端
import socketimport select # 创建一个socket对象并绑定IP端口sk = socket.socket()sk.bind(('127.0.0.1', 9999,))sk.listen(5) while True: # 开始监听sk(服务端)对象 如果sk发生变化 表示有客户端来连接 此时rlist里面的值为[sk,] rlist, wlist, elist, = select.select([sk, ], [], [], 1) print(rlist) # 遍历rlist 如果有客户端来连接 就会被加入到rlist列表 for r in rlist: # 新客户端来连接 conn, address = r.accept() conn.sendall(bytes('hello', encoding='utf-8'))
开启select监听后,我们启动服务端看看效果
[] [] # 空列表[][<socket.socket fd=244, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>] # 发生改变的sk状态[][] # 空列表[]# 启动服务后,一旦有客户端连接 sk状态就会改变 sk就会被添加进rlist列表中被打印出来
客户端效果
hello>>>:# 服务端接受了连接请求 并发送了一个hello消息
上面我们只是应用了select中的一个参数rlist,详细的参数信息应该是
句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间) 参数: 可接受四个参数(前三个必须)返回值:三个列表 select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化 当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表 如果监听的句柄有变化,则直接执行
所以我们可以继续完善上面的代码 模拟实现一个完整的socketserver并发程序
import socketimport select # 创建一个socket对象并绑定IP端口sk = socket.socket()sk.bind(('127.0.0.1', 9999,))sk.listen(5) # 创建一个新客户端列表 有消息状态改变的客户端列表和一个存储具体消息的字典inputs = [sk, ]outputs = []message = {} while True: # 开始监听sk(服务端)对象 如果sk发生变化 表示有客户端来连接 此时rlist里面的值为[sk,] # 监听conn对象 如果conn发生变化 表示客户端有消息过来了 此时rlist的值为[客户端, ] rlist, wlist, elist, = select.select([sk, ], [], [], 1) print(len(inputs), len(rlist), len(wlist)) # 遍历rlist 查看发生状态改变的链接 for r in rlist: if r == sk: # 新客户端来连接 conn, address = r.accept() conn.sendall(bytes('hello', encoding='utf-8')) # conn是什么? 其实是socket对象 连接成功后添加进客户端列表rlist inputs.append(conn) # 以该客户端为键添加字典元素 message[conn] = [] else: # 有人给我发了消息 try: ret = r.recv(1024) # r.send(ret) if not ret: raise Exception('断开连接') else: # 如果是已经建立连接的客户端发来消息 添加进消息客户端列表wlist outputs.append(r) # 具体消息写入字典 message[r].append(ret) except Exception as e: # 如果客户端异常断开 清理列表 inputs.remove(r) del message[r] # 读写分离 专门负责发消息 for w in wlist: # 读取字典的最后消息 msg = message[w].pop() resp = msg + bytes('response', encoding='utf-8') w.sendall(resp) # 消息读取完成将该客户端从wlist删除 outputs.remove(w)
SocketServer模块
SocketServer是内部使用IO多路复用以及多线程和多进程,从而实现并发处理多个客户端请求的Socket服务端
每个客户端请求连接到服务器时,SockServer服务端会在服务器上创建一个线程或者进程专门来负责处理当前客户端的所有请求
ThreaddingTCPServer
ThreaddingTCPServer实现的Socket服务器内部会为每个client创建一个“线程”,该线程用来和客户端进行交互
使用ThreaddingTCPServer:
1)创建一个自定义类 去继承socketserver.BaseRequestHandler的类
2)自定义类中必须有一个名称为handle的方法
3)在handle中发送和收取客户端消息使用普通字段self.request
服务端
import socketserverimport subprocess # 自定义类 继承socketserver.BaseRequestHandlerclass Myserver(socketserver.BaseRequestHandler): # handle方法 def handle(self): self.request.sendall(bytes('欢迎致电10086 请输入1-9 0转人工服务...', encoding='utf-8')) while True: data = self.request.recv(1024) if len(data) == 0: break self.request.send(cmd_res) if __name__ == '__main__': server = socketserver.ThreadingTCPServer(('127.0.0.1', 8009), Myserver) server.serve_forever()
客户端
import socket ip_port = ('127.0.0.1',8009)sk = socket.socket()sk.connect(ip_port)sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close()
ThreadingTCPServer源码剖析
ThreadingTCPServer中类关系图
内部调用流程:
1)执行 BaseServer.__init__ 方法,将自定义的继承自SocketServer.BaseRequestHandler 的类 MyRequestHandle赋值给 self.RequestHandlerClass
2)执行 TCPServer.__init__ 方法,创建服务端Socket对象并绑定 IP 和 端口
3)执行 BaseServer.server_forever 方法,While 循环一直监听是否有客户端请求到达 ...
当客户端连接到达服务器
4)执行 ThreadingMixIn.process_request 方法,创建一个 “线程” 用来处理请求
5)执行 ThreadingMixIn.process_request_thread 方法
6)执行 BaseServer.finish_request 方法
7)执行 self.RequestHandlerClass() 即:执行 自定义 MyRequestHandler 的构造方法(自动调用基BaseRequestHandler的构造方法,在该构造方法中又会调用 MyRequestHandler的handle方法)
精简源码
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sk.bind(('127.0.0.1',8009))sk.listen(5) while True: r, w, e = select.select([sk,],[],[],1) print 'looping' if sk in r: print 'get request' request, client_address = sk.accept() t = threading.Thread(target=process, args=(request, client_address)) t.daemon = False t.start() sk.close()
本文出自 “改变从每一天开始” 博客,请务必保留此出处http://lilongzi.blog.51cto.com/5519072/1880174