[C/C++后端开发学习] 7 tcp服务器的epoll实现以及Reactor模型

时间:2024-10-25 07:23:03

tcp服务器的epoll实现以及Reactor模型

  • 1 IO多路复用
    • select
    • poll
    • epoll
  • 2 epoll详解
    • 2.1 基本使用方法
    • 2.2 LT水平触发和ET边沿触发
    • 2.3 实现服务器监听和接收数据
  • 3 Reactor模型
    • 3.1 增加回调函数实现Reactor模型
    • 3.2 实现服务器发送数据
    • 3.3 半包的数据怎么处理?
    • 3.4 单Reactor的优化思路
    • 3.5 多线程多Reactor处理
      • 处理方法:
    • 3.6 多进程多Reactor处理
      • 处理方法:
    • 3.7 Reactor参考案例
  • 其他杂项笔记

1 IO多路复用

IO多路复用简单地理解就是,一个线程(或进程)同时负责多个文件描述符fd的读写操作,它通过某种方式对这些fd进行监听,当内核发现指定的某个描述符就绪时,就通知线程进行读写。

select、poll和epoll是最常见的IO多路复用实现方式。基于此实现的服务器模型往往又被称为事件驱动模型。

select

select将需要监听的接口描述符标记到集合中(数组实现),包括读、写、异常3个集合,然后将这些集合传递给内核;内核轮询遍历集合中标记的描述符,并将就绪的描述符保留在这些集合中返回。应用程序从内核处拷贝回这些集合后,还需要再遍历一遍才能获知哪些描述符处于就绪的状态。

当描述符较大时,select()接口本身需要消耗大量时间去轮询各个句柄;其次,整个描述符集合在用户程序和内核之间来回拷贝也存在一定的开销;最后,应用程序还需要遍历一遍描述符集合才能获得各描述符的状态。可见,select 显然不是实现“事件驱动”的最好选择。

下面是select 接口的原型定义:

FD_ZERO(int fd, fd_set* fds)
FD_SET(int fd, fd_set* fds)
FD_ISSET(int fd, fd_set* fds)
FD_CLR(int fd, fd_set* fds)
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout)
  • 1
  • 2
  • 3
  • 4
  • 5

其中 fd_set 类型按 bit 位标记句柄,比如设置了一个值为16的句柄(通过FD_SET() ),则fd_set 的第16位bit被设置为1,select调用将对其进行监视;select调用返回后,也需要检查fd_set 中的16号句柄(第16位bit)是否被标记为1(通过FD_ISSET() ),从而可判断对应的描述符是否可读写。

客户端的一个 connect() 操作,将在服务器端的监听socket激发一个“可读事件”;close()操作也是对相应连接的socket触发可读,只不过读操作返回的是0。

poll

poll的工作方式与select本质上差不多,只不过将内部管理描述符的数据结构由数组改为了链表,取消了最大可监控文件描述符数的限制,其他方面与select基本无异。

epoll

epoll 在内核中维护一个简易的文件系统,其中包含一个红黑树和一个就绪队列。每一个事件结构体都包含需要监听的描述符、监听的读写事件以及事件触发时的回调函数,这些事件会被挂到红黑树中,这样可以实现事件结构的快速查询,方便实现事件的修改和删除;就绪的描述符会被加入到等待队列中,因此应用程序不需要轮询,直接从队列中读走就绪的事件结构即可。

关于epoll的工作原理可以参考:Select、Poll、Epoll详解。一个简单的示意图如下图所示,其要点在于:所有添加到 epoll 的事件都会与网卡驱动程序建立回调关系,当对应事件发生时,会调用其回调⽅法(ep_poll_callback)并将事件的event结构体放到就绪队列rdllist双向链表中;之后再将就绪队列中的数据拷贝到用户空间,epoll_wait 调用返回。

在这里插入图片描述

与select相对地,1)epoll只需要将待监视的描述符信息拷贝一次给内核,不必每次调用前都设置一次;2)不必每次都遍历描述符集合,直接读取就绪的描述符事件结构即可;3)最大描述符数量没有限制。

不过,select是POSIX的,而不同的操作系统特供的 epoll 接口差异很大,因此跨平台能力就相对差一些,不过对于以Linux为主的服务器环境问题不大。

