muduo网络库学习:reactor模式 - flysong

时间:2024-02-22 13:02:00

muduo网络库学习:reactor模式

  Muduo 是基于 Reactor 模式的网络库,其核心是个事件循环 EventLoop,用于响应计时器和IO事件。Muduo采用基于对象(object based)而非面向对象(object oriented)的设计风格,其事件回调接口多以 boost::function + boost::bind 表达,用户在使用muduo的时候不需要继承其中的class。 

一. Reactor模式简介

  Reactor释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。

   Reactor,换个名词“non-blocking IO + IO multiplexing”,意思就显而易见了。Reactor模式用非阻塞IO+poll(epoll)函数来处理并发,程序的基本结构是一个事件循环,以事件驱动和事件回调的方式实现业务逻辑。

while(!done)
{
    int retval  = poll(fds,nfds,timeout)
    if(retval < 0)
        处理错误,回调用户的error handler
    else{
        处理到期的timers,回调用户的timer handler
        if(retval > 0){
            处理IO事件,毁掉用户的IO event handler
        }
    }
}

 

二. moduo库Reactor模式的实现

  muduo主要通过3个类来实现Reactor模式:EventLoop,Channel,Poller。

  Poller类在这里是poll函数的封装(在muduo源码里面是抽象基类,支持poll和epoll)。Channel,每一个Channel对象都对应了一个fd。EventLoop 事件循环(反应器 Reactor),每个线程只能有一个 EventLoop 实体,它负责IO和定时器事件的分派。它用eventfd(2)来异步唤醒,这有别于传统的用一对 pipe(2) 的办法。它用 TimerQueue 作为计时器管理,用 Poller 作为IO Multiplexing。

 

image

 

 

1. EventLoop

事件循环。moduo的线程模型为one loop per thread,即每个线程只能有一个EventLoop对象。EventLoop对象的生命周期通常和其所属的线程一样长。

  一个EventLoop对象对应一个Poller成员对象,boost::scoped_ptr<Poller> poller_;Poller是个抽象类,具体可以是EPollPoller(默认) 或者PollPoller
数据成员:

      const pid_t threadId_;保存当前EventLoop所属线程id

       boost::scoped_ptr poller_; 实现I/O复用

       boost::scoped_ptr timerQueue_;

       int wakeupFd_;

       boost::scoped_ptr wakeupChannel_; 用于处理wakeupFd_上的可读事件,将事件分发到handlRead() ChannelList activeChannels_; 有事件就绪的   Channel Channel* currentActiveChannel_;

       MutexLock mutex_; pendingFunctors_回暴露给其他线程,所以需要加锁 std::vectorpendingFunctors_;

