参照Ceph的定时器的设计。
设计原理:
- 定时器线程,处理定时任务
- 利用context_map = std::multimap<std::chrono::system_clock::time_point,Context>,利用运行时间点作为key排序
- 定时器线程wait_until(lock,now()-context_map.begin())
- 添加定时任务时,利用std::condition_variable,实现线程同步
头文件:
class SafeTimer: public UPThread{
private:
std::condition_variable cond;
std::mutex mtx;
using uptime_point = std::chrono::system_clock::time_point;
/*
* 定时任务集合,key为运行时间点,value为定时任务
*/
using schedule_map = std::multimap<std::chrono::system_clock::time_point,UPContext*>;
schedule_map schedule;
/*
* 根据UPContext,快速在shedule_map中检索
*/
using context_map = std::map<UPContext*,schedule_map::iterator>;
context_map ctx_map;
/*
* True: 成功取消的UPContext,保证不会被运行;
* False:成功取消的UPContext,也有可能会被运行;
* 实现细节:True:加锁状态下运行,False:运行时会先释放锁
*/
bool safe_callbacks;
bool stop;
/*
* 线程入口函数
*/
void* entry() override;
/*
* 定时任务轮询线程
*/
void time_thread();
void dump(const char * caller = 0) const;
public:
SafeTimer(const SafeTimer&)=delete;
SafeTimer& operator=(const SafeTimer&)=delete;
SafeTimer(bool safe_callbacks=true);
virtual ~SafeTimer();
void init();
void shutdown();
bool empty(){
std::lock_guard<std::mutex> lock{mtx};
return ctx_map.empty();
}
/*
* 添加定时任务:
* 1. 相对时间之后运行
* 2. 指定的时间点运行
*/
UPContext* add_event_after(int seconds,UPContext*);
UPContext* add_event_at(uptime_point,UPContext*);
/*
* 取消定时任务
*/
bool cancel_event(UPContext*);
void cancel_all_event();
};
定时器的定义:
#include "Timer.hpp"
SafeTimer::SafeTimer(bool safe_callbacks)
:safe_callbacks(safe_callbacks),
stop(false)
{}
SafeTimer::~SafeTimer(){
assert(stop == true);
}
void* SafeTimer::entry(){
time_thread();
return nullptr;
}
void SafeTimer::time_thread(){
std::unique_lock<std::mutex> lock{mtx};
while(!stop){
auto now = std::chrono::system_clock::now();
if (schedule.empty()){
cond.wait(lock);
}else{
cond.wait_until(lock,schedule.begin()->first);
}
while(!schedule.empty()){
auto p = schedule.begin();
if (p->first > now)
break;
auto* ctx = p->second;
ctx_map.erase(ctx);
schedule.erase(p);
if (!safe_callbacks){
lock.unlock();
ctx->complete(0);
lock.lock();
}else{
ctx->complete(0);
}
if (!safe_callbacks && !stop){
break;
}
}
}
}
void SafeTimer::init(){
create("SafeTimer");
}
void SafeTimer::shutdown(){
{
cancel_all_event();
std::lock_guard<std::mutex> lock(mtx);
stop = true;
cond.notify_all();
}
join();
}
bool SafeTimer::cancel_event(UPContext* ctx){
std::lock_guard<std::mutex> lock(mtx);
auto iter = ctx_map.find(ctx);
if (iter == ctx_map.end()){
return false;
}
delete iter->first;
schedule.erase(iter->second);
ctx_map.erase(iter);
cond.notify_all();
return true;
}
void SafeTimer::cancel_all_event(){
std::lock_guard<std::mutex> lock(mtx);
while(!ctx_map.empty()){
auto ctx = ctx_map.begin();
delete ctx->first;
schedule.erase(ctx->second);
ctx_map.erase(ctx);
}
cond.notify_all();
}
void SafeTimer::dump(const char* caller) const {
if (!caller){
caller = "";
}
dout(10)<<"Dump "<<caller<<dendl;
for (schedule_map::const_iterator iter = schedule.begin();iter!=schedule.end();++iter){
dout(10)<<std::fixed<<std::chrono::duration<double>(iter->first.time_since_epoch()).count()<<"s->"<<iter->second<<dendl;
}
}
UPContext* SafeTimer::add_event_after(int sec,UPContext* ctx){
std::chrono::seconds t(sec);
auto tp = std::chrono::system_clock::now() + t;
return add_event_at(tp,ctx);
}
UPContext* SafeTimer::add_event_at(uptime_point when,UPContext* ctx){
if (stop){
delete ctx;
return nullptr;
}
schedule_map::iterator iter = schedule.insert(std::pair<uptime_point,UPContext*>(when,ctx));
std::pair<context_map::iterator, bool> citer = ctx_map.insert(std::pair<UPContext*,schedule_map::iterator>(ctx,iter));
if (citer.second){
return ctx;
}
/*
* 插在容器头部,则需立即唤醒信号处理线程
*/
if (iter == schedule.begin()){
cond.notify_all();
}
return ctx;
}
接口使用
std::shared_ptr<SafeTimer> timer = std::make_shared<SafeTimer>();
timer->init();
timer->add_event_after(5,new TestContext("Test1"));
timer->add_event_after(6,new TestContext("Test2"));
timer->add_event_after(10,new TestContext("Test3"));
sleep(14);
timer->shutdown();
网友评论