如果处理的连接数不是特别多的话,使用select/epoll 方案的服务器不一定就比使用多线程+阻塞IO 方案的性能更好,甚至可能延迟还更大。但对于连接数更多的情况下,select/epoll 等肯定具有优势。

2 epoll详解

2.1 基本使用方法

  • 创建一个epoll实例,返回该实例的描述符epfd:
int epoll_create(int size);		// size参数只有0和1的区别(Since Linux 2.6.8, the argument is ignored, but must be  greater  than  zero)
  • 1
  • 设置epoll事件结构(注意了哈,epoll_data 是个**union**!)。
typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

struct epoll_event
{
  uint32_t events;	/* Epoll events */
  epoll_data_t data;	/* User data variable */
} __EPOLL_PACKED;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

其中,events 事件类型变量常用 EPOLLINEPOLLOUTEPOLLET,可以通过‘“或”运算符|进行组合。data 下的 fd 和 ptr 根据需要设置其一,一般使用 ptr 指向自定义的结构体,其中可以是包含了fd 和回调函数等等需要的参数,一个实例如下:

struct sockitem
{
	int sockfd;
	int (*callback)(int fd, int events, void *arg);
};
  • 1
  • 2
  • 3
  • 4
  • 5

事件类型EPOLLERREPOLLHUP的意义在于,当连接发生错误或关闭时,可以通知处理函数从socket读出 errno。EPOLLHUP出现在TCP四次挥手接收到最后一个ACK后,此时读写通道都已关闭。

EPOLLRDHUB的意义则在于检测到对端调用shutdown(SHUT_WR)发送⼀个 FIN 包关闭了写通道,此时对于本地端就是读通道关闭了,但写通道没有关闭(本地没有调用close),连接处于半关闭状态。

  • 添加事件到epoll实例中:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 1
epfd : 就是epoll实例的描述符
op : EPOLL_CTL_ADD(增加事件)、EPOLL_CTL_MOD(修改事件)、EPOLL_CTL_DEL(删除事件)
event : 自然就是对应的事件结构体了
  • 1
  • 2
  • 3
  • 获取就绪的触发事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • 1

此处,events是一个输入/输出参数,传入的应该要是一个struct epoll_event数组;maxevents一般是数组大小;timeout是以毫秒为单位的超时事件,传入-1则永久阻塞,传入0则会立即返回;Linux的man中说的很清楚:

The call will block until either :
* a file descriptor delivers an event;
* the call is interrupted by a signal handler; or
* the timeout expires.

正常返回时,返回值为就绪的事件数目n,events数组中的前n个元素则存放了就绪的事件结构体。

2.2 LT水平触发和ET边沿触发

LT水平触发是 epoll 的默认触发方式,与select一样,读时只要 fd 对应的缓冲区中有数据,就一直触发通知应用程序 fd 可读,写时只要缓冲区中有空间就通知应用 fd 可写。而ET边沿触发是epoll特有的,读时只有当缓冲区中新加入新的数据时,才触发一次,之后如果数据没有全部读走,即使缓冲区中还有数据,也不会再触发,直至缓冲区又送入了新数据才又触发一次。

显然,在

要使用边沿触发的话,在epoll_event结构的变量events中设置EPOLLET即可。

一般的使用方法为:

  • ET + 循环读出全部数据
  • LT + 一次读一块数据,不管多少

使用ET时一次触发往往需要通过循环去将所有数据读出,否则有可能导致死等,即:客户端发送数据后等待响应,但一次触发后服务端没有读出全部数据,不会响应客户端,客户端也不再发数据引起触发,于是二者就僵持住了。

但ET的优势在于:因为在ET模式下只有当数据到达网卡时才会触发一次,而LT模式下,内核需要不断轮询I/O状态表,涉及的epoll_wait系统调用的次数也更多,所以针对大数据量的场景ET模式的效率要比LT模式高很多。

⽔平触发的时候, io函数既可以是阻塞的也可以是⾮阻塞的。
边沿触发的时候, io函数只能是⾮阻塞的。因为ET触发一次读事件后,应该要循环将所有数据读完毕,此时如果采用阻塞IO去读,那么最后一次读必然会被阻塞住。

