TinyWebserver的复现与改进(6):定时器处理非活动连接

时间:2025-03-27 13:45:04
#include <arpa/> #include <> #include <> #include <> #include <> #include <> #include <> #include <sys/> #include "" #include "" #include "http_conn.h" #include <> #include <> #include "lst_timer.h" #define MAX_FD 65536 // 最大的文件描述符 #define MAX_EVENT_NUMBER 10000 // 监听的最大事件数 #define TIMESLOT 5 //设置定时器相关参数 static int pipefd[2]; static sort_timer_lst timer_lst; static int epollfd = 0; // 信号的中断处理函数 void timer_sig_handler(int sig) { int save_errno = errno; int msg = sig; send(pipefd[1], (char*)&msg, 1, 0); errno = save_errno; } // 定时器回调函数, 从epoll上删除sockfd void cb_func(client_data *user_data) { printf("close fd : %d\n", user_data->sockfd); epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); assert(&user_data); close(user_data->sockfd); } void timer_handler() { timer_lst.tick(); alarm(TIMESLOT); } /* 函数指针的声明: 类型说明符 (*函数名) (参数) void(handler)(int) 声明了一个名为 handler 的函数指针,它指向一个接受一个 int 参数并返回 void 的函数 */ void addsig(int sig, void(handler)(int), bool restart = false) { // sigaction的输入参数 struct sigaction sa; // 指定sa内存区域的前n个字节都设置为某个特定的值('\0'),用于对新分配的内存进行初始化 memset(&sa, '\0', sizeof(sa)); // 写入函数指针,指向的函数就是信号捕捉到之后的处理函数 sa.sa_handler = handler; if(restart) sa.sa_flags |= SA_RESTART; // 设置临时阻塞信号集 sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); } // 信号的初始化 void timer_sig_init() { int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret != -1); setnonblocking(pipefd[1]); addfd(epollfd, pipefd[0], false); addsig(SIGALRM, timer_sig_handler); addsig(SIGTERM, timer_sig_handler); } int main(int argc, char* argv[]) { if(argc <= 1) { // 要求输入格式为 ./ 10000 其中10000是端口号 printf("usage: %s port_number\n", basename(argv[0])); return 1; } // 端口号 string -> int int port = atoi(argv[1]); // 如果向一个没有读端的管道写数据,不用终止进程 addsig(SIGPIPE, SIG_IGN); // SIG_IGN: 忽略信号,这里指的是忽略信号 · SIGPIPE // 定义一个线程池指针 threadpool<http_conn>* pool = NULL; try { // 开辟一个线程池 pool = new threadpool<http_conn>; }catch(...) { // 若异常则退出 return 1; } // 开辟一块连续的http_conn数组,保存所有正在连接的客户端信息 http_conn* users = new http_conn[MAX_FD]; client_data *users_timer = new client_data[MAX_FD]; // 设置监听 int listenfd = socket(AF_INET, SOCK_STREAM, 0); int ret = 0; struct sockaddr_in address; address.sin_addr.s_addr = INADDR_ANY; address.sin_family = AF_INET; address.sin_port = htons(port); // 设置端口复用 int reuse = 1; setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); // 绑定 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); if(ret == -1) { perror("bind"); exit(-1); } // 开始监听 ret = listen(listenfd, 5); if(ret == -1) { perror("listen"); exit(-1); } // 将listend添加到epoll模型中 epoll_event events[MAX_EVENT_NUMBER]; epollfd = epoll_create(5); addfd(epollfd, listenfd, false); http_conn::m_epollfd = epollfd; timer_sig_init(); bool timeout = false; bool stop_server = false; alarm(TIMESLOT); while(!stop_server) { // epoll轮询,等待有数据发送 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER,-1); if((number < 0) && (errno != EINTR)) { printf("epoll failture\n"); break; } for(int i = 0; i < number; i++) { int sockfd = events[i].data.fd; // 有新的客户端连接 if(sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addresslen = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addresslen); if(connfd < 0) { printf("errno is %d\n", errno); continue; } if(http_conn::m_user_count >= MAX_FD) { close(connfd); continue; } users[connfd].init(connfd, client_address); //初始化client_data数据 //创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中 users_timer[connfd].address = client_address; users_timer[connfd].sockfd = connfd; util_timer *timer = new util_timer; timer->user_data = &users_timer[connfd]; timer->cb_func = cb_func; time_t cur = time(NULL); timer->expire = cur + 3*TIMESLOT; users_timer[connfd].timer = timer; timer_lst.add_timer(timer); } else if((sockfd == pipefd[0])&&(events[i].events & EPOLLIN)) { int sig; char signals[1024]; ret = recv(pipefd[0], signals, sizeof(signals), 0); if (ret == -1) { continue; } else if (ret == 0) { continue; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { case SIGALRM: { timeout = true; break; } case SIGTERM: { stop_server = true; } } } } } // 若对方异常端开或错误 else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) { users[sockfd].close_conn(); } // 有读事件发生(可读) else if(events[i].events & EPOLLIN) { util_timer *timer = users_timer[sockfd].timer; // 有读事件发生 if(users[sockfd].read()) { // 读的到数据 pool->append(users+sockfd); //若有数据传输,则将定时器往后延迟3个单位 //并对新的定时器在链表上的位置进行调整 if (timer) { time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; timer_lst.adjust_timer(timer); } } else { printf("Read Fail!\n"); // 读不到数据 timer->cb_func(&users_timer[sockfd]); if (timer) { timer_lst.del_timer(timer); } // users[sockfd].close_conn(); } } // 有写事件发生(可写) else if(events[i].events & EPOLLOUT) { util_timer *timer = users_timer[sockfd].timer; if(users[sockfd].write()) { if (timer) { time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; timer_lst.adjust_timer(timer); } } else { printf("Write Fail!\n"); timer->cb_func(&users_timer[sockfd]); if (timer) { timer_lst.del_timer(timer); } // users[sockfd].close_conn(); } } } if (timeout) { timer_handler(); timeout = false; } } close(epollfd); close(listenfd); close(pipefd[1]); close(pipefd[0]); delete [] users; delete[] users_timer; delete pool; return 0; }