高级IO—多路转接

时间:2024-04-25 22:13:49

                                        ????慕斯主页 修仙—别有洞天

                                       ♈️今日夜电波:Cupid - Twin Ver. (FIFTY FIFTY) - Sped Up Version

                                                           0:20━━━━━━️????──────── 2:25
                                                                ????   ◀️   ⏸   ▶️    ☰  

                                 ????关注????点赞????收藏您的每一次鼓励都是对我莫大的支持????


目录

什么是高级I/O?

理解五种高级I/O的模型

高级I/O之多路转接

多路转接之select

接口详解

fd_set

select使用实例

select的特点与缺点

特点:

缺点:

多路转接之poll

接口详解

poll的使用实例

poll的优点和缺点

优点:

缺点:

多路转接之epoll

主要函数

epoll_event结构

epoll的原理

使用流程

epoll工作方式

epoll的使用实例

epoll的优势


什么是高级I/O?

        在Linux中,高级I/O(Advanced I/O)指的是一组函数,这些函数提供了比标准I/O更高级别的输入输出操作。高级I/O具有更好的性能和更多的灵活性。在处理大量数据时,高级I/O由于以异步和非阻塞方式进行I/O操作,因此明显优于标准I/O。

        在Linux中,高级I/O涵盖了多种I/O模型,包括:

  1. 阻塞I/O:当内核将数据准备好之前,系统调用会一直等待。所有的套接字默认都是阻塞方式。
  2. 非阻塞I/O:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回错误码。非阻塞I/O往往需要程序员利用循环的方式反复尝试读写文件描述符,这个过程称为轮询。
  3. 信号驱动I/O:当内核将数据准备好的时候,使用信号通知应用程序进行I/O操作。
  4. I/O多路转接:这是一种同时等待多个文件描述符就绪状态的机制。通过多路转接,系统可以监控多个I/O操作,并在任何一个操作准备好时通知应用程序。
  5. 异步I/O:在数据拷贝给用户空间完成时,由内核通知应用程序处理。与信号驱动I/O不同,异步I/O是在数据拷贝完成后通知应用程序,而信号驱动I/O是通知应用程序何时可以开始拷贝数据。

        在Linux中,可以通过系统调用和相关的库函数来使用这些高级I/O功能。例如,对于异步I/O,Linux提供了如aio_read和aio_write等函数来执行异步的读写操作。

        总的来说,Linux中的高级I/O为开发者提供了丰富的工具和技术,可以根据具体的应用场景和需求选择最适合的I/O模型,以提高系统的性能和响应能力。

理解五种高级I/O的模型

        当我们需要再应用层读取或者写数据的时候通常会调用read或者write等等系统调用的I/O接口,他们的本质就是将数据从用户层写到OS或者从OS写到用户层。说到底他们本质就是拷贝函数,而I/O=等待+拷贝。等待指的是要进行拷贝,必须得先判断条件成立。而前面提到的高级I/O呢?实际上就是单位时间内,在I/O的过程中等待的比重越小,I/O的效率越高,而几乎所有提高I/O效率的策略,本质就是这个!!!接下来我们理解五种高级I/O模型:

        阻塞IO: 阻塞IO是最常见的IO模型. 在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式.

        非阻塞IO:非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用. 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码.

        信号驱动IO: 内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作

        IO多路转接: 虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态.


        异步IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据).


高级I/O之多路转接

        在Linux中,多路转接IO通常指的是利用一种机制同时监视多个IO事件的能力,以便在任意一个IO事件准备就绪时立即执行相应的IO操作。这种机制可以提高IO操作的效率,特别是在需要同时处理多个IO事件时。在Linux中,实现多路转接IO的主要方式有三种:

  1. selectselect是最古老的一种多路转接IO机制,在POSIX标准中定义。它允许程序监视一组文件描述符(包括套接字、管道等),并在其中任何一个文件描述符准备就绪时通知程序。但select存在一些性能限制,特别是当需要监视大量文件描述符时,性能会下降。
  2. pollpollselect的一种改进,也用于监视一组文件描述符的状态。与select不同的是,poll使用一个pollfd结构体数组来传递文件描述符信息,避免了select中使用大量的位图。因此,poll通常比select具有更好的性能。
  3. epollepoll是Linux特有的一种高级IO机制,引入了事件驱动的模型。它使用一个单独的文件描述符来管理需要监视的IO事件集合,通过epoll_ctl函数来注册事件,然后通过epoll_wait函数等待事件的发生。epoll相比于selectpoll具有更好的性能和扩展性,尤其在需要处理大量并发连接的服务器应用中表现突出。

