muduo网络库学习:reactor模式
Muduo 是基于 Reactor 模式的网络库,其核心是个事件循环 EventLoop,用于响应计时器和IO事件。Muduo采用基于对象(object based)而非面向对象(object oriented)的设计风格,其事件回调接口多以 boost::function + boost::bind 表达,用户在使用muduo的时候不需要继承其中的class。
一. Reactor模式简介
Reactor释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。
Reactor,换个名词“non-blocking IO + IO multiplexing”,意思就显而易见了。Reactor模式用非阻塞IO+poll(epoll)函数来处理并发,程序的基本结构是一个事件循环,以事件驱动和事件回调的方式实现业务逻辑。
while(!done) { int retval = poll(fds,nfds,timeout) if(retval < 0) 处理错误,回调用户的error handler else{ 处理到期的timers,回调用户的timer handler if(retval > 0){ 处理IO事件,毁掉用户的IO event handler } } }
二. moduo库Reactor模式的实现
muduo主要通过3个类来实现Reactor模式:EventLoop,Channel,Poller。
Poller类在这里是poll函数的封装(在muduo源码里面是抽象基类,支持poll和epoll)。Channel,每一个Channel对象都对应了一个fd。EventLoop 事件循环(反应器 Reactor),每个线程只能有一个 EventLoop 实体,它负责IO和定时器事件的分派。它用eventfd(2)来异步唤醒,这有别于传统的用一对 pipe(2) 的办法。它用 TimerQueue 作为计时器管理,用 Poller 作为IO Multiplexing。
1. EventLoop
事件循环。moduo的线程模型为one loop per thread,即每个线程只能有一个EventLoop对象。EventLoop对象的生命周期通常和其所属的线程一样长。
数据成员:
const pid_t threadId_;保存当前EventLoop所属线程id
boost::scoped_ptr poller_; 实现I/O复用
boost::scoped_ptr timerQueue_;
int wakeupFd_;
boost::scoped_ptr wakeupChannel_; 用于处理wakeupFd_上的可读事件,将事件分发到handlRead() ChannelList activeChannels_; 有事件就绪的 Channel Channel* currentActiveChannel_;
MutexLock mutex_; pendingFunctors_回暴露给其他线程,所以需要加锁 std::vectorpendingFunctors_;
主要功能函数:
loop(),在该函数中会循环执行以下过程:调用Poller::poll(),通过此调用获得一个vector<channel*>activeChannels_的就绪事件集合,再遍历该容器,执行每个Channel的Channel::handleEvent()完成相应就绪事件回调,最后执行pendingFunctors_排队的函数。上述一次循环就是一次Reactor模式完成。
runInLoop(boost::function<void()>),实现用户指定任务回调,若是EventLoop隶属的线程调用EventLoop::runInLoop()则EventLoop马上执行;若是其它线程调用则执行EventLoop::queueInLoop(boost::function<void()>将任务添加到队列中(线程转移)。EventLoop如何获得有任务这一事实呢?通过eventfd可以实现线程间通信,具体做法是:其它线程向EventLoop::vector<boost::function<void()> >添加任务T,然后通过EventLoop::wakeup()向eventfd写一个int,eventfd的回调函数EventLoop::handleRead()读取这个int,从而相当于EventLoop被唤醒,此时loop中遍历队列执行堆积的任务。这里采用Channel管理eventfd,Poller侦听eventfd体现了eventfd可以统一事件源的优势。
queueInLoop(Functor& cb),将cb放入队列,并在必要时唤醒IO线程。有两种情况需要唤醒IO线程,1 调用queueInLoop()的线程不是IO线程,2 调用queueInLoop()的线程是IO线程,而此时正在调用pengding functor。
2. Channel
事件分发器。每个Channel只属于一个EventLoop,每个Channel只负责一个文件描述符fd的IO事件分发,但其不拥有fd。
数据成员:
int fd_文件描述符,
int events_ 文件描述符注册事件,
int revents_文件描述符的就绪事件,由Poller::poll设置
readCallback_,writeCallback...各种事件回调,会在拥有该Channel类的构造函数中被注册,例如TcpConnction会在构造函数中TcpConnection::handlRead()注册给Channel::readCallback
主要功能函数:
setCallback()系列函数,接受Channel所属的类注册相应的事件回调函数
enableReading(),update(), 当一个fd想要注册可读事件时,首先通过Channel::enableReading()-->Channel::update(this)->EventLoop::updateChannel(Channel)->Poller::updateChannel(Channel*)调用链向poll系统调用的侦听事件表注册或者修改注册事件。
handleEvent(), Channel作为是事件分发器其核心结构是Channel::handleEvent(),该函数调用Channel::handleEventWithGuard(),在其内根据Channel::revents的值分发调用相应的事件回调。
3. Poller
Poller是IO multiplexing的封装,封装了poll和epoll。Poller是EventLoop的间接成员,只供拥有该Poller的EventLoop在IO线程调用。生命期与EventLoop相等。
对于PollPoller来说,一个fd对应一个struct pollfd(pollfd.fd),一个fd 对应一个channel;这个fd 可以是socket, eventfd, timerfd, signalfd。
对于EPollPoller 来说,一个channel* 对应一个fd, 一个channel* 对应一个struct epoll_event
数据成员:
vector pollfds_事件结构体数组用于poll的第一个参数;
map<int,channel*> channels_用于文件描述符fd到Channel的映射便于快速查找到相应的Channel
主要功能函数:
updateChannel(Channel*) 用于将传入的Channel关心的事件注册给Poller。
poll(int timeoutMs,vector<channel*> activeChannels)其调用poll侦听事件集合,将就绪事件所属的Channel调用fillActiveChannels()加入到activeChannels_中。
一个线程最多只能有一个EventLoop对象,这种线程被称为IO线程。一个EventLoop对象对应多个Channel对象,但只有wakeupChannel_生存期由EventLoop控制, timerfdChannel_生存期由TimeQueue管理。
(boost::scoped_ptr<Channel> wakeupChannel_; // 纳入poller_来管理 int wakeupFd_; // eventfd函数创建 )
其他类
EventLoopThread: 启动一个线程执行一个EventLoop,其语义和"one loop per thread“相吻合。注意这里用到了互斥量和条件变量,这是因为线程A创建一个EventLoopThread对象后一个运行EventLoop的线程已经开始创建了,可以通过EventLoopThread::startLoop()获取这个EventLoop对象,但是若EventLoop线程还没有创建好,则会出错。所以在创建EventLoop完成后会执行condititon.notify()通知线程A,线程A调用EventLoopThread::startLoop()时调用condition.wai()等待,从而保证获取一个创建完成的EventLoop.毕竟线程A创建的EventLoop线程,A可能还会调用EventLoop执行一些任务回调呢。
三. 定时函数选择 和 muduo 定时器
1. Linux时间函数(P241)
(1)、Linux 的计时函数,用于获得当前时间:
- time(2) / time_t (秒)
- ftime(3) / struct timeb (毫秒)
- gettimeofday(2) / struct timeval (微秒)
- clock_gettime(2) / struct timespec (纳秒)
- gmtime / localtime / timegm / mktime / strftime / struct tm (这些与当前时间无关)
(2)、定时函数,用于让程序等待一段时间或安排计划任务:
- sleep(3)
- alarm(2)
- usleep(3)
- nanosleep(2)
- clock_nanosleep(2)
- getitimer(2) / setitimer(2)
- timer_create(2) / timer_settime(2) / timer_gettime(2) / timer_delete(2)
- timerfd_create(2) / timerfd_gettime(2) / timerfd_settime(2)
取舍如下:
• (计时)只使用gettimeofday 来获取当前时间。
• (定时)只使用timerfd_* 系列函数来处理定时。
timerfd_create 把时间变成了一个文件描述符,该“文件”在定时器超时的那一刻变得可读,这样就能很方便地融入到 select/poll 框架中,用统一的方式来处理 IO 事件和超时事件,这也正是 Reactor 模式的长处。
传统的Reactor 利用select/poll/epoll 的timeout 来实现定时功能,但poll 和epoll 的定时精度只有毫秒,远低于timerfd_settime 的定时精度。
取舍原因详见陈硕书籍p241页。
2. 定时器
muduo的定时器由三个类实现,TimerId、Timer、TimerQueue,用户只能看到第一个类,其它两个都是内部实现细节.
TimerQueue的公有接口很简单,只有两个函数addTimer和cancel, TimerQueue 数据结构的选择,能快速根据当前时间找到已到期的定时器,也要高效的添加和删除Timer,因而可以用二叉搜索树,用map或者set.
a. EventLoop类对象的初始化:在EventLoop的构造函数中对Poller和TimerQueue类型的成员进行初始化
b. TimerQueue的构造函数:将timefd和Channel关联起来,并绑定TimerQueue::handleRead作为描述符可读时的回调函数,而在TimerQueue::handleRead中又会去调用超时的Timer的回调。
for (std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it) { it->second->run(); }
c. EventLoop中的函数runAt、runAfter、runEvery :runAfter和runEvery都是通过设置不同的参数去调用TimeQueue::addTimer。
开始
1.TimerQueue构造函数创建timerfd,设置超时回调,该回调将用来构造Timer
2.通过runAt、runAfter、runEvery添加Timer和设置Timer的回调
3.loop.loop(),此时定时器已经启动,当发生超时事件时,timerfd可读,poll调用返回。
4.通过activeChannl返回活跃事件(这里的活跃事件即为timerfd可读事件)
5.在Channl::handleRead()回调中将执行在TimerQueue构造函数中绑定的TimerQueue::handleRead()。
6.TimerQueue::handleRead()又将获取所有超时的Timer并依次执行Timer中的回调。
7.执行完一次超时回调后,这个时候需要重新设置定时器,把当前未过期的最早时间作为定时器最新时间(Timestamp::reset())。
结束
1 #include "EventLoop.h" 2 #include <boost/bind.hpp> 3 #include <stdio.h> 4 #include "../logging/Logging.h" 5 6 void dummyOutput(const char* msg, int len) 7 { 8 if (g_file) 9 { 10 fwrite(msg, 1, len, g_file); 11 fwrite(msg, 1, len, stdout); 12 } 13 } 14 15 int cnt = 0; 16 muduo::EventLoop* g_loop; 17 18 void printTid() 19 { 20 printf("pid = %d, tid = %d\n", getpid(), muduo::CurrentThread::tid()); 21 printf("now %s\n", muduo::Timestamp::now().toString().c_str()); 22 } 23 24 void print(const char* msg) 25 { 26 printf("msg %s %s\n", muduo::Timestamp::now().toString().c_str(), msg); 27 if (++cnt == 20) 28 { 29 g_loop->quit(); 30 } 31 } 32 33 int main() 34 { 35 FILE* g_file = ::fopen("../../../muduo_log", "ae"); 36 muduo::Logger::setOutput(dummyOutput); 37 printTid(); 38 muduo::EventLoop loop; 39 g_loop = &loop; 40 41 print("main"); 42 loop.runAfter(1, boost::bind(print, "once1")); 43 loop.runAfter(1.5, boost::bind(print, "once1.5")); 44 loop.runAfter(2.5, boost::bind(print, "once2.5")); 45 loop.runAfter(3.5, boost::bind(print, "once3.5")); 46 loop.runEvery(2, boost::bind(print, "every2")); 47 loop.runEvery(3, boost::bind(print, "every3")); 48 49 loop.loop(); 50 print("main loop exits"); 51 sleep(1); 52 }
截取部分输出信息
pid = 12451, tid = 12451
now 1556463938.637758
39.643503Z 12451 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
39.673813Z 12451 TRACE EventLoop EventLoop created 0x7FFF9413A4E0 in thread 12451 - EventLoop.cc:62
39.673818Z 12451 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
msg 1556463939.673822 main
39.673848Z 12451 TRACE loop EventLoop 0x7FFF9413A4E0 start looping - EventLoop.cc:94
40.678646Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
40.678787Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463940.678777 - TimerQueue.cc:62
msg 1556463940.678797 once1
41.181196Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
41.181257Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463941.181253 - TimerQueue.cc:62
msg 1556463941.181266 once1.5
41.681123Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
41.681232Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463941.681227 - TimerQueue.cc:62
msg 1556463941.681244 every2
42.183455Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
42.183510Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463942.183506 - TimerQueue.cc:62
msg 1556463942.183519 once2.5
42.678720Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
42.678791Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463942.678785 - TimerQueue.cc:62
msg 1556463942.678804 every3
43.180903Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
43.180974Z 12451 TRACE readTimerfd TimerQueue::handleRead() 1 at 1556463943.180970 - TimerQueue.cc:62
msg 1556463943.180982 once3.5
43.690973Z 12451 TRACE poll 1 events happended - EPollPoller.cc:65
输出日志中0,1,2文件描述符被标准输入输出占据,epollfd_ = 3(epoll_create1 创建), timerfd_ = 4, wakeupFd_ = 5。
每次定时时间到,timerfd_ 就会可读,执行定时器回调函数。4.5s的定时不会超时,因为还没到时间的时候已经被取消了; 间隔3s的定时只超时3次,因为9s后被取消了;间隔2s的超时执行20次后g_loop->quit(),loop.loop()循环中判断条件后退出事件循环。
lower_bound(x); 返回第一个>=x 的元素的iterator位置;upper_bound(); 返回第一个>x的元素的iterator位置。
RVO优化:在linux g++ 会优化,VC++ 在release 模式下会优化,即函数返回对象时不会调用拷贝函数。