nsq一共提供了几种消费者客户端工具:nsq_to_file、nsq_to_http、nsq_to_nsq
nsq_to_file 消息写入文件
执行命令:nsq_to_file --topic=test --output-dir=/tmp --channel=chan --lookupd-http-address=127.0.0.1:4161
- nsq_to_file提供连接tcp或http两种参数
- -lookupd-http-address value
lookupd HTTP address (may be given multiple times) - -nsqd-tcp-address value
nsqd TCP address (may be given multiple times)
- -lookupd-http-address value
- 总览nsq_to_file这个消费者的运行流程:
func main() {
...
discoverer := newTopicDiscoverer(cfg, hupChan, termChan, *httpConnectTimeout, *httpRequestTimeout)
// 请求 nsqlookupd,获取生产者信息并连接
discoverer.updateTopics(topics, *topicPattern)
// 开启一个poll线程
discoverer.poller(lookupdHTTPAddrs, len(topics) == 0, *topicPattern)
}
func (t *TopicDiscoverer) updateTopics(topics []string, pattern string) {
// 遍历处理topics
for _, topic := range topics {
cfl, err := newConsumerFileLogger(topic, t.cfg)
if err != nil {
log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err)
continue
}
}
}
func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
c, err := nsq.NewConsumer(topic, *channel, cfg)
if err != nil {
return nil, err
}
c.AddHandler(f)
err = c.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
return nil, err
}
err = c.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
return nil, err
}
}
ConnectToNSQLookupd与nsqlookupd交互
- 主线程执行ConnectToNSQLookupd
- ConnectToNSQLookupd将一个nsqlookupd地址添加到此使用者实例的列表中。
如果它是第一个被添加的,它将启动一次Consumer.lookupdLoop()协程进行搜索生产者 - 开启queryLookupd()线程来定时开启Consumer.lookupdLoop()协程poll搜索生产者
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
if atomic.LoadInt32(&r.stopFlag) == 1 {
return errors.New("consumer stopped")
}
if atomic.LoadInt32(&r.runningHandlers) == 0 {
return errors.New("no handlers")
}
if err := validatedLookupAddr(addr); err != nil {
return err
}
atomic.StoreInt32(&r.connectedFlag, 1)
r.mtx.Lock()
for _, x := range r.lookupdHTTPAddrs {
if x == addr {
r.mtx.Unlock()
return nil
}
}
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
numLookupd := len(r.lookupdHTTPAddrs)
r.mtx.Unlock()
// 第一次处理才开启lookupdLoop线程
if numLookupd == 1 {
r.queryLookupd()
r.wg.Add(1)
go r.lookupdLoop()
}
return nil
}
获取生产者信息
- nsq_to_file 客户端会请求 nsqlookupd的http接口,获取nsqlookupd分配给消费者的nsqd节点生产者信息
- 再去连接生产者,并发订阅消息指令
func (r *Consumer) queryLookupd() {
retries := 0
retry:
endpoint := r.nextLookupdEndpoint()
r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)
var data lookupResp
// 请求nsqlookupd 获取分配给此消费者的nsqd节点信息
err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
retries++
if retries < 3 {
r.log(LogLevelInfo, "retrying with next nsqlookupd")
goto retry
}
return
}
var nsqdAddrs []string
// 获取生产者地址
for _, producer := range data.Producers {
broadcastAddress := producer.BroadcastAddress
port := producer.TCPPort
joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
nsqdAddrs = append(nsqdAddrs, joined)
}
// apply filter
if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
}
for _, addr := range nsqdAddrs {
// 连接生产者,并发订阅消息指令
err = r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
continue
}
}
}
- 获取的nsqd,以及topic,chanel信息
日志1.png - qureying完成后进行连接
日志2.png
Consumer.lookupdLoop()协程poll
- lookupdLoop()协程会定时去执行queryLookupd()以获取nsqlookupd分配的nsqd生产者信息
// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
for {
select {
case <-ticker.C:
r.queryLookupd()
case <-r.lookupdRecheckChan:
r.queryLookupd()
case <-r.exitChan:
goto exit
}
}
}
网友评论