美文网首页
定时器的设计与实现

定时器的设计与实现

作者: 圣地亚哥_SVIP | 来源:发表于2020-05-09 16:58 被阅读0次

参照Ceph的定时器的设计。
设计原理

  • 定时器线程,处理定时任务
  • 利用context_map = std::multimap<std::chrono::system_clock::time_point,Context>,利用运行时间点作为key排序
  • 定时器线程wait_until(lock,now()-context_map.begin())
  • 添加定时任务时,利用std::condition_variable,实现线程同步

头文件:

class SafeTimer: public UPThread{
private:
    std::condition_variable cond;
    std::mutex mtx;

    using uptime_point = std::chrono::system_clock::time_point;
    
    /*
    * 定时任务集合,key为运行时间点,value为定时任务
    */
    using schedule_map = std::multimap<std::chrono::system_clock::time_point,UPContext*>;
    schedule_map schedule;
    /*
    * 根据UPContext,快速在shedule_map中检索
    */
    using context_map = std::map<UPContext*,schedule_map::iterator>;
    context_map ctx_map;
    
    /*
    * True: 成功取消的UPContext,保证不会被运行;
    * False:成功取消的UPContext,也有可能会被运行;
    * 实现细节:True:加锁状态下运行,False:运行时会先释放锁
    */
    bool safe_callbacks;
    
    bool stop;

    /*
    * 线程入口函数
    */
    void* entry() override;
    
    /*
    * 定时任务轮询线程
    */
    void time_thread();

    void dump(const char * caller = 0) const;

public:
    SafeTimer(const SafeTimer&)=delete;
    SafeTimer& operator=(const SafeTimer&)=delete;
    
    SafeTimer(bool safe_callbacks=true);
    virtual ~SafeTimer();
    
    void init();
    void shutdown();

    bool empty(){
        std::lock_guard<std::mutex> lock{mtx};
        return ctx_map.empty();
    }
    
    
    /*
    * 添加定时任务:
    * 1. 相对时间之后运行
    * 2. 指定的时间点运行
    */
    UPContext* add_event_after(int seconds,UPContext*);
    UPContext* add_event_at(uptime_point,UPContext*);
    
    /*
    * 取消定时任务
    */
    bool cancel_event(UPContext*);
    void cancel_all_event();
};

定时器的定义:


#include "Timer.hpp"

SafeTimer::SafeTimer(bool safe_callbacks)
    :safe_callbacks(safe_callbacks),
    stop(false)
{}

SafeTimer::~SafeTimer(){
    assert(stop == true);
}

void* SafeTimer::entry(){
    time_thread();
    return nullptr;
}

void SafeTimer::time_thread(){
    std::unique_lock<std::mutex> lock{mtx};
    while(!stop){
        auto now = std::chrono::system_clock::now();
        if (schedule.empty()){
            cond.wait(lock);
        }else{
            cond.wait_until(lock,schedule.begin()->first);
        }
        while(!schedule.empty()){
            auto p = schedule.begin();
            if (p->first > now)
                break;
            auto* ctx = p->second;
            ctx_map.erase(ctx);
            schedule.erase(p);
            if (!safe_callbacks){
                lock.unlock();
                ctx->complete(0);
                lock.lock();
            }else{
                ctx->complete(0);
            }
            if (!safe_callbacks && !stop){
                break;
            }
        }
    }
}

void SafeTimer::init(){
    create("SafeTimer");
}

void SafeTimer::shutdown(){
    {
        cancel_all_event();
        std::lock_guard<std::mutex> lock(mtx);
        stop = true;
        cond.notify_all();
    }
    join();
}

bool SafeTimer::cancel_event(UPContext* ctx){
    std::lock_guard<std::mutex> lock(mtx);
    auto iter = ctx_map.find(ctx);
    if (iter == ctx_map.end()){
        return false;
    }
    delete iter->first;
    schedule.erase(iter->second);
    ctx_map.erase(iter);
    cond.notify_all();
    return true;
}

void SafeTimer::cancel_all_event(){
    std::lock_guard<std::mutex> lock(mtx);
    while(!ctx_map.empty()){
        auto ctx = ctx_map.begin();
        delete ctx->first;
        schedule.erase(ctx->second);
        ctx_map.erase(ctx);
    }
    cond.notify_all();
}

void SafeTimer::dump(const char* caller) const {
    if (!caller){
        caller = "";
    }
    dout(10)<<"Dump "<<caller<<dendl;
    for (schedule_map::const_iterator iter = schedule.begin();iter!=schedule.end();++iter){
        dout(10)<<std::fixed<<std::chrono::duration<double>(iter->first.time_since_epoch()).count()<<"s->"<<iter->second<<dendl;
    }
}

UPContext* SafeTimer::add_event_after(int sec,UPContext* ctx){
    std::chrono::seconds t(sec);
    auto tp = std::chrono::system_clock::now() + t;
    return add_event_at(tp,ctx);
}

UPContext* SafeTimer::add_event_at(uptime_point when,UPContext* ctx){
    if (stop){
        delete ctx;
        return nullptr;
    }
    
    schedule_map::iterator iter = schedule.insert(std::pair<uptime_point,UPContext*>(when,ctx));
    std::pair<context_map::iterator, bool> citer = ctx_map.insert(std::pair<UPContext*,schedule_map::iterator>(ctx,iter));
    
    if (citer.second){
        return ctx;
    }
    /*
    * 插在容器头部,则需立即唤醒信号处理线程
    */
    if (iter == schedule.begin()){
        cond.notify_all();
    }
    
    return ctx;
}

接口使用

std::shared_ptr<SafeTimer> timer = std::make_shared<SafeTimer>();
timer->init();

timer->add_event_after(5,new TestContext("Test1"));

timer->add_event_after(6,new TestContext("Test2"));

timer->add_event_after(10,new TestContext("Test3"));

sleep(14);

timer->shutdown();

相关文章

网友评论

      本文标题:定时器的设计与实现

      本文链接:https://www.haomeiwen.com/subject/ylaknhtx.html