Socket编程实践(11) --epoll原理与封装

时间:2022-09-01 18:45:10

常用模型的特点

Linux 下设计并发网络程序,有典型的Apache模型(Process Per Connection,PPC), TPC(Thread Per Connection)模型,以及 select/polL模型和epoll模型。

1 、PPC/TPC 模型

这两种模型思想类似,就是让每一个到来的连接一边自己做事去,别再来烦我(详见本系列博客).只是 PPC 是为它开了一个进程,而 TPC 开了一个线程。可是别烦我是有代价的,它要时间和空间啊,连接多了之后,那么多的进程/线程切换,这开销就上来了;因此这类模型能接受的最大连接数都不会高,一般在几百个左右。

2 、select 模型

1) 最大并发数限制,因为一个进程所打开的 FD (文件描述符)是有限制的,由 FD_SETSIZE 设置,默认值是 1024,因此 Select 模型的最大并发数就被相应限制了。自己改改这个 FD_SETSIZE ?想法虽好,可是先看看下面吧 …

2) 效率问题, select 每次调用都会线性扫描全部的 FD 集合,这样效率就会呈现线性下降,把 FD_SETSIZE 改大的后果就是,大家都慢慢来,什么?都超时了??!!

3) 内核/用户空间内存拷贝问题,如何让内核把 FD 消息通知给用户空间呢?在这个问题上 select 采取了内存拷贝方法。

3、 poll 模型

基本上效率和 select 是相同的, select 缺点的 2 和 3 它都没有改掉。

Epoll 的提升

1. Epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于 2048, 一般来说这个数目和系统内存关系很大 ,具体数目可以 cat /proc/sys/fs/file-max[599534] 察看。

2. 效率提升, Epoll最大的优点就在于它只管你“活跃”的连接 ,而跟连接总数无关,因此在实际的网络环境中, Epoll的效率就会远远高于 select 和 poll 。

3. 内存拷贝, Epoll 在这点上使用了“共享内存(详见本系列其他博客)”,这个内存拷贝也省略了。

epoll的使用

epoll的接口非常简单,一共就3/4个函数:

int epoll_create(int size);
int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

1. 对于epoll_create1 的flag参数: 可以设置为0 或EPOLL_CLOEXEC,为0时函数表现与epoll_create一致, EPOLL_CLOEXEC标志与open 时的O_CLOEXEC 标志类似,即进程被替换时会关闭打开的文件描述符(需要注意的是,epoll_create与epoll_create1当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/<pid>/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽)。

2. 对于epoll_ctl, op参数表示动作,用三个宏来表示:

EPOLL_CTL_ADD

注册新的fd到epfd中

EPOLL_CTL_DEL

从epfd中删除一个fd

EPOLL_CTL_MOD

修改已经注册的fd的监听事件

3. 对于epoll_wait:

events:结构体指针, 一般是一个数组

maxevents:事件的最大个数, 或者说是数组的大小

timeout:超时时间, 含义与poll的timeout参数相同,设为-1表示永不超时;

4. epoll_event结构体

struct epoll_event
{
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};
typedef union epoll_data
{
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

一般data 共同体我们设置其成员fd即可,也就是epoll_ctl 函数的第三个参数。

events集合

EPOLLIN

表示对应的文件描述符可以读(包括对端SOCKET正常关闭)

EPOLLOUT

表示对应的文件描述符可以写

EPOLLPRI

表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)

EPOLLERR

表示对应的文件描述符发生错误

EPOLLHUP

表示对应的文件描述符被挂断

EPOLLET

将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的

EPOLLONESHOT

只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

/**示例: epoll使用示例
 	注:client端与测试端与前同, 而且使用相同的测试端测试select/poll/epoll, 可以发现epoll的效率是非常高的**/

//添加fd到epoll
void addFd(int epollfd, int fd, const uint32_t &events = EPOLLIN, bool et = false)
{
    struct epoll_event event;
    event.events = events;
    if (et)
        event.events |= EPOLLET;
    event.data.fd = fd;
    if( epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1 )
        err_exit("epoll_ctl_add error");
}
//从epoll删除fd
void delFd(int epollfd, int fd)
{
    struct epoll_event event;
    event.data.fd = fd;
    if( epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event) == -1 )
        err_exit("epoll_ctl_del error");
}