具体实现中,使用ET还是LT取决于:1)首先看服务是否需要界定数据包并进行数据解析,2)其次看每次读的数据量是否比较小。

  • 如果需要界定数据包,则适合使用LT,每次读一部分数据就先做业务逻辑相关的解析,然后再触发读下一部分数据,再做其他处理。比如redis和memcached,主要职责是实现业务逻辑,比较关心的是是否读到了完整的数据包,如果读到了足够的数据包就先去做业务处理,剩下的下一次触发再来读;
  • 如果每次读取的数据量比较小,适合用LT,因为在使用LT模式读数据时每次触发往往只会调用一次read,如果一次读的比较少而没有把缓冲区中的数据全读完也没关系,反正下次还会触发;这往往也是在需要界定数据包的情况下才会出现;
  • 如果二者都不是,则适合使用ET。比如Nginx作反向代理,因为不需要处理业务逻辑,数据量又比较大,一有数据就尽量把读缓冲区中的所有数据一股脑地读出来并转发出去。

2.3 实现服务器监听和接收数据

#include <>
#include <>
#include <>
#include <>
#include <>
#include <>
#include <sys/>
#include <sys/>
#include <sys/>
#include <netinet/>
#include <arpa/>

int main(int argc, char* argv[])
{
    int port = atoi(argv[1]);   // 传入一个参数作为端口号,此处对参数简单处理

    /* 开启监听 */
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
		return -1;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(port);

    if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
        return -2;

    if(listen(sockfd, 5) < 0)
        return -3;
    /* 开启监听完成 */

    /* go epoll */
    int epfd = epoll_create(1);

    struct epoll_event ev;
    struct epoll_event events[512]; // wait时存储就绪的事件

    memset(&ev, 0, sizeof(struct epoll_event));
    ev.events = EPOLLIN;    // 默认LT
    ev.data.fd = sockfd;

    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);    // 添加到事件到epoll

    while(1)
    {
        int nready = epoll_wait(epfd, events, 512, -1); // 阻塞wait
        if(nready < 0)
        {
            printf("epoll_wait error.\n");
            break;
        }

        int i;
        for(i = 0; i < nready; i++)	// 遍历处理就绪的事件
        {
            if(events[i].events & EPOLLIN)  // 客户端connect是可读事件 EPOLLIN; 客户端关闭连接,也是 EPOLLIN,只是读时返回0
            {
                if(events[i].data.fd == sockfd) // 如果是监听套接字的事件(对应需要调用accept)
                {
                    struct sockaddr_in client;
                    memset(&client, 0, sizeof(struct sockaddr_in));
                    socklen_t caddr_len = sizeof(struct sockaddr_in);

                    int clientfd = accept(sockfd, (struct sockaddr*)&client, &caddr_len);
                    if(clientfd < 0)
                    {
                        printf("# accept error\n");
                        continue;
                    }

                    char str[INET_ADDRSTRLEN] = {0};
                    printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client.sin_addr, str, sizeof(str)),
                        ntohs(client.sin_port));

                    memset(&ev, 0, sizeof(struct epoll_event));
                    ev.events = EPOLLIN;
                    ev.data.fd = clientfd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);  // 把客户端socket增加到epoll中监听
                }
                else // 如果是非监听套接字的其他已经连接的客户端的事件
                {
                    int clientfd = events[i].data.fd;
                    char buffer[1024] = {0};
                    int ret = recv(clientfd, buffer, 1024, 0);
                    if(ret <= 0)
                    {
                        if(ret < 0)
                        {
                            if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
                            {   // 被打断直接返回的情况
                                continue;
                            }
                        }
                        else
                        {   // 返回0说明对端关闭连接
                            printf("# client disconnected...\n");
                        }
                        
                        /* 将当前客户端socket从epoll中删除 */
                        close(clientfd);
                        ev.events = EPOLLIN;
                        ev.data.fd = clientfd;
                        epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev);  
                    }
                    else
                    {
                        printf("# recv data from fd:%d : %s , len = %d\n", clientfd, buffer, ret);
                    }
                }
            }

            if(events[i].events & EPOLLOUT)
            {
                // TODO
            }

        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123

3 Reactor模型

Reactor 的原义为“反应堆”,是一种事件驱动机制。其特殊之处在于,应有程序并非直接调用某个API进行IO处理,而是将IO处理对应的事件处理函数注册到 Reactor 上,当事件触发时,由 Reactor 去调用事件处理函数进行IO处理。这些事件处理函数就是所谓的“回调函数”。