主要功能函数:

    loop(),在该函数中会循环执行以下过程:调用Poller::poll(),通过此调用获得一个vector<channel*>activeChannels_的就绪事件集合,再遍历该容器,执行每个Channel的Channel::handleEvent()完成相应就绪事件回调,最后执行pendingFunctors_排队的函数。上述一次循环就是一次Reactor模式完成。

     runInLoop(boost::function<void()>),实现用户指定任务回调,若是EventLoop隶属的线程调用EventLoop::runInLoop()则EventLoop马上执行;若是其它线程调用则执行EventLoop::queueInLoop(boost::function<void()>将任务添加到队列中(线程转移)。EventLoop如何获得有任务这一事实呢?通过eventfd可以实现线程间通信,具体做法是:其它线程向EventLoop::vector<boost::function<void()> >添加任务T,然后通过EventLoop::wakeup()向eventfd写一个int,eventfd的回调函数EventLoop::handleRead()读取这个int,从而相当于EventLoop被唤醒,此时loop中遍历队列执行堆积的任务。这里采用Channel管理eventfd,Poller侦听eventfd体现了eventfd可以统一事件源的优势。

    queueInLoop(Functor& cb),将cb放入队列,并在必要时唤醒IO线程。有两种情况需要唤醒IO线程,1 调用queueInLoop()的线程不是IO线程,2 调用queueInLoop()的线程是IO线程,而此时正在调用pengding functor。

2. Channel

  事件分发器。每个Channel只属于一个EventLoop,每个Channel只负责一个文件描述符fd的IO事件分发,但其不拥有fd。
数据成员:

    int fd_文件描述符,

     int events_ 文件描述符注册事件,

     int revents_文件描述符的就绪事件,由Poller::poll设置

     readCallback_,writeCallback...各种事件回调,会在拥有该Channel类的构造函数中被注册,例如TcpConnction会在构造函数中TcpConnection::handlRead()注册给Channel::readCallback

主要功能函数:

     setCallback()系列函数接受Channel所属的类注册相应的事件回调函数

     enableReading(),update(), 当一个fd想要注册可读事件时,首先通过Channel::enableReading()-->Channel::update(this)->EventLoop::updateChannel(Channel)->Poller::updateChannel(Channel*)调用链向poll系统调用的侦听事件表注册或者修改注册事件。

      handleEvent(), Channel作为是事件分发器其核心结构是Channel::handleEvent(),该函数调用Channel::handleEventWithGuard(),在其内根据Channel::revents的值分发调用相应的事件回调。

3. Poller

  Poller是IO multiplexing的封装,封装了poll和epoll。Poller是EventLoop的间接成员,只供拥有该Poller的EventLoop在IO线程调用。生命期与EventLoop相等。
  对于PollPoller来说,一个fd对应一个struct pollfd(pollfd.fd),一个fd 对应一个channel;这个fd 可以是socket, eventfd, timerfd, signalfd。
  对于EPollPoller 来说,一个channel* 对应一个fd, 一个channel* 对应一个struct epoll_event

数据成员:

    vector pollfds_事件结构体数组用于poll的第一个参数;

    map<int,channel*> channels_用于文件描述符fd到Channel的映射便于快速查找到相应的Channel

主要功能函数:

    updateChannel(Channel*) 用于将传入的Channel关心的事件注册给Poller。

    poll(int timeoutMs,vector<channel*> activeChannels)其调用poll侦听事件集合,将就绪事件所属的Channel调用fillActiveChannels()加入到activeChannels_中。

  一个线程最多只能有一个EventLoop对象,这种线程被称为IO线程。一个EventLoop对象对应多个Channel对象,但只有wakeupChannel_生存期由EventLoop控制,  timerfdChannel_生存期由TimeQueue管理。
(boost::scoped_ptr<Channel> wakeupChannel_; // 纳入poller_来管理     int wakeupFd_;   // eventfd函数创建 )

 

其他类

  EventLoopThread: 启动一个线程执行一个EventLoop,其语义和"one loop per thread“相吻合。注意这里用到了互斥量和条件变量,这是因为线程A创建一个EventLoopThread对象后一个运行EventLoop的线程已经开始创建了,可以通过EventLoopThread::startLoop()获取这个EventLoop对象,但是若EventLoop线程还没有创建好,则会出错。所以在创建EventLoop完成后会执行condititon.notify()通知线程A,线程A调用EventLoopThread::startLoop()时调用condition.wai()等待,从而保证获取一个创建完成的EventLoop.毕竟线程A创建的EventLoop线程,A可能还会调用EventLoop执行一些任务回调呢。

三. 定时函数选择 和 muduo 定时器

1. Linux时间函数(P241)

(1)、Linux 的计时函数,用于获得当前时间:

  • time(2) / time_t (秒)
  • ftime(3) / struct timeb (毫秒)
  • gettimeofday(2) / struct timeval (微秒)
  • clock_gettime(2) / struct timespec (纳秒)
  • gmtime / localtime / timegm / mktime / strftime / struct tm (这些与当前时间无关)

(2)、定时函数,用于让程序等待一段时间或安排计划任务:

 

  • sleep(3)
  • alarm(2)
  • usleep(3)
  • nanosleep(2)
  • clock_nanosleep(2)
  • getitimer(2) / setitimer(2)
  • timer_create(2) / timer_settime(2) / timer_gettime(2) / timer_delete(2)
  • timerfd_create(2) / timerfd_gettime(2) / timerfd_settime(2)

  取舍如下:

• (计时)只使用gettimeofday 来获取当前时间。
• (定时)只使用timerfd_* 系列函数来处理定时。

 

  timerfd_create 把时间变成了一个文件描述符,该“文件”在定时器超时的那一刻变得可读,这样就能很方便地融入到 select/poll 框架中,用统一的方式来处理 IO 事件和超时事件,这也正是 Reactor 模式的长处。

传统的Reactor 利用select/poll/epoll 的timeout 来实现定时功能,但poll 和epoll 的定时精度只有毫秒,远低于timerfd_settime 的定时精度。

  取舍原因详见陈硕书籍p241页。

2. 定时器

  muduo的定时器由三个类实现,TimerId、Timer、TimerQueue,用户只能看到第一个类,其它两个都是内部实现细节. 

  TimerQueue的公有接口很简单,只有两个函数addTimer和cancel, TimerQueue 数据结构的选择,能快速根据当前时间找到已到期的定时器,也要高效的添加和删除Timer,因而可以用二叉搜索树,用map或者set.

 

  

a. EventLoop类对象的初始化:在EventLoop的构造函数中对Poller和TimerQueue类型的成员进行初始化

b. TimerQueue的构造函数:将timefd和Channel关联起来,并绑定TimerQueue::handleRead作为描述符可读时的回调函数,而在TimerQueue::handleRead中又会去调用超时的Timer的回调。

for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
    {
        it->second->run();
    }

c. EventLoop中的函数runAt、runAfter、runEvery  :runAfter和runEvery都是通过设置不同的参数去调用TimeQueue::addTimer。

 

开始
1.TimerQueue构造函数创建timerfd,设置超时回调,该回调将用来构造Timer

2.通过runAt、runAfter、runEvery添加Timer和设置Timer的回调

3.loop.loop(),此时定时器已经启动,当发生超时事件时,timerfd可读,poll调用返回。

4.通过activeChannl返回活跃事件(这里的活跃事件即为timerfd可读事件)

5.在Channl::handleRead()回调中将执行在TimerQueue构造函数中绑定的TimerQueue::handleRead()。

6.TimerQueue::handleRead()又将获取所有超时的Timer并依次执行Timer中的回调。

7.执行完一次超时回调后,这个时候需要重新设置定时器,把当前未过期的最早时间作为定时器最新时间(Timestamp::reset())。
结束

 

 1 #include "EventLoop.h"
 2 #include <boost/bind.hpp>
 3 #include <stdio.h>
 4 #include "../logging/Logging.h"
 5 
 6 void dummyOutput(const char* msg, int len)
 7 {
 8     if (g_file)
 9     {
10         fwrite(msg, 1, len, g_file);
11         fwrite(msg, 1, len, stdout);
12     }
13 }
14 
15 int cnt = 0;
16 muduo::EventLoop* g_loop;
17 
18 void printTid()
19 {
20   printf("pid = %d, tid = %d\n", getpid(), muduo::CurrentThread::tid());
21   printf("now %s\n", muduo::Timestamp::now().toString().c_str());
22 }
23 
24 void print(const char* msg)
25 {
26   printf("msg %s %s\n", muduo::Timestamp::now().toString().c_str(), msg);
27   if (++cnt == 20)
28   {
29     g_loop->quit();
30   }
31 }
32 
33 int main()
34 {
35   FILE* g_file = ::fopen("../../../muduo_log", "ae");
36   muduo::Logger::setOutput(dummyOutput);
37   printTid();
38   muduo::EventLoop loop;
39   g_loop = &loop;
40 
41   print("main");
42   loop.runAfter(1, boost::bind(print, "once1"));
43   loop.runAfter(1.5, boost::bind(print, "once1.5"));
44   loop.runAfter(2.5, boost::bind(print, "once2.5"));
45   loop.runAfter(3.5, boost::bind(print, "once3.5"));
46   loop.runEvery(2, boost::bind(print, "every2"));
47   loop.runEvery(3, boost::bind(print, "every3"));
48 
49   loop.loop();
50   print("main loop exits");
51   sleep(1);
52 }
TimerQueue_unittest

  截取部分输出信息

pid = 12451, tid = 12451
now 1556463938.637758
39.643503Z 12451 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
39.673813Z 12451 TRACE EventLoop EventLoop created 0x7FFF9413A4E0 in thread 12451 - EventLoop.cc:62
39.673818Z 12451 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
msg 1556463939.673822 main
39.673848Z 12451 TRACE loop EventLoop 0x7FFF9413A4E0 start looping - EventLoop.cc:94
40.678646Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
40.678787Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463940.678777 - TimerQueue.cc:62
msg 1556463940.678797 once1
41.181196Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
41.181257Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463941.181253 - TimerQueue.cc:62
msg 1556463941.181266 once1.5
41.681123Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
41.681232Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463941.681227 - TimerQueue.cc:62
msg 1556463941.681244 every2
42.183455Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
42.183510Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463942.183506 - TimerQueue.cc:62
msg 1556463942.183519 once2.5
42.678720Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
42.678791Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463942.678785 - TimerQueue.cc:62
msg 1556463942.678804 every3
43.180903Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
43.180974Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463943.180970 - TimerQueue.cc:62
msg 1556463943.180982 once3.5
43.690973Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65

  输出日志中0,1,2文件描述符被标准输入输出占据,epollfd_ = 3(epoll_create1 创建), timerfd_ = 4, wakeupFd_ = 5。

  每次定时时间到,timerfd_ 就会可读,执行定时器回调函数。4.5s的定时不会超时,因为还没到时间的时候已经被取消了; 间隔3s的定时只超时3次,因为9s后被取消了;间隔2s的超时执行20次后g_loop->quit(),loop.loop()循环中判断条件后退出事件循环。

 

lower_bound(x); 返回第一个>=x 的元素的iterator位置;upper_bound(); 返回第一个>x的元素的iterator位置。

RVO优化:在linux g++ 会优化,VC++ 在release 模式下会优化,即函数返回对象时不会调用拷贝函数。