int main()
{
    signal(SIGPIPE, sigHandlerForSigPipe);
    try
    {
        TCPServer server(8001);
        int listenfd = server.getfd();
        int epollfd = epoll_create1(EPOLL_CLOEXEC);
        if (epollfd == -1)
            err_exit("epoll_create1 error");
        // 将监听套接字注册到epoll
        addFd(epollfd, listenfd, EPOLLIN, true);

        // 用于保存epoll_wait返回事件数组
        std::vector<struct epoll_event> events(16);
        char buf[BUFSIZ];
        int count = 0;
        while (true)
        {
            // 等待epoll返回
            int nReady = epoll_wait(epollfd, &*events.begin(), (int)events.size(), -1);
            if (nReady == -1)
            {
                if (errno == EINTR)
                    continue;
                err_exit("epoll_wait error");
            }
            if ((size_t)nReady == events.size())
                events.resize(events.size()*2);

            for (int i = 0; i < nReady; ++i)
            {
                // 如果是监听套接字发送了可读事件
                if (events[i].data.fd == listenfd)
                {
                    int connectfd = accept(listenfd, NULL, NULL);
                    if (connectfd == -1)
                        err_exit("accept error");
                    cout << "accept success..." << endl;
                    cout << "count = " << ++count << endl;
                    setUnBlock(connectfd, true);
                    addFd(epollfd, connectfd, EPOLLIN, true);
                }
                // 如果是已连接套接字发生了可读事件
                else if (events[i].events & EPOLLIN)
                {
                    int connectfd = events[i].data.fd;
                    if (connectfd < 0)
                        continue;

                    memset(buf, 0, sizeof(buf));
                    int ret = readline(connectfd, buf, sizeof(buf)-1);
                    if (ret == -1)
                        err_exit("read-line error");
                    // 如果对端关闭
                    else if (ret == 0)
                    {
                        cerr << "client connect closed..." << endl;
                        // 将该套接字同epoll中移除
                        delFd(epollfd, connectfd);
                        close(connectfd);
                        continue;
                    }
                    cout << buf;
                    writen(connectfd, buf, strlen(buf));
                }
            }
        }
    }
    catch (const SocketException &e)
    {
        cerr << e.what() << endl;
        err_exit("TCPServer error");
    }
}

小结-epoll与select、poll的区别

1.相比于select与poll, epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。

因为内核中select/poll的实现是采用轮询来处理的, 因此他们检测就绪实践的算法时间复杂度是O(N), 因此, 需要轮询的fd数目越多, 自然耗时越多, 他们的性能呈线性甚至指数的方式下降。

而epoll的实现是基于事件回调的,如果fd有期望的事件发生就通过回调函数将其加入epoll就绪队列中,也就是说它只关心“活跃”的fd,与fd数目无关 其算法时间复杂度为O(1)。

2. 内核空间与用户空间内存拷贝问题,如何让内核把 fd消息通知给用户空间呢?在这个问题上select/poll采取了内存拷贝方法。而epoll采用了内核和用户空间共享内存的方式。

3. epoll不仅会告诉应用程序有I/0 事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,因此根据这些信息应用程序就能直接定位到事件,而不必遍历整个fd集合。而select/poll模型,当有 I/O 事件到来时, select/poll通知应用程序有事件到达,而应用程序必须轮询所有的fd集合,测试每个fd是否有事件发生,并处理事件。

4. 当活动连接比较多的时候, epoll_wait的效率就未必比select/poll高了, 因为这时候对于epoll 来说一直在调用callback 函数, 回调函数被触发得过于频繁, 所以epoll_wait适用于连接数量多, 但活动连接少的情况;

ET/LT模式

1、EPOLLLT:完全靠Linux-kernel-epoll驱动,应用程序只需要处理从epoll_wait返回的fds, 这些fds我们认为它们处于就绪状态。此时epoll可以认为是更快速的poll。

