(C++)利用TCP协议服务器(epoll),实现reactor(二)--------------------附直接实现代码

时间:2024-10-25 07:24:16
  • #include <>
  • #include <netinet/>
  • #include <>
  • #include <>
  • #include <>
  • #include <sys/>
  • #include <sys/>
  • #include <>
  • #include <sys/>
  • #include <sys/>
  • #include <>
  • #define MAXLNE 4096
  • #define POLL_SIZE 1024
  • #define BUFFER_LENGTH 1024
  • #define MAX_EPOLL_EVENT 1024
  • #define NOSET_CB 0 //控制对fd的操作
  • #define READ_CB 1
  • #define WRITE_CB 2
  • #define ACCEPT_CB 3
  • typedef int NCALLBACK(int fd, int event, void *arg);
  • struct nitem { // fd按照结构体存储
  • int fd; //哪一个fd
  • int status;//fd的状态
  • int events;//需要如何操作这个fd(删除,移动,或者添加进epoll里),这里一定要置0,不然fd放不进来
  • void *arg;//**这个服务器没用到这个,不清楚他的作用,还需要继续学习
  • #if 0
  • NCALLBACK *callback;//一个回调函数
  • #else
  • NCALLBACK *readcb; // epollin
  • NCALLBACK *writecb; // epollout
  • NCALLBACK *acceptcb; // epollin
  • #endif
  • unsigned char sbuffer[BUFFER_LENGTH]; //为什么用uint呢?因为传输的ascii全都大于0,为char时候有负的
  • int slength;
  • unsigned char rbuffer[BUFFER_LENGTH];//对应的数据的缓冲区
  • int rlength;//通过rcv函数接收数据长度
  • };
  • struct itemblock {//这里大量fd的结构体,通过链表的方式串联在一起
  • struct itemblock *next;
  • struct nitem *items;//fd的结构体嵌套构造的
  • };
  • struct reactor {
  • int epfd;//epoll_create创建的话柄
  • struct itemblock *head;
  • };
  • //一堆声明,不用担心定义的先后
  • int init_reactor(struct reactor *r);
  • int read_callback(int fd, int event, void *arg);
  • int write_callback(int fd, int event, void *arg);
  • int accept_callback(int fd, int event, void *arg);
  • struct reactor *instance = NULL;
  • struct reactor *getInstance(void) { //singleton
  • if (instance == NULL) {
  • instance = malloc(sizeof(struct reactor));
  • if (instance == NULL) return NULL;
  • memset(instance, 0, sizeof(struct reactor));
  • if (0 > init_reactor(instance)) {
  • free(instance);
  • return NULL;
  • }
  • }
  • return instance;
  • }
  • int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {
  • struct reactor *r = getInstance();
  • struct epoll_event ev = {0};//全部置为空闲
  • if (event == READ_CB) {
  • r->head->items[fd].fd = fd;//以fd的序号存储fd,方便查找
  • r->head->items[fd].readcb = cb;//接下来的回调函数类型
  • r->head->items[fd].arg = arg;//这里没啥用
  • = EPOLLIN;//表示这个端口可读
  • } else if (event == WRITE_CB) {
  • r->head->items[fd].fd = fd;
  • r->head->items[fd].writecb = cb;
  • r->head->items[fd].arg = arg;
  • = EPOLLOUT;//表示这个端口可写
  • } else if (event == ACCEPT_CB) {
  • r->head->items[fd].fd = fd;
  • r->head->items[fd].acceptcb = cb;
  • r->head->items[fd].arg = arg;
  • = EPOLLIN;
  • }
  • = &r->head->items[fd];
  • //以下是将新的fd放进我们的nitem里,如果是就fd则把需要对他的event给他
  • if (r->head->items[fd].events == NOSET_CB) {
  • if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
  • printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
  • return -1;
  • }
  • r->head->items[fd].events = event;
  • } else if (r->head->items[fd].events != event) {
  • if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
  • printf("epoll_ctl EPOLL_CTL_MOD failed\n");
  • return -1;
  • }
  • r->head->items[fd].events = event;
  • }
  • return 0;
  • }
  • int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) {
  • struct reactor *r = getInstance();
  • struct epoll_event ev = {0};
  • = arg;
  • epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
  • r->head->items[fd].events = 0;
  • return 0;
  • }
  • int write_callback(int fd, int event, void *arg) {
  • struct reactor *R = getInstance();
  • unsigned char *sbuffer = R->head->items[fd].sbuffer;
  • int length = R->head->items[fd].slength;
  • int ret = send(fd, sbuffer, length, 0);
  • if (ret < length) {
  • nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
  • } else {
  • nreactor_set_event(fd, read_callback, READ_CB, NULL);
  • }
  • return 0;
  • }
  • // 5k qps
  • int read_callback(int fd, int event, void *arg) {
  • struct reactor *R = getInstance();
  • unsigned char *buffer = R->head->items[fd].rbuffer;
  • #if 0 //ET
  • int idx = 0, ret = 0;
  • while (idx < BUFFER_LENGTH) {//读取
  • ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
  • if (ret == -1) {
  • break;
  • } else if (ret > 0) {
  • idx += ret;
  • } else {// == 0
  • break;
  • }
  • }
  • if (idx == BUFFER_LENGTH && ret != -1) { //判断还需要读不读,
  • nreactor_set_event(fd, read_callback, READ_CB, NULL);
  • } else if (ret == 0) {
  • nreactor_set_event
  • //close(fd);
  • } else {
  • nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
  • }
  • #else //LT
  • int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
  • if (ret == 0) { // fin,客户机发生fin请求关闭,服务端也关
  • nreactor_del_event(fd, NULL, 0, NULL);
  • close(fd);
  • } else if (ret > 0) {
  • unsigned char *sbuffer = R->head->items[fd].sbuffer; //把对应fd的接受缓存区拿出来发送数据给我们的客户端,方便调试
  • memcpy(sbuffer, buffer, ret);
  • R->head->items[fd].slength = ret;
  • printf("readcb: %s\n", sbuffer);
  • nreactor_set_event(fd, write_callback, WRITE_CB, NULL);//把我们的客户端发过来的发回去,方便调试
  • }
  • #endif
  • }
  • // web server
  • // ET / LT
  • int accept_callback(int fd, int event, void *arg) {
  • int connfd;
  • struct sockaddr_in client;
  • socklen_t len = sizeof(client);
  • if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
  • printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
  • return 0;
  • }//说明了监听之后,才会发生三次握手,并且套接字长时间存在
  • nreactor_set_event(connfd, read_callback, READ_CB, NULL);
  • }
  • int init_server(int port) {
  • int listenfd;
  • struct sockaddr_in servaddr;
  • char buff[MAXLNE];
  • if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
  • printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
  • return 0;
  • }
  • memset(&servaddr, 0, sizeof(servaddr));
  • servaddr.sin_family = AF_INET;
  • servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  • servaddr.sin_port = htons(port);
  • if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
  • printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
  • return 0;
  • }
  • if (listen(listenfd, 10) == -1) {
  • printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
  • return 0;
  • }
  • return listenfd;
  • }
  • int init_reactor(struct reactor *r) {
  • if (r == NULL) return -1;//表示fd全部断开,shutdown
  • int epfd = epoll_create(1); //创建每一个的话柄,每一个话柄不一样,故这位我们之后链表查找有很大帮助(一个listen进去的话柄相同)
  • r->epfd = epfd;
  • // fd --> item
  • r->head = (struct itemblock*)malloc(sizeof(struct itemblock));//在对应epfd放进去之后,把链表的位置拿到
  • if (r->head == NULL) {
  • close(epfd);
  • return -2;
  • }
  • memset(r->head, 0, sizeof(struct itemblock));//分配了内存之后置0,防止脏数据
  • r->head->items = malloc(MAX_EPOLL_EVENT * sizeof(struct nitem)); //分配了多少个装fd的房间
  • if (r->head->items == NULL) {
  • free(r->head);
  • close(epfd);
  • return -2;
  • }
  • memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));//把装fd的房间置0
  • r->head->next = NULL;
  • return 0;
  • }
  • // accept --> EPOLL
  • int reactor_loop(int listenfd) {
  • struct reactor *R = getInstance();//这里这里一定要知道reactor是如何分配的,这里采用的结构体存储
  • struct epoll_event events[POLL_SIZE] = {0};
  • while (1) {
  • int nready = epoll_wait(R->epfd, events, POLL_SIZE, 5);//等5ms,记录下有多少个fd需要处理
  • if (nready == -1) {
  • continue;
  • }
  • int i = 0;
  • for (i = 0;i < nready;i ++) {
  • struct nitem *item = (struct nitem *)events[i].;//这里是epoll为我们找到的需要处理的fd
  • int connfd = item->fd;
  • if (connfd == listenfd) {
  • item->acceptcb(listenfd, 0, NULL);//调用回调
  • } else {
  • if (events[i].events & EPOLLIN) {
  • item->readcb(connfd, 0, NULL);
  • }
  • if (events[i].events & EPOLLOUT) {
  • item->writecb(connfd, 0, NULL);
  • }
  • }
  • }
  • }
  • return 0;
  • }
  • int main(int argc, char **argv)
  • {
  • int connfd, n;
  • int listenfd = init_server(9999);//初始化服务端(socket->bind->listen),封装明确了功能,方便我们多次使用
  • nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);//先初始化最大的结构体链表reactor,把我们的监听fd放进轮询结构里
  • //nreactor_set_event(listenfd, accept_callback, read_callback, write_callback); 这里给出了回溯函数的另一种常用形式
  • reactor_loop(listenfd);//进入轮询状态,1.控制连接加入断开 2.控制连接的send和recv
  • return 0;
  • }