多路转接之select

        多路转接中的select是Linux系统提供的一个用于监视多个文件描述符状态的机制。通过select系统调用,程序可以同时监视多个文件描述符(如套接字、管道等)的读、写和异常事件,并在这些事件发生时得到通知。下面将对select的接口进行详细的解释。

接口详解

select函数原型

#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);

参数说明

  • nfds:这是被监视的文件描述符的总数,通常设置为最大文件描述符加1。这是因为文件描述符是从0开始计数的,所以nfds应该比所有文件描述符集合中的最大值大1。
fd_set

        fd_set的定义如下:

    fd_set是一个用于管理文件描述符(file descriptor)的数据结构,在POSIX标准中定义。它通常用于在多路复用(如selectpoll等函数)中对文件描述符进行操作。fd_set的主要目的是允许程序同时监视多个文件描述符的状态,如可读、可写或异常状态。

  fd_set的通常的结构如下:

typedef struct fd_set {
    unsigned long fds_bits[FD_SETSIZE/(8*sizeof(unsigned long))];
} fd_set;

        其中,fds_bits是一个位数组,用于表示一系列文件描述符的状态。每个位代表一个文件描述符,如果该位被置为1,则表示相应的文件描述符处于可读、可写或异常等状态,否则表示不处于该状态。实际上就是一个位图结构,从而表示多个文件描述符。

        在fd_set中,文件描述符的状态是通过一系列的宏来设置的。这些宏包括:

  • void FD_ZERO(fd_set *set): 将指定的fd_set变量清零,初始化为空集。这是在对文件描述符集合进行设置前必须进行的操作,因为系统分配内存空间后通常并不作清空处理。
  • void FD_SET(int fd, fd_set *set): 将给定的文件描述符fd添加到指定的fd_set变量中。
  • void FD_CLR(int fd, fd_set *set): 从指定的fd_set变量中移除给定的文件描述符fd
  • int FD_ISSET(int fd, fd_set *set): 检查给定的文件描述符fd是否在指定的fd_set变量中,并检测其状态是否变化。当检测到fd状态发生变化时返回真(非零值),否则返回假(零)。

    FD_SETSIZE是系统指定的最大文件描述符数量,默认值为1024。这意味着一个fd_set通常能够表示的文件描述符数量受其限制。但请注意,这个值可以通过修改FD_SETSIZE宏来调整fd_set结构所能管理的最大文件描述符数量。

        在实际应用中,使用fd_set通常涉及以下步骤:

  1. 使用FD_ZERO初始化一个fd_set变量。
  2. 使用FD_SET将需要监视的文件描述符添加到fd_set中。
  3. 调用如select之类的函数来等待文件描述符的状态变化。
  4. 使用FD_ISSET来检查哪些文件描述符的状态已经变化,并据此进行相应的处理。

        select中与fd_set紧密相关的参数:

  • readfds:指向一个文件描述符集合的指针,这个集合中的文件描述符都是程序想要监视其读事件的。
  • writefds:指向一个文件描述符集合的指针,这个集合中的文件描述符都是程序想要监视其写事件的。
  • exceptfds:指向一个文件描述符集合的指针,这个集合中的文件描述符都是程序想要监视其异常事件的。
  • timeout:这是一个指向timeval结构的指针,用于指定select的等待时间。如果timeout为NULL,则select将无限期地等待直到有文件描述符就绪。timeval结构定义了两个字段:tv_sec(秒)和tv_usec(微秒),用于指定等待的时间长度。需要注意的是:他是一个输入输出型参数,结构体组成如下:
struct timeval {  
    time_t tv_sec;      // 秒  
    suseconds_t tv_usec; // 微秒  
};
  • tv_sec:表示从Epoch(1970年1月1日 00:00:00 UTC)到当前时间的秒数。
  • tv_usec:表示当前秒数内的微秒数,即秒后面的零头。

        当timeout参数为NULL时,select调用将无限期地等待,直到至少有一个文件描述符在指定的集合中变为就绪状态。这意味着,如果没有任何文件描述符在调用期间变为就绪,select将一直阻塞,直到被信号中断或发生其他异常。

        当struct timeval中的变量都设置为0,则表示select为非阻塞等待,任何的描述符只要有一个不就绪select就会出错返回。

        如果timeout参数不为NULL,则它指定了select调用的最大等待时间。当这个时间过去后,无论是否有文件描述符变为就绪,select都会返回。如果在这个时间段内,有文件描述符变为就绪,select将返回这些文件描述符的数量。如果在定时的期间有一个文件描述符就绪了,那么就会让timeval中变量变为剩余的时间。例如:设置tv_sec为5,tv_usec为0,而3秒后有一个文件描述符就绪了,那么timeval中的tv_sec会变为2,tv_usec不变。而如果在超时时间内没有文件描述符就绪,select将返回0,表示超时。需要注意的是,每次调用select时,你都需要重新设置timeout参数,因为select不会保留上次调用时的超时设置。也就是说,每次调用select时,你都需要创建一个新的timeval结构,并设置你想要的超时时间。否则,就会一直按照struct timeval中的变量都设置为0的情况。

