网络编程基础【day10】:IO多路复用

时间:2022-11-25 03:13:29
这些名词比较绕口,理解涵义就好。一个epoll场景:一个酒吧服务员(一个线程),前面趴了一群醉汉,突然一个吼一声“倒酒”(事件),你小跑过去给他倒一杯,然后随他去吧,突然又一个要倒酒,你又过去倒上,就这样一个服务员服务好多人,有时没人喝酒,服务员处于空闲状态,可以干点别的玩玩手机。至于epoll与select,poll的区别在于后两者的场景中醉汉不说话,你要挨个问要不要酒,没时间玩手机了。io多路复用大概就是指这几个醉汉共用一个服务员。
 

前言

从零单排高性能问题,这次轮到异步通信了。这个领域入门有点难,需要了解UNIX五种IO模型和TCP协议,熟练使用三大异步通信框架:Netty、NodeJS、Tornado。目前所有标榜异步的通信框架用的都不是异步IO模型,而是IO多路复用中的epoll。因为Python提供了对Linux内核API的友好封装,所以我选择Python来学习IO多路复用。

IO多路复用

  1. select

    举一个EchoServer的例子,客户端发送任何内容,服务端会原模原样返回。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    '''
    Created on Feb 16, 2016 @author: mountain
    '''
    import socket
    import select
    from Queue import Queue #AF_INET指定使用IPv4协议,如果要用更先进的IPv6,就指定为AF_INET6。
    #SOCK_STREAM指定使用面向流的TCP协议,如果要使用面向数据包的UCP协议,就指定SOCK_DGRAM。
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    #设置监听的ip和port
    server_address = ('localhost', 1234)
    server.bind(server_address)
    #设置backlog为5,client向server发起connect,server accept后建立长连接,
    #backlog指定排队等待server accept的连接数量,超过这个数量,server将拒绝连接。
    server.listen(5)
    #注册在socket上的读事件
    inputs = [server]
    #注册在socket上的写事件
    outputs = []
    #注册在socket上的异常事件
    exceptions = []
    #每个socket有一个发送消息的队列
    msg_queues = {}
    print "server is listening on %s:%s." % server_address
    while inputs:
    #第四个参数是timeout,可选,表示n秒内没有任何事件通知,就执行下面代码
    readable, writable, exceptional = select.select(inputs, outputs, exceptions)
    for sock in readable:
    #client向server发起connect也是读事件,server accept后产生socket加入读队列中
    if sock is server:
    conn, addr = sock.accept()
    conn.setblocking(False)
    inputs.append(conn)
    msg_queues[conn] = Queue()
    print "server accepts a conn."
    else:
    #读取client发过来的数据,最多读取1k byte。
    data = sock.recv(1024)
    #将收到的数据返回给client
    if data:
    msg_queues[sock].put(data)
    if sock not in outputs:
    #下次select的时候会触发写事件通知,写和读事件不太一样,前者是可写就会触发事件,并不一定要真的去写
    outputs.append(sock)
    else:
    #client传过来的消息为空,说明已断开连接
    print "server closes a conn."
    if sock in outputs:
    outputs.remove(sock)
    inputs.remove(sock)
    sock.close()
    del msg_queues[sock]
    for sock in writable:
    if not msg_queues[sock].empty():
    sock.send(msg_queues[sock].get_nowait())
    if msg_queues[sock].empty():
    outputs.remove(sock)
    for sock in exceptional:
    inputs.remove(sock)
    if sock in outputs:
    outputs.remove(sock)
    sock.close()
    del msg_queues[sock]
    [mountain@king ~/workspace/wire]$ telnet localhost 1234
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    1
    1

    select有3个缺点:

    1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
    2. 每次调用select后,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
      这点从python的例子里看不出来,因为python select api更加友好,直接返回就绪的socket列表。事实上linux内核select api返回的是就绪socket数目:
      int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
    3. fd数量有限,默认1024。
  2. poll

    采用poll重新实现EchoServer,只要搞懂了select,poll也不难,只是api的参数不太一样而已。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    '''
    Created on Feb 27, 2016 @author: mountain
    '''
    import select
    import socket
    import sys
    import Queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    server_address = ('localhost', 1234)
    server.bind(server_address)
    server.listen(5)
    print 'server is listening on %s port %s' % server_address
    msg_queues = {}
    timeout = 1000 * 60
    #POLLIN: There is data to read
    #POLLPRI: There is urgent data to read
    #POLLOUT: Ready for output
    #POLLERR: Error condition of some sort
    #POLLHUP: Hung up
    #POLLNVAL: Invalid request: descriptor not open
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
    READ_WRITE = READ_ONLY | select.POLLOUT
    poller = select.poll()
    #注册需要监听的事件
    poller.register(server, READ_ONLY)
    #文件描述符和socket映射
    fd_to_socket = { server.fileno(): server}
    while True:
    events = poller.poll(timeout)
    for fd, flag in events:
    sock = fd_to_socket[fd]
    if flag & (select.POLLIN | select.POLLPRI):
    if sock is server:
    conn, client_address = sock.accept()
    conn.setblocking(False)
    fd_to_socket[conn.fileno()] = conn
    poller.register(conn, READ_ONLY)
    msg_queues[conn] = Queue.Queue()
    else:
    data = sock.recv(1024)
    if data:
    msg_queues[sock].put(data)
    poller.modify(sock, READ_WRITE)
    else:
    poller.unregister(sock)
    sock.close()
    del msg_queues[sock]
    elif flag & select.POLLHUP:
    poller.unregister(sock)
    sock.close()
    del msg_queues[sock]
    elif flag & select.POLLOUT:
    if not msg_queues[sock].empty():
    msg = msg_queues[sock].get_nowait()
    sock.send(msg)
    else:
    poller.modify(sock, READ_ONLY)
    elif flag & select.POLLERR:
    poller.unregister(sock)
    sock.close()
    del msg_queues[sock]

    poll解决了select的第三个缺点,fd数量不受限制,但是失去了select的跨平台特性,它的linux内核api是这样的:

    int poll (struct pollfd *fds, unsigned int nfds, int timeout);
    struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
    };
  3. epoll

    用法与poll几乎一样。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    '''
    Created on Feb 28, 2016 @author: mountain
    '''
    import select
    import socket
    import Queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    server_address = ('localhost', 1234)
    server.bind(server_address)
    server.listen(5)
    print 'server is listening on %s port %s' % server_address
    msg_queues = {}
    timeout = 60
    READ_ONLY = select.EPOLLIN | select.EPOLLPRI
    READ_WRITE = READ_ONLY | select.EPOLLOUT
    epoll = select.epoll()
    #注册需要监听的事件
    epoll.register(server, READ_ONLY)
    #文件描述符和socket映射
    fd_to_socket = { server.fileno(): server}
    while True:
    events = epoll.poll(timeout)
    for fd, flag in events:
    sock = fd_to_socket[fd]
    if flag & READ_ONLY:
    if sock is server:
    conn, client_address = sock.accept()
    conn.setblocking(False)
    fd_to_socket[conn.fileno()] = conn
    epoll.register(conn, READ_ONLY)
    msg_queues[conn] = Queue.Queue()
    else:
    data = sock.recv(1024)
    if data:
    msg_queues[sock].put(data)
    epoll.modify(sock, READ_WRITE)
    else:
    epoll.unregister(sock)
    sock.close()
    del msg_queues[sock]
    elif flag & select.EPOLLHUP:
    epoll.unregister(sock)
    sock.close()
    del msg_queues[sock]
    elif flag & select.EPOLLOUT:
    if not msg_queues[sock].empty():
    msg = msg_queues[sock].get_nowait()
    sock.send(msg)
    else:
    epoll.modify(sock, READ_ONLY)
    elif flag & select.EPOLLERR:
    epoll.unregister(sock)
    sock.close()
    del msg_queues[sock]

    epoll解决了select的三个缺点,是目前最好的IO多路复用解决方案。为了更好地理解epoll,我们来看一下linux内核api的用法。

    int epoll_create(int size)//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)//注册事件,每个fd只拷贝一次。
    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)/*等待IO事件,事件发生时,
    内核调用回调函数,把就绪fd放入就绪链表中,并唤醒epoll_wait,epoll_wait只需要遍历就绪链表即可,
    而select和poll都是遍历所有fd,这效率高下立判。*/

