Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor)

时间:2024-01-23 15:34:07

文章目录

  • 1、完善Epoll简单服务器
  • 2、打造统一的分开处理的体系
  • 3、epoll工作模式
  • 4、ET模式
  • 5、继续完善,处理写事件
  • 6、引入自定义协议,处理写事件


本篇基于上篇代码继续改进,很长。关于Reactor的说明在后一篇

1、完善Epoll简单服务器

上面的代码在处理读事件时,用的request数组是临时的,如果有数据没读完,那么下次再来到这里,就没有这些数据了。所以得让每一个fd都有自己的缓冲区。建立一个Connection类,然后有一个map结构,让这个类和每个fd建立映射。Start函数改一下,不管超时还是出错,就只处理数据,处理的部分交给HandlerEvent,改名成LoopOnce,也就是说,Start那里还是有循环,每次循环都去执行L函数,L函数用Wait提取一次,然后处理。

    void Start()
    {
        //1、将listensock添加到epoll中,要先有epoll模型
        bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);//只关心读事件
        assert(r);//可以做别的判断
        (void)r;
        struct epoll_event revs_[gnum];
        int timeout = 1000;
        while(true)
        {
            LoopOnce(timeout);
        }
    }

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_.Accept(&clientip, &clientport);
        if (sock < 0) return ;
        logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
        // 还不能recv,即使有了连接但也不知道有没有数据
        // 只有epoll知道具体情况,所以将sock添加到epoll中
        bool r = epoller_.AddEvent(sock, EPOLLIN);
        assert(r);
        (void)r;
    }

    void Recver(int fd)
    {
        char request[1024];
        ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
        if (s > 0)
        {
            request[s - 1] = 0; // 对打印格式
            request[s - 2] = 0; // 做一下调整
            std::string response = func_(request);
            send(fd, response.c_str(), response.size(), 0);
        }
        else
        {
            if (s == 0)
                logMessage(Info, "client quit ...");
            else
                logMessage(Warning, "recv error, client quit...");
            close(fd);
            // 将文件描述符移除
            // 在处理异常的时候,fd必须合法才能被处理
            epoller_.DelEvent(fd);
        }
    }

    void LoopOnce(int timeout)
    {
        int n = epoller_.Wait(revs_, gnum, timeout);
        for(int i = 0; i < n; i++)
        {
            int fd = revs_[i].data.fd;
            uint32_t events = revs_[i].events;
            logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");
            if(events & EPOLLIN)//判断读事件就绪
            {
                if (fd == listensock_.Fd())
                {
                    // 1、新连接到来
                    Accepter();
                }
                else
                {
                    // 2、读事件
                    Recver(fd);
                }
            }
        }
    }
class Connection
{
public:
    Connection(int fd): fd_(fd)
    {}
     
    ~Connection()
    {}
public:
    int fd_;
    std::string inbuffer_;
    std::string outbuffer_;
};

std::unordered_map<int, Connection*> connections_;

把Start的初始化任务交给InitServer

    void InitServer()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
        epoller_.Create();
        logMessage(Debug, "init server success");
        //为listensock创建对应的connection对象
        Connection* conn = new Connection(listensock_.Fd());
        //将listensock和connection对象添加到connections_
        connections_.insert(std::pair<int, Connection*>(listensock_.Fd(), conn));
        //将listensock添加到epoll中
        bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);
        assert(r);
        (void)r;
    }

    void Start()
    {
        struct epoll_event revs_[gnum];
        int timeout = 1000;
        while(true)
        {
            LoopOnce(timeout);
        }
    }

同样地,Accepter有添加到epoll的fd也要映射上自己的Connection类,Recver那里就可以也改一下了

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_.Accept(&clientip, &clientport);
        if (sock < 0) return ;
        logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
        // 还不能recv,即使有了连接但也不知道有没有数据
        // 只有epoll知道具体情况,所以将sock添加到epoll中        
        Connection* conn = new Connection(sock);      
        connections_.insert(std::pair<int, Connection*>(sock, conn));
        bool r = epoller_.AddEvent(sock, EPOLLIN);
        assert(r);
        (void)r;
    }

    void Recver(int fd)
    {
        char request[1024];
        ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
        if (s > 0)
        {
            request[s - 1] = 0; // 对打印格式
            request[s - 2] = 0; // 做一下调整
            connections_[fd]->inbuffer_ += request;
            std::string response = func_(request);
            send(fd, response.c_str(), response.size(), 0);
        }
        else
        {
            if (s == 0)
                logMessage(Info, "client quit ...");
            else
                logMessage(Warning, "recv error, client quit...");
            close(fd);
            // 将文件描述符移除
            // 在处理异常的时候,fd必须合法才能被处理
            epoller_.DelEvent(fd);
        }
    }