返回值

  • 成功时,select返回就绪文件描述符的数量。这个数量可能包括读、写和异常事件的文件描述符总数。
  • 如果在指定的超时时间内没有文件描述符就绪,select将返回0。
  • 如果调用失败,select将返回-1,并设置相应的错误码。

工作原理

    select的工作原理是阻塞等待直到至少有一个文件描述符在指定的集合中变为就绪状态,或者直到超时。一旦有文件描述符就绪或者超时发生,select将返回,并通过修改传入的文件描述符集合来告知调用者哪些文件描述符已经就绪。

注意事项

  • 在使用select之前,程序必须使用宏(如FD_ZEROFD_SETFD_CLRFD_ISSET)来初始化和操作文件描述符集合。
  • select的一个潜在缺点是它只能监视有限数量的文件描述符(通常受限于系统定义的一个上限)。对于需要监视大量文件描述符的应用程序,可能需要使用其他机制,如pollepoll
  • select在文件描述符集合较大时可能效率较低,因为它需要遍历整个集合来检查哪些文件描述符已经就绪。

select使用实例

        通过简单的一个TCP服务器获取发来的信息:

Log.hpp

#pragma once

#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>

#define SIZE 1024

#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4

#define Screen 1
#define Onefile 2
#define Classfile 3

#define LogFile "log.txt"

class Log
{
public:
    Log()
    {
        printMethod = Screen;
        path = "./log/";
    }
    void Enable(int method)
    {
        printMethod = method;
    }
    std::string levelToString(int level)
    {
        switch (level)
        {
        case Info:
            return "Info";
        case Debug:
            return "Debug";
        case Warning:
            return "Warning";
        case Error:
            return "Error";
        case Fatal:
            return "Fatal";
        default:
            return "None";
        }
    }

    // void logmessage(int level, const char *format, ...)
    // {
    //     time_t t = time(nullptr);
    //     struct tm *ctime = localtime(&t);
    //     char leftbuffer[SIZE];
    //     snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
    //              ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
    //              ctime->tm_hour, ctime->tm_min, ctime->tm_sec);

    //     // va_list s;
    //     // va_start(s, format);
    //     char rightbuffer[SIZE];
    //     vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
    //     // va_end(s);

    //     // 格式:默认部分+自定义部分
    //     char logtxt[SIZE * 2];
    //     snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);

    //     // printf("%s", logtxt); // 暂时打印
    //     printLog(level, logtxt);
    // }
    void printLog(int level, const std::string &logtxt)
    {
        switch (printMethod)
        {
        case Screen:
            std::cout << logtxt << std::endl;
            break;
        case Onefile:
            printOneFile(LogFile, logtxt);
            break;
        case Classfile:
            printClassFile(level, logtxt);
            break;
        default:
            break;
        }
    }
    void printOneFile(const std::string &logname, const std::string &logtxt)
    {
        std::string _logname = path + logname;
        int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
        if (fd < 0)
            return;
        write(fd, logtxt.c_str(), logtxt.size());
        close(fd);
    }
    void printClassFile(int level, const std::string &logtxt)
    {
        std::string filename = LogFile;
        filename += ".";
        filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
        printOneFile(filename, logtxt);
    }

    ~Log()
    {
    }
    void operator()(int level, const char *format, ...)
    {
        time_t t = time(nullptr);
        struct tm *ctime = localtime(&t);
        char leftbuffer[SIZE];
        snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
                 ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
                 ctime->tm_hour, ctime->tm_min, ctime->tm_sec);

        va_list s;
        va_start(s, format);
        char rightbuffer[SIZE];
        vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
        va_end(s);

        // 格式:默认部分+自定义部分
        char logtxt[SIZE * 2];
        snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);

        // printf("%s", logtxt); // 暂时打印
        printLog(level, logtxt);
    }

private:
    int printMethod;
    std::string path;
};

Log lg;

// int sum(int n, ...)
// {
//     va_list s; // char*
//     va_start(s, n);

//     int sum = 0;
//     while(n)
//     {
//         sum += va_arg(s, int); // printf("hello %d, hello %s, hello %c, hello %d,", 1, "hello", 'c', 123);
//         n--;
//     }

//     va_end(s); //s = NULL
//     return sum;
// }

Socket.hpp

#pragma once

#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"

enum
{
    SocketErr = 2,
    BindErr,
    ListenErr,
};

// TODO
const int backlog = 10;

