一个基于C++11的定时器队列(timerfd,poll实现)

时间:2024-01-23 21:28:04

目录

@


前言

最近小程序要用到定时器,找了一圈也没找到合适的,最后还是绕回来选择了muduo里面的TimerQueue,整理了下它的代码,独立了出来,因为实在懒得从头写一个- -!。

原来的muduo中TimerQueue是专为EventLoop提供定时功能的组件,我在笔记
muduo网络库学习笔记(三)TimerQueue定时器队列
中解读过muduo这块代码,现在反过来,EventLoop做为TimerQueue的组件,TimerQueue启动后在后面开一个线程跑EventLoop,EventLoop里面进行阻塞的poll循环,只监听timerFd,和EventFd,从而独立出来一个单独的定时器队列。。

优点

[Async] [thread-safe] [based on poll] [microseconds-level]

异步 :后台线程监视文件描述符动态。
线程安全 : 多线程安全的 支持异步插入定时器。
基于poll : 非休眠机制实现。
级别 : 微妙级别。

test

#include <chrono>
#include <iostream>
#include "Logger.hpp"
#include "TimerQueue.hpp"

void test()
{

  LOG_DEBUG << "[test] : test timerQue happended ";

  std::cout << "[test] : test timerQue happended at " << std::chrono::system_clock::now().time_since_epoch() / std::chrono::microseconds(1) << std::endl;

}


int main()
{

  //Logger::setLogLevel(Logger::TRACE);

  TimerQueue* timer_queue = TimerQueue::GetInstance();
  timer_queue->Start();
  timer_queue->runAfter(1.0, test);
  timer_queue->runAfter(1.0, test);
  timer_queue->runAfter(3.0, test);

  timer_queue->runEvery(5.0, test);
  getchar();
  return 0;
}

./timer_queue_test
[test] : test timerQue happended at 1548293190811373
[test] : test timerQue happended at 1548293190811392
[test] : test timerQue happended at 1548293192811787
[test] : test timerQue happended at 1548293194811927
[test] : test timerQue happended at 1548293199812081
[test] : test timerQue happended at 1548293204812645
[test] : test timerQue happended at 1548293209813508

源代码

TimerQueue : https://github.com/BethlyRoseDaisley/TimerQueue/tree/master/TimerQueue 欢迎收藏。

hpp

#ifndef _NET_TIMERQUEUE_HH
#define _NET_TIMERQUEUE_HH
#include "TimeStamp.hpp"

#include <stdint.h>
#include <set>
#include <vector>
#include <condition_variable>
#include <functional>
#include <thread>

template <typename T>
class AtomicIntegerT
{
public:
  AtomicIntegerT()
  :m_value(0)
  {

  }

  T get()
  {
    return __sync_val_compare_and_swap(&m_value, 0, 0);
  }

  T incrementAndGet()
  {
    return addAndGet(1);
  }

  T decrementAndGet()
  {
    return addAndGet(-1);
  }

private:
  AtomicIntegerT& operator=(const AtomicIntegerT&);
  AtomicIntegerT(const AtomicIntegerT&);

  T getAndAdd(T x)
  {
    return __sync_fetch_and_add(&m_value, x);
  }

  T addAndGet(T x)
  {
    return getAndAdd(x) + x;
  }

  volatile T m_value; 

};

typedef AtomicIntegerT<int32_t> AtomicInt32;
typedef AtomicIntegerT<int64_t> AtomicInt64;

class Timer{
public:
  typedef std::function<void()> TimerCallBack_t;

  Timer(const TimerCallBack_t& cb, TimeStamp when, double interval)
  :m_callBack(cb),
  m_expiration(when),
  m_interval(interval),
  m_repeat(interval > 0.0),
  m_sequence(s_numCreated.incrementAndGet())
{

}

  void run() const
  {
    m_callBack();
  }

  TimeStamp expiration() const { return m_expiration; }
  bool repeat() const { return m_repeat; }
  int64_t sequence() const { return m_sequence; }
  void restart(TimeStamp now);

  static int64_t numCreated(){ return s_numCreated.get(); }

private:
  Timer& operator=(const Timer&);
  Timer(const Timer&);

  const TimerCallBack_t m_callBack;
  TimeStamp m_expiration;
  const double m_interval;
  const bool m_repeat;
  const int64_t m_sequence;

  static AtomicInt64 s_numCreated;

};

///
/// An opaque identifier, for canceling Timer.
///
class TimerId
{
 public:
  TimerId()
    : m_timer(NULL),
      m_sequence(0)
  {
  }

  TimerId(Timer* timer, int64_t seq)
    : m_timer(timer),
      m_sequence(seq)
  {
  }

  // default copy-ctor, dtor and assignment are okay

  friend class TimerQueue;

