主题名(Subject Name)
主题名是大小写敏感的,必须是非空字符串,不能包含空格,使用“.”符号来分层,mqtt中使用“/”分层。
星号“*”匹配一层,跟mqtt中的“+”一样。
大于号“>”匹配后面所有层,跟mqtt中的“#”一样。
源码分析
代码都在sublist.go
。
先列出数据结构:
// A Sublist stores and efficiently retrieves subscriptions.
type Sublist struct {
sync.RWMutex
genid uint64
matches uint64
cacheHits uint64
inserts uint64
removes uint64
cache map[string]*SublistResult
root *level
count uint32
}
// A level represents a group of nodes and special pointers to
// wildcard nodes.
type level struct {
nodes map[string]*node
pwc, fwc *node //pwc代表'*'节点,fwc代表'>'节点
}
// A node contains subscriptions and a pointer to the next level.
type node struct {
next *level
psubs []*subscription //普通订阅者列表
qsubs [][]*subscription //queue订阅者列表
}
// New will create a default sublist
func NewSublist() *Sublist {
return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
}
// Create a new default level. We use FNV1A as the hash
// algorithm for the tokens, which should be short.
//FNV1A?历史遗留注释吧,这儿分明直接用了golang自带的map哈希。
func newLevel() *level {
return &level{nodes: make(map[string]*node)}
}
一开始使用NewSublist创建一个Sublist。Sublist保存了所有subject。
Sublist初始化时创建了root节点。
来看看怎么插入一个subject:
// Insert adds a subscription into the sublist
func (s *Sublist) Insert(sub *subscription) error {
// copy the subject since we hold this and this might be part of a large byte slice.
subject := string(sub.subject)
tsa := [32]string{}
tokens := tsa[:0]
start := 0
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
start = i + 1
}
}
tokens = append(tokens, subject[start:])
s.Lock()
sfwc := false
l := s.root
var n *node
for _, t := range tokens {
lt := len(t)
if lt == 0 || sfwc { //如果此层长度为0或者上一层已经是'>'了,表示Subject是非法的
s.Unlock()
return ErrInvalidSubject
}
if lt > 1 { //不是*和>,直接map定位
n = l.nodes[t]
} else {
switch t[0] {
case pwc:
n = l.pwc
case fwc:
n = l.fwc
sfwc = true //表示此层只能是最后一层
default: //不是*和>,直接map定位
n = l.nodes[t]
}
}
if n == nil { //node节点还没有则创建
n = newNode()
if lt > 1 {
l.nodes[t] = n
} else {
switch t[0] {
case pwc:
l.pwc = n
case fwc:
l.fwc = n
default:
l.nodes[t] = n
}
}
}
if n.next == nil {
n.next = newLevel()
}
l = n.next //下一层
}
//上面循环结束后此时n是最后一层的node节点
if sub.queue == nil { //不是queue,把sub加到psubs中。psubs切片存储了所有订阅此subject的client
n.psubs = append(n.psubs, sub)
} else {
// This is a queue subscription
if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
n.qsubs[i] = append(n.qsubs[i], sub)
} else {
n.qsubs = append(n.qsubs, []*subscription{sub})
}
}
s.count++
s.inserts++
s.addToCache(subject, sub)
atomic.AddUint64(&s.genid, 1)
s.Unlock()
return nil
}
从Insert方法中可以理出整个数据结构:
Sublist第一个层节点是root,root是个level结构,level代表一层。level包含了一个nodes map,nodes存储了此层的所有node,pwc和fwc分别代表了*和> node 。
node的next指向了下一层level,node的psubs存储了普通subject订阅者client,qsubs存储的是queue类别的subject订阅者client。
整个list是个树结构,只不过每层的node节点使用map哈希存储。
先不管Cache干什么用的,先来看看查找匹配:
// matchLevel is used to recursively descend into the trie.
func matchLevel(l *level, toks []string, results *SublistResult) {
var pwc, n *node
for i, t := range toks {
if l == nil {
return
}
if l.fwc != nil { //全匹配,把下面的所有订阅者都加入到results中
addNodeToResults(l.fwc, results)
}
if pwc = l.pwc; pwc != nil { //层匹配,递归子层
matchLevel(pwc.next, toks[i+1:], results)
}
n = l.nodes[t] //查找节点
if n != nil { //找到继续下一层
l = n.next
} else {
l = nil
}
}
if n != nil { //找到节点,把订阅者加入到results中
addNodeToResults(n, results)
}
if pwc != nil { //最后一层*通配符的订阅者加入到results中
addNodeToResults(pwc, results)
}
}
从指定level比如root开始遍历匹配下面的每一层nodes,如果匹配则把订阅者加入到result中,注意通配符的处理。
每一层使用map快速定位node,使用切片存储此层所有订阅者。
因为使用了map,查询定位比mosquitto的遍历链表树快的多。何况qnatsd还做了Cache。
下面再来看看Cache:
因为查找一个subject的所有订阅者比较费时间,所以使用cache缓存一部分subject订阅者信息,每次查找先去cache中查找,找不到再去sublist中查找,如果找到就加入到cache中,新增一个subject时也要加入到cache中。
cache也有数量限制,当超过一定数量时删除最早的部分cache,防止cache过多。
网友评论