美文网首页
NSQ 源码学习笔记(三)

NSQ 源码学习笔记(三)

作者: 莫Y兮 | 来源:发表于2018-04-06 00:06 被阅读147次

    上一篇的最后一段代码中,channel中的消息在发送至客户端时,也同步了一份消息发送到了inFight队列中

        subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
    

    这里其实一开始不是很明白,在上网查阅了资料后,了解到inFlight队列是NSQ用来实现消息至少投递一次的。知道了功能后,再来看就很明了了。

    func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
        now := time.Now()
        msg.clientID = clientID
        msg.deliveryTS = now
        msg.pri = now.Add(timeout).UnixNano()
        err := c.pushInFlightMessage(msg)
        if err != nil {
            return err
        }
        c.addToInFlightPQ(msg)
        return nil
    }
    

    上述代码中,首先初始化消息的过期时间timeout+now,通过将msg加入到InFlight队列中,InFlight其实是一个堆排序队列,优先级是按照超时时间来排序的,越靠近过期时间,将会越靠前。这里只是将消息存入队列,那么在哪里消费呢?我们在第一篇笔记中的末尾,Nsqd在完成监听部分的初始化后,有四个自启动的goroutine,第一个通过Wrap启动的n.queueScanLoop()就是用来执行消费的。

    func (n *NSQD) queueScanLoop() {
        //任务派发 队列
        workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    
        //任务结果 队列
        responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    
        // 用来优雅关闭
        closeCh := make(chan int)
        // 利用Ticket来定期开始任务和调整worker
        workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
        refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
    
        channels := n.channels()
        // 调整worker
        n.resizePool(len(channels), workCh, responseCh, closeCh)
    
        for {
            select {
            case <-workTicker.C: // 开始一次任务的派发
                if len(channels) == 0 {
                    continue
                }
            case <-refreshTicker.C:  // 重新调整 worker 数量
                channels = n.channels()
                n.resizePool(len(channels), workCh, responseCh, closeCh)
                continue
            case <-n.exitChan:  // 退出
                goto exit
            }
            
            // num最大为nsqd的所有channel总数
            num := n.getOpts().QueueScanSelectionCount
            if num > len(channels) {
                num = len(channels)
            }
    
            loop:
            // 随机取出num个channel, 派发给 worker 进行 扫描
            for _, i := range util.UniqRands(num, len(channels)) {
                workCh <- channels[i]
            }
    
            // 接收 扫描结果, 统一 有多少 channel 是 "脏" 的
            numDirty := 0
            for i := 0; i < num; i++ {
                if <-responseCh {
                    numDirty++
                }
            }
    
            // 假如 "脏" 的 "比例" 大于阀值, 则不等待 workTicker
            // 马上进行下一轮 扫描
            if float64(numDirty) / float64(num) > n.getOpts().QueueScanDirtyPercent {
                goto loop
            }
        }
    
        exit:
        n.logf("QUEUESCAN: closing")
        close(closeCh)
        workTicker.Stop()
        refreshTicker.Stop()
    }
    
    // resizePool adjusts the size of the pool of queueScanWorker goroutines
    //
    //  1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
    //
    func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
        // 校验启动的worker数量,最大为nsqd的所有channel数 * 1/4,
        idealPoolSize := int(float64(num) * 0.25)
        if idealPoolSize < 1 {
            idealPoolSize = 1
        } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
            idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
        }
        for {
            // 当前启动的worker数等于设定的idealPoolSize,那么直接返回,
            // 如果大于了idealPoolSize,通过closeCh关闭一个worker
            // 如果未达到idealPoolSize,启动worker的goroutine
            if idealPoolSize == n.poolSize {
                break
            } else if idealPoolSize < n.poolSize {
                // contract
                closeCh <- 1
                n.poolSize--
            } else {
                // expand
                n.waitGroup.Wrap(func() {
                    n.queueScanWorker(workCh, responseCh, closeCh)
                })
                n.poolSize++
            }
        }
    }
    

    worker的具体实现是queueScanWorker

    // queueScanWorker receives work (in the form of a channel) from queueScanLoop
    // and processes the deferred and in-flight queues
    func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
        for {
            select {
            case c := <-workCh:
                now := time.Now().UnixNano()
                dirty := false
                // 实现消息至少被投递一次
                if c.processInFlightQueue(now) {
                    dirty = true
                }
                // 实现延迟消息队列
                if c.processDeferredQueue(now) {
                    dirty = true
                }
                // 如果有过期消息的存在,则dirty
                responseCh <- dirty
            case <-closeCh:
                return
            }
        }
    }
    
    func (c *Channel) processInFlightQueue(t int64) bool {
        c.exitMutex.RLock()
        defer c.exitMutex.RUnlock()
    
        if c.Exiting() {
            return false
        }
    
        dirty := false
        for {
            c.inFlightMutex.Lock()
            // 从队列中获取已经过期的消息
            msg, _ := c.inFlightPQ.PeekAndShift(t)
            c.inFlightMutex.Unlock()
            
            if msg == nil {
                goto exit
            }
            dirty = true
            // 如果获取到了符合条件的msg,按msg.ID将msg在infight队列中删除
            _, err := c.popInFlightMessage(msg.clientID, msg.ID)
            if err != nil {
                goto exit
            }
            atomic.AddUint64(&c.timeoutCount, 1)
            c.RLock()
            client, ok := c.clients[msg.clientID]
            c.RUnlock()
            if ok {
                client.TimedOutMessage()
            }
            // 消息在channel中发起重新投递
            c.doRequeue(msg)
        }
    
    exit:
        return dirty
    }
    
    // 延迟消息队列的实现
    func (c *Channel) processDeferredQueue(t int64) bool {
        c.exitMutex.RLock()
        defer c.exitMutex.RUnlock()
    
        if c.Exiting() {
            return false
        }
    
        dirty := false
        for {
            c.deferredMutex.Lock()
            item, _ := c.deferredPQ.PeekAndShift(t)
            c.deferredMutex.Unlock()
    
            if item == nil {
                goto exit
            }
            dirty = true
    
            msg := item.Value.(*Message)
            _, err := c.popDeferredMessage(msg.ID)
            if err != nil {
                goto exit
            }
            c.doRequeue(msg)
        }
    
    exit:
        return dirty
    }
    

      上面的两个函数processDeferredQueueprocessInFlightQueue的实现基本一致,那为什么相同的逻辑要实现两次呢。两个队列,DeferredQueue 用 head 包实现, InFlightQueue 自己又实现了一次heap, 其实跟 DeferredQueue 不是一样的么?

      之前两个就真是是一样的, 后来有一个提交,里面的注释是: this eliminates the use of container/heap and the associated cost of boxing and interface type assertions.

    https://github.com/nsqio/nsq/commit/74bfde101934700cb0cd980d01b6dfe2fe5a6a53

      意思就是说, 这些 队列里 存的是 Message 这个类型, 如果使用 heap, 需要存到 heap.Item 的 Value 里,而这个value 是一个 interface{} , 赋值 和 取值 都需要做类型推断 和 包装,那么作为 InFlightQueue 这个 “高负荷” 的队列, 减少这种 “类型推断和包装” , 有利于提高性能

    测试一下:

    type Item struct {
        d1 int
        d2 int
    }
    
    func BenchmarkT1(b *testing.B) {
        q := make([]*Item, 0)   // 不需要类型推断的 slice
        for i := 0; i < b.N; i++ {
            q = append(q, &Item{i, i})
        }
        for _, hero := range q {
            hero.d1++
        }
    }
    
    func BenchmarkT2(b *testing.B) {
        q := make([]interface{}, 0)
        for i := 0; i < b.N; i++ {
            q = append(q, &Item{i, i})
        }
        for _, hero := range q {
            hero.(*Item).d1++   // 需要做类型推断
        }
    }
    

    测试结果:

    BenchmarkT1-8           10000000               241 ns/op
    BenchmarkT2-8            5000000               332 ns/op
    

    相关文章

      网友评论

          本文标题:NSQ 源码学习笔记(三)

          本文链接:https://www.haomeiwen.com/subject/eqljhftx.html