 private:
  //TimerId& operator=(const TimerId&);
  //TimerId(const TimerId&);

  Timer* m_timer;
  int64_t m_sequence;
};

class Channel;
class EventLoop;

class TimerQueue
{
private:
  TimerQueue();
public:
  ~TimerQueue();

  static TimerQueue* GetInstance()
  {
    static TimerQueue instance;
    return &instance;
  }

  typedef std::function<void()> TimerCallBack_t;

  // Schedules the callback to be run at given time,
  void Start();

  TimerId runAt(const TimeStamp& time, const TimerCallBack_t& cb);
  TimerId runAfter(double delay, const TimerCallBack_t& cb);
  TimerId runEvery(double interval, const TimerCallBack_t& cb);

  void cancel(TimerId timerId);

private:
  typedef std::pair<TimeStamp, Timer*> Entry;
  typedef std::set<Entry> TimerList;
  typedef std::pair<Timer*, int64_t> ActiveTimer;
  typedef std::set<ActiveTimer> ActiveTimerSet;

  TimerId addTimer(const TimerCallBack_t& cb, TimeStamp when, double interval = 0.0);
  void addTimerInLoop(Timer* timer);
  void cancelInLoop(TimerId timerId);
  //called when timerfd alarms
  void handleRead();
  //move out all expired timers and return they.
  std::vector<Entry> getExpired(TimeStamp now);
  bool insert(Timer* timer);
  void reset(const std::vector<Entry>& expired, TimeStamp now);

  std::thread m_thread;
  const int m_timerfd;
  EventLoop* p_loop;
  Channel* p_timerfdChannel;

  //Timer List sorted by expiration
  TimerList m_timers;
  ActiveTimerSet m_activeTimers;

  bool m_callingExpiredTimers; /*atomic*/
  ActiveTimerSet m_cancelingTimers;

  std::condition_variable m_wait_loop_init;
};

#endif

cpp

#include <stdint.h>
#include <assert.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <mutex>

#include "Logger.hpp"
#include "Channel.hpp"
#include "EventLoop.hpp"
#include "TimerQueue.hpp"

namespace TimerFd
{

int createTimerfd()
{
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);
  LOG_TRACE << "createTimerfd() fd : " << timerfd;
  if (timerfd < 0)
  {
    LOG_SYSFATAL << "Failed in timerfd_create";
  }
  return timerfd;
}

struct timespec howMuchTimeFromNow(TimeStamp when)
{
  int64_t microseconds = when.microSecondsSinceEpoch()
                         - TimeStamp::now().microSecondsSinceEpoch();
  if (microseconds < 100)
  {
    microseconds = 100;
  }
  struct timespec ts;
  ts.tv_sec = static_cast<time_t>(
      microseconds / TimeStamp::kMicroSecondsPerSecond);
  ts.tv_nsec = static_cast<long>(
      (microseconds % TimeStamp::kMicroSecondsPerSecond) * 1000);
  return ts;
}

void readTimerfd(int timerfd, TimeStamp now)
{
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
  if (n != sizeof howmany)
  {
    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
  }
}

void resetTimerfd(int timerfd, TimeStamp expiration)
{
  // wake up loop by timerfd_settime()
  LOG_TRACE << "resetTimerfd()";
  struct itimerspec newValue;
  struct itimerspec oldValue;
  bzero(&newValue, sizeof newValue);
  bzero(&oldValue, sizeof oldValue);
  newValue.it_value = howMuchTimeFromNow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
  if (ret)
  {
    LOG_SYSERR << "timerfd_settime()";
  }
}

};

using namespace TimerFd;

AtomicInt64  Timer::s_numCreated;

void Timer::restart(TimeStamp now)
{
  if(m_repeat)
  {
    m_expiration = TimeStamp::addTime(now, m_interval);
  }
  else
  {
    m_expiration = TimeStamp::invalid();
  }

}

TimerQueue::TimerQueue()
  :m_timerfd(createTimerfd()),
   m_callingExpiredTimers(false)
{

}

TimerQueue::~TimerQueue()
{
  p_timerfdChannel->disableAll();
  p_timerfdChannel->remove();
  p_loop->quit();
  m_thread.join();
  delete p_loop;
  delete p_timerfdChannel;
  ::close(m_timerfd);

  for (TimerList::iterator it = m_timers.begin();
      it != m_timers.end(); ++it)
  {
    delete it->second;
  }
}

