消息订阅流程
- 客户端连接nsqd.TcpServer->通过protocolV2.SUB()->通过参数Exec()处理->获取topic对象->开启topic.messagePump()协程->向当前client的SubEventChan发送(指定接收)channel进行订阅
protocolV2处理对象
protocolV2.SUB()发布消息
- 每当client连接进来时,都会开启Protocol的Handle协程,执行IOLoop()来找到或新建client对象
- 订阅消息时会执行SUB(client,params)执行订阅指令,并且为当前nsqd新建或绑定topic对象,同时开启topic.messagePump()协程
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
// 获取订阅的topic name
topicName := string(params[1])
if !protocol.IsValidTopicName(topicName) {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
fmt.Sprintf("SUB topic name %q is not valid", topicName))
}
// 获取订阅的channel name
channelName := string(params[2])
if !protocol.IsValidChannelName(channelName) {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL",
fmt.Sprintf("SUB channel name %q is not valid", channelName))
}
var channel *Channel
for {
// 创建或绑定已有de topic
// 开启topic.messagePump()协程
topic := p.ctx.nsqd.GetTopic(topicName)
// 获取订阅的channel
channel = topic.GetChannel(channelName)
channel.AddClient(client.ID, client)
if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {
channel.RemoveClient(client.ID)
time.Sleep(1 * time.Millisecond)
continue
}
break
}
atomic.StoreInt32(&client.State, stateSubscribed)
client.Channel = channel
// 向当前client的SubEventChan发送channel
// 表示进行了订阅,并且使用channel接收消息
client.SubEventChan <- channel
return okBytes, nil
}
网友评论