一、IO多路复用定义

IO多路复用允许应用在多个文件描述符上阻塞,并在某一个可以读写时通知, 一般遵循下面的设计原则:、

  1. IO多路复用:任何文件描述符准备好IO时进行通知
  2. 在文件描述符就绪前进行睡眠。
  3. 唤醒:哪个准备好了
  4. 在不阻塞的情况下处理所有IO就绪的文件描述符
  5. 返回第一步

Linux下提供了三种IO多路复用方案,select、poll和epoll。

二、select IO 多路复用

看一下select 函数的定义:

int select (int n,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
FD_CLR(int fd, fd_set *set);
FD_ISSET(int fd, fd_set *set);
FD_SET(int fd, fd_set *set);
FD_ZERO(fd_set *set);

上面的定义中可以看到select主要检测三类文件描述符,分别等待不同的事件。

  • readfds

    确认是否有可读数据
  • writefds

    确认是否有可写数据
  • exceptefds

    确认是否有异常发生或者出现带外数据。

第一个参数n,等于所有集合中文件描述符的最大值加一。select()的调用者需要找到最大的文件描述符值加一作为第一个参数。

成功返回时,返回哪一个文件描述符,就说明该文件描述符准备好无阻塞IO。对于timeout,select操作可以设置一个超时时间,超时后即使没有文件描述符IO就绪也会返回。

select的缺点:

1.每次调用select,都需要把fd集合从用户态拷贝到内核态 
2。同时需要遍历所有fd 
3。支持的文件描述符默认只有1024

三、poll IO 多路复用

poll()系统调用也是一个IO多路复用解决方案,解决了 一些select的不足,下面给出poll的定义:

#include <sys/poll.h>
int poll (struct pollfd *fds, unsigned int nfds,
int timeout);

与上面的select()使用三个文件描述符集合不同,poll()使用了一个简单的nfds个pollfd结构体构成的数组,fds指向该数组,结构体定义如下:

#include <sys/poll.h>
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events to watch */
short revents; /* returned events witnessed */
};

