美文网首页Golang
nsq源码(11) nsqlookupd与消费者交互

nsq源码(11) nsqlookupd与消费者交互

作者: Linrundong | 来源:发表于2019-02-13 09:39 被阅读9次

    nsq一共提供了几种消费者客户端工具:nsq_to_file、nsq_to_http、nsq_to_nsq

    nsq_to_file 消息写入文件

    执行命令:nsq_to_file --topic=test --output-dir=/tmp --channel=chan --lookupd-http-address=127.0.0.1:4161

    • nsq_to_file提供连接tcp或http两种参数
      • -lookupd-http-address value
        lookupd HTTP address (may be given multiple times)
      • -nsqd-tcp-address value
        nsqd TCP address (may be given multiple times)
    • 总览nsq_to_file这个消费者的运行流程:
    func main() {
        ...
        discoverer := newTopicDiscoverer(cfg, hupChan, termChan, *httpConnectTimeout, *httpRequestTimeout)
        
        // 请求 nsqlookupd,获取生产者信息并连接
        discoverer.updateTopics(topics, *topicPattern)
        // 开启一个poll线程
        discoverer.poller(lookupdHTTPAddrs, len(topics) == 0, *topicPattern)
    }
    
    
    func (t *TopicDiscoverer) updateTopics(topics []string, pattern string) {
        // 遍历处理topics
        for _, topic := range topics {
            cfl, err := newConsumerFileLogger(topic, t.cfg)
            if err != nil {
                log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err)
                continue
            }
        }
    }
    
    func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
        c, err := nsq.NewConsumer(topic, *channel, cfg)
        if err != nil {
            return nil, err
        }
    
        c.AddHandler(f)
    
        err = c.ConnectToNSQDs(nsqdTCPAddrs)
        if err != nil {
            return nil, err
        }
    
        err = c.ConnectToNSQLookupds(lookupdHTTPAddrs)
        if err != nil {
            return nil, err
        }
    }
    

    ConnectToNSQLookupd与nsqlookupd交互

    • 主线程执行ConnectToNSQLookupd
    • ConnectToNSQLookupd将一个nsqlookupd地址添加到此使用者实例的列表中。
      如果它是第一个被添加的,它将启动一次Consumer.lookupdLoop()协程进行搜索生产者
    • 开启queryLookupd()线程来定时开启Consumer.lookupdLoop()协程poll搜索生产者
    func (r *Consumer) ConnectToNSQLookupd(addr string) error {
        if atomic.LoadInt32(&r.stopFlag) == 1 {
            return errors.New("consumer stopped")
        }
        if atomic.LoadInt32(&r.runningHandlers) == 0 {
            return errors.New("no handlers")
        }
    
        if err := validatedLookupAddr(addr); err != nil {
            return err
        }
    
        atomic.StoreInt32(&r.connectedFlag, 1)
    
        r.mtx.Lock()
        for _, x := range r.lookupdHTTPAddrs {
            if x == addr {
                r.mtx.Unlock()
                return nil
            }
        }
        r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
        numLookupd := len(r.lookupdHTTPAddrs)
        r.mtx.Unlock()
    
        // 第一次处理才开启lookupdLoop线程
        if numLookupd == 1 {
            r.queryLookupd()
            r.wg.Add(1)
            go r.lookupdLoop()
        }
    
        return nil
    }
    

    获取生产者信息

    • nsq_to_file 客户端会请求 nsqlookupd的http接口,获取nsqlookupd分配给消费者的nsqd节点生产者信息
    • 再去连接生产者,并发订阅消息指令
    func (r *Consumer) queryLookupd() {
        retries := 0
    
    retry:
        endpoint := r.nextLookupdEndpoint()
    
        r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)
    
        var data lookupResp
        // 请求nsqlookupd 获取分配给此消费者的nsqd节点信息
        err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
        if err != nil {
            r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
            retries++
            if retries < 3 {
                r.log(LogLevelInfo, "retrying with next nsqlookupd")
                goto retry
            }
            return
        }
    
        var nsqdAddrs []string
        // 获取生产者地址
        for _, producer := range data.Producers {
            broadcastAddress := producer.BroadcastAddress
            port := producer.TCPPort
            joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
            nsqdAddrs = append(nsqdAddrs, joined)
        }
        // apply filter
        if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
            nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
        }
        for _, addr := range nsqdAddrs {
            // 连接生产者,并发订阅消息指令
            err = r.ConnectToNSQD(addr)
            if err != nil && err != ErrAlreadyConnected {
                r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
                continue
            }
        }
    }
    
    • 获取的nsqd,以及topic,chanel信息

      日志1.png
    • qureying完成后进行连接

      日志2.png

    Consumer.lookupdLoop()协程poll

    • lookupdLoop()协程会定时去执行queryLookupd()以获取nsqlookupd分配的nsqd生产者信息
    // poll all known lookup servers every LookupdPollInterval
    func (r *Consumer) lookupdLoop() {
        for {
            select {
            case <-ticker.C:
                r.queryLookupd()
            case <-r.lookupdRecheckChan:
                r.queryLookupd()
            case <-r.exitChan:
                goto exit
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:nsq源码(11) nsqlookupd与消费者交互

        本文链接:https://www.haomeiwen.com/subject/rwrreqtx.html