所有就绪的fd,不只包含我们关心的fd,都要有Connection类。Accepter那里,得到连接后,获取套接字,不直接读取,因为不知道是否有数据,就交给epoll,不过获取套接字后,每个套接字都需要正确读取自己的报文,所以Connection有了两个buffer。

所有就绪的fd,不仅要有Connection类,还要被epoll管理。但这样的代码并不高效,删除的时候要从epoll里删,还要从connections_里删,且代码也不够简洁。

封装并修改一下形式

class Connection
{
public:
    Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
    : fd_(fd), clientip_(clientip), clientport_(clientport)
    {}
     
    ~Connection()
    {}
public:
    int fd_;
    std::string inbuffer_;
    std::string outbuffer_;
    std::string clientip_;
    uint16_t clientport_;
};

//...

    void InitServer()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
        epoller_.Create();
        //为listensock创建对应的connection对象
        //将listensock和connection对象添加到connections_       
        //将listensock添加到epoll中
        AddConnection(listensock_.Fd(), EPOLLIN);
        logMessage(Debug, "init server success");
    }

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //1、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);
        connections_.insert(std::pair<int, Connection*>(fd, conn));
        //2、fd和events写到内核中
        bool r = epoller_.AddEvent(fd, events);
        assert(r);
        (void)r;
        logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
    }

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_.Accept(&clientip, &clientport);
        if (sock < 0) return ;
        logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
        // 还不能recv,即使有了连接但也不知道有没有数据
        // 只有epoll知道具体情况,所以将sock添加到epoll中        
        AddConnection(sock, EPOLLIN, clientip, clientport);
    }

    void Recver(int fd)
    {
        char request[1024];
        ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
        if (s > 0)
        {
            request[s - 1] = 0; // 对打印格式
            request[s - 2] = 0; // 做一下调整
            connections_[fd]->inbuffer_ += request;
            std::string response = func_(request);
            send(fd, response.c_str(), response.size(), 0);
        }
        else
        {
            if (s == 0)
                logMessage(Info, "client quit ...");
            else
                logMessage(Warning, "recv error
            // 在处理异常的时候,fd必须合法才能被处理
            epoller_.DelEvent(fd);
        }
    }

2、打造统一的分开处理的体系

现有的Accepter、Recver都是处理写事件的,LoopOnce那里可以加个读事件的判断,但相关的处理函数要怎么写?为了简便,这里再引入回调函数。

const static int gport = 8888;
class Connection;

using func_t = std::function<std::string (std::string)>;
using callback_t = std::function<void(Connection*)>;

class Connection
{
public:
    Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
    : fd_(fd), clientip_(clientip), clientport_(clientport)
    {}
     
    void Register(callback_t recver, callback_t sender, callback_t excepter)
    {
        recver_ = recver;
        sender_ = sender;
        excepter_ = excepter;
    }

    ~Connection()
    {}
public:
    //IO信息
    int fd_;
    std::string inbuffer_;
    std::string outbuffer_;
    //IO处理
    callback_t recver_;
    callback_t sender_; 
    callback_t excepter_;
    //用户信息
    std::string clientip_;
    uint16_t clientport_;
};

Register为注册方法,也就是要使用的方法。在AddConnection函数中,要判断一下,是我们关心的和不是我们关心的,都调用注册方法,但传的参数不一样。

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //2、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);
        if(fd == listensock_.Fd())
        {
            conn->Register();
        }
        else
        {
            conn->Register();
        }
        connections_.insert(std::pair<int, Connection*>(fd, conn));
        //3、fd和events写到内核中
        bool r = epoller_.AddEvent(fd, events);
        assert(r);
        (void)r;
        logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
    }

Accepter那里,里面有AddConnection函数。当LoopOnce调用Accepter时,这个函数也要用回调函数,这样就是一个类的成员函数要调用另一个类的回调函数。

    void Accepter(Connection* conn)
    {
        (void) conn;//先闲置不用
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_.Accept(&clientip, &clientport);
        if (sock < 0) return ;
        logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
        // 还不能recv,即使有了连接但也不知道有没有数据
        // 只有epoll知道具体情况,所以将sock添加到epoll中        
        AddConnection(sock, EPOLLIN, clientip, clientport);
    }

AddConnection中,Regsiter三个参数都是callback_t类型的,我们可以这样写

        if(fd == listensock_.Fd())
        {
            conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
        }

这样设置,当我们关心的套接字上有事件就绪时,读方法就绑定Accepter。是其它套接字的话

        else
        {
            conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),
                           std::bind(&EpollServer::Sender, this, std::placeholders::_1),
                           std::bind(&EpollServer::Excepter, this, std::placeholders::_1));
        }
    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //2、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);
        if(fd == listensock_.Fd())
        {
            conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
        }
        else
        {
            conn->Register(std::bind(