每个pollfd指定了唯一一个文件描述符,每个结构体中的events字段是要坚实的文件描述符事件的一组位掩码。revents字段是发生在该文件描述符上的事件的位掩码,内核在返回时设置这个字段,所有events字段请求的时间都可能在revents字段中返回。下面是合法的事件:

POLLIN       没有数据可读
POLLRDNORM 有正常数据可读。
POLLRDBAND 有优先数据可读。
POLLPRI 有高优先级数据可读。
POLLOUT 写操作不会阻塞。
POLLWRNORM 写正常数据不会阻塞。
POLLBAND 写优先数据不会阻塞。
POLLMSG 有一个sigpoll消息可用。

除此之外还可能返回几个异常信息:

POLLER       文件描述符有错误。
POLLHUP 文件描述符上有挂起事件。
POLLNVAL 给出的文件描述符非法。

监视一个文件描述符是否可以读写,可以设置events为POLLIN | POLLOUT,返回时将在revents中检查是否有相应标志。如果设置了POLLIN,或者POLLOUT则代表可以操作相关事件。

timeout参数指定在任何IO就绪前需要等待时间的长度,负值表示永远等待,一个零值表示调用立即返回,列出所有为准备好的IO,不等待任何时间。

四、poll()与select()的区别

poll()系统调用优于select():

  • poll()不需要使用者计算最大的文件描述符值加一和传递该参数。
  • poll()在应对较大值的文件描述符时效率更好,如果用select()监视值为900的文件描述符–内核需要检查每个集合中的每个bit位,知道第九百个。
  • select()的文件描述符集合是静态大小,但是poll()可以创建合适大小的数组,只需要传递结构体数组即可。
  • select()文件描述符集合会在返回时重新创建,每个调用都必须要重新初始化它们。poll()系统调用分离了events字段和revents字段,无需改变就能重用。
  • 相对于poll(),select()移植性更好
  • select()提供了更好的超时方案。

五、epoll IO 多路复用

上面的两种方式中,每次调用都需要所有被监听的文件描述符,内核必须遍历所有的文件描述符,当文件描述符变得很大,这里的遍历就会成为瓶颈。

epoll将监听注册从实际监听中分离出来,完成了真正的事件等待。

1、先创建一个新的epoll实例:

#include <sys/epoll.h>
int efpd = epoll_create (int size)

size是告诉内核大概需要监听的文件描述符数目。

2、控制epoll

epoll_ctl()可以向指定的epoll上下文中加入或删除文件描述符。

#include <sys/epoll.h>
int epoll_ctl (int epfd, int op, int fd, struct
epoll_event *event);

头文件

六、IO实现的内核内幕

主要涉及三个内核子系统:

  1. 虚拟文件系统(VFS)
  2. 页缓存
  3. 页回写

虚拟文件系统

虚拟文件系统是linux内核的文件操作的抽象机制,允许内核在无需了解文件系统类型的情况下,使用文件系统函数和操作文件系统数据。

VFS实现这种抽象的方法是使用一种通用文件模型,它是所有linux文件系统的基础,通用文件模型提供了linux内核文件系统必须遵循的框架,框架提供了了hooks支持读写、建立链接、同步等其他功能。

当然这种方法规定了一些共性,比如必须要有inode,super block(超级块)和目录条目等。

页缓存

页缓存是一种在内存中保存最近在磁盘文件系统*问过的数据的方式。页缓存是内核寻找文件系统数据的第一目的地。只有缓存找不到时内核才会调用存储子系统从磁盘读数据。

linux中页缓存大小是动态的,随着IO操作将越来越多的数据带入内存,页缓存会随之增大,消耗更多的内存,如果页缓存确实消耗掉了所有空闲内存,页缓存会释放最少使用页。

页回写

内核使用缓冲区来延迟写操作,当一个进程发起写请求,数据被拷贝到缓冲区,这时将缓冲区标记为“脏”数据,如果对同一个数据块有新的写请求,缓冲区就更新为新数据,把“脏”缓冲区写入磁盘。有两个条件会触发这种回写:

  1. 当空闲内存小于设定的阀值,会将缓冲区回写。
  2. 当一个脏的缓冲区寿命超过阀值也会回写防止数据不确定。

回写由一些pdflush内核线程操作,当上述两种情况发生,线程被唤醒开始刷新脏缓冲区。