美文网首页
gnatsd的Subject数据结构及优于mosquitto的原

gnatsd的Subject数据结构及优于mosquitto的原

作者: 云上听风 | 来源:发表于2018-04-30 12:33 被阅读0次

    主题名(Subject Name)


    主题名是大小写敏感的,必须是非空字符串,不能包含空格,使用“.”符号来分层,mqtt中使用“/”分层。
    星号“*”匹配一层,跟mqtt中的“+”一样。
    大于号“>”匹配后面所有层,跟mqtt中的“#”一样。

    源码分析


    代码都在sublist.go
    先列出数据结构:

    // A Sublist stores and efficiently retrieves subscriptions.
    type Sublist struct {
        sync.RWMutex
        genid     uint64
        matches   uint64
        cacheHits uint64
        inserts   uint64
        removes   uint64
        cache     map[string]*SublistResult
        root      *level
        count     uint32
    }
    
    // A level represents a group of nodes and special pointers to
    // wildcard nodes.
    type level struct {
        nodes    map[string]*node
        pwc, fwc *node //pwc代表'*'节点,fwc代表'>'节点
    }
    
    // A node contains subscriptions and a pointer to the next level.
    type node struct {
        next  *level
        psubs []*subscription //普通订阅者列表
        qsubs [][]*subscription //queue订阅者列表
    }
    
    // New will create a default sublist
    func NewSublist() *Sublist {
        return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
    }
    
    // Create a new default level. We use FNV1A as the hash
    // algorithm for the tokens, which should be short.
    //FNV1A?历史遗留注释吧,这儿分明直接用了golang自带的map哈希。
    func newLevel() *level {
        return &level{nodes: make(map[string]*node)}
    }
    

    一开始使用NewSublist创建一个Sublist。Sublist保存了所有subject。
    Sublist初始化时创建了root节点。


    来看看怎么插入一个subject:

    
    // Insert adds a subscription into the sublist
    func (s *Sublist) Insert(sub *subscription) error {
        // copy the subject since we hold this and this might be part of a large byte slice.
        subject := string(sub.subject)
        tsa := [32]string{}
        tokens := tsa[:0]
        start := 0
        for i := 0; i < len(subject); i++ {
            if subject[i] == btsep {
                tokens = append(tokens, subject[start:i])
                start = i + 1
            }
        }
        tokens = append(tokens, subject[start:])
    
        s.Lock()
    
        sfwc := false
        l := s.root
        var n *node
    
        for _, t := range tokens {
            lt := len(t)
            if lt == 0 || sfwc { //如果此层长度为0或者上一层已经是'>'了,表示Subject是非法的
                s.Unlock()
                return ErrInvalidSubject
            }
    
            if lt > 1 { //不是*和>,直接map定位
                n = l.nodes[t]
            } else {
                switch t[0] {
                case pwc:
                    n = l.pwc
                case fwc:
                    n = l.fwc
                    sfwc = true //表示此层只能是最后一层
                default: //不是*和>,直接map定位
                    n = l.nodes[t]
                }
            }
            if n == nil { //node节点还没有则创建
                n = newNode()
                if lt > 1 {
                    l.nodes[t] = n
                } else {
                    switch t[0] {
                    case pwc:
                        l.pwc = n
                    case fwc:
                        l.fwc = n
                    default:
                        l.nodes[t] = n
                    }
                }
            }
            if n.next == nil {
                n.next = newLevel()
            }
            l = n.next //下一层
        }
        //上面循环结束后此时n是最后一层的node节点
        if sub.queue == nil { //不是queue,把sub加到psubs中。psubs切片存储了所有订阅此subject的client
            n.psubs = append(n.psubs, sub)
        } else {
            // This is a queue subscription
            if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
                n.qsubs[i] = append(n.qsubs[i], sub)
            } else {
                n.qsubs = append(n.qsubs, []*subscription{sub})
            }
        }
    
        s.count++
        s.inserts++
    
        s.addToCache(subject, sub)
        atomic.AddUint64(&s.genid, 1)
    
        s.Unlock()
        return nil
    }
    

    从Insert方法中可以理出整个数据结构:
    Sublist第一个层节点是root,root是个level结构,level代表一层。level包含了一个nodes map,nodes存储了此层的所有node,pwc和fwc分别代表了*和> node 。
    node的next指向了下一层level,node的psubs存储了普通subject订阅者client,qsubs存储的是queue类别的subject订阅者client。

    整个list是个树结构,只不过每层的node节点使用map哈希存储。

    先不管Cache干什么用的,先来看看查找匹配:


    // matchLevel is used to recursively descend into the trie.
    func matchLevel(l *level, toks []string, results *SublistResult) {
        var pwc, n *node
        for i, t := range toks {
            if l == nil {
                return
            }
            if l.fwc != nil { //全匹配,把下面的所有订阅者都加入到results中
                addNodeToResults(l.fwc, results)
            }
            if pwc = l.pwc; pwc != nil { //层匹配,递归子层
                matchLevel(pwc.next, toks[i+1:], results)
            }
            n = l.nodes[t] //查找节点
            if n != nil {  //找到继续下一层
                l = n.next
            } else {
                l = nil
            }
        }
        if n != nil { //找到节点,把订阅者加入到results中
            addNodeToResults(n, results)
        }
        if pwc != nil { //最后一层*通配符的订阅者加入到results中
            addNodeToResults(pwc, results)
        }
    }
    

    从指定level比如root开始遍历匹配下面的每一层nodes,如果匹配则把订阅者加入到result中,注意通配符的处理。

    每一层使用map快速定位node,使用切片存储此层所有订阅者。
    因为使用了map,查询定位比mosquitto的遍历链表树快的多。何况qnatsd还做了Cache。

    下面再来看看Cache:


    因为查找一个subject的所有订阅者比较费时间,所以使用cache缓存一部分subject订阅者信息,每次查找先去cache中查找,找不到再去sublist中查找,如果找到就加入到cache中,新增一个subject时也要加入到cache中。
    cache也有数量限制,当超过一定数量时删除最早的部分cache,防止cache过多。

    相关文章

      网友评论

          本文标题:gnatsd的Subject数据结构及优于mosquitto的原

          本文链接:https://www.haomeiwen.com/subject/qkjzlftx.html