美文网首页Lua点滴程序员
skynet源码分析(2)--消息队列mq

skynet源码分析(2)--消息队列mq

作者: 天一阁图书管理员 | 来源:发表于2017-08-18 16:28 被阅读223次

    作者:shihuaping0918@163.com,转载请注明作者

    消息队列是skynet的核心功能之一,它的功能说白了就是入队出队,先进先出,这个数据结构都有讲过。源码实现在skynet_mq.h和skynet_mq.c中。下面来看一下这两个文件。

    #ifndef SKYNET_MESSAGE_QUEUE_H
    #define SKYNET_MESSAGE_QUEUE_H
    
    #include <stdlib.h>
    #include <stdint.h>
    //消息结构体
    struct skynet_message {
        uint32_t source; //从哪里来
        int session; //参考云风的博客,这个session是个标识
        void * data; //消息体
        size_t sz;//长度
    };
    
    // type is encoding in skynet_message.sz high 8bit
    #define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
    #define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)
    
    struct message_queue;
    //全局消息入队
    void skynet_globalmq_push(struct message_queue * queue);
    //全局消息出队
    struct message_queue * skynet_globalmq_pop(void);
    //创建一个消息队列
    struct message_queue * skynet_mq_create(uint32_t handle);
    void skynet_mq_mark_release(struct message_queue *q);
    //消息移除
    typedef void (*message_drop)(struct skynet_message *, void *);
    //队列释放
    void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud);
    //消息处理者handler
    uint32_t skynet_mq_handle(struct message_queue *);
    
    // 0 for success
    //消息出队
    int skynet_mq_pop(struct message_queue *q, struct skynet_message *message);
    //消息入队
    void skynet_mq_push(struct message_queue *q, struct skynet_message *message);
    
    // return the length of message queue, for debug
    int skynet_mq_length(struct message_queue *q);
    int skynet_mq_overload(struct message_queue *q);
    
    void skynet_mq_init();
    
    #endif
    
    

    上面有价值的东西实际就是消息的结构体,其它都是声明而已。

    //默认队列长度为64
    #define DEFAULT_QUEUE_SIZE 64
    //最大长度为max(16bit)+1=65536
    #define MAX_GLOBAL_MQ 0x10000
    
    // 0 means mq is not in global mq.
    // 1 means mq is in global mq , or the message is dispatching.
    
    #define MQ_IN_GLOBAL 1
    #define MQ_OVERLOAD 1024
    
    struct message_queue {
        struct spinlock lock;
        uint32_t handle;  //目标handler
        int cap; //容量
        int head; //头位置
        int tail; //末尾位置
        int release; //释放标记
        int in_global; //是否在全局队列中
        int overload; //最大负载
        int overload_threshold; //最大负载阀值
        struct skynet_message *queue; //循环数组
        struct message_queue *next; //下一个队列,链表
    };
    //全局消息队列,链表
    struct global_queue {
        struct message_queue *head; //头
        struct message_queue *tail; //尾
        struct spinlock lock;
    };
    
    static struct global_queue *Q = NULL;
    
    void 
    skynet_globalmq_push(struct message_queue * queue) {
        struct global_queue *q= Q;
    
        SPIN_LOCK(q)
        assert(queue->next == NULL);
        if(q->tail) { //链表不为空
            q->tail->next = queue;
            q->tail = queue;
        } else { //链表为空
            q->head = q->tail = queue;
        }
        SPIN_UNLOCK(q)
    }
    
    //取链表中第一个消息队列
    struct message_queue * 
    skynet_globalmq_pop() {
        struct global_queue *q = Q;
    
        SPIN_LOCK(q)
        struct message_queue *mq = q->head;
        if(mq) {
             //注意这里,队列取出来后,就从链表中删除了
            q->head = mq->next;
            if(q->head == NULL) {
                assert(mq == q->tail);
                q->tail = NULL;
            }
            mq->next = NULL;
        }
        SPIN_UNLOCK(q)
    
        return mq;
    }
    //创建一个消息队列
    struct message_queue * 
    skynet_mq_create(uint32_t handle) {
        struct message_queue *q = skynet_malloc(sizeof(*q));
        q->handle = handle;
        q->cap = DEFAULT_QUEUE_SIZE;
        q->head = 0;//刚开始头为0
        q->tail = 0;//刚开始尾也为0
        SPIN_INIT(q)
        // When the queue is create (always between service create and service init) ,
        // set in_global flag to avoid push it to global queue .
        // If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
        q->in_global = MQ_IN_GLOBAL;
        q->release = 0;
        q->overload = 0;
        q->overload_threshold = MQ_OVERLOAD;
        ///这里分配的是数组,是数组,是数组
        q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);
        q->next = NULL;
    
        return q;
    }
    //释放队列,回收内存
    static void 
    _release(struct message_queue *q) {
        assert(q->next == NULL);
        SPIN_DESTROY(q)
        skynet_free(q->queue);
        skynet_free(q);
    }
    //返回队列的handler
    uint32_t 
    skynet_mq_handle(struct message_queue *q) {
        return q->handle;
    }
    //获取队列长度,注意数组被循环使用的情况
    int
    skynet_mq_length(struct message_queue *q) {
        int head, tail,cap;
    
        SPIN_LOCK(q)
        head = q->head;
        tail = q->tail;
        cap = q->cap;
        SPIN_UNLOCK(q)
        //当还没有循环使用数组的时候
        if (head <= tail) {
            return tail - head;
        }
        //当数组已经被循环使用的时候
        return tail + cap - head;
    }
    //获取负载情况
    int
    skynet_mq_overload(struct message_queue *q) {
        if (q->overload) {
            int overload = q->overload;
            q->overload = 0; //这里清零是为了避免持续产生报警,在skynet-server.c中
            return overload;
        } 
        return 0;
    }
    
    //消息队列出队,从数组中出队
    int
    skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
        int ret = 1;
        SPIN_LOCK(q)
        //说明队列不是空的
        if (q->head != q->tail) {
            *message = q->queue[q->head++];  //注意head++,数据不移动,移动的是游标
            ret = 0;
            int head = q->head;
            int tail = q->tail;
            int cap = q->cap;
        //因为是循环数组,超出边界后要重头开始,所以设为0
            if (head >= cap) {
                q->head = head = 0;
            }
            //如果数组被循环使用了,那么tail < head
            int length = tail - head;
            if (length < 0) {
                length += cap;
            }
            //长度要超过阀值了,扩容一倍,和c++的vector一样的策略
            while (length > q->overload_threshold) {
                q->overload = length;
                q->overload_threshold *= 2;
            }
        } else { //队列是空的
            // reset overload_threshold when queue is empty
            q->overload_threshold = MQ_OVERLOAD;
        }
    
        if (ret) {
            q->in_global = 0;
        }
        
        SPIN_UNLOCK(q)
    
        return ret;
    }
    
    //为了方便和上面的函数对比,我把skynet_mq_push提上来了
    void 
    skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
        assert(message);
        SPIN_LOCK(q)
    //入队
        q->queue[q->tail] = *message;
    //因为是循环数组,越界了要重头开始
        if (++ q->tail >= q->cap) {
            q->tail = 0;
        }
    //如果首尾重叠了,要扩展
        if (q->head == q->tail) {
            expand_queue(q);
        }
    //重新放回全局队列中
        if (q->in_global == 0) {
            q->in_global = MQ_IN_GLOBAL;
            skynet_globalmq_push(q);
        }
        
        SPIN_UNLOCK(q)
    }
    //扩展循环数组
    static void
    expand_queue(struct message_queue *q) {
    //新建一个数组
        struct skynet_message *new_queue = skynet_malloc(sizeof(struct skynet_message) * q->cap * 2);
        int i;
        for (i=0;i<q->cap;i++) { //老数据拷过来
            new_queue[i] = q->queue[(q->head + i) % q->cap];
        }
        q->head = 0; //重设head
        q->tail = q->cap; //重设tail
        q->cap *= 2;
        
        skynet_free(q->queue); //释放老数组
        q->queue = new_queue;
    }
    
    //初始化全局队列
    void 
    skynet_mq_init() {
        struct global_queue *q = skynet_malloc(sizeof(*q));
        memset(q,0,sizeof(*q));
        SPIN_INIT(q);
        Q=q;
    }
    //服务释放标记
    void 
    skynet_mq_mark_release(struct message_queue *q) {
        SPIN_LOCK(q)
        assert(q->release == 0);
        q->release = 1;
        if (q->in_global != MQ_IN_GLOBAL) {
            skynet_globalmq_push(q);
        }
        SPIN_UNLOCK(q)
    }
    //释放服务,清空循环数组
    static void
    _drop_queue(struct message_queue *q, message_drop drop_func, void *ud) {
        struct skynet_message msg;
        while(!skynet_mq_pop(q, &msg)) {
            drop_func(&msg, ud);
        }
        _release(q); //回收内存
    }
    //释放服务相关的队列
    void 
    skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {
        SPIN_LOCK(q)
        
        if (q->release) {
            SPIN_UNLOCK(q)
            _drop_queue(q, drop_func, ud);
        } else {
            skynet_globalmq_push(q);
            SPIN_UNLOCK(q)
        }
    }
    
    

    代码到此分析结束,可以看出来,skynet的消息队列实际上是有两种,一种是全局消息队列,一种是服务消息队列。每个服务都有自己的消息队列,每个服务消息队列中都有服务的handle标识。这个涉及到消息的派发,这里就不展开了。每个服务消息队列被全局消息队列引用。

    全局消息队列用的是经典的链表来实现的,而服务的消息队列用的是比较不直观,可能对有些人来说理解起来特别困难的循环数组来实现的。而且数组空间不够的时候,会动态扩展,容量扩展为当前容量的2倍。

    消息队列的出队入队函数名都比较简单而且明了,push/pop。这个名字可能会带来一定的误解,如果改成enqueue/dequeue的话,就更符合它的实际功能。

    相关文章

      网友评论

      本文标题:skynet源码分析(2)--消息队列mq

      本文链接:https://www.haomeiwen.com/subject/gcgorxtx.html