美文网首页
TimerQueue——基于timer fd的定时器机制

TimerQueue——基于timer fd的定时器机制

作者: 老杜振熙 | 来源:发表于2020-11-29 21:15 被阅读0次

注:本文为阅读muduo源码库和作者著作之后的网络库复现和笔记

在传统的IO多路复用系统中,定时操作通常是直接去设置poll()等函数的超时时间,系统超时之后去执行对应的定时回调。但现代Linux系统已经将时间也抽象为了一种事件,并提供了对应的文件描述符fd机制,其能够被poll()等多路复用函数进行监视,因此可以很简洁地融入到我们之前实现的 Reactor机制中。这也是更科学的做法。

底层API

  • int timerfd_create(int clockid, int flags)
    根据用户指定的标识符生成timer,并返回对应的fdclockid设置时钟的类型,flags则是设置fd的各个标志位,比如是否阻塞等等。

  • int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value)
    通过new_value去设定timer的超时时间(expiration time)以及是否重复(repeat via same iterval)。需要注意的是,new_value默认情况下指的是基于timer当前时刻的相对时间,如果需要设置绝对时间,则需要更改flags参数。

模块设计

定时设置,也就是用户指定一个时刻和一个回调函数,系统需要在该时刻去执行该回调函数。这个定义包含了两层含义:

  • 用户的定时设置是任意的。有可能先设置一个10秒的定时,接下来再设置一个5秒的定时。用户不一定会以expiration time的先后顺序去设置定时;
  • 系统顺序执行回调函数。系统需要将所有注册过的定时事件,按照expiration time的先后顺序,在用户设置的每一个expiration time去执行对应的回调函数;

基于上述的特点,我们需要的定时器模块的流程示意如图1所示。


图1. 定时器任务流程

首先解释一下图1所示的流程:

  • class Timer: 显然每一个expiration time都对应着一个callback function,因此将其封装为一个类class Timer,方便执行各自的回调函数。在图中即对应着五个不同颜色的小圆圈;
  • 接口:TimerQueue向用户提供Timer Insert接口,供用户插入各个Timer。在图中,我们先后插入了expiration time为10s, 18s, 20s, 15s, 10.01s的Timer;
  • class TimerQueue内部过程:①将得到的所有Timer进行排序;②每当有新的Timer expired之时,抽取出过期的Timer,在图中对应着10s和10.01s两个Timer;③依次执行这些expired Timer的回调函数;

接下来则是根据流程所示的特点去分析如何构建class TimerQueue:

首先是何时执行timerfd_create()。这个很简单,自然是在构造函数中执行,将得到的timer fd作为class TimerQueue的成员变量;

其次是何时执行timerfd_settime()。这个是需要重点思考的问题。最直接的解决方案自然是每插入一个Timer就执行一次timerfd_settime(),对应图1的例子,系统就会将10s, 10.01s, 15s, 18s, 20s这几个时刻都设置为过期时刻,随后在每一个过期时刻将对应的Timer取出,并执行回调函数。但这个方案有一个潜在的问题。timerfd_settime()系统调用,频繁的调用会导致系统开销增大。
改进的思路就是只对最早过期的Timer设置timerfd_settime()。具体而言分两种情况:

  • 新插入一个Timer:在插入Timer之前比较该Timer和TimerQueue中排名最靠前的Timer的expiration time,如果在其之前,则增设一个timerfd_settime()
  • 取出过期的Timer:在取出了过期的Timer之后,如果TimerQueue中还有剩余的Timer,则再对残余的Timer中最靠前的那一个Timer设置timerfd_settime()

随后是如何组织TimerQueue中的Timer。我们必须保证有序,而插入新Timer的位置又是随机的。综合考虑,使用std::set是最方便的。问题是如果存在expiration time相同的Timer怎么办。muduo源码给出的解决方法很巧妙,其将std::set的元素设置为std::pair<TimeStamp, Timer *>,这样一来就完美的解决了问题,即使两个Timer的过期时刻一致,在std::set中也还能用Timer的地址去排列各个Timer的先后顺序。

最后就是将class TimerQueue集成到EventLoop中,也就是作为class EventLoop的成员变量,这个很简单,不再赘述。