class Sock
{
public:
    Sock()
    {
    }
    ~Sock()
    {
    }

public:
    void Socket()
    {
        sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd_ < 0)
        {
            lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
            exit(SocketErr);
        }
        int opt = 1;
        setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
    }
    void Bind(uint16_t port)
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;

        if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
            exit(BindErr);
        }
    }
    void Listen()
    {
        if (listen(sockfd_, backlog) < 0)
        {
            lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
            exit(ListenErr);
        }
    }
    int Accept(std::string *clientip, uint16_t *clientport)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);
        if(newfd < 0)
        {
            lg(Warning, "accept error, %s: %d", strerror(errno), errno);
            return -1;
        }
        char ipstr[64];
        inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));
        *clientip = ipstr;
        *clientport = ntohs(peer.sin_port);

        return newfd;
    }
    bool Connect(const std::string &ip, const uint16_t &port)
    {
        struct sockaddr_in peer;
        memset(&peer, 0, sizeof(peer));
        peer.sin_family = AF_INET;
        peer.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));

        int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));
        if(n == -1) 
        {
            std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;
            return false;
        }
        return true;
    }
    void Close()
    {
        close(sockfd_);
    }
    int Fd()
    {
        return sockfd_;
    }

private:
    int sockfd_;
};

SelectServer1.hpp

#pragma once

#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"

using namespace std;

static const uint16_t defaultport = 8888;
static const int fd_num_max = (sizeof(fd_set) * 8);
int defaultfd = -1;

class SelectServer
{
public:
    SelectServer(uint16_t port = defaultport) : _port(port)
    {
        for (int i = 0; i < fd_num_max; i++)
        {
            fd_array[i] = defaultfd;
            // std::cout << "fd_array[" << i << "]" << " : " << fd_array[i] << std::endl;
        }
    }
    bool Init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        return true;
    }
    void Accepter()
    {
        // 我们的连接事件就绪了
        std::string clientip;
        uint16_t clientport = 0;
        int sock = _listensock.Accept(&clientip, &clientport); // 会不会阻塞在这里?不会
        if (sock < 0) return;
        lg(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);

        // sock -> fd_array[]
        int pos = 1;
        for (; pos < fd_num_max; pos++) // 第二个循环
        {
            if (fd_array[pos] != defaultfd)
                continue;
            else
                break;
        }
        if (pos == fd_num_max)
        {
            lg(Warning, "server is full, close %d now!", sock);
            close(sock);
        }
        else
        {
            fd_array[pos] = sock;
            PrintFd();
            // TODO
        }
    }
    void Recver(int fd, int pos)
    {
        // demo
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?
        if (n > 0)
        {
            buffer[n] = 0;
            cout << "get a messge: " << buffer << endl;
        }
        else if (n == 0)
        {
            lg(Info, "client quit, me too, close fd is : %d", fd);
            close(fd);
            fd_array[pos] = defaultfd; // 这里本质是从select中移除
        }
        else
        {
            lg(Warning, "recv error: fd is : %d", fd);
            close(fd);
            fd_array[pos] = defaultfd; // 这里本质是从select中移除
        }
    }
    void Dispatcher(fd_set &rfds)
    {
        for (int i = 0; i < fd_num_max; i++) // 这是第三个循环
        {
            int fd = fd_array[i];
            if (fd == defaultfd)
                continue;

            if (FD_ISSET(fd, &rfds))
            {
                if (fd == _listensock.Fd())
                {
                    Accepter(); // 连接管理器
                }
                else // non listenfd
                {
                    Recver(fd, i);
                }
            }
        }
    }
    void Start()
    {
        int listensock = _listensock.Fd();
        fd_array[0] = listensock;
        for (;;)
        {
            fd_set rfds;
            FD_ZERO(&rfds);

            int maxfd = fd_array[0];
            for (int i = 0; i < fd_num_max; i++) // 第一次循环
            {
                if (fd_array[i] == defaultfd)
                    continue;
                FD_SET(fd_array[i], &rfds);
                if (maxfd < fd_array[i])
                {
                    maxfd = fd_array[i];
                    lg(Info, "max fd update, max fd is: %d", maxfd);
                }
            }

            // accept?不能直接accept!检测并获取listensock上面的事件,新连接到来,等价于读事件就绪

            // struct timeval timeout = {1, 0}; // 输入输出,可能要进行周期的重复设置
            struct timeval timeout = {0, 0}; // 输入输出,可能要进行周期的重复设置
            // 如果事件就绪,上层不处理,select会一直通知你!
            // select告诉你就绪了,接下来的一次读取,我们读取fd的时候,不会被阻塞
            // rfds: 输入输出型参数。 1111 1111 -> 0000 0000
            int n = select(maxfd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);
            switch (n)
            {
            case 0:
                cout << "time out, timeout: " << timeout.tv_sec << "." << timeout.tv_usec << endl;
                break;
            case -1:
                cerr << "select error" << endl;
                break;
            default:
                // 有事件就绪了,TODO
                cout << "get a new link!!!!!" << endl;
                Dispatcher(rfds); // 就绪的事件和fd你怎么知道只有一个呢???
                break;
            }
        }
    }
    void PrintFd()
    {
        cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (fd_array[i] == defaultfd)
                continue;
            cout << fd_array[i] << " ";
        }
        cout << endl;
    }
    ~SelectServer()
    {
        _listensock.Close();
    }

