美文网首页Golang
nsq源码(4) nsqd 消息订阅

nsq源码(4) nsqd 消息订阅

作者: Linrundong | 来源:发表于2019-01-21 17:55 被阅读1次

消息订阅流程

  • 客户端连接nsqd.TcpServer->通过protocolV2.SUB()->通过参数Exec()处理->获取topic对象->开启topic.messagePump()协程->向当前client的SubEventChan发送(指定接收)channel进行订阅

protocolV2处理对象

protocolV2.SUB()发布消息
  • 每当client连接进来时,都会开启Protocol的Handle协程,执行IOLoop()来找到或新建client对象
  • 订阅消息时会执行SUB(client,params)执行订阅指令,并且为当前nsqd新建或绑定topic对象,同时开启topic.messagePump()协程
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    // 获取订阅的topic name
    topicName := string(params[1])
    if !protocol.IsValidTopicName(topicName) {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
            fmt.Sprintf("SUB topic name %q is not valid", topicName))
    }

    // 获取订阅的channel name
    channelName := string(params[2])
    if !protocol.IsValidChannelName(channelName) {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL",
            fmt.Sprintf("SUB channel name %q is not valid", channelName))
    }

    var channel *Channel
    for {
        // 创建或绑定已有de topic
        // 开启topic.messagePump()协程
        topic := p.ctx.nsqd.GetTopic(topicName)
        // 获取订阅的channel
        channel = topic.GetChannel(channelName)
        channel.AddClient(client.ID, client)

        if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {
            channel.RemoveClient(client.ID)
            time.Sleep(1 * time.Millisecond)
            continue
        }
        break
    }
    atomic.StoreInt32(&client.State, stateSubscribed)
    client.Channel = channel
    
    // 向当前client的SubEventChan发送channel
    // 表示进行了订阅,并且使用channel接收消息
    client.SubEventChan <- channel

    return okBytes, nil
}

相关文章

网友评论

    本文标题:nsq源码(4) nsqd 消息订阅

    本文链接:https://www.haomeiwen.com/subject/opqhjqtx.html