libevent大概框架已经了解,现在我们通过libevent封装的epoll了解一下libevent的I/O模型。
epollop结构体
1 struct epollop { 2 struct evepoll *fds; //文件描述符事件数组,每个文件描述符对应的事件,这里将读写事件分开了。记录所有需要监听的事件
3 int nfds; //文件描述符数量 4 struct epoll_event *events; //epoll的事件结构数组
5 int nevents; //可以缓存监听的events的数量 6 int epfd; //epoll_create返回的epoll句柄 7 };
我们仍然找一个demo,从demo的调用方式来分析libevent的epoll工作方式,参考libevent在异步socket中的使用
第一个例子,虽然有些缺憾,但胜在短小,方便理解,后面可以在去看后面2个例子。由于libevent的signal是嵌入到I/O模型中的,和前面一样,这里跳过signal部分,后面再说。
1 /* Port to listen on. */ 2 #define SERVER_PORT 5555 3 4 /** 5 * 这个结构是指定的客户端数据,这个例子中只指定了读事件对象。 6 */ 7 struct client 8 { 9 struct event ev_read; 10 }; 11 12 /** 13 * 将一个socket设置成非阻塞模式 14 */ 15 int 16 setnonblock(int fd) 17 { 18 int flags; 19 20 flags = fcntl(fd, F_GETFL); 21 if (flags < 0) 22 return flags; 23 flags |= O_NONBLOCK; 24 if (fcntl(fd, F_SETFL, flags) < 0) 25 return -1; 26 27 return 0; 28 } 29 30 /** 31 * 这个函数当客户端的socket可读时由libevent调用 32 */ 33 void 34 on_read(int fd, short ev, void *arg) 35 { 36 struct client *client = (struct client *)arg; 37 u_char buf[8196]; 38 int len, wlen; 39 40 len = read(fd, buf, sizeof(buf)); 41 if (len == 0) 42 { 43 /* 客户端断开连接,在这里移除读事件并且释放客户数据结构 */ 44 printf("Client disconnected.\n"); 45 close(fd); 46 event_del(&client->ev_read); 47 free(client); 48 return; 49 } 50 else if (len < 0) 51 { 52 /* 出现了其它的错误,在这里关闭socket,移除事件并且释放客户数据结构 */ 53 printf("Socket failure, disconnecting client: %s", 54 strerror(errno)); 55 close(fd); 56 event_del(&client->ev_read); 57 free(client); 58 return; 59 } 60 61 62 63 /* 为了简便,我们直接将数据写回到客户端。通常我们不能在非阻塞的应用程序中这么做, 64 * 我们应该将数据放到队列中,等待可写事件的时候再写回客户端。 */ 65 wlen = write(fd, buf, len); 66 if (wlen < len) 67 { 68 /* 我们没有把所有数据都写回。如果我们有适当的队列/缓存机制, 69 * 我们能够在再次的可写事件中再次写入剩余的数据。因为这是一个简单的例子, 70 * 我们仅仅舍去了没有写入的数据。 */ 71 printf("Short write, not all data echoed back to client.\n"); 72 } 73 } 74 75 /** 76 * 当有一个连接请求准备被接受时,这个函数将被libevent调用。 */ 77 void 78 on_accept(int fd, short ev, void *arg) 79 { 80 int client_fd; 81 struct sockaddr_in client_addr; 82 socklen_t client_len = sizeof(client_addr); 83 struct client *client; 84 85 /* 接受新的连接 */ 86 client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); 87 if (client_fd == -1) 88 { 89 warn("accept failed"); 90 return; 91 } 92 93 /* 设置客户端socket为非阻塞模式。 */ 94 if (setnonblock(client_fd) < 0) 95 warn("failed to set client socket non-blocking"); 96 97 /* 我们接受了一个新的客户,分配一个新的客户数据结构对象来保存这个客户的状态。 */ 98 client = calloc(1, sizeof(*client)); 99 if (client == NULL) 100 err(1, "malloc failed"); 101 102 /* 设置读事件,libevent将在客户端socket可读时调用on_read函数。 103 * 我们也会我们也会不断的响应读事件,所以我们不用在每次读取时再次添加读事件。 */ 104 event_set(&client->ev_read, client_fd, EV_READ|EV_PERSIST, on_read, 105 client); 106 107 /* 设置的事件并没有激活,使用添加事件让其激活。 */ 108 event_add(&client->ev_read, NULL); 109 110 printf("Accepted connection from %s\n", 111 inet_ntoa(client_addr.sin_addr)); 112 } 113 114 int 115 main(int argc, char **argv) 116 { 117 int listen_fd; 118 struct sockaddr_in listen_addr; 119 int reuseaddr_on = 1; 120 121 /* 接受连接请求的事件对象。 */ 122 struct event ev_accept; 123 124 /* 初始化 libevent. */ 125 event_init(); 126 127 /* 创建我们监听的socket。 */ 128 listen_fd = socket(AF_INET, SOCK_STREAM, 0); 129 if (listen_fd < 0) 130 err(1, "listen failed"); 131 if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, 132 sizeof(reuseaddr_on)) == -1) 133 err(1, "setsockopt failed"); 134 memset(&listen_addr, 0, sizeof(listen_addr)); 135 listen_addr.sin_family = AF_INET; 136 listen_addr.sin_addr.s_addr = INADDR_ANY; 137 listen_addr.sin_port = htons(SERVER_PORT); 138 if (bind(listen_fd, (struct sockaddr *)&listen_addr, 139 sizeof(listen_addr)) < 0) 140 err(1, "bind failed"); 141 if (listen(listen_fd, 5) < 0) 142 err(1, "listen failed"); 143 144 /* 设置socket为非阻塞模式,使用libevent编程这是必不可少的。 */ 145 if (setnonblock(listen_fd) < 0) 146 err(1, "failed to set server socket to non-blocking"); 147 148 /* 我们现在有了一个监听的socket,我们创建一个读事件,当有客户连接时,接收通知。 */ 149 event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL); //用epoll_wait的返回信息也可以完成监听连接请求,但是libevent封装的epoll并不理会连接请求,因此响应连接要我们自己处理。
libevent的epoll_wait只处理异常,超时,及关心的事件发生。其中关心的事件只处理读事件,写事件,EPOLLERR(对应的文件描述符发生错误),
EPOLLHUP(对应的文件描述符被挂断)。
这里是将监听连接注册到epoll中的读事件并持久化,当收到信息后调用on_accept接受连接,并为新连接上来的socket注册读事件。 150 event_add(&ev_accept, NULL); 151 152 /* 开始 libevent 的事件循环。 */ 153 event_dispatch(); 154 155 return 0; 156 }
1.前一篇提到过,在event_base_new(void)过程中通过调用base->evsel->init(base)我们进行了I/O模型的初始化,在epoll中就是调用的epoll_init(struct event_base *base)
1 epoll_init(struct event_base *base) 2 { 3 int epfd; //epoll_create返回的epoll句柄,这个句柄会占用一个fd值。 4 struct epollop *epollop; //指向本篇开始提到的结构体的指针 5 6 /* Disable epollueue when this environment variable is set */ 7 if (evutil_getenv("EVENT_NOEPOLL")) 8 return (NULL); 9 10 /* Initalize the kernel queue */ 11 if ((epfd = epoll_create(32000)) == -1) { 12 if (errno != ENOSYS) 13 event_warn("epoll_create"); 14 return (NULL); 15 } 16 17 FD_CLOSEONEXEC(epfd); //从定义来看是说在子进程中不能使用这个句柄,也就是子进程不能用这个epoll实例
18 19 if (!(epollop = calloc(1, sizeof(struct epollop)))) 20 return (NULL); 21 22 epollop->epfd = epfd; 23 24 /* Initalize fields */ 25 epollop->events = malloc(INITIAL_NEVENTS * sizeof(struct epoll_event)); 26 if (epollop->events == NULL) { 27 free(epollop); 28 return (NULL); 29 } 30 epollop->nevents = INITIAL_NEVENTS; 31 32 epollop->fds = calloc(INITIAL_NFILES, sizeof(struct evepoll)); 33 if (epollop->fds == NULL) { 34 free(epollop->events); 35 free(epollop); 36 return (NULL); 37 } 38 epollop->nfds = INITIAL_NFILES; 39 40 evsignal_init(base); //信号下一篇在说,跳过信号相关部分 41 42 return (epollop); 43 }
2.event_add(struct event *ev, const struct timeval *tv)中,只要事件是读|写|信号,并且不是已注册或已激活事件,都将调用对应的add注册成对应I/O模型的事件。这里包含信号前面已经提到过,是因为信号嵌入到了I/O模型中,这一篇不讨论信号。
在epoll模型中就是epoll_add(void *arg, struct event *ev)
1 static int 2 epoll_add(void *arg, struct event *ev) 3 { 4 struct epollop *epollop = arg; 5 struct epoll_event epev = {0, {0}}; 6 struct evepoll *evep; 7 int fd, op, events; 8 9 if (ev->ev_events & EV_SIGNAL) //如果事件类型是信号,注册信号事件 10 return (evsignal_add(ev)); 11 12 fd = ev->ev_fd; 13 if (fd >= epollop->nfds) { //如果这个要注册事件的文件描述符值超过已分配的文件描述符数组长度,调用realloc扩充epollop->fds的分配内存×2,直到这个值小于数组长度。将扩充部分置为0 14 /* Extent the file descriptor array as necessary */ 15 if (epoll_recalc(ev->ev_base, epollop, fd) == -1) 16 return (-1); 17 } 18 evep = &epollop->fds[fd]; 19 op = EPOLL_CTL_ADD; 20 events = 0; 21 if (evep->evread != NULL) { //该文件描述符已经存在读事件,修改时保留读事件 22 events |= EPOLLIN; 23 op = EPOLL_CTL_MOD; 24 } 25 if (evep->evwrite != NULL) { //该文件描述符已经存在写事件,修改时保留写事件 26 events |= EPOLLOUT; 27 op = EPOLL_CTL_MOD; 28 } 29 30 if (ev->ev_events & EV_READ) //注册事件如果需要注册读事件,为该文件描述符添加读事件 31 events |= EPOLLIN; 32 if (ev->ev_events & EV_WRITE) //注册事件如果需要注册写事件,为该文件描述符添加写事件 33 events |= EPOLLOUT; 34 35 epev.data.fd = fd; //监听的文件描述符 36 epev.events = events; //需要监听改文件描述符发生的事件 37 if (epoll_ctl(epollop->epfd, op, ev->ev_fd, &epev) == -1) //将该文件描述符需要监听的事件注册到epoll 38 return (-1); 39 40 /* Update events responsible */ 41 if (ev->ev_events & EV_READ) 42 evep->evread = ev; //表明该文件描述符有读事件 43 if (ev->ev_events & EV_WRITE) 44 evep->evwrite = ev; //表明该文件描述符有写事件 45 46 return (0); 47 }
3.事件注册完成后,event_base_loop()中监听这些事件event_dispatch(struct event_base *base, void *arg, struct timeval *tv),
对于epoll模型,就是epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)
1 static int 2 epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv) 3 { 4 struct epollop *epollop = arg; 5 struct epoll_event *events = epollop->events; 6 struct evepoll *evep; 7 int i, res, timeout = -1; 8 9 if (tv != NULL) 10 timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000; //超时时间,单位ms,向上取整 11 12 if (timeout > MAX_EPOLL_TIMEOUT_MSEC) { //超时时间超过linux内核可以等待的最长时间,取最长时间 13 /* Linux kernels can wait forever if the timeout is too big; 14 * see comment on MAX_EPOLL_TIMEOUT_MSEC. */ 15 timeout = MAX_EPOLL_TIMEOUT_MSEC; 16 } 17 18 res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); 19 20 if (res == -1) { //返回-1,说明epoll_wait产生异常 21 if (errno != EINTR) { //在超时或监听到事件发生前收到的不是中断信号 22 event_warn("epoll_wait"); 23 return (-1); 24 } 25 26 evsignal_process(base); //信号处理,此处不说明 27 return (0); 28 } else if (base->sig.evsignal_caught) { //监听的某种信号产生并处理,此处不说明 29 evsignal_process(base); 30 } 31 32 event_debug(("%s: epoll_wait reports %d", __func__, res)); 33 34 for (i = 0; i < res; i++) { 35 int what = events[i].events; 36 struct event *evread = NULL, *evwrite = NULL; 37 int fd = events[i].data.fd; //发生该事件的文件描述符 38 39 if (fd < 0 || fd >= epollop->nfds) 40 continue; 41 evep = &epollop->fds[fd]; //发生该事件的文件描述符的事件类型(2个事件类型读、写) 42 43 if (what & (EPOLLHUP|EPOLLERR)) { //该文件描述符被挂断或发生错误(读事件应该是用来注销该文件描述符上的事件,这里不明白激活写事件是要干嘛) 44 evread = evep->evread; //如果有读事件,记录下来 45 evwrite = evep->evwrite; //如果有写事件,记录下来 46 } else { 47 if (what & EPOLLIN) { //该文件描述符可以读(包括对端SOCKET正常关闭) 48 evread = evep->evread; //记录这个读事件 49 } 50 51 if (what & EPOLLOUT) { //文件描述符可以写 52 evwrite = evep->evwrite; //记录这个写事件 53 } 54 } 55 56 if (!(evread||evwrite)) //如果没有读或写事件发生,跳过该文件描述符的事件 57 continue; 58 59 if (evread != NULL) //如果前面记录了读事件,将该事件插入到激活链表,且调用事件回调次数设为1 60 event_active(evread, EV_READ, 1); 61 if (evwrite != NULL) //如果前面记录了写事件,将该事件插入到激活链表,且调用事件回调次数设为1 62 event_active(evwrite, EV_WRITE, 1); 63 } 64 65 if (res == epollop->nevents && epollop->nevents < MAX_NEVENTS) { //监听到的事件数量达到了分配的缓存上限,但没有达到MAX_NEVENTS,说明我们的缓存数组不够用了,扩充缓存监听事件的数组 66 /* We used all of the event space this time. We should 67 be ready for more events next time. */ 68 int new_nevents = epollop->nevents * 2; 69 struct epoll_event *new_events; 70 71 new_events = realloc(epollop->events, 72 new_nevents * sizeof(struct epoll_event)); 73 if (new_events) { 74 epollop->events = new_events; 75 epollop->nevents = new_nevents; 76 } 77 } 78 79 return (0); 80 }
epoll到这里差不多结束了,处理已激活事件是在event框架的event_dispatch中,不属于I/O模型的部分,epoll模型还剩下注销事件,注销epoll这个模型实例。
epoll的监听事件一般都是持久化的(如果不是持久化的,参考上一篇最后),需要注销监听事件一般在客户端断开连接(读事件读到0个字节),而注销epoll模型实例,释放epoll的内存,意味着我们没有I/O模型和信号管理可用了,还只剩下计时器事件可用,功能基本也就结束了。
所以这里最后看看epoll注销事件epoll_del是怎么做的,注销epoll模型实例epoll_dealloc就省略了。
1 static int 2 epoll_del(void *arg, struct event *ev) 3 { 4 struct epollop *epollop = arg; 5 struct epoll_event epev = {0, {0}}; 6 struct evepoll *evep; 7 int fd, events, op; 8 int needwritedelete = 1, needreaddelete = 1; 9 10 if (ev->ev_events & EV_SIGNAL) //如果该事件是信号,恢复该信号的到注册之前的默认处理。 11 return (evsignal_del(ev)); 12 13 fd = ev->ev_fd; 14 if (fd >= epollop->nfds) //该文件描述符不在事件队列中,忽略 15 return (0); 16 evep = &epollop->fds[fd]; 17 18 op = EPOLL_CTL_DEL; 19 events = 0; 20 21 if (ev->ev_events & EV_READ) 22 events |= EPOLLIN; 23 if (ev->ev_events & EV_WRITE) 24 events |= EPOLLOUT; 25 26 if ((events & (EPOLLIN|EPOLLOUT)) != (EPOLLIN|EPOLLOUT)) { //如果该文件描述符上的事件不是同时有读和写事件 27 if ((events & EPOLLIN) && evep->evwrite != NULL) { //这是什么情况?理解不能 28 needwritedelete = 0; 29 events = EPOLLOUT; 30 op = EPOLL_CTL_MOD; 31 } else if ((events & EPOLLOUT) && evep->evread != NULL) { 32 needreaddelete = 0; 33 events = EPOLLIN; 34 op = EPOLL_CTL_MOD; 35 } 36 } 37 38 epev.events = events; 39 epev.data.fd = fd; 40 41 if (needreaddelete) 42 evep->evread = NULL; //将该文件描述符上的读事件置为NULL 43 if (needwritedelete) 44 evep->evwrite = NULL; //将该文件描述符上的写事件置为NULL 45 46 if (epoll_ctl(epollop->epfd, op, fd, &epev) == -1) 47 return (-1); 48 49 return (0); 50 }