private:
    Sock _listensock;
    uint16_t _port;
    int fd_array[fd_num_max];   // 数组, 用户维护的!
    // int wfd_array[fd_num_max];
};

mian.cc

#include "SelectServer1.hpp"
#include <iostream>
#include <memory>

int main()
{
    std::unique_ptr<SelectServer> svr(new SelectServer);
    svr->Init();
    svr->Start();

    return 0;
}

大致效果:

select的特点与缺点

特点:

        可监控的文件描述符个数取决与sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)= 512,每bit表示一个文件描述符,则我服务器上支持的最大文件描述符是512*8=4096.

        将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd,一是用于再select 返回后, array作为源数据和fd_set进行FD_ISSET判断。二是select返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数。

        备注: fd_set的大小可以调整,可能涉及到重新编译内核.


缺点:

        每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便.

        每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

        同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

        select支持的文件描述符数量太小

多路转接之poll

接口详解

    poll 是 Unix/Linux 系统编程中的一个 I/O 多路复用接口,它允许程序同时监视多个文件描述符(file descriptors)的状态变化,例如可读、可写或有异常。与 select 系统调用类似,poll 提供了非阻塞地等待多个文件描述符就绪的能力,但 poll 在某些情况下提供了更好的性能和扩展性。

函数原型

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数

  • struct pollfd *fds:指向 pollfd 结构体数组的指针,每个 pollfd 结构体代表一个要监视的文件描述符。
  • nfds_t nfdsfds 数组中的元素数量。
  • int timeout:等待事件发生的超时时间(以毫秒为单位)。如果 timeout 为 -1,则 poll 会一直等待,直到有事件发生或发生错误。如果 timeout 为 0,则 poll 会立即返回,不会等待。

pollfd 结构体

struct pollfd {
    int fd;          // 文件描述符
    short events;    // 感兴趣的事件
    short revents;   // 实际发生的事件
};

  • fd:要监视的文件描述符。
  • events:位掩码,表示我们感兴趣的事件。可能的值包括:
    • POLLIN:数据可读。
    • POLLOUT:数据可写。
    • POLLPRI:有优先数据可读(例如,带外数据)。
    • POLLERR:发生错误。
    • POLLHUP:挂起(例如,TCP 连接关闭)。
    • POLLNVAL:文件描述符无效。
  • revents:位掩码,由 poll 返回,表示实际发生的事件。在调用 poll 之前,应将其设置为 0。

返回值

  • 如果成功,返回就绪的文件描述符数量。
  • 如果出错,返回 -1,并设置 errno 以指示错误。

错误处理

        如果 poll 返回 -1,可以通过检查 errno 来确定出错的原因。可能的错误包括:

  • EBADF:一个或多个提供的文件描述符无效。
  • EINTR:调用被信号中断。
  • EINVALnfds 参数大于系统限制,或者 fds 指针无效。
  • ENOMEM:内存不足,无法完成操作。
  • 其他错误。

poll的使用实例

        通过基于poll使用TCP协议简单获取发过来的信息。

PollServer.hpp

#pragma once
#include <iostream>
#include "Socket.hpp"
#include <poll.h>
#include <sys/time.h>

using namespace std;

static uint16_t defaultport = 8888;
static const int fd_num_max = 64;
int defaultfd = -1;

class PollServer
{
public:
    PollServer(uint16_t port = defaultport) : _port(port)
    {
        for (int i = 0; i < fd_num_max; i++)
        {
            _event_fds[i].fd = defaultfd;
            _event_fds[i].events = POLLIN;
            _event_fds[i].revents = 0;
        }
    }

    bool Init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();
        _event_fds[0].fd = _listensock.Fd();

