美文网首页
ZeroMQ源码解析之yqueue

ZeroMQ源码解析之yqueue

作者: 分享放大价值 | 来源:发表于2016-10-12 17:41 被阅读373次

    What is yqueue?

    yqueue是ZMQ底层实现的一个高效的队列数据结构。它的主要特点有两个:

    • 动态增长的时候,它能够尽可能少的进行内存分配和内存释放。
    • 允许生产者线程和消费者线程不加锁的进行生产和消费。

    注意:用户必须保证不在空的yqueue上进行出队(pop)操作,并且在未进行同步的情况下,生产者和消费者不应该同时访问同一个元素。

    yqueue API

    ZMQ实现的yqueue提供了如下几个接口:

    template <typename T, int N> class yqueue_t {
    public:
        // 获取队头元素的引用
        inline T &front();
        // 获取队尾元素的引用
        inline T &back();
        // 向队列插入元素
        inline void push();
        // 撤销最近一次得插入操作
        inline void unpush();
        // 将队头出列
        inline void pop();
    }
    

    ZMQ的队列操作和C++的queue“很不一样”,在使用该数据结构时要非常小心。

    • yqueue的push并不接收参数,而只是将队列扩容,真正要插入元素需要在调用push方法之后,调用yqueue.back() = val
    • yqueue的pop并不真正把数据删除,而只是将队头“指针”向后移动。并且该操作不返回队头的元素。想得到队头的元素需要在调用pop方法之前,调用T val = yqueue.front()
    • yqueue的unpush是独有的,与pop类似,在调用unpush之前,要先调用T garbage = yqueue.front(),同时,调用者还要保证以下两个条件:
      • 队列不空
      • 释放要撤销的元素占用的资源

    Implementation

    yqueue实质上是对如下结构体的封装:

    private:
        struct chunk_t {
            T values [N];
            chunk_t *prev;
            chunk_t *next;
        };
    
    • yqueue的内存动态分配都是以chunk_t为单位,其内部是包含N个元素的数组(ZMQ使用的N值是256,通过这样的设置,能够削弱约99.6%的由内存分配带来的影响)。

    yqueue所有队列的操作都是通过下面的几个指针来完成的:

    private:
        chunk_t *begin_chunk;
        int begin_pos;
        chunk_t *back_chunk;
        int back_pos;
        chunk_t *end_chunk;
        int end_pos;
    
    • chunk_t *指针标志了对应的chunk_t块,而int值表明了块内偏移。利用这两个参数可以定位到yqueue内部的任意元素。
    • 使用者应该保证三个指针分别被各自使用的线程独占,或互斥的占有。对应到API也就是说,pop、push和unpush操作应保证使用时被某线程独占;而生产者和消费者之间可以是两个无需同步的线程。

    yqueue的生产者和消费者不用加锁是通过一个原子指针实现的:

    private:
        template <typename T> class atomic_ptr_t {
        public:
            // 非线程安全,在初始化时用来为指定初值
            inline void set(T *ptr);
            // 原子“交换”操作,将值设置为val并返回之前的值
            inline T *xchg(T *val);
            // 原子的“对比 & 交换”操作,若旧值与cmp相等,边赋值为val;
            // 否则旧值不变。最后返回旧值。
            inline T *cas(T *cmp, T*val);
        private:
            volatile T *ptr;
        };
        atomic_ptr_t<chunk_t> spare_chunk;
    
    • atomic_ptr_t实质上是对各种平台的互斥操作进行了封装,保证了指针在任意时刻只能又一个线程持有,从而保证了基于该指针进行的所有操作的原子性。

    yqueue之所以能极大程度的减少内存分配,并不仅仅是选取恰当的N值,还得益于底层实现采用的“内存重用”技术:

    inline void push() {
        // 将back()依赖的指针定位到yqueue尾部,以便push()调用后执行back()操作
        back_chunk = end_chunk;
        back_pos = end_pos;
    
        // 当前的块仍有空间,不需要扩容
        if (++end_pos != N)
            return;
    
        // 获取原子指针的值,并将原子指针的值设为NULL
        // 如果有pop()操作造成的废弃的块,yqueue不会马上释放
        // 而是用原子指针保存,以便内存重用
        chunk_t *sc = spare_chunk.xchg(NULL);
        if (sc) {
            // 若此时有可重用的内存,则sc非空
            end_chunk->next = sc;
            sc->prev = end_chunk;
        } else {
            // sc == NULL表示没有可重用内存,此时需要malloc动态分配
            end_chunk->next = (chunk_t*)malloc(sizeof(chunk_t));
            end_chunk->next->prev = end_chunk;
        }
        // 将队尾移到扩容后的块上
        end_chunk = end_chunk->next;
        end_pos = 0;
    }
    
    inline void pop() {
        if (++begin_pos == N) {
            // 当前块已全部出队(废弃),用临时指针o指向
            chunk_t *o = begin_chunk;
            // 将队头指向下一块的开始,剔除旧块,并复位偏移量
            begin_chunk = begin_chunk->next;
            begin_chunk->prev = NULL;
            begin_pos = 0;
    
            // 根据“最近经常使用”内存分页算法,将O设为重用块效率最高
            // 也就是说,如果马上扩容时,不需要进行内存页面替换操作
            // geek,利用操作系统内存管理,追求性能到极致
            chunk_t *cs = spare_chunk.xchg(o);
            // 别忘了释放旧的、不用的块,这回是真的没用了
            free(cs);
        }
    }
    
    • 通过选取合适的N值,以及pop和push操作巧妙的利用原子指针进行内存重用,是的yqueue极大程度的减少了内存分配和销毁的开销。

    Tips

    • 高效往往是利用巧妙的数据结构,结合操作系统经过精心设计的结晶。

    相关文章

      网友评论

          本文标题:ZeroMQ源码解析之yqueue

          本文链接:https://www.haomeiwen.com/subject/kgviyttx.html