实现过程中的一些知识点总结

  1. CLOCK_REALTIMECLOCK_MONOTONIC
    在使用timer_create()中需要使用这些标志符去设定clockid(当然还有一些其他标识符可供选择)。这两个标识符的区别是:CLOCK_REALTIME基于实际的系统时间,可设定,非单调;CLOCK_MONOTONIC基于的是一个不确定的时间点,并单调流逝,其不可设定,绝对单调。为什么说CLOCK_REALTIME是非单调的呢?原因很简单,因为系统时间本身就是可以被用户任何修改的。具体详见博客[1]

代码实战

/* TimerQueue.h */
#ifndef TIMERQUEUE_H
#define TIMERQUEUE_H

#include <muduo/base/Timestamp.h>
#include <set>
#include <vector>
#include <memory>

#include "Channel.h"

// forward declaration
class EventLoop;
class Timer;
class TimerId;

class TimerQueue
{
  using Entry = std::pair<muduo::Timestamp, Timer *>;
  using TimerList = std::set<Entry>;
  using TimerCallback = std::function<void()>;
private:
  EventLoop *loop_;
  const int timerfd_;
  TimerList timers_;
  Channel timerfdChannel_;

  std::vector<Entry> getExpired(muduo::Timestamp now);
  bool insert(Timer *);
  void resetTimerfd(muduo::Timestamp when);
  

public:
  TimerQueue(EventLoop *);
  ~TimerQueue();

  TimerId addTimer(const TimerCallback cb, muduo::Timestamp time, double interval);
  void addTimerInLoop(Timer *timer);
  void handleRead();
  void reset(std::vector<Entry> &expired, muduo::Timestamp now);

};

#endif /* TIMERQUEUE_H */

/* TimerQueue.cc */
#include <sys/timerfd.h>
#include <unistd.h>
#include "TimerQueue.h"
#include "Channel.h"
#include "Timer.h"
#include "EventLoop.h"
#include "TimerId.h"

int creatTimerFd(){
  return ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
}

struct timespec getTimeDiffFromNow(muduo::Timestamp when){
  auto microSeconds = when.microSecondsSinceEpoch() 
                      - muduo::Timestamp::now().microSecondsSinceEpoch();
  if(microSeconds < 100)
    microSeconds = 100; // set the minimal diff as 100 micro seconds
  struct timespec ret;
  // seconds
  ret.tv_sec = static_cast<time_t>(microSeconds / muduo::Timestamp::kMicroSecondsPerSecond);
  // nano seconds
  ret.tv_nsec = static_cast<__SYSCALL_SLONG_TYPE>( (microSeconds%muduo::Timestamp::kMicroSecondsPerSecond) * 1000 );
  return ret;
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(muduo::Timestamp now){
  Entry dummy{now, reinterpret_cast<Timer *>(UINTPTR_MAX)};
  auto end_ = timers_.lower_bound(dummy);
  assert(end_==timers_.end() || now<end_->first);
  auto ret = std::vector<Entry>(timers_.begin(), end_);
  timers_.erase(timers_.begin(), end_);
  return ret;
}

TimerId TimerQueue::addTimer(const TimerCallback cb,
                          muduo::Timestamp time,
                          double interval){
  Timer *timer = new Timer(cb, time, interval);
  loop_->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}

void TimerQueue::addTimerInLoop(Timer *timer){
  loop_->assertInLoopThread();
  auto isEarliest = insert(timer);
  if(isEarliest){
    resetTimerfd(timer->expiration());
    //printf("addtimer done\n");
  }
}

TimerQueue::TimerQueue(EventLoop *loop):
  loop_(loop),
  timerfd_(creatTimerFd()),
  timers_(),
  timerfdChannel_(loop, timerfd_)
{
  timerfdChannel_.setReadCallback(std::bind(&TimerQueue::handleRead, this));
  timerfdChannel_.enableRead(); // update timerfd Channel to Poller
  //printf("::TimerQueue done\n");
}

TimerQueue::~TimerQueue(){
  ::close(timerfd_);
  for(auto itr = timers_.begin(); itr != timers_.end(); ++itr){
    delete itr->second;
  }
}

void TimerQueue::handleRead(){
  loop_->assertInLoopThread();
  muduo::Timestamp now(muduo::Timestamp::now());
  auto expiredList = getExpired(now);
  for(auto &expired: expiredList){
    expired.second->run();
  }
  //printf("runned expired timer\n");
  reset(expiredList, now);
  //printf("reset done\n");
}

void TimerQueue::reset(std::vector<Entry> &expired, muduo::Timestamp now){
  muduo::Timestamp when;
  for(auto &e: expired){
    if(e.second->repeat()){
      e.second->restart(now); // addTimer(now, interval);
      insert(e.second);
    }
  }
  if(!timers_.empty()){
    when = timers_.begin()->first;
  } else {
    when = muduo::Timestamp::invalid();
  }
  if(when.valid()){
    resetTimerfd(when); // set the timer only when timers_ is not empty
    // and the next alarm time is the first element of timers_
  }
}

bool TimerQueue::insert(Timer * timer){
  bool isEarliset = false;
  auto itr = timers_.begin();
  if(itr==timers_.end() || timer->expiration()<itr->first){
    isEarliset = true;
  }
  timers_.insert({timer->expiration(), timer});
  return isEarliset;
}

void TimerQueue::resetTimerfd(muduo::Timestamp when){
  itimerspec old_timerspec, new_timerspec;
  bzero(&old_timerspec, sizeof old_timerspec);
  bzero(&new_timerspec, sizeof new_timerspec);
  new_timerspec.it_value = getTimeDiffFromNow(when);
  //printf("timer diff from now is %ld\n", new_timerspec.it_value.tv_sec);
  ::timerfd_settime(timerfd_, 0, &new_timerspec, &old_timerspec);;
}

/* Timer.h */
#ifndef TIMER_H
#define TIMER_H

#include <functional>
#include <muduo/base/Timestamp.h>
#include <muduo/base/Atomic.h>

class Timer
{
  using TimerCallback = std::function<void()>;
private:
  TimerCallback callback_;
  muduo::Timestamp expiration_;
  double interval_;
  bool repeat_;
  int64_t sequence_;
  static muduo::AtomicInt64 sequenceNumGenerator_;

public:
  Timer(const TimerCallback &cb, muduo::Timestamp when, double itv);
  ~Timer();
  