        return true;
    }

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;

        int sock=_listensock.Accept(&clientip, &clientport);
        if(sock<0)
        {
            lg(Error, "accept error");
            return;
        }

        lg(Info, "get a new link! %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);
        
        int pos=1;
        for(;pos<fd_num_max;pos++)
        {
            if(_event_fds[pos].fd==defaultfd)
            {
                break;
            }
        }

        if(pos==fd_num_max)
        {
            lg(Error, "too many links!,close %d now!",sock);
            close(sock);
        }else{
            _event_fds[pos].fd=sock;
            _event_fds[pos].events=POLLIN;
            _event_fds[pos].revents=0;
            PrintFd();
        }

    }

    void Recver(int fd, int pos)
    {
        char buf[1024];
        ssize_t n = read(fd, buf, sizeof(buf));
        if (n > 0)
        {
            buf[n]=0;
            cout << "recv data: " << buf << endl;
        }
        else if (n == 0)
        {
            lg(Info, "client quit,close fd : %d",fd);
            close(fd);
            _event_fds[pos].fd = defaultfd;
        }else
        {
            lg(Error, "recv error");
            close(fd);
            _event_fds[pos].fd = defaultfd;
        }
    }

    void Dispatcher()
    {
        for(int i=0;i<fd_num_max;i++)
        {
            int fd=_event_fds[i].fd;
            if(fd==defaultfd)
            {
                continue;
            }

            if(_event_fds[i].revents&POLLIN)
            {
                if(fd==_listensock.Fd())
                {
                    Accepter();
                }else{
                    Recver(fd,i);
                }
            }

        }
    }

    void Start()
    {
        int timeout = 3000;
        while (true)
        {
            int n = poll(_event_fds, fd_num_max, 0);
            switch (n)
            {
            case 0:
                //cout << "timeout" << endl;
                break;
            case -1:
                cout << "poll error" << endl;
                break;
            default:
                Dispatcher();
                break;
            }
        }
    }

    void PrintFd()
    {
        cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (_event_fds[i].fd == defaultfd)
                continue;
            cout << _event_fds[i].fd << " ";
        }
        cout << endl;
    }

    ~PollServer()
    {
        _listensock.Close();
    }

private:
    Sock _listensock;
    uint16_t _port;
    struct pollfd _event_fds[fd_num_max];
};

大致效果:

poll的优点和缺点

优点:

        不同与select使用三个位图来表示三个fdset的方式, poll使用一个pollfd的指针实现.

        pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便。
        poll并没有最大数量限制 (但是数量过大后性能也是会下降).当这个大小大到内存抗不下去了就不是poll的问题了,而是OS自身的问题。

缺点:

        poll中监听的文件描述符数目增多时和select函数一样, poll返回后,需要轮询pollfd来获取就绪的描述符.每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降 .
epoll是Linux内核为处理大批量文件描述符而作了改进的I/O事件处理接口。它不同于select和poll,在处理大量并发连接时,具有更高的效率和更好的性能。以下是epoll接口的详解:

多路转接之epoll

        epoll是Linux下的一种I/O多路复用机制,它相比传统的select和poll具有更高的效率和更好的扩展性。epoll是为处理大批量句柄而作了改进的poll,然而事实上真的是poll的改进吗?实际上的epoll在底层的结构以及使用方法同poll都是有着众多的不同的!!!
        以下是epoll接口的详细解释:

主要函数

  1. epoll_create(int size)
#include <sys/epoll.h>

int epoll_create(int size);
    • 功能:创建一个epoll对象,并返回该对象的文件描述符。
    • 参数size参数从Linux内核2.6.8版本起就被忽略,只要求size大于0即可。
    • 返回值:成功则返回epoll专用的文件描述符,失败返回-1。
    • 注意事项:创建好epoll句柄后,它会占用一个文件描述符。因此,在使用完epoll后,必须调用close()关闭,否则可能导致文件描述符被耗尽。
  1. epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    • 功能:操作控制epoll对象,主要涉及epoll红黑树上节点的一些操作,如添加节点、删除节点、修改节点事件。
    • 参数
      • epfd:通过epoll_create创建的epoll对象句柄。
      • op:操作类型,如添加、删除或修改。可以选择如下中的任何一个:

EPOLL_CTL_ADD

        • 功能:将文件描述符 fd 添加到 epoll 实例(由 epfd 指定)上,并关联 event 中指定的事件。
        • 细节:如果 fd 已经存在于 epoll 实例中,调用此操作将会失败(除非使用了 EPOLLONESHOT 标志)。

EPOLL_CTL_MOD

        • 功能:修改已添加到 epoll 实例中的文件描述符 fd 的事件。
        • 细节:如果 fd 不存在于 epoll 实例中,调用此操作将会失败。event 参数用于指定新的事件集合。