void TimerQueue::Start()
{

  bool b_inited = false;;

  m_thread = std::thread([this, &b_inited]()mutable {
    this->p_loop = new EventLoop();
    this->p_timerfdChannel = new Channel(this->p_loop, this->m_timerfd);
    this->p_timerfdChannel->setReadCallBack(std::bind(&TimerQueue::handleRead, this));
    this->p_timerfdChannel->enableReading();
    b_inited = true;
    this->m_wait_loop_init.notify_all();
    this->p_loop->loop();
  });

  std::mutex mutex;
  std::unique_lock<std::mutex> lock(mutex);
  while(!b_inited){
    m_wait_loop_init.wait(lock);
  }
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(TimeStamp now)
{
  std::vector<Entry> expired;
  Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>UINTPTR_MAX);
  TimerList::iterator it = m_timers.lower_bound(sentry);
  assert(it == m_timers.end() || now < it->first);
  std::copy(m_timers.begin(), it, back_inserter(expired));
  m_timers.erase(m_timers.begin(), it);

  for(std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    size_t n = m_activeTimers.erase(timer);
    assert(n == 1); (void)n;
  }

  assert(m_timers.size() == m_activeTimers.size());

  return expired;
}


TimerId TimerQueue::addTimer(const TimerCallBack_t& cb, TimeStamp when, double interval)
{
  Timer* timer = new Timer(cb, when, interval);
  p_loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}

void TimerQueue::addTimerInLoop(Timer* timer)
{
  p_loop->assertInLoopThread();
  bool earliestChanged = insert(timer);

  if (earliestChanged)
  {
    resetTimerfd(m_timerfd, timer->expiration());
  }
}

void TimerQueue::cancel(TimerId timerId)
{
  p_loop->runInLoop(std::bind(&TimerQueue::cancelInLoop, this, timerId));
}

void TimerQueue::cancelInLoop(TimerId timerId)
{
  p_loop->assertInLoopThread();
  assert(m_timers.size() ==  m_activeTimers.size());
  ActiveTimer timer(timerId.m_timer, timerId.m_sequence);
  ActiveTimerSet::iterator it = m_activeTimers.find(timer);
  if(it != m_activeTimers.end())
  {
    size_t n = m_timers.erase(Entry(it->first->expiration(), it->first));
    assert(n == 1);
    delete it->first;
  }
  else if (m_callingExpiredTimers)
  {
    m_cancelingTimers.insert(timer);
  }
  assert(m_timers.size() == m_activeTimers.size());
}

bool TimerQueue::insert(Timer* timer)
{
  p_loop->assertInLoopThread();
  assert(m_timers.size() == m_activeTimers.size());
  bool earliestChanged = false;
  TimeStamp when = timer->expiration();
  TimerList::iterator it = m_timers.begin();
  if (it == m_timers.end() || when < it->first)
  {
    earliestChanged = true;
  }
  {
    std::pair<TimerList::iterator, bool> result
      = m_timers.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  {
    std::pair<ActiveTimerSet::iterator, bool> result
      = m_activeTimers.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }

  LOG_TRACE << "TimerQueue::insert() " << "m_timers.size() : "
  << m_timers.size() << " m_activeTimers.size() : " << m_activeTimers.size();

  assert(m_timers.size() == m_activeTimers.size());
  return earliestChanged;
}


void TimerQueue::handleRead()
{
  p_loop->assertInLoopThread();
  TimeStamp now(TimeStamp::now());
  readTimerfd(m_timerfd, now);

  std::vector<Entry> expired = getExpired(now);

  LOG_TRACE << "Expired Timer size " << expired.size() << "  ";

  m_callingExpiredTimers = true;
  m_cancelingTimers.clear();

  for(std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it )
  {
    it->second->run();
  }

  m_callingExpiredTimers = false;

  reset(expired, now);
}


void TimerQueue::reset(const std::vector<Entry>& expired, TimeStamp now)
{
  TimeStamp nextExpire;

  for(std::vector<Entry>::const_iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    if(it->second->repeat()
      && m_cancelingTimers.find(timer) == m_cancelingTimers.end())
    {//如果是周期定时器则重新设定时间插入. 否则delete.
      it->second->restart(now);
      insert(it->second);
    }
    else
    {// FIXME move to a free list no delete please
      delete it->second;
    }
  }

  if (!m_timers.empty())
  {
    nextExpire = m_timers.begin()->second->expiration();
  }

  if (nextExpire.valid())
  {
    resetTimerfd(m_timerfd, nextExpire);
  }
}


TimerId TimerQueue::runAt(const TimeStamp& time, const TimerCallBack_t& cb)
{
  return addTimer(cb, time, 0.0);
}

TimerId TimerQueue::runAfter(double delay, const TimerCallBack_t& cb)
{
  TimeStamp time(TimeStamp::addTime(TimeStamp::now(), delay));
  return runAt(time, cb);
}

TimerId TimerQueue::runEvery(double interval, const TimerCallBack_t& cb)
{
  TimeStamp time(TimeStamp::addTime(TimeStamp::now(), interval));
  return addTimer(cb, time, interval);
}