Reactor 模型有三个重要的组件:
- 多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
- 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。
- 事件处理器:负责处理特定事件的处理函数。

Reactor 模型通常是单线程的,其设计目标是单线程使用一颗 CPU 的全部资源,这样做的附带好处在于:不必考虑共享资源的互斥访问。

同时它也具有一定的可扩展性,对于多核的机器,可以增加 Reactor 实例个数,使每个核上运行一个反应堆。当然这些Reactor 实例处理的请求也应是互不相关的,这适用于一些需要为简单的访问提供并发服务的场景。例如 Nginx 这样的 http 静态服务器。

几乎所有的Linux服务器低层都是采用基于epoll的Reactor模型实现的。

用一句话总结Reactor:由最基础的对IO进行管理的思路转换为对事件进行管理的思路。

此外,一般来说 Reactor 模型还会要求回调处理函数的IO操作采用非阻塞IO,这是因为:Reactor 模型要求各个IO事件之间不能互相影响,也就是说某个事件的IO操作无论如何也不允许出现永久阻塞而导致其他事件无法响应的情况。而采用阻塞地方式去读时,如果某个事件的数据量比较大而需要连续读多次,则会存在引起读操作阻塞的风险。类似地,有些实现可能希望一次事件触发后能够连续调用多次accept,那么显然也存在相同的问题。因此,在使用阻塞或非阻塞都无所谓的情况下优先使用非阻塞IO。

3.1 增加回调函数实现Reactor模型

struct sockitem
{
	int sockfd;
	int (*callback)(int fd, int events, void *arg);
    int epfd;
};

#define MAX_EVENTS_NUM 512

struct reactor
{
    int epfd;
    struct epoll_event events[MAX_EVENTS_NUM];
};

/* 写IO回调函数 */
int send_cb(int fd, int events, void *arg)
{
	/* TODO */
    return 0;
}

/* 读IO回调函数 */
int recv_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    int clientfd = si->sockfd;
    char buffer[1024] = {0};
    int ret = recv(clientfd, buffer, 1024, 0);
    if(ret <= 0)
    {
        if(ret < 0)
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
            {   // 被打断直接返回的情况
                return ret;
            }
        }
        else
        {
            printf("# client disconnected...\n");
        }
        
        /* 将当前客户端socket从epoll中删除 */
        close(clientfd);
        ev.events = EPOLLIN;
        ev.data.ptr = si;
        epoll_ctl(si->epfd, EPOLL_CTL_DEL, clientfd, &ev);  
        free(si);
    }
    else
    {
        printf("# recv data from fd:%d : %s , len = %d\n", clientfd, buffer, ret);
    }

    return ret;
}

/* accept也属于读IO操作的回调 */
int accept_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    struct sockaddr_in client;
    memset(&client, 0, sizeof(struct sockaddr_in));
    socklen_t caddr_len = sizeof(struct sockaddr_in);

    int clientfd = accept(si->sockfd, (struct sockaddr*)&client, &caddr_len);
    if(clientfd < 0)
    {
        printf("# accept error\n");
        return clientfd;
    }

    char str[INET_ADDRSTRLEN] = {0};
    printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client.sin_addr, str, sizeof(str)),
        ntohs(client.sin_port));

    struct sockitem *client_si = (struct sockitem*)malloc(sizeof(struct sockitem));
    client_si->sockfd = clientfd;
    client_si->callback = recv_cb;  // accept完的下一步就是接收客户端数据
    client_si->epfd = si->epfd;

    memset(&ev, 0, sizeof(struct epoll_event));
    ev.events = EPOLLIN;
    ev.data.ptr = client_si;
    epoll_ctl(si->epfd, EPOLL_CTL_ADD, clientfd, &ev);  // 把客户端socket增加到epoll中监听

    return clientfd;
}