EPOLL_CTL_DEL

        • 功能:从 epoll 实例中删除文件描述符 fd。
        • 细节:如果 fd 不存在于 epoll 实例中,调用此操作将会失败。
      • fd:要监视的文件描述符。
      • event:指向epoll_event结构的指针,用于指定要监视的事件类型。
    • 返回值:成功返回0,失败返回-1。
  1. epoll_wait(int epfd, struct epoll_event events, int maxevents, int timeout)
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    • 功能:等待事件的发生。
    • 参数
      • epfd:epoll文件描述符。
      • events:指向epoll_event数组的指针,用于存储触发的事件。
      • maxevents:告诉内核这个events数组的大小,即最多能返回多少个事件。
      • timeout:等待的超时时间,如果为-1,则表示一直等待。
    • 返回值:返回触发事件的文件描述符数量,如果超时则返回0,出错则返回-1。

epoll_event结构

        以下是 struct epoll_event 的详细解释:

struct epoll_event {
    __uint32_t events;      /* Epoll events */
    epoll_data_t data;      /* User data variable */
};
  1. events
    • 这是一个 32 位的整数,用于表示事件的类型。它可以是一个或多个以下的标志(使用 OR 操作符 | 组合):
      • EPOLLIN:表示对应的文件描述符可以读(例如,套接字上有数据可读)。
      • EPOLLOUT:表示对应的文件描述符可以写。
      • EPOLLPRI:表示对应的文件描述符有紧急数据可读。
      • EPOLLERR:表示对应的文件描述符发生错误。
      • EPOLLHUP:表示对应的文件描述符被挂起。
      • EPOLLET:设置此标志后,epoll 将使用边缘触发(Edge Triggered)模式,而不是默认的水平触发(Level Triggered)模式。
      • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。
  1. data
    • 这是一个联合体(union),用于存储用户定义的数据。你可以用它来关联文件描述符和任何你需要的自定义数据。这样,当 epoll 返回事件时,你可以很容易地找到与该事件相关的上下文信息。
typedef union epoll_data {
    void        *ptr;
    int          fd;
    __uint32_t   u32;
    __uint64_t   u64;
} epoll_data_t;
    • ptr:你可以存储一个指向任何类型的指针。
    • fd:通常用于存储文件描述符。
    • u32u64:用于存储 32 位和 64 位的无符号整数。

epoll的原理

        当某一进程调用epoll_create方法时, Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关。大致的结构如下:

struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};

        其中成员struct rb_root rbr(也就是红黑树节点)我们可这样理解其结构:

struct rb_root rbr{
    int fd;
    uint32_t event;  
};

        内部包含着文件描述符fd和事件event。

        操作系统中会为我们维护一颗红黑树以及一个就绪队列。其中epoll_ctl的本质则是对红黑树中的节点进行操作,即:增、删、改。而上面我们提到的红黑树结构体即可以属于红黑树也可以属于就绪队列。一旦红黑树中的一个节点的某个文件描述符的某个事件就绪了,那么该事件就可以被添加到就绪队列中。其中就绪队列中每一个节点也是包含int fd;和uint32_t event;这两个成员的。而我们的epoll_wait则会以O(1)的时间复杂度去检测事件有没就绪(实际上就是判是否为空)。操作系统中还会为提供回调函数用于将发生的事件添加到就绪队列中,避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响。

        其中struct list_head rdlist成员(也就是就绪队列中的节点!!!)我们可这样理解其结构:

struct list_head {  
    struct list_head *next;  
    struct list_head *prev;  
};

        当有事件就绪了就可以通过该结构接入就绪队列!通过回调函数中的一些操作,如:强转类型然后找到eventpoll这个结构体中的某个成员来判断是否就绪,回调函数可以通过某个成员来判断是否处于就绪队列中!

        在epoll中,对于每一个事件,都会建立一个epitem结构体. 如下:

struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}

        当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1)。

总结一下, epoll的使用过程就是三部曲:

        调用epoll_create创建一个epoll句柄;

        调用epoll_ctl, 将要监控的文件描述符进行注册;

        调用epoll_wait, 等待文件描述符就绪

        大致的图示如下:

使用流程

  1. 创建epoll实例:使用epoll_create函数创建一个epoll实例,并获取其文件描述符。
  2. 注册文件描述符:使用epoll_ctl函数将需要监视的文件描述符添加到epoll实例中,并指定要监视的事件类型。
  3. 等待事件:使用epoll_wait函数等待事件的发生。该函数会阻塞程序执行,直到有事件发生或超时。
  4. 处理事件:根据epoll_wait返回的触发事件的文件描述符和事件类型,应用程序可以执行相应的操作。
  5. 更新或删除事件:在处理完一个事件后,可以使用epoll_ctl函数更新或删除epoll实例中的文件描述符的监视状态。