2、EPOLLET:此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关注这个fd的任何状态信息(从epoll队列移除), 直到应用程序通过读写操作(非阻塞)触发EAGAIN状态,epoll认为这个fd又变为空闲状态,那么epoll又重新关注这个fd的状态变化(重新加入epoll队列)。 随着epoll_wait的返回,队列中的fds是在减少的,所以在大并发的系统中,EPOLLET更有优势,但是对程序员的要求也更高,因为有可能会出现数据读取不完整的问题,举例如下:

假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发ET,那么这个fd变成就绪状态就会从epoll 队列移除,则epoll_wait 会一直阻塞,忽略尚未读取的1k数据; 而如果是水平触发LT,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。

因此总结来说: LT模式可能触发的次数更多, 一旦触发的次数多, 也就意味着效率会下降; 但这样也不能就说LT模式就比ET模式效率更低, 因为ET的使用对编程人员提出了更高更精细的要求, 一旦编程人员水平达不到(比如本人), 那ET模式还不如LT模式;

Epoll-Class封装

在本部分我们实现一个较为好用实用的Epoll并发类, 由于实现代码与使用方式较简单, 因此就不在此赘述了, 下面我还使用了该类实现了一个基于Epoll的echo-server, 以演示该类的用法;

由于此处仅为Epoll类库的第一个版本, 因此错误之处必然会存在, 如果读者在阅读的过程中发现了该类库的BUG, 还望这篇博客的读者朋友不吝赐教; 而作者也会不断的更新该类库(主要更新代码我会发布到此处), 以处理新的业务需求;

Epoll类设计

class Epoll
{
public:
    Epoll(int flags = EPOLL_CLOEXEC, int noFile = 1024);
    ~Epoll();

    void addfd(int fd, uint32_t events = EPOLLIN, bool ETorNot = false);
    void modfd(int fd, uint32_t events = EPOLLIN, bool ETorNot = false);
    void delfd(int fd);
    int wait(int timeout = -1);
    int getEventOccurfd(int eventIndex) const;
    uint32_t getEvents(int eventIndex) const;

public:
    bool isValid()
    {
        if (m_epollfd == -1)
            return false;
        return true;
    }
    void close()
    {
        if (isValid())
        {
            :: close(m_epollfd);
            m_epollfd = -1;
        }
    }

private:
    std::vector<struct epoll_event> events;
    int m_epollfd;
    int fdNumber;
    int nReady;
private:
    struct epoll_event event;
};

Epoll类实现

/** epoll_create **/
Epoll::Epoll(int flags, int noFile) : fdNumber(0), nReady(0)
{
    struct rlimit rlim;
    rlim.rlim_cur = rlim.rlim_max = noFile;
    if ( ::setrlimit(RLIMIT_NOFILE, &rlim) == -1 )
        throw EpollException("setrlimit error");

    m_epollfd = ::epoll_create1(flags);
    if (m_epollfd == -1)
        throw EpollException("epoll_create1 error");
}
Epoll::~Epoll()
{
    this -> close();
}
/** epoll_ctl **/
void Epoll::addfd(int fd, uint32_t events, bool ETorNot)
{
    bzero(&event, sizeof(event));
    event.events = events;
    if (ETorNot)
        event.events |= EPOLLET;
    event.data.fd = fd;
    if( ::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &event) == -1 )
        throw EpollException("epoll_ctl_add error");
    ++ fdNumber;
}
void Epoll::modfd(int fd, uint32_t events, bool ETorNot)
{
    bzero(&event, sizeof(event));
    event.events = events;
    if (ETorNot)
        event.events |= EPOLLET;
    event.data.fd = fd;
    if( ::epoll_ctl(m_epollfd, EPOLL_CTL_MOD, fd, &event) == -1 )
        throw EpollException("epoll_ctl_mod error");
}
void Epoll::delfd(int fd)
{
    bzero(&event, sizeof(event));
    event.data.fd = fd;
    if( ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &event) == -1 )
        throw EpollException("epoll_ctl_del error");
    -- fdNumber;
}
/** epoll_wait **/
int Epoll::wait(int timeout)
{
    events.resize(fdNumber);
    while (true)
    {
        nReady = epoll_wait(m_epollfd, &*events.begin(), fdNumber, timeout);
        if (nReady == 0)
            throw EpollException("epoll_wait timeout");
        else if (nReady == -1)
        {
            if (errno == EINTR)
                continue;
            else  throw EpollException("epoll_wait error");
        }
        else
            return nReady;
    }
    return -1;
}