int main(int argc, char* argv[])
{
    int port = atoi(argv[1]);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
		return -1;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(port);

    if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
        return -2;

    if(listen(sockfd, 5) < 0)
        return -3;

    /* go epoll */
    struct reactor ra;
    ra.epfd = epoll_create(1);

    struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));    // 自定义数据,用于传递给回调函数
    si->sockfd = sockfd;
    si->callback = accept_cb;
    si->epfd = ra.epfd; // sockitem 中增加一个epfd成员以便回调函数中使用

    struct epoll_event ev;
    memset(&ev, 0, sizeof(struct epoll_event));
    ev.events = EPOLLIN;    // 默认LT
    ev.data.ptr = si;

    epoll_ctl(ra.epfd, EPOLL_CTL_ADD, sockfd, &ev);    // 添加到事件到epoll

    while(1)
    {
        int nready = epoll_wait(ra.epfd, ra.events, MAX_EVENTS_NUM, -1);
        if(nready < 0)
        {
            printf("epoll_wait error.\n");
            break;
        }

        int i;
        for(i = 0; i < nready; i++)
        {
            si = ra.events[i].data.ptr;
            if(ra.events[i].events & (EPOLLIN | EPOLLOUT))
            {
                if(si->callback != NULL)
                    si->callback(si->sockfd, ra.events[i].events, si);  // 调用回调函数
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152

上面的代码中,定义了一个sockitem结构体来作为event结构内的自定义数据,以便于在回调时区分不同的事件并方便在回调函数中使用事件的参数如socket 描述符等。此时还没有实现 send 操作的回调。

3.2 实现服务器发送数据

接收到客户端数据后,最简单的响应方式就是在接收回调函数recv_cb的最后调用send。但这么处理存在问题:
发送大量数据时,socket的发送缓冲区有可能是满的,此时send会返回-1。那我们可以增加循环去发送吗?不能,否则线程可能长时间停留在此处理该send过程,其他IO请求都没法处理了。

正确的处理方式:recv完之后,将socket在epoll中的可触发事件设为可写,在写数据的回调中send完数据后,再将其设为可读。如此循环运行下去。

修改读和写的回调函数如下:

int recv_cb(int fd, int events, void *arg);

/* 写IO回调函数 */
int send_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    int clientfd = si->sockfd;
    
    /* 写回的数据此处先简单处理 */
    char* hello = "Hello!\n";
    int ret = send(clientfd, hello, strlen(hello), 0);

    si->callback = recv_cb; // 切回读的回调
    ev.events = EPOLLIN;
    ev.data.ptr = si;
    epoll_ctl(si->epfd, EPOLL_CTL_MOD, si->sockfd, &ev);

    return ret;
}

/* 读IO回调函数 */
int recv_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    int clientfd = si->sockfd;
    char buffer[1024] = {0};
    int ret = recv(clientfd, buffer, 1024, 0);
    if(ret <= 0)
    {
        if(ret < 0)
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
            {   // 被打断直接返回的情况
                return ret;
            }
        }
        else
        {
            printf("# client disconnected...\n");
        }
        
        /* 将当前客户端socket从epoll中删除 */
        close(clientfd);
        ev.events = EPOLLIN;
        ev.data.ptr = si;
        epoll_ctl(si->epfd, EPOLL_CTL_DEL, clientfd, &ev);  
        free(si);
    }
    else
    {
        printf("# recv data from fd:%d : %s , len = %d\n", clientfd, buffer, ret);
        /* 此处进行数据处理,比如解析接收到的HTTP报文等等 */
        /*** ..... ***/

        si->callback = send_cb;     // 回调函数要切换成写回调
        struct epoll_event ev;
        ev.events = EPOLLOUT | EPOLLET; // 写的时候最好还是用ET
        ev.data.ptr = si;
        epoll_ctl(si->epfd, EPOLL_CTL_MOD, si->sockfd, &ev);
    }

    return ret;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

3.3 半包的数据怎么处理?

TCP半包问题:数据量很大,一次回调读取或写入时没有处理完,希望下次回调时继续处理。

将各个socket的接收缓冲和发送缓冲都封装到各自的 sockitem 中,这样就能各自读写各的了。

#define MAX_BUFFER_SIZE 1024
struct sockitem
{
	int sockfd;
	int (*callback)(int fd, int events, void *arg);
    int epfd;

    char recvbuffer[MAX_BUFFER_SIZE]; // 接收缓冲
	char sendbuffer[MAX_BUFFER_SIZE]; // 发送缓冲
    int recvlength; // 接收缓冲区中的数据长度
    int sendlength; // 发送缓冲区中的数据长度
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3.4 单Reactor的优化思路

1)Reactor位于主线程,读、写事件以及数据包解析与封装等比较耗时的操作交给线程池中的worker线程处理,主线程处理业务逻辑;
2)Reactor位于主线程,仅负责读事件,读完后的数据交给线程池中的worker线程处理,并由worker线程完成写操作;如果写过程出现失败或没有写完,则将剩下的待写数据缓存,然后注册写事件,由主线程完成写。

