目录
@
前言
最近小程序要用到定时器,找了一圈也没找到合适的,最后还是绕回来选择了muduo里面的TimerQueue,整理了下它的代码,独立了出来,因为实在懒得从头写一个- -!。
原来的muduo中TimerQueue是专为EventLoop提供定时功能的组件,我在笔记
muduo网络库学习笔记(三)TimerQueue定时器队列中解读过muduo这块代码,现在反过来,EventLoop做为TimerQueue的组件,TimerQueue启动后在后面开一个线程跑EventLoop,EventLoop里面进行阻塞的poll循环,只监听timerFd,和EventFd,从而独立出来一个单独的定时器队列。。
优点
[Async] [thread-safe] [based on poll] [microseconds-level]
异步
:后台线程监视文件描述符动态。线程安全
: 多线程安全的 支持异步插入定时器。基于poll
: 非休眠机制实现。级别
: 微妙级别。
test
#include <chrono>
#include <iostream>
#include "Logger.hpp"
#include "TimerQueue.hpp"
void test()
{
LOG_DEBUG << "[test] : test timerQue happended ";
std::cout << "[test] : test timerQue happended at " << std::chrono::system_clock::now().time_since_epoch() / std::chrono::microseconds(1) << std::endl;
}
int main()
{
//Logger::setLogLevel(Logger::TRACE);
TimerQueue* timer_queue = TimerQueue::GetInstance();
timer_queue->Start();
timer_queue->runAfter(1.0, test);
timer_queue->runAfter(1.0, test);
timer_queue->runAfter(3.0, test);
timer_queue->runEvery(5.0, test);
getchar();
return 0;
}
./timer_queue_test
[test] : test timerQue happended at 1548293190
811373
[test] : test timerQue happended at 1548293190
811392
[test] : test timerQue happended at 1548293192
811787
[test] : test timerQue happended at 1548293194
811927
[test] : test timerQue happended at 1548293199
812081
[test] : test timerQue happended at 1548293204
812645
[test] : test timerQue happended at 1548293209
813508
源代码
TimerQueue
: https://github.com/BethlyRoseDaisley/TimerQueue/tree/master/TimerQueue 欢迎收藏。
hpp
#ifndef _NET_TIMERQUEUE_HH
#define _NET_TIMERQUEUE_HH
#include "TimeStamp.hpp"
#include <stdint.h>
#include <set>
#include <vector>
#include <condition_variable>
#include <functional>
#include <thread>
template <typename T>
class AtomicIntegerT
{
public:
AtomicIntegerT()
:m_value(0)
{
}
T get()
{
return __sync_val_compare_and_swap(&m_value, 0, 0);
}
T incrementAndGet()
{
return addAndGet(1);
}
T decrementAndGet()
{
return addAndGet(-1);
}
private:
AtomicIntegerT& operator=(const AtomicIntegerT&);
AtomicIntegerT(const AtomicIntegerT&);
T getAndAdd(T x)
{
return __sync_fetch_and_add(&m_value, x);
}
T addAndGet(T x)
{
return getAndAdd(x) + x;
}
volatile T m_value;
};
typedef AtomicIntegerT<int32_t> AtomicInt32;
typedef AtomicIntegerT<int64_t> AtomicInt64;
class Timer{
public:
typedef std::function<void()> TimerCallBack_t;
Timer(const TimerCallBack_t& cb, TimeStamp when, double interval)
:m_callBack(cb),
m_expiration(when),
m_interval(interval),
m_repeat(interval > 0.0),
m_sequence(s_numCreated.incrementAndGet())
{
}
void run() const
{
m_callBack();
}
TimeStamp expiration() const { return m_expiration; }
bool repeat() const { return m_repeat; }
int64_t sequence() const { return m_sequence; }
void restart(TimeStamp now);
static int64_t numCreated(){ return s_numCreated.get(); }
private:
Timer& operator=(const Timer&);
Timer(const Timer&);
const TimerCallBack_t m_callBack;
TimeStamp m_expiration;
const double m_interval;
const bool m_repeat;
const int64_t m_sequence;
static AtomicInt64 s_numCreated;
};
///
/// An opaque identifier, for canceling Timer.
///
class TimerId
{
public:
TimerId()
: m_timer(NULL),
m_sequence(0)
{
}
TimerId(Timer* timer, int64_t seq)
: m_timer(timer),
m_sequence(seq)
{
}
// default copy-ctor, dtor and assignment are okay
friend class TimerQueue;
private:
//TimerId& operator=(const TimerId&);
//TimerId(const TimerId&);
Timer* m_timer;
int64_t m_sequence;
};
class Channel;
class EventLoop;
class TimerQueue
{
private:
TimerQueue();
public:
~TimerQueue();
static TimerQueue* GetInstance()
{
static TimerQueue instance;
return &instance;
}
typedef std::function<void()> TimerCallBack_t;
// Schedules the callback to be run at given time,
void Start();
TimerId runAt(const TimeStamp& time, const TimerCallBack_t& cb);
TimerId runAfter(double delay, const TimerCallBack_t& cb);
TimerId runEvery(double interval, const TimerCallBack_t& cb);
void cancel(TimerId timerId);
private:
typedef std::pair<TimeStamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
typedef std::pair<Timer*, int64_t> ActiveTimer;
typedef std::set<ActiveTimer> ActiveTimerSet;
TimerId addTimer(const TimerCallBack_t& cb, TimeStamp when, double interval = 0.0);
void addTimerInLoop(Timer* timer);
void cancelInLoop(TimerId timerId);
//called when timerfd alarms
void handleRead();
//move out all expired timers and return they.
std::vector<Entry> getExpired(TimeStamp now);
bool insert(Timer* timer);
void reset(const std::vector<Entry>& expired, TimeStamp now);
std::thread m_thread;
const int m_timerfd;
EventLoop* p_loop;
Channel* p_timerfdChannel;
//Timer List sorted by expiration
TimerList m_timers;
ActiveTimerSet m_activeTimers;
bool m_callingExpiredTimers; /*atomic*/
ActiveTimerSet m_cancelingTimers;
std::condition_variable m_wait_loop_init;
};
#endif
cpp
#include <stdint.h>
#include <assert.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <mutex>
#include "Logger.hpp"
#include "Channel.hpp"
#include "EventLoop.hpp"
#include "TimerQueue.hpp"
namespace TimerFd
{
int createTimerfd()
{
int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
TFD_NONBLOCK | TFD_CLOEXEC);
LOG_TRACE << "createTimerfd() fd : " << timerfd;
if (timerfd < 0)
{
LOG_SYSFATAL << "Failed in timerfd_create";
}
return timerfd;
}
struct timespec howMuchTimeFromNow(TimeStamp when)
{
int64_t microseconds = when.microSecondsSinceEpoch()
- TimeStamp::now().microSecondsSinceEpoch();
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts;
ts.tv_sec = static_cast<time_t>(
microseconds / TimeStamp::kMicroSecondsPerSecond);
ts.tv_nsec = static_cast<long>(
(microseconds % TimeStamp::kMicroSecondsPerSecond) * 1000);
return ts;
}
void readTimerfd(int timerfd, TimeStamp now)
{
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
if (n != sizeof howmany)
{
LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
}
}
void resetTimerfd(int timerfd, TimeStamp expiration)
{
// wake up loop by timerfd_settime()
LOG_TRACE << "resetTimerfd()";
struct itimerspec newValue;
struct itimerspec oldValue;
bzero(&newValue, sizeof newValue);
bzero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}
};
using namespace TimerFd;
AtomicInt64 Timer::s_numCreated;
void Timer::restart(TimeStamp now)
{
if(m_repeat)
{
m_expiration = TimeStamp::addTime(now, m_interval);
}
else
{
m_expiration = TimeStamp::invalid();
}
}
TimerQueue::TimerQueue()
:m_timerfd(createTimerfd()),
m_callingExpiredTimers(false)
{
}
TimerQueue::~TimerQueue()
{
p_timerfdChannel->disableAll();
p_timerfdChannel->remove();
p_loop->quit();
m_thread.join();
delete p_loop;
delete p_timerfdChannel;
::close(m_timerfd);
for (TimerList::iterator it = m_timers.begin();
it != m_timers.end(); ++it)
{
delete it->second;
}
}
void TimerQueue::Start()
{
bool b_inited = false;;
m_thread = std::thread([this, &b_inited]()mutable {
this->p_loop = new EventLoop();
this->p_timerfdChannel = new Channel(this->p_loop, this->m_timerfd);
this->p_timerfdChannel->setReadCallBack(std::bind(&TimerQueue::handleRead, this));
this->p_timerfdChannel->enableReading();
b_inited = true;
this->m_wait_loop_init.notify_all();
this->p_loop->loop();
});
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
while(!b_inited){
m_wait_loop_init.wait(lock);
}
}
std::vector<TimerQueue::Entry> TimerQueue::getExpired(TimeStamp now)
{
std::vector<Entry> expired;
Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>UINTPTR_MAX);
TimerList::iterator it = m_timers.lower_bound(sentry);
assert(it == m_timers.end() || now < it->first);
std::copy(m_timers.begin(), it, back_inserter(expired));
m_timers.erase(m_timers.begin(), it);
for(std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
size_t n = m_activeTimers.erase(timer);
assert(n == 1); (void)n;
}
assert(m_timers.size() == m_activeTimers.size());
return expired;
}
TimerId TimerQueue::addTimer(const TimerCallBack_t& cb, TimeStamp when, double interval)
{
Timer* timer = new Timer(cb, when, interval);
p_loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
}
void TimerQueue::addTimerInLoop(Timer* timer)
{
p_loop->assertInLoopThread();
bool earliestChanged = insert(timer);
if (earliestChanged)
{
resetTimerfd(m_timerfd, timer->expiration());
}
}
void TimerQueue::cancel(TimerId timerId)
{
p_loop->runInLoop(std::bind(&TimerQueue::cancelInLoop, this, timerId));
}
void TimerQueue::cancelInLoop(TimerId timerId)
{
p_loop->assertInLoopThread();
assert(m_timers.size() == m_activeTimers.size());
ActiveTimer timer(timerId.m_timer, timerId.m_sequence);
ActiveTimerSet::iterator it = m_activeTimers.find(timer);
if(it != m_activeTimers.end())
{
size_t n = m_timers.erase(Entry(it->first->expiration(), it->first));
assert(n == 1);
delete it->first;
}
else if (m_callingExpiredTimers)
{
m_cancelingTimers.insert(timer);
}
assert(m_timers.size() == m_activeTimers.size());
}
bool TimerQueue::insert(Timer* timer)
{
p_loop->assertInLoopThread();
assert(m_timers.size() == m_activeTimers.size());
bool earliestChanged = false;
TimeStamp when = timer->expiration();
TimerList::iterator it = m_timers.begin();
if (it == m_timers.end() || when < it->first)
{
earliestChanged = true;
}
{
std::pair<TimerList::iterator, bool> result
= m_timers.insert(Entry(when, timer));
assert(result.second); (void)result;
}
{
std::pair<ActiveTimerSet::iterator, bool> result
= m_activeTimers.insert(ActiveTimer(timer, timer->sequence()));
assert(result.second); (void)result;
}
LOG_TRACE << "TimerQueue::insert() " << "m_timers.size() : "
<< m_timers.size() << " m_activeTimers.size() : " << m_activeTimers.size();
assert(m_timers.size() == m_activeTimers.size());
return earliestChanged;
}
void TimerQueue::handleRead()
{
p_loop->assertInLoopThread();
TimeStamp now(TimeStamp::now());
readTimerfd(m_timerfd, now);
std::vector<Entry> expired = getExpired(now);
LOG_TRACE << "Expired Timer size " << expired.size() << " ";
m_callingExpiredTimers = true;
m_cancelingTimers.clear();
for(std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it )
{
it->second->run();
}
m_callingExpiredTimers = false;
reset(expired, now);
}
void TimerQueue::reset(const std::vector<Entry>& expired, TimeStamp now)
{
TimeStamp nextExpire;
for(std::vector<Entry>::const_iterator it = expired.begin();
it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
if(it->second->repeat()
&& m_cancelingTimers.find(timer) == m_cancelingTimers.end())
{//如果是周期定时器则重新设定时间插入. 否则delete.
it->second->restart(now);
insert(it->second);
}
else
{// FIXME move to a free list no delete please
delete it->second;
}
}
if (!m_timers.empty())
{
nextExpire = m_timers.begin()->second->expiration();
}
if (nextExpire.valid())
{
resetTimerfd(m_timerfd, nextExpire);
}
}
TimerId TimerQueue::runAt(const TimeStamp& time, const TimerCallBack_t& cb)
{
return addTimer(cb, time, 0.0);
}
TimerId TimerQueue::runAfter(double delay, const TimerCallBack_t& cb)
{
TimeStamp time(TimeStamp::addTime(TimeStamp::now(), delay));
return runAt(time, cb);
}
TimerId TimerQueue::runEvery(double interval, const TimerCallBack_t& cb)
{
TimeStamp time(TimeStamp::addTime(TimeStamp::now(), interval));
return addTimer(cb, time, interval);
}