使用情景:
- 客户端程序要同时处理多个socket。
- 客户端程序要同时处理用户输入和网络连接。
- TCP服务器要同时处理监听socket和连接socket,这是使用最多的场合。
- 服务器要同时处理TCP请求和UDP请求。
- 服务器要同时监听多个端口或者处理多种服务。
- select
- poll
- epoll
select系统调用
作用: 在一段指定时间内,监听用户感兴趣的文件描述符的可读、可写和异常等事件。 select API 原型:
#include <sys/select.h>int select ( int nfds, fd_set* readfds, fde_set* writefds, fd_set* exceptfds, struct timeval* timeout );函数说明:
- nfds: 指定被监听的文件描述符的总数,通常为所有文件描述符中的最大值+1
- readfds、writefds 、exceptfds: 可读、可写和异常等事件对应的文件描述符集合。
- fd_set结构:仅包含一个整型数组,该数组的每个元素的每一位标记了一个文件描述符。fd_set能容纳的文件描述符数量由FD_SETSIZE指定,这就限制了select能同时处理的文件描述符的总量。
fd_set相关的位操作:
#include <sys/select.h>FD_ZERO( fd_set *fdset );FD_SET( int fd, fd_set *fdset );FD_CLR( int fd, fd_set *fdset );int FD_ISSET( int fd, fd_set *fdset );
- timeout:设置select的超时时间。这是timeval结构指针,用来告诉内核select等待多久。不过我们不能完全信任select调用返回后的timeout值,比如调用失败时,timeout值是不确定的。timeval结构体如下:
struct timeval{ long tv_sec; \\秒数 long tv_usec; \\微秒}如果给timeout变量的tv_sec成员和tv_usec成员都传递0,则select将立即返回。如果给timeout传递NULL,则select将一直阻塞,直到某个文件描述符就绪。
返回状态:
- select成功时返回就绪(可读、可写和异常)文件描述符的总数。
- 如果在超时时间内没有任何文件描述符就绪,select将返回0。
- select失败时返回-1并设置errno。
- 如果select 等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。
文件描述符就绪条件
可读:
- socket内核接收缓冲区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地对该socket,并且读操作返回的字节数大于0。
- socket通信的对方关闭连接,此时读操作返回0。
- 监听socket上有新的连接请求。
- socekt上有未处理的错误,此时我们可以使用getsockopt来读取和清除该错误。
- socket内核发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
- socket的写操作被关闭。对写操作被关闭的socket执行写操作将出发一个SIGPIPE信号。
- socket使用非阻塞connect连接成功或者失败之后。
- socket上有未处理的错误,此时我们可以使用getsockopt来读取和清除该错误。
- socket上接收到带外数据。
处理带外数据socket上接收到普通数据和带外数据都将使select返回,但socket处于不同的就绪状态:前者处于可读状态,后者处于异常状态。
#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <assert.h>#include <stdio.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>int main( int argc, char* argv[] ){ if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); printf( "ip is %s and port is %d\n", ip, port ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); if ( connfd < 0 ) { printf( "errno is: %d\n", errno ); close( listenfd ); } char remote_addr[INET_ADDRSTRLEN]; printf( "connected with ip: %s and port: %d\n", inet_ntop( AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN ), ntohs( client_address.sin_port ) ); char buf[1024]; fd_set read_fds; fd_set exception_fds; FD_ZERO( &read_fds ); FD_ZERO( &exception_fds ); int nReuseAddr = 1; setsockopt( connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof( nReuseAddr ) ); while( 1 ) { memset( buf, '\0', sizeof( buf ) ); /* 每次调用select前都要重新在readfds和exception_fds中设置文件描述符connfd,因为事件发生之后,文件描述符集合将被内核修改 */ FD_SET( connfd, &read_fds ); FD_SET( connfd, &exception_fds ); ret = select( connfd + 1, &read_fds, NULL, &exception_fds, NULL ); printf( "select one\n" ); if ( ret < 0 ) { printf( "selection failure\n" ); break; } /* 对于可读事件,采用普通的recv函数读取数据 */ if ( FD_ISSET( connfd, &read_fds ) ) { ret = recv( connfd, buf, sizeof( buf )-1, 0 ); if( ret <= 0 ) { break; } printf( "get %d bytes of normal data: %s\n", ret, buf ); } /* 对于异常事件,采用带MSG_OOB标志的recv函数读取带外数据 */ else if( FD_ISSET( connfd, &exception_fds ) ) { ret = recv( connfd, buf, sizeof( buf )-1, MSG_OOB ); if( ret <= 0 ) { break; } printf( "get %d bytes of oob data: %s\n", ret, buf ); } } close( connfd ); close( listenfd ); return 0;}
poll系统调用
作用:和select类型,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。原型:
#include <poll.h>int poll ( struct pollfd* fds, nfds_t nfds, int timeout );
函数说明:
- fds:一个pollfd结构类型的数组,指定我们所感兴趣的文件描述符上发生的可读,可写和异常事件。
struct pollfd{ int fd; /* 文件描述符 */ short events; /* 注册的事件 */ short revents; /* 实际发生的事件,由内核填充 */}其中,fd成员指定文件描述符;events 成员告诉poll监听fd上的哪些事件,它是一系列事件的按位或;revents成员则由内核修改,以通知应用程序fd上实际发生了哪些事件。 poll 支持的事件类型如下:
- nfds :指定被监听事件集合fds的大小。其类型nfds_t 的定义如下:
typedef unsigned long int nfds_t;
- timeout :指定poll的超时值,单位是毫秒。当timeout 为-1时,poll调用将永远阻塞,直到某个事件发生;当timeout为0时,poll调用将立即返回。
- 一个系统所能打开的文件描述符的最大数也是有限制的,跟内存有关,可以通过 /proc/sys/fs/file-max 调整。
- 一个进程所能打开的文件描述符最大值,可以通过调整内核参数、ulimit -n命令、setrlimit函数。
epoll系列系统调用
特点: 一组函数完成任务 epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。 需要一个额外的文件描述来唯一标识内核中的这个事件表。
文件描述符的创建:
#include <sys/epoll.h>int epoll_create ( int size );该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。
操作内核事件表:
#include <sys/epoll.h>int epoll_ctl ( int epfd, int op, int fd, struct epoll_event *event );
函数说明:fd:要操作的文件描述符op:指定操作类型操作类型: EPOLL_CTL_ADD:往事件表中注册fd上的事件 EPOLL_CTL_MOD:修改fd上的注册事件 EPOLL_CTL_DEL:删除fd上的注册事件event:指定事件,它是epoll_event结构指针类型epoll_event定义:
struct epoll_event{ __unit32_t events; /* epoll事件 */ epoll_data_t data; /* 用户数据 */};
结构体说明:events:描述事件类型,和poll支持的事件类型基本相同(两个额外的事件:EPOLLET和EPOLLONESHOT,高效运作的关键)data成员:存储用户数据
typedef union epoll_data{ void* ptr; /* 指定与fd相关的用户数据 */ int fd; /* 指定事件所从属的目标文件描述符 */ uint32_t u32; uint64_t u64;} epoll_data_t;
epoll_wait函数主要接口作用:在一段时间内,等待一组文件描述符上的事件原型:
#include <sys/epoll.h>int epoll_wait ( int epfd, struct epoll_event* events, int maxevents, int timeout );
函数说明:返回:成功时返回就绪的文件描述符的个数,失败时返回-1并设置errnotimeout:与poll相同maxevents:指定最多监听多少个事件events:检测到事件,将所有就绪的事件从内核事件表中复制到它的第二个参数events指向的数组中。与poll的区别(见下面的demo)
poll和epoll在使用上的差别:
/* 索引poll返回的就绪文件描述符 *//* 方式:遍历,检查标志位 */int ret = poll ( fds, MAX_EVENT_NUMBER, -1 );for ( int i = 0; i < MAX_EVENT_NUMBER; ++i ){ if ( fds[i].revents & POLLIN ) { int sockfd = fds[i].fd; /* 处理sockfd */ }}/* 索引epoll返回的就绪文件描述符 *//* 方式:遍历 */int ret = epoll_wait ( epollfd, events, MAX_EVENT_NUMBER, -1 );for ( int i = 0; i < ret; i++ ){ int sockfd = events[i].data.fd; /* sockfd必然就绪,直接处理 */}
LT和ET模式对文件操作符的操作模式:
- LT:电平触发,默认的工作模式。当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。
- ET:边沿触发,高效工作模式。文件描述符注册为EPOLLET事件,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。
注意:ET 模式要求socket为非阻塞模式,如果是阻塞的,那么读或写操作将会因为没有后续事件而一直处于阻塞状态(饥渴状态)。LT模式可以是阻塞或者非阻塞。
#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <string.h>#include <errno.h>#include <fcntl.h>#include <assert.h>#include <sys/socket.h>#include <sys/types.h>#include <sys/epoll.h>#include <netinet/in.h>#include <arpa/inet.h>#define MAX_EVENT_NUMBER 1024#define BUFFER_SIZE 10int setnonblocking(int fd);void addfd(int epollfd, int fd, bool enable_et);void lt(epoll_event *events, int number, int epollfd, int listenfd);void et(epoll_event *events, int number, int epollfd, int listenfd);/* * 用telnet到这个服务端程序上,并一次传输超过10字节(BUFFER_SIZE的大小)的数据, * 然后比较LT和ET的异同,会发现ET比LT下事件被触发的次数少很多。*/int main(int argc, char **argv){if (argc != 3) {fprintf(stderr, "Usage: %s ip port\n", basename(argv[0]));return 1;}const char *ip = argv[1];int port = atoi(argv[2]);int ret = 0;struct sockaddr_in address;bzero(&address, sizeof(address));address.sin_family = AF_INET;address.sin_port = htons(port);inet_pton(AF_INET, ip, &address.sin_addr);int sockfd = socket(PF_INET, SOCK_STREAM, 0);assert(sockfd >= 0);int reuse = 1;setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));ret = bind(sockfd, (struct sockaddr*)&address, sizeof(address));assert(ret != -1);ret = listen(sockfd, 5);assert(ret != -1);epoll_event events[MAX_EVENT_NUMBER];int epollfd = epoll_create(5);assert(epollfd != -1);addfd(epollfd, sockfd, true);while (1) {int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);if (ret < 0) {fprintf(stderr, "epoll failed: %s\n", strerror(errno));break;}//lt(events, ret, epollfd, sockfd);//LT模式et(events, ret, epollfd, sockfd);//ET模式}close(epollfd);close(sockfd);return 0;}//设置非阻塞文件描述符int setnonblocking(int fd){int old_option = fcntl(fd, F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd, F_SETFL, new_option);return old_option;}//将描述符fd的EPOLLIN注册到epollfd提示的epoll内核事件中,参数enable_et指定是否启用ET模式void addfd(int epollfd, int fd, bool enable_et){epoll_event event;event.data.fd = fd;event.events = EPOLLIN;if (enable_et) {event.events |= EPOLLET;}epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);setnonblocking(fd);}//LT模式void lt(epoll_event *events, int number, int epollfd, int listenfd){char buf[BUFFER_SIZE];for (int i = 0; i < number; i++) {int sockfd = events[i].data.fd;if (sockfd == listenfd) {struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);addfd(epollfd, connfd, false);}else if (events[i].events & EPOLLIN) {//只要socket读缓存中还有未读出的数据,就会被触发printf("event trigger once\n");memset(buf, '\0', BUFFER_SIZE);int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);if (ret <= 0) {close(sockfd);continue;}printf("get %d bytes of content: %s\n", ret, buf);}else {printf("something else happened\n");}}}//ET模式void et(epoll_event *events, int number, int epollfd, int listenfd){char buf[BUFFER_SIZE];for (int i = 0; i < number; i++) {int sockfd = events[i].data.fd;if (sockfd == listenfd) {struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);addfd(epollfd, connfd, true);}else if (events[i].events & EPOLLIN) {printf("event trigger once\n");int ret = 0;//因为ET模式不会重复触发,所以我们要循环读取所有数据while (1) {memset(buf, '\0', BUFFER_SIZE);ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);if (ret < 0) {//对于非阻塞I/O,下面的条件成立时表示数据已全部读取完毕if (errno == EAGAIN || errno == EWOULDBLOCK) {printf("read later!\n");break;}close(sockfd);break;}else if (ret == 0) {close(sockfd);}else {printf("get %d bytes of content: %s\n", ret, buf);}}}else {printf("something else happened\n");}}}
EPOLLONESHOT事件
使用场合: 一个线程在读取完某个socket上的数据后开始处理这些数据,而数据的处理过程中该socket又有新数据可读,此时另外一个线程被唤醒来读取这些新的数据。于是,就出现了两个线程同时操作一个socket的局面。可以使用epoll的EPOLLONESHOT事件实现一个socket连接在任一时刻都被一个线程处理。
作用: 对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或异常事件,且只能触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册 的EPOLLONESHOT事件。
使用: 注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。
效果: 尽管一个socket在不同事件可能被不同的线程处理,但同一时刻肯定只有一个线程在为它服务,这就保证了连接的完整性,从而避免了很多可能的竞态条件。
#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <assert.h>#include <stdio.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>#include <sys/epoll.h>#include <pthread.h>#define MAX_EVENT_NUMBER 1024#define BUFFER_SIZE 1024struct fds{ int epollfd; int sockfd;};int setnonblocking( int fd ){ int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option;}/*将fd上的EPOLLIN和EPOLLET事件注册到epollfd指示的epoll内核事件表中,参数oneshot指定是否注册fd上的EPOLLONESHOT事件*/void addfd( int epollfd, int fd, bool oneshot ){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; if( oneshot ) { event.events |= EPOLLONESHOT; } epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd );}/*重置fd上的事件。这样操作之后,尽管fd上的EPOLLONESHOT事件被注册,但是操作系统仍然会触发fd上的EPOLLIN事件,且只触发一次 */void reset_oneshot( int epollfd, int fd ){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );}/*工作线程*/void* worker( void* arg ){ int sockfd = ( (fds*)arg )->sockfd; int epollfd = ( (fds*)arg )->epollfd; printf( "start new thread to receive data on fd: %d\n", sockfd ); char buf[ BUFFER_SIZE ]; memset( buf, '\0', BUFFER_SIZE );/*循环读取sockfd上的数据,直到遇到EAGAIN错误*/ while( 1 ) { int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret == 0 ) { close( sockfd ); printf( "foreiner closed the connection\n" ); break; } else if( ret < 0 ) { if( errno == EAGAIN ) { reset_oneshot( epollfd, sockfd ); printf( "read later\n" ); break; } } else { printf( "get content: %s\n", buf );/*休眠5S,模拟数据处理过程 */ sleep( 5 ); } } printf( "end thread receiving data on fd: %d\n", sockfd );}int main( int argc, char* argv[] ){ if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); assert( epollfd != -1 );/*注意,监听socket listenfd上是不能注册RPOLLONESHOT事件的,否则应用程序只能处理一个客户连接!因为后续的客户连接请求将不再触发listenfd上的EPOLLIN事件*/ addfd( epollfd, listenfd, false ); while( 1 ) { int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ret < 0 ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < ret; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );/*对每个非监听文件描述符都注册EPOLLONESHOT事件*/ addfd( epollfd, connfd, true ); } else if ( events[i].events & EPOLLIN ) { pthread_t thread; fds fds_for_new_worker; fds_for_new_worker.epollfd = epollfd; fds_for_new_worker.sockfd = sockfd;/*新启动一个工作线程为sockfd服务*/ pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker ); } else { printf( "something else happened \n" ); } } } close( listenfd ); close( epollfd ); return 0;}
小结:三组I/O复用函数的比较
系统调用 | select | poll | epoll |
事件集合 | 用户通过3个参数分别传入感兴趣的可读、可写及异常等事件,内核通过对这些参数的在线修改来反馈其中的就绪事件。这使得用户每次调用select都要重置这3个参数 | 统一处理所有事件类型,因此只需要一个事件集参数。用户通过pollfd.events传入感兴趣的事件,内核通过修改pollfd.revents反馈其中就绪的事件 | 内核通过一个事件表直接管理用户感兴趣的所有事件。因此每次调用epoll_wait时,无需反复传入用户感兴趣的事件。epoll_wait系统调用的参数events仅用来反馈就绪的事件 |
应用程序索引就绪文件描述符的时间复杂度 | O(n) | O(n) | O(1) |
最大支持文件描述符数 | 一般有最大值限制(FD_SETSIZE 为1024,修改后需重新编译内核) | 65535(一个进程所能打开的最大文件描述符数量,ulimit -n或者setrlimit函数) | 65535(系统能打开的最大文件描述符数量,/proc/sys/fs/file-max) |
工作模式 | LT | LT | 支持ET高效模式 |
内核实现和工作效率 | 采用轮询方式检测就绪事件时间复杂度:O(n) | 采用轮询方式检测就绪事件时间复杂度:O(n) | 采用回调方式检测就绪事件事件复杂度:O(1) |
需要说明的是: epoll的效率未必一定比select和poll高。当活动连接比较多的时候,epoll_wait的效率未必比select和poll高,因为此时回调函数被触发的过于频繁。所以,epoll_wait适用于连接数量多,但活动连接较少的情况。
参考资料:《Linux高性能服务器编程》from : http://blog.csdn.net/zs634134578/article/details/19929449