3.5 多线程多Reactor处理

问题:当大量客户端同时发起 connect 时,有多少个客户端接入,epoll_wait 就要返回多少次(因为一次只能accept一个客户端),这将导致接入过程影响到其他IO请求的处理。

处理方法:

一个线程专门用于处理 accept,将返回的socket描述符注册到epoll实例中;另一个线程处理同一个epoll实例中的读写触发事件。

缺点: 数据共享就避免不了要通过加锁来处理线程安全问题。如果每一个连接都处理相同的业务逻辑,连接之间存在大量的临界资源处理(数据交互),则这些加锁操作将严重影响多线程的处理性能。

如果每个连接在处理业务逻辑时互相之间没有很强的相关性,各干各的就行,则多线程多Reactor的模式性能会比单Reactor好很多;但前提是处理临界资源时只需要很轻的锁操作,或最好不需要锁操作。

3.6 多进程多Reactor处理

前提: 解决好客户端关联数据(session等)的存储和共享问题,因为多进程间并不直接共享内存资源,使用共享内存的方式会很麻烦。同时,需要解决accept时的进群问题,典型案例是Nginx。

多进程的优势: 多个进程之间的用户态地址空间是相互独立的,当业务逻辑代码或第三方组件导致其中一个进程奔溃后几乎不会对其他进程产生影响,因此稳定性、可靠性更高。

处理方法:

多进程如何监听同一个端口?—— 执行完 listen 之后再fork。

3.7 Reactor参考案例

  • 单线程Reactor——libevent、redis

  • 多线程Reactor——memcached(one eventloop per thread)

  • 多进程Reactor——nginx

  • 单Reactor + 任务队列 + 线程池——skynet
    优点:将业务逻辑和网络处理过程分离,再通过线程池去“消费”队列中的任务,这可以避免复杂的业务逻辑影响网络I/O。

  • 多Reactor + 消息队列 + 线程池——适用于网络密集型+业务密集型的场景

其他杂项笔记

udp能否实现并发服务器?
udp无法保证数据达到的顺序。所以不能简单地采用在数据包上增加一层协议头的方式来区分客户端。
正确的做法:模拟tcp三次握手,握手后为每个客户端分配一个fd

socket是什么?
socket由两部分组成,1是fd,2是五元组–sip,dip,sport,dport,传输层协议(tcp/udp)

关于sigio

  • sigio允许为描述符安装一个信号处理函数,操作该描述符的进程发起读写操作后可以继续运行而不阻塞;等到数据准备好后,内核会给进程发送一个SIGIO信号,进程通过异步回调的方式进入到信号处理函数去处理数据。

  • sigio 很难看到在实际中应用。

  • udp 实现并发服务器可以使用sigio来接收(异步回调函数中调用recvfrom),但tcp不行,因为tcp接收流数据时会产生大量sigio,包括accept、recv都会产生sigio,但信号处理函数却没有传入 fd 因而无法区分。

信号是如何发送的?

  1. 进程的信号集合保存在task_struct
    task_struct -> sighand_struct ()-> sigaction–>action[64] , SIGIO是29

  2. 调用signal时信号如何绑定到进程
    系统调用 SYSCALL_DEFINE2(signal, …) -> do_sigaction() -> 设置该进程的sighand->action[sig-1]

  3. 信号如何发送
    kill 系统调用发送信号 -> signalfd_notify() -> 激活等待队列(条件等待,等待sigio位满足) --> 回调

服务端出现大量close_wait是什么情况?
客户端关闭连接后,服务端没有及时关闭socket

采用io多路复用的情况下,多个线程需要同时写一个fd时,如何避免数据混乱?
某个线程在写之前,先将这个fd从epoll(或其他io多路复用工具)中删除,避免其他线程调用wait时触发,然后写完之后再把它加回去。这样确保只有一个线程占用这个fd。