nsq的nsqd组件中一条消息的流转过程
protocolV2
tcp链接建立后,connect交protocolV2的IOLoop方法处理。
client
protocolV2将connection包装成一个client,生成一个clientID,注册到NSQD中。
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
p.ctx.nsqd.AddClient(client.ID, client)
断开连接时移除这个client。
// 接收到SUB命令时 client会订阅一个channel 断开链接后移除这个client
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
p.ctx.nsqd.RemoveClient(client.ID)
client结构中主要的字段:
net.Conn 对应的tcp链接
Reader *bufio.Reader 对tcp链接读缓冲 16k
Writer *bufio.Writer 对tcp链接写缓冲 16k
Channel *Channel 订阅的channel
SubEventChan chan *Channel 在IOLoop中通知messagePump订阅
ClientID string 唯一标示
IOLoop
IOLoop主要包括:
- 主循环,从client中读取一行解析出对应命令并执行。
- 一个goroutine执行protocolV2的messagePump,处理订阅消息。
IOLoop的主循环
读client的connect的Reader包装,
这里用到了bufio.Reader的ReadSlice方法,利用了bufio.Reader的缓冲,避免重复申请内存。
读出的一行按separatorBytes
分割成多个参数,执行Exec。
line, err = client.Reader.ReadSlice('\n')
// trim the '\n'
line = line[:len(line)-1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes)
p.Exec(client, params)
Exec方法依靠params[0]决定具体调用的方法。
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
case bytes.Equal(params[0], []byte("RDY")):
return p.RDY(client, params)
case bytes.Equal(params[0], []byte("REQ")):
return p.REQ(client, params)
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params) // write to memoryChan or backenQueue
case bytes.Equal(params[0], []byte("MPUB")):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte("DPUB")):
return p.DPUB(client, params)
case bytes.Equal(params[0], []byte("NOP")):
return p.NOP(client, params)
case bytes.Equal(params[0], []byte("TOUCH")):
return p.TOUCH(client, params)
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
case bytes.Equal(params[0], []byte("CLS")):
return p.CLS(client, params)
case bytes.Equal(params[0], []byte("AUTH")):
return p.AUTH(client, params)
}
主要分析PUB
和SUB
。
PUB主要代码
topicName := string(params[1])
topic := p.ctx.nsqd.GetTopic(topicName) // 向下追可以看到如果是一个不存在的topic 将自动创建
msg := NewMessage(topic.GenerateID(), messageBody)
err = topic.PutMessage(msg)
PUB实现了将消息发布到一个topic的逻辑。
向下追topic.PutMessage的代码,在这里消息被写入memoryMsgChan或backend。
select {
case t.memoryMsgChan <- m: // 写入memoryMsgChan
default: // 写入backend
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
在nsq中,通过channel消费topic中的内容,在topic得到发布的消息时,会将topic中的消息复制到他的每一个channel中,这个过程在topic.messagePump中实现。
// 主要代码
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
}
for i, channel := range chans {
chanMsg := msg
channel.PutMessage(chanMsg)
}
}
流程简单概括为:
--> 读connect一行拆分多个参数
--> 执行第一个参数对应的方法(PUB)
--> 根据第二个参数获取topic,没有的话创建
--> 将消息的内容放到对应topic的memoryMsgChan或backendChan
--> topic的messagePump取出消息,将消息复制到每一个channel中
一个完整的PUB方法结束。
SUB代码
topicName := string(params[1])
channelName := string(params[2])
topic := p.ctx.nsqd.GetTopic(topicName) // 没有则创建
channel = topic.GetChannel(channelName) // 同上
channel.AddClient(client.ID, client)
client.Channel = channel
client.SubEventChan <- channel
SUB方法从两个参数中得到订阅的topic和channel,并将client注册到改channel中。
同时将channel传入client.SubEventChan,将在protocolV2的messagePump中进一步处理。
messagePump
这里有点复杂,没全看懂,写一下基本流程。
messagePump用来处理SUB之后的逻辑,从方才的SUB方法中看到,订阅一个频道后向client.SubEventChan中添加了这个频道。这个频道会在在messagePump取出进一步操作。
// 主要代码
for{
if subChannel == nil || !client.IsReadyForMessages() {
} else if flushed {
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
} else {
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case subChannel = <-subEventChan:
subEventChan = nil
case b := <-backendMsgChan:
msg, err := decodeMessage(b)
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
p.SendMessage(client, msg)
case msg := <-memoryMsgChan:
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
p.SendMessage(client, msg)
}
}
在SubEventChan中取得频道之后,循环memoryMsgChan和backendMsgChan指向这个频道。
在循环中select获取这个频道的subEventChan或memoryMsgChan。
上文中PUB方法最后将消息放入了channel中,和放入topic中一样channel有memoryMsgChan和backendMsgChan,在这里被订阅后开始消费。
只记录了最基本的SUB、PUB消息流转,还有大量细节。
网友评论