  muduo::Timestamp expiration() const {
    return expiration_;
  }
  double interval() {
    return interval_;
  }
  int64_t sequence() const {
    return sequence_;
  }
  bool repeat() const {
    return repeat_;
  }

  void run() {
    //printf("run callback\n");
    callback_();
    //printf("callback runned\n");
  }

  void restart(muduo::Timestamp when);
};

#endif /* TIMER_H */

/* Timer.cc */
#include "Timer.h"

muduo::AtomicInt64 Timer::sequenceNumGenerator_;

Timer::Timer(const TimerCallback &cb, 
             muduo::Timestamp when,
             double itv) : 
  callback_(cb), expiration_(when), interval_(itv), repeat_(itv>0), 
  sequence_(sequenceNumGenerator_.incrementAndGet())
{

}

Timer::~Timer(){

}

void Timer::restart(muduo::Timestamp when){
  if(repeat_){
    expiration_ = muduo::addTime(when, interval_);
  } else {
    expiration_ = muduo::Timestamp::invalid();
  }
}

/* TimerId.h */
#ifndef TIMERID_H
#define TIMERID_H

#include <cstdint>

class Timer;

class TimerId
{
private:
  Timer *timer_;
  int64_t sequence_;

public:
  TimerId():
    timer_(nullptr),
    sequence_(0){

  }
  TimerId(Timer *t, int64_t sequ):
    timer_(t), sequence_(sequ) {

  }
};

#endif /* TIMERID_H */

/* main.cc */
#include "EventLoop.h"
#include <muduo/base/Thread.h>
#include "TimerId.h"

EventLoop *g_loop;

void timeOutFunc(){
  printf("Oooops, time out!!!\n");
  g_loop->quit();
}

void threadFunc(){
  g_loop->runAfter(15, &timeOutFunc);
}

int main(int argc, char *argv[])
{
  EventLoop loop;
  g_loop = &loop;
  muduo::Thread thread_(&threadFunc);
  thread_.start();
  loop.loop();
  return 0;
}

结果演示

在TimerQueue中注册一个Timer

Reference


  1. CLOCK_MONOTONICCLOCK_REALTIME的区别(https://blog.csdn.net/tangchenchan/article/details/47989473)

相关文章

网友评论

      本文标题:TimerQueue——基于timer fd的定时器机制

      本文链接:https://www.haomeiwen.com/subject/vjgeiktx.html