定时器

作者: 大海无垠_af22 | 来源:发表于2023-02-25 22:19 被阅读0次

    基于set实现

    set有有序表,因此基于时间可以做到一个有序的任务列表,方便的实现添加、删除和查询功能。

    struct TimerItem {
        list_head link;
        int64_t target_time;
    };
    
    template<class T, TimerItem T::*timer_item = &T::timer_item>
    class TimerBySet {
        using Unit = int64_t;
        inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
        inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
        inline static list_head* get_link(T* t) {
            return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
        }
        inline static const list_head* get_link(const T* t) {
            return &((t->*timer_item).link);
        }
        struct Compare {
            bool operator()(T*c, T*r) const {
                auto cus = get_timeout(c), rus = get_timeout(r);
                if (cus != rus) return cus < rus;
                return reinterpret_cast<const void*>(c) < reinterpret_cast<const void*>(r);
            }
        };
    public:
        void init(Unit) {}
        bool empty() const {
            return items_.empty();
        }
        size_t size() const {
            return items_.size();
        }
        Unit min_time() const {
            if (items_.empty()) return 0;
            auto itr = *items_.begin();
            return get_timeout(itr);
        }
        void insert(Unit time_us, T* t) {
            set_timeout(t, time_us);
            items_.insert(t);
        }
        void remove(T* t) {
            items_.erase(t);
        }
        void pop(Unit now, list_head* result_list) {
            result_list->clear();
            if (empty() || min_time() > now) return;
            while (!items_.empty() && get_timeout(*items_.begin()) <= now) {
                T* t = *items_.begin();
                items_.erase(items_.begin());
                set_timeout(t, 0);
                result_list->push_back(get_link(t));
            }
        }
    
    private:
        std::set<T*, Compare> items_;
    };
    

    基于堆

    小根堆可以很方便得到最小值,插入和删除也是O(log(N))复杂度。

    struct TimerHeapItem: public TimerItem {
        size_t index;
    };
    
    template<class T, TimerHeapItem T::*timer_item = &T::timer_item>
    class TimerByHeap {
        using Unit = int64_t;
        inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
        inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
        inline static void set_index(T* t, size_t index) { (t->*timer_item).index = index;}
        inline static size_t get_index(const T* t) { return (t->*timer_item).index;}
        inline static list_head* get_link(T* t) {
            return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
        }
        inline static const list_head* get_link(const T* t) {
            return &((t->*timer_item).link);
        }
    public:
        void init(Unit) {}
        bool empty() const { return heap_.empty();}
        size_t size() const { return heap_.size();}
        Unit min_time() const {
            if (empty()) return 0;
            return get_timeout(heap_[0]);
        }
        void insert(Unit tp, T* t) {
            assert(tp > 0);
            set_timeout(t, tp);
            set_index(t, size());
            heap_.push_back(t);
            _adjust(size() - 1);
        }
        void remove(T* t) {
            size_t index = get_index(t);
            if (unlikely(index >= size())) {
                return;
            }
            set_index(t, std::numeric_limits<size_t>::max());
            if (unlikely(index + 1 == size())) {
                heap_.back() = nullptr;
                heap_.pop_back();
                return;
            }
            t = heap_[index] = heap_.back();
            set_index(t, index);
            heap_.back() = nullptr;
            heap_.pop_back();
            _adjust(index);
        }
        void pop(Unit now, list_head* result_list) {
            result_list->clear();
            while (!empty() && get_timeout(heap_[0]) <= now) {
                T* t = heap_[0];
                remove(t);
                result_list->push_back(get_link(t));
            }
        }
    
    private:
        bool _less_than(size_t a, size_t b) {
            return get_timeout(heap_[a]) < get_timeout(heap_[b]);
        }
        void _swap(size_t a, size_t b) {
            set_index(heap_[a], b);
            set_index(heap_[b], a);
            std::swap(heap_[a], heap_[b]);
        }
        void _adjust(size_t index) {
            assert(index < heap_.size());
            // adjust up
            for (size_t pos = index; pos > 0; ) {
                size_t father = (pos - 1) >> 1;
                if (!_less_than(pos, father)) {
                    break;
                }
                _swap(pos, father);
                pos = father;
            }
            // adjust down
            for (;index < size();) {
                size_t left = 2 * index + 1, right = left + 1;
                if (left >= size()) break;
                if (right < size() && _less_than(right, index) && _less_than(right, left)) {
                    _swap(index, right);
                    index = right;
                    continue;
                }
                if (_less_than(left, index)) {
                    _swap(index, left);
                    index = left;
                    continue;
                }
                break;
            }
        }
        std::deque<T*> heap_;
    };
    

    基于时间轮

    时间轮本质上是利用时间的整数特性实现的特殊的哈希表。
    这里的采用int64表示时间,每6位二进制做一个时间轮,总计11个轮,每个轮有2^6=64个分片。插入的时间按照高位到低位和now对比,第一个不同的轮就是需要插入的位置,再取轮中的6位插入对应的分片中。删除时也能从时间反推到插入的位置,由于采用双向链表结构,插入和删除都是常量时间复杂度。pop和查询min_time()相对麻烦,耗时更多。考虑到插入和删除是频率最高的调用,因此这个耗时在接受范围内。为了加快pop和查询,同时还采用位图记录每个分片是否为空,通过位图的高效查找跳过空的分片。
    位图的代码如下。

    inline size_t high_zero_bit_num(uint64_t num) {
        return __builtin_clzll(num);
    }
    inline size_t low_zero_bit_num(uint64_t num) {
        return __builtin_ctzll(num);
    }
    
    template<size_t BitNum>
    class BitMap {
    public:
        BitMap() {
            clear();
        }
        void clear() {
            for (size_t i=0; i< kUnitNum;++i) bitmap_[i] = 0;
        }
        inline void clear_bit(size_t nth) {
            if (unlikely(nth >= BitNum)) return;
            size_t idx = nth >> kBitmapShift;
            size_t bit = nth & kUnitMask;
            bitmap_[idx] &= ~(uint64_t(1) << bit);
        }
        inline void set_bit(size_t nth) {
            if (unlikely(nth >= BitNum)) return;
            size_t idx = nth >> kBitmapShift;
            size_t bit = nth & kUnitMask;
            bitmap_[idx] |= uint64_t(1) << bit;
        }
        size_t size() const { return BitNum;}
        inline bool get_bit(size_t nth) const {
            if (unlikely(nth >= BitNum)) return false;
            size_t idx = nth >> kBitmapShift;
            size_t bit = nth & kUnitMask;
            return (bitmap_[idx] >> bit) & 1;
        }
        // [start, end)
        // return nth where get_bit(nth) == true, else return end if not found
        inline size_t find_first_set_bit(size_t start = 0, size_t end = BitNum) const {
            if (unlikely(end > BitNum)) {
                end = BitNum;
            }
            if (unlikely(start >= end)) {
                return end;
            }
            size_t idx_start = start >> kBitmapShift;
            size_t bit_start = start & kUnitMask;
            size_t idx_end = end >> kBitmapShift;
            size_t bit_end = end & kUnitMask;
            uint64_t unit = (bitmap_[idx_start] >> bit_start) << bit_start;
            size_t zn = low_zero_bit_num(unit);
            if (zn < kBitmapUnit) {
                return (idx_start<<kBitmapShift) + zn;
            }
            for (size_t k=idx_start+1; k<idx_end; ++k) {
                zn = low_zero_bit_num(bitmap_[k]);
                if (zn == kBitmapUnit) {
                    continue;
                }
                return (k<<kBitmapShift) + zn;
            }
            if (idx_end >= kUnitNum) {
                return end;
            }
            zn = low_zero_bit_num(bitmap_[idx_end]);
            if (zn < bit_end) {
                return (idx_end<<kBitmapShift) + zn;
            }
            return end;
        }
    
    private:
        static constexpr size_t kBitmapShift = 6;
        static constexpr size_t kBitmapUnit = 1 << kBitmapShift;
        static constexpr size_t kUnitMask = kBitmapUnit - 1;
        static constexpr size_t kUnitNum =
            (BitNum + kBitmapUnit - 1) >> kBitmapShift;
        std::array<uint64_t, kUnitNum> bitmap_;
    };
    

    时间轮代码如下。

    struct TimerItem {
        list_head link;
        int64_t target_time;
    };
    
    template<class T, TimerItem T::*timer_item = &T::timer_item>
    class TimeWheel {
    private:
        // timer = [wheel1, wheel2, ...]
        // wheel = [slot1, slot2, ... slot_64]
        using Unit = int64_t;
        static constexpr size_t kUnitBits = 8 * sizeof(Unit);
        static constexpr size_t kSlotBits = 6;
        static constexpr size_t kSlotNum = 1 << kSlotBits;
        static constexpr size_t kWheels = (kUnitBits + kSlotBits - 1)/kSlotBits;
        static constexpr size_t kSlotAll = kWheels * kSlotNum;
        Unit now_{};
        size_t size_{};
        BitMap<kSlotAll> slot_bitmap_;
        list_head timeouts_{};
        std::array<list_head, kSlotAll> items_;
    
        inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
        inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
        inline static list_head* get_link(T* t) {
            return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
        }
        inline static const list_head* get_link(const T* t) {
            return &((t->*timer_item).link);
        }
        inline static T* from_link(list_head* h) {
            return const_cast<T*>(from_link(static_cast<const list_head*>(h)));
        }
        inline static const T* from_link(const list_head* h) {
            static const size_t off =
                reinterpret_cast<size_t>(get_link(static_cast<const T*>(nullptr)));
            if (h == nullptr) return nullptr;
            const char* ptr = reinterpret_cast<const char*>(h) - off;
            const T* t = reinterpret_cast<const T*>(ptr);
            return t;
        }
        inline size_t slot_in_wheel(Unit u, size_t wheel) const {
            // assert(wheel < kWheels);
            return (u >> (kSlotBits * (kWheels - 1 - wheel))) & (kSlotNum - 1);
        }
        inline size_t wheel_at(Unit u) const {
            size_t zero_bit_n = high_zero_bit_num(now_ ^ u);
            assert(zero_bit_n < kUnitBits);
            return (zero_bit_n + kWheels * kSlotBits - kUnitBits) / kSlotBits;
        }
        inline size_t slot_in_global(Unit u) const {
            size_t wheel = wheel_at(u);
            size_t index = (wheel << kSlotBits) | slot_in_wheel(u, wheel);
            return index;
        }
        inline void _remove(list_head* h, list_head* result_list) {
            remove(from_link(h));
            result_list->push_back(h);
        }
        inline void _insert_to_slots(Unit tp, list_head* node) {
            auto global_slot = slot_in_global(tp);
            list_head& h = items_[global_slot];
            bool empty = h.empty();
            h.push_back(node);
            if (empty) {
                slot_bitmap_.set_bit(global_slot);
            }
        }
        inline void _pop_to_list(list_head* result_list, list_head* head) {
            for (list_head* h=head->next; h != head; h=head->next) {
                _remove(h, result_list);
            }
        }
        // [start_slot, end_slot)
        inline void _pop_wheel(size_t wheel, size_t end_slot, list_head* result_list) {
            size_t global_base_slot = wheel << kSlotBits;
            size_t start_slot = slot_in_wheel(now_, wheel);
            size_t start_global = global_base_slot + start_slot;
            size_t end_global = global_base_slot + end_slot;
            for(size_t i = start_global; i < end_global; ++i) {
                i = slot_bitmap_.find_first_set_bit(i, end_global);
                if (i == end_global) {
                    break;
                }
                list_head* head = &items_[i];
                _pop_to_list(result_list, head);
                slot_bitmap_.clear_bit(i);
            }
        }
        inline bool _find_min_time(size_t wheel, Unit* tp) const {
            size_t slot_base = wheel << kSlotBits;
            size_t slot_end = (wheel+1) << kSlotBits;
            size_t slot_start = slot_in_wheel(now_, wheel) + 1 + slot_base;
            size_t global_slot = slot_bitmap_.find_first_set_bit(slot_start, slot_end);
            if (global_slot >= slot_end) {
                return false;
            }
            *tp = now_ & ~((Unit(1)<<(kSlotBits*(kWheels - wheel))) - 1);
            *tp += (global_slot - slot_base) << (kSlotBits * (kWheels - wheel - 1));
            return true;
        }
    
    public:
        TimeWheel() = default;
        void debug_check_bitmap() const {
            for(size_t i=0;i<kSlotAll;++i){
                if (items_[i].empty()){
                    log_assert(!slot_bitmap_.get_bit(i))<<"bit not 0, i="<<i;
                } else {
                    log_assert(slot_bitmap_.get_bit(i))<<"bit not 1, i="<<i;
                }
            }
        }
        void init(Unit now) {
            assert(now >= 0);
            now_ = now;
            slot_bitmap_.clear();
        }
        bool empty() const { return 0 == size_;}
        size_t size() const { return size_;}
        // return mt, all timer after mt
        // if some timer timeouts or no timer, return now
        Unit min_time() const {
            if (empty() || !timeouts_.empty()) return now_;
            Unit tp = 0;
            for (size_t w=kWheels; w > 0 && !_find_min_time(w-1, &tp);--w);
            return tp;
        }
        void insert(Unit tp, T* t) {
            if (unlikely(t == nullptr)) {
                return;
            }
            auto node = get_link(t);
            assert(node->empty());
            assert(tp > 0);
            set_timeout(t, tp);
            ++size_;
            if (tp <= now_) {
                timeouts_.push_back(node);
                return;
            }
            _insert_to_slots(tp, node);
        }
        void remove(T* t) {
            if (unlikely(t == nullptr)) {
                return;
            }
            list_head* th = get_link(t);
            auto tp = get_timeout(t);
            if (th->empty() || tp < 0) return;
            set_timeout(t, 0);
            th->remove();
            assert(size_ > 0);
            --size_;
            if (tp > now_) {
                size_t global_slot = slot_in_global(tp);
                if (items_[global_slot].empty()) {
                    slot_bitmap_.clear_bit(global_slot);
                }
            }
        }
        void pop(Unit now, list_head* result_list) {
            result_list->remove();
            if (now_ > now) {
                return;
            }
            if (now == now_) {
                _pop_to_list(result_list, &timeouts_);
                return;
            }
            auto start_wheel = wheel_at(now);
            /*
            now_: A1 B1 C1 D1
            now : A1 B2 C2 D2
            */
            for (size_t wheel = kWheels - 1; wheel > start_wheel; --wheel) {
                size_t end_slot = kSlotNum;
                _pop_wheel(wheel, end_slot, result_list);
            }
            size_t end_slot = slot_in_wheel(now, start_wheel);
            _pop_wheel(start_wheel, end_slot, result_list);
            now_ = now;
            size_t target_slot = start_wheel * kSlotNum + end_slot;
            list_head* slot_head = &items_[target_slot];
            while (slot_head->next != slot_head) {
                list_head* node=slot_head->next;
                T* t = from_link(node);
                auto tp = get_timeout(t);
                if (tp <= now) {
                    _remove(node, result_list);
                    continue;
                }
                _insert_to_slots(tp, node);
            }
            slot_bitmap_.clear_bit(target_slot);
        }
    
        DISALLOW_COPY(TimeWheel);
    };
    

    测试代码

    struct TimeoutPerfItem {
        // TimerItem timer_item;
        TimerHeapItem timer_item;
        int64_t get_timeout_us() const { return timer_item.target_time;}
        const list_head* get_timeout_link() const { return &timer_item.link;}
    };
    
    // using TestTimer = TimerBySet<TimeoutPerfItem>;
    using TestTimer = TimerByHeap<TimeoutPerfItem>;
    // using TestTimer = wt::TimeWheel<TimeoutPerfItem>;
    
    static void test_timer_benchmark(size_t num) {
        size_t rm_num = num/2;
        int64_t base_us = wt::steady_us();
        TestTimer tm1{};
        std::vector<TimeoutPerfItem> items;
        items.resize(num);
        auto m1 = wt::System::process_memory_kb();
        auto t0 = wt::steady_us();
        tm1.init(base_us);
        for (size_t i = 0; i < num; i++) {
            tm1.insert(base_us + i * 97, &items[i]);
        }
        auto t1 = wt::steady_us();
        auto m2 = wt::System::process_memory_kb();
        log_info()<<"timer benchmark: num="<<num;
        // insert num:1000000, mem_kb:0
        log_info()<<"insert num:"<<num<<",items_mem_kb:"<<m1<<", mem_kb:"<<(m2-m1);
        // insert cost_ms:8, avg_ns:8
        log_assert(tm1.size() == num)<<"insert, size="<<tm1.size();
        log_info()<<"insert cost_ms:"<<(t1-t0)/1000
            <<", avg_ns:"<<1000*(t1-t0)/num;
        auto t2 = wt::steady_us();
        for (size_t i = 0; i < rm_num;++i) {
            tm1.remove(&items[i]);
        }
        auto t3 = wt::steady_us();
        auto dt_rm = t3 - t2;
        log_assert(tm1.size() == num - rm_num)<<"remove, size="<<tm1.size();
        // remove cost_ms:2, avg_ns:4
        log_info()<<"remove cost_ms:"<<dt_rm/1000
            <<", avg_ns:"<<1000*dt_rm/rm_num;
        auto t4 = wt::steady_us();
        for (size_t i = rm_num; i < num;++i) {
            list_head result;
            tm1.pop(items[i].get_timeout_us(), &result);
            log_assert(result.size() == 1);
            log_assert(result.next == items[i].get_timeout_link());
        }
        auto t5 = wt::steady_us();
        auto dt_pop = t5 - t4;
        log_assert(tm1.size() == 0)<<"pop, size="<<tm1.size();
        // pop cost_ms:40, avg_ns:81
        log_info()<<"pop cost_ms:"<<dt_pop/1000
            <<", avg_ns:"<<1000*dt_pop/(num - rm_num);
    
        tm1.init(base_us);
        for (size_t i = rm_num; i < num; i++) {
            tm1.insert(base_us + i * 97, &items[i]);
        }
        // tm1.debug_check_bitmap();
        auto t6 = wt::steady_us();
        for (size_t i = rm_num; i < num;++i) {
            auto min_time = tm1.min_time();
            log_assert(min_time > items[i-1].get_timeout_us()
                    && min_time <= items[i].get_timeout_us()) << "i=" <<i
                <<",min_time="<<min_time<<",expect="<<items[i].get_timeout_us();
            list_head result;
            tm1.pop(items[i].get_timeout_us(), &result);
            log_assert(result.size() == 1);
            log_assert(result.next == items[i].get_timeout_link());
        }
        auto dt7 = wt::steady_us() - t6;
        auto dt_mintime = std::max(dt7, dt_pop) - dt_pop;
        // min_time cost_ms:11, avg_ns:22
        log_info()<<"min_time cost_ms:"<<dt_mintime/1000
            <<", avg_ns:"<<1000*dt_mintime/(num - rm_num);
    }
    
    DefineBoolArg(enable_test_timer, "run test_timer()");
    static void test_timer() {
        if (!enable_test_timer) return;
        for (size_t num: {10, 100, 1000, 2000}) {
            test_timer_benchmark(10000 * num);
        }
    }
    

    性能对比结果

    实现类型 num insert remove pop min_time 内存/KB
    基于Set 1万 147 74 50 0 4804
    100万 186 103 40 0 42240
    1000万 239 124 31 0 421872
    2000万 285 132 35 2 468596
    基于堆 1万 27 301 338 0 1108
    100万 15 299 283 8 7392
    1000万 14 398 422 0 73656
    2000万 15 458 462 0 147704
    基于时间轮 1万 4 4 53 7 0
    100万 4 4 52 3 0
    1000万 4 4 51 18 0
    2000万 5 4 65 7 0

    结论:基于set的实现插入耗时巨大,应该是申请内存和节点遍历的原因,删除耗时次之,可见节点太多后树太高,遍历耗时,pop只删除起始节点,耗时相对小一点,min_time()查询很快。set耗内存很高,1千万定时器内存达到422MB。
    结论:基于堆的实现deque由大块内存组织而成,插入又是固定在尾巴,耗时很小;pop和删除总是在头部节点,删除后将尾部置换到头部再重新调整,需要调整堆的所有层,因此耗时巨大;min_time()没有耗时,符合预期。1千万定时器内存消耗74MB,相对set小了很多。
    结论:基于时间轮的实现插入和删除耗时仅5ns,而且保持在常量时间,符合预期;pop耗时50-60ns,在可接受范围内,min_time查询时间在7ns左右,可以当做常量时间。内存上没有额外开销,完全采用item即可。综上性能上,完胜前两个实现。

    相关文章

      网友评论

          本文标题:定时器

          本文链接:https://www.haomeiwen.com/subject/cbcqldtx.html