美文网首页Golang
nsq源码(9) nsqlookupd与nsqd交互

nsq源码(9) nsqlookupd与nsqd交互

作者: Linrundong | 来源:发表于2019-01-22 17:47 被阅读18次

nsqlookupd与nsq交互

nsqd带参数启动

  • 除了接收pub发布的topic,还可以通过硬盘备份的文件恢复创建topic
  • 在nsqd启动时除了开启tcp,http与queueScanLoop超时检测线程,还有开启一个lookupLoop线程去注册nsqlookupd
func (p *program) Start() error {
    opts := nsqd.NewOptions()

    // 读取参数设置Options对象
    flagSet := nsqdFlagSet(opts)
    flagSet.Parse(os.Args[1:])

    options.Resolve(opts, flagSet, cfg)
    // 使用Options对象创建nsqd对象
    nsqd := nsqd.New(opts)

    nsqd.Main()
    ...
}

func (n *NSQD) Main() {
    // 恢复创建备份文件中的topic
    err := nsqd.LoadMetadata()
    if err != nil {
        log.Fatalf("ERROR: %s", err.Error())
    }

    // 超时消息检索和处理任务
    n.waitGroup.Wrap(n.queueScanLoop)

    // 根据参数选择注册中心nsqlookupd
    n.waitGroup.Wrap(n.lookupLoop)
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }
}

NSQD.LoadMetadata() 备份文件创建topic

  • 在创建topic时除了开启messagePump线程接收memoryMsgChan队列
  • 还会nsqd.Notify(t)通过topic.notifyChan队列通知lookupLoop线程去注册该topic
func (n *NSQD) LoadMetadata() error {
    // 读取nsqd.dat硬盘备份文件
    fn := newMetadataFile(n.getOpts())

    // 根据备份文件创建topic
    for _, t := range m.Topics {
        if !protocol.IsValidTopicName(t.Name) {
            n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
            continue
        }
        topic := n.GetTopic(t.Name)
        if t.Paused {
            topic.Pause()
        }
        for _, c := range t.Channels {
            if !protocol.IsValidChannelName(c.Name) {
                n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
                continue
            }
            channel := topic.GetChannel(c.Name)
            if c.Paused {
                channel.Pause()
            }
        }
        topic.Start()
    }
    return nil
}

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    t.waitGroup.Wrap(t.messagePump)

    // 退出或者通知nsqdlookup进行注册操作
    // 通过topic.notifyChan队列通知lookupLoop线程去注册该topic
    t.ctx.nsqd.Notify(t)

    return t
}

NSQD.lookupLoop 接收通知进行topic/channel注册

  • 在nsqd启动时开启的lookupLoop线程会循环处理notifyChan队列发来的消息
// 为当前nsqd绑定nsqlookupd
func (n *NSQD) lookupLoop() {
    for {
        if connect {
            for _, host := range n.getOpts().NSQLookupdTCPAddresses {
                if in(host, lookupAddrs) {
                    continue
                }
                n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
                // 读取参数
                // LOOKUP connecting to 127.0.0.1:4160
                // LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.1.0 BroadcastAddress:sz-linrundong}
                lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
                    connectCallback(n, hostname))
                // 进行连接nsqlooupd操作
                lookupPeer.Command(nil) // start the connection
                lookupPeers = append(lookupPeers, lookupPeer)
                lookupAddrs = append(lookupAddrs, host)
            }
            n.lookupPeers.Store(lookupPeers)
            connect = false
        }

        select {
        // 通过NSQD.notifyChan队列获取nsqlookupd发来的消息
        case val := <-n.notifyChan:
            var cmd *nsq.Command
            var branch string

            // 断言解析队列消息
            switch val.(type) {
            case *Channel:
                // notify all nsqlookupds that a new channel exists, or that it's removed
                branch = "channel"
                channel := val.(*Channel)
                if channel.Exiting() == true {
                    cmd = nsq.UnRegister(channel.topicName, channel.name)
                } else {
                    //拼装注册命令
                    cmd = nsq.Register(channel.topicName, channel.name)
                }
            case *Topic:
                // notify all nsqlookupds that a new topic exists, or that it's removed
                branch = "topic"
                topic := val.(*Topic)
                if topic.Exiting() == true {
                    cmd = nsq.UnRegister(topic.name, "")
                } else {
                    cmd = nsq.Register(topic.name, "")
                }
            }

            // 向每个lookupd发送请求命令cmd
            for _, lookupPeer := range lookupPeers {
                n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
                }
            }
        case <-n.exitChan:
            goto exit
        }
    }

exit:
    n.logf(LOG_INFO, "LOOKUP: closing")
}

向nsqlookupd发送注册请求

  • 发送选项以及序列化channel,获取响应
func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
    initialState := lp.state
    if lp.state != stateConnected {
        err := lp.Connect()
        if err != nil {
            return nil, err
        }
        lp.state = stateConnected
        _, err = lp.Write(nsq.MagicV1)
        if err != nil {
            lp.Close()
            return nil, err
        }
        if initialState == stateDisconnected {
            lp.connectCallback(lp)
        }
        if lp.state != stateConnected {
            return nil, fmt.Errorf("lookupPeer connectCallback() failed")
        }
    }
    if cmd == nil {
        return nil, nil
    }
    _, err := cmd.WriteTo(lp)
    if err != nil {
        lp.Close()
        return nil, err
    }
    resp, err := readResponseBounded(lp, lp.maxBodySize)
    if err != nil {
        lp.Close()
        return nil, err
    }
    return resp, nil
}

相关文章

网友评论

    本文标题:nsq源码(9) nsqlookupd与nsqd交互

    本文链接:https://www.haomeiwen.com/subject/qfjrjqtx.html