int Epoll::getEventOccurfd(int eventIndex) const
{
    if (eventIndex > nReady)
        throw EpollException("parameter(s) error");
    return events[eventIndex].data.fd;
}
uint32_t Epoll::getEvents(int eventIndex) const
{
    if (eventIndex > nReady)
        throw EpollException("parameter(s) error");
    return events[eventIndex].events;
}

使用Epoll的echoserver(测试)代码:

int main()
{
signal(SIGPIPE, SIG_IGN);
    /**
    将下面的这两个变量设置成为放在程序的开头,
    只是因为这样可以使得业务处理部分的代码显
    得简洁一些,在实际应用(C++)中,没必要也不
    推荐这样使用
    **/
    char buf[BUFSIZ];
    int clientCount = 0;
    try
    {
        TCPServer server(8001);
        int listenfd = server.getfd();
        Epoll epoll;
        // 将监听套接字注册到epoll
        epoll.addfd(server.getfd(), EPOLLIN, true);
        while (true)
        {
            int nReady = epoll.wait();
            for (int i = 0; i < nReady; ++i)
                // 如果是监听套接字发生了可读事件
                if (epoll.getEventOccurfd(i) == listenfd)
                {
                    int connectfd = accept(listenfd, NULL, NULL);
                    if (connectfd == -1)
                        err_exit("accept error");
                    cout << "accept success..." << endl;
                    cout << "clientCount = " << ++ clientCount << endl;
                    setUnBlock(connectfd, true);
                    epoll.addfd(connectfd, EPOLLIN, true);
                }
                else if (epoll.getEvents(i) & EPOLLIN)
                {
                    TCPClient *client = new TCPClient(epoll.getEventOccurfd(i));
                    memset(buf, 0, sizeof(buf));
                    if (client->read(buf, sizeof(buf)) == 0)
                    {
                        cerr << "client connect closed..." << endl;
                        // 将该套接字从epoll中移除
                        epoll.delfd(client->getfd());
                        delete client;
                        continue;
                    }
                    cout << buf;
                    client->write(buf);
                }
        }
    }
    catch (const SocketException &e)
    {
        cerr << e.what() << endl;
        err_exit("TCPServer error");
    }
    catch (const EpollException &e)
    {
        cerr << e.what() << endl;
        err_exit("Epoll error");
    }
}

完整源代码请参照:

http://download.csdn.net/detail/hanqing280441589/8492911