epoll工作方式

        epoll主要有两种工作模式:边缘触发(Edge-Triggered)和水平触发(Level-Triggered)。

  1. 边缘触发模式(EPOLLET):
  • 在此模式下,epoll仅在文件描述符状态发生变化时通知应用程序。具体来说,当文件描述符从未就绪状态变为就绪状态时,epoll会触发一次事件通知,并且仅通知一次,即使文件描述符仍然处于就绪状态。这种模式要求应用程序完全处理每个事件,确保不会遗漏任何事件。
  1. 水平触发模式(EPOLLIN):
  • 在此模式下,epoll会持续通知应用程序文件描述符处于就绪状态,直到应用程序读取了所有可用的数据或写入了所有数据。如果应用程序没有完全处理就绪状态,下次epoll_wait函数调用时仍然会通知该文件描述符处于就绪状态。这种模式需要应用程序自行控制事件的处理。

epoll的使用实例

        通过基于epoll使用TCP协议简单获取发过来的信息,并且对其进行简单的处理后发回。

nocopy.hpp

        这个类的作用主要是禁止拷贝如下对拷贝构造以及赋值运算符都禁用了。

#pragma once

class nocopy{
public:
    nocopy() = default;
    nocopy(const nocopy& cp)=delete;
    const nocopy& operator=(const nocopy& cp)=delete;
};

Epoller.hpp

        对epol进行了简单的封装。

#pragma once
#include "Log.hpp"
#include "nocopy.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>

class Epoller : public nocopy
{

    static const int size = 128;

public:
    Epoller()
    {
        _epfd = epoll_create(size);
        if (_epfd == -1)
        {
            lg(Error, "epoll_create1 error");
        }
        else
        {
            lg(Info, "epoll_create1 success! _epfd:%d", _epfd);
        }
    }

    int EpollerWait(struct epoll_event *revents, int num, int timeout = 3000) // struct epoll_event revents[]?
    {
        int n = epoll_wait(_epfd, revents, num, timeout);
        return n;
    }

    int EpollerUpdate(int oper, int sock, uint32_t event)
    {
        int n = 0;
        if (oper == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epfd, oper, sock, nullptr);
            if (n == -1)
            {
                lg(Error, "epoll_ctl del error");
            }
            else
            {
                lg(Info, "epoll_ctl del success! sock:%d", sock);
            }
        }
        else
        {
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock;
            n = epoll_ctl(_epfd, oper, sock, &ev);
            if (n == -1)
            {
                lg(Error, "epoll_ctl add error");
            }
            else
            {
                lg(Info, "epoll_ctl add success! sock:%d", sock);
            }
            return n;
        }
    }

    ~Epoller()
    {
        if (_epfd > 0)
            close(_epfd);
    }

private:
    int _epfd;
    int _timeout{3000};
};

EpollServer.hpp

        真正实现的EpollServer服务器。

#pragma once
#include "Socket.hpp"
#include "Log.hpp"
#include "Epoller.hpp"
#include "nocopy.hpp"
#include <memory>

uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);
static const uint16_t defaultport = 8888;

class EpollServer : public nocopy
{

    static const int num = 64;

public:
    EpollServer(uint16_t port = defaultport)
        : _listsocket_ptr(new Sock()),
          _epoller_ptr(new Epoller()),
          _port(port)
    {
    }

    void Init()
    {
        _listsocket_ptr->Socket();
        _listsocket_ptr->Bind(_port);
        _listsocket_ptr->Listen();

        lg(Info, "create listen socket success: %d\n", _listsocket_ptr->Fd());
    }

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int connfd = _listsocket_ptr->Accept(&clientip, &clientport);
        if (connfd < 0)
        {
            lg(Error, "accept error");
            return;
        }
        _epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, connfd, EVENT_IN);
        lg(Info, "accept success: %d get a new link! client info@ %s:%d", connfd, clientip.c_str(), clientport);
    }

    void Recver(int fd)
    {
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "get a messge: " << buffer << std::endl;
            // write
            std::string echo_str = "server echo $ ";
            echo_str += buffer;
            write(fd, echo_str.c_str(), echo_str.size());
        }else if (n == 0)
        {
            lg(Info, "client quit, me too, close fd is : %d", fd);
            //细节3
            _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
            close(fd);
        }
        else
        {
            lg(Warning, "recv error: fd is : %d", fd);
            _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
            close(fd);
        }
    }

    void Dispatcher(struct epoll_event revs[], int num)
    {
        for (int i = 0; i < num; i++)
        {
            int fd = revs[i].data.fd;
            uint32_t events = revs[i].events;
            if (events & EVENT_IN)
            {
                if (fd == _listsocket_ptr->Fd())
                {
                    // 有新连接
                    Accepter();
                }
                else
                {
                    // 有数据可读
                    Recver(fd);
                }
            }
        }
    }

    void Start()
    {
        _epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, _listsocket_ptr->Fd(), EVENT_IN);
        struct epoll_event revs[num];
        while(true)
        {
            int n = _epoller_ptr->EpollerWait(revs, nu

相关文章