Socket编程实践(11) --epoll原理与封装的更多相关文章

  1. C&num; socket编程实践

    C# socket编程实践——支持广播的简单socket服务器   在上篇博客简单理解socket写完之后我就希望写出一个websocket的服务器了,但是一路困难重重,还是从基础开始吧,先搞定C# ...

  2. Socket编程实践&lpar;6&rpar; --TCP服务端注意事项

    僵尸进程处理 1)通过忽略SIGCHLD信号,避免僵尸进程 在server端代码中添加 signal(SIGCHLD, SIG_IGN); 2)通过wait/waitpid方法,解决僵尸进程 sign ...

  3. Socket编程实践&lpar;6&rpar; --TCPNotes服务器

    僵尸进程过程 1)通过忽略SIGCHLD信号,避免僵尸进程 在server端代码中加入 signal(SIGCHLD, SIG_IGN); 2)通过wait/waitpid方法.解决僵尸进程 sign ...

  4. &lbrack;&period;net 面向对象编程基础&rsqb; &lpar;11&rpar; 面向对象三大特性——封装

    [.net 面向对象编程基础] (11) 面向对象三大特性——封装 我们的课题是面向对象编程,前面主要介绍了面向对象的基础知识,而从这里开始才是面向对象的核心部分,即 面向对象的三大特性:封装.继承. ...

  5. Socket编程实践&lpar;10&rpar; --select的限制与poll的使用

    select的限制 用select实现的并发服务器,能达到的并发数一般受两方面限制: 1)一个进程能打开的最大文件描述符限制.这可以通过调整内核参数.可以通过ulimit -n(number)来调整或 ...

  6. C&num; socket编程实践——支持广播的简单socket服务器

    在上篇博客简单理解socket写完之后我就希望写出一个websocket的服务器了,但是一路困难重重,还是从基础开始吧,先搞定C# socket编程基本知识,写一个支持广播的简单server/clie ...

  7. Socket编程实践&lpar;2&rpar; Socket API 与 简单例程

    在本篇文章中,先介绍一下Socket编程的一些API,然后利用这些API实现一个客户端-服务器模型的一个简单通信例程.该例子中,服务器接收到客户端的信息后,将信息重新发送给客户端. socket()函 ...

  8. Socket编程实践&lpar;1&rpar; 基本概念

    1. 什么是socket socket可以看成是用户进程与内核网络协议栈的编程接口.TCP/IP协议的底层部分已经被内核实现了,而应用层是用户需要实现的,这部分程序工作在用户空间.用户空间的程序需要通 ...

  9. Socket编程实践&lpar;7&rpar; --Socket-Class封装&lpar;改进版v2&rpar;

    本篇博客定义一套用于TCP通信比较实用/好用Socket类库(运用C++封装的思想,将socket API尽量封装的好用与实用), 从开发出Socket库的第一个版本以来, 作者不知道做了多少改进,  ...

随机推荐

  1. Visaul Studio 常用快捷键的动画演示

    从本篇文章开始,我将会陆续介绍提高 VS 开发效率的文章,欢迎大家补充~ 在进行代码开发的时候,我们往往会频繁的使用键盘.鼠标进行协作,但是切换使用两种工具会影响到我们的开发速度,如果所有的操作都可以 ...

  2. iOS学习17之OC内存管理

    1.内存管理的方式 1> iOS应用程序出现Crash(闪退),90%的原因是因为内存问题. 2> 内存问题 野指针异常:访问没有所有权的内存,如果想要安全的访问,必须确保空间还在 内存泄 ...

  3. 淮安团购网美团联盟网赚版 v5&period;7

    淮安团购网,主要是利用美团联盟的hao123版API大家可以注册http://union.meituan.com获取api 核心采用dede5.7所以在安装上没有大的问题,安装好后后台恢复备份就可以了 ...

  4. Eclipse关闭项目

    Eclipse 关闭项目 为什么要关闭项目? Eclipse 工作空间包含了多个项目.一个项目可以是关闭或开启状态. 项目打开过多影响有: 消耗内存 占用编译时间:在删除项目.class 文件(Cle ...

  5. UEP-保存

    uep的保存操作分为ajaxgrid和ajaxform两种方式 1.ajaxgrid public void storeInfoSave(){ try { //两个dataWrap 一个dataWra ...

  6. Java学习笔记40(缓冲流)

    缓冲流: 在读写文件的各种流中,最令人烦恼的就是效率问题, 而缓冲流的目的就是提高读写效率 字节输出缓冲流: package demo; import java.io.BufferedOutputSt ...

  7. 2017 ACM&sol;ICPC Asia Regional Qingdao Online解题报告(部分)

    HDU 6206 Apple 题意: 给出四个点的坐标(每个点的坐标值小于等于1,000,000,000,000),问最后一个点是否在前三个点组成的三角形的外接圆内,是输出Accept,否输出Reje ...

  8. vue中使用promise&period;all发送多个请求

    1.创建两个promise,在promise中使用axios 2.调用Promise.all([p1,p2]).then(res=>{}).catch(err=>{})方法 代码如下: & ...

  9. Java:多线程,CyclicBarrier同步器

    1. 背景 CyclicBarrier类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此 ...

  10. Centos查看系统位数方法

    方法一:file /sbin/init 方法二:file /bin/ls 我的显示是32位