美文网首页GolangGolang 入门资料+笔记Golang语言社区
golang-nsq系列(三)--nsqlookupd源码解析

golang-nsq系列(三)--nsqlookupd源码解析

作者: 热爱coding的稻草 | 来源:发表于2019-11-23 20:15 被阅读0次

    上一篇 介绍了 nsqd 的代码逻辑与流程图,本篇来解析 nsq 中另一大模块 nsqlookupd,其负责维护 nsqd 节点的拓扑结构信息,实现了去中心化的服务注册与发现。

    1. nsqlookupd 执行入口

    nsq/apps/nsqlookupd/main.go 可以找到执行入口文件,如下:

    main.png

    2. nsqlookupd 执行主逻辑

    主要流程与上一篇讲的 nsqd 执行逻辑相似,区别是运行的具体任务不同。

    2.1 通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqlookupd 实例;

    func main() {
      prg := &program{}
      if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        logFatal("%s", err)
      }
    }
    
    func (p *program) Init(env svc.Environment) error {
      if env.IsWindowsService() {
        dir := filepath.Dir(os.Args[0])
        return os.Chdir(dir)
      }
      return nil
    }
    
    func (p *program) Start() error {
      opts := nsqlookupd.NewOptions()
    
      flagSet := nsqlookupdFlagSet(opts)
      ...
    }
    

    2.2 初始化配置参数(优先级:flagSet-命令行参数 > cfg-配置文件 > opts-默认值),开启协程,进入 nsqlookupd.Main() 主函数;

    options.Resolve(opts, flagSet, cfg)
      nsqlookupd, err := nsqlookupd.New(opts)
      if err != nil {
        logFatal("failed to instantiate nsqlookupd", err)
      }
      p.nsqlookupd = nsqlookupd
    
      go func() {
        err := p.nsqlookupd.Main()
        if err != nil {
          p.Stop()
          os.Exit(1)
        }
      }()
    

    2.3 开启 goroutine 执行 tcpServer, httpServer,分别监听 nsqd, nsqadmin 的客户端请求;

    func (l *NSQLookupd) Main() error {
      ctx := &Context{l}
    
      exitCh := make(chan error)
      var once sync.Once
      exitFunc := func(err error) {
        once.Do(func() {
          if err != nil {
            l.logf(LOG_FATAL, "%s", err)
          }
          exitCh <- err
        })
      }
    
      tcpServer := &tcpServer{ctx: ctx}
      l.waitGroup.Wrap(func() {
        exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
      })
      httpServer := newHTTPServer(ctx)
      l.waitGroup.Wrap(func() {
        exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
      })
    
      err := <-exitCh
      return err
    }
    

    2.4 TCPServer 循环监听客户端请求,建立长连接进行通信,并开启 handler 处理每一个客户端 conn

    func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
      logf(lg.INFO, "TCP: listening on %s", listener.Addr())
    
      for {
        clientConn, err := listener.Accept()
        if err != nil {
          if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
            logf(lg.WARN, "temporary Accept() failure - %s", err)
            runtime.Gosched()
            continue
          }
          // theres no direct way to detect this error because it is not exposed
          if !strings.Contains(err.Error(), "use of closed network connection") {
            return fmt.Errorf("listener.Accept() error - %s", err)
          }
          break
        }
        go handler.Handle(clientConn)
      }
    
      logf(lg.INFO, "TCP: closing %s", listener.Addr())
    
      return nil
    }
    

    2.5 httpServer 通过 http_api.Decorate 装饰器实现对各 http 路由进行 handler 装饰,如加 log 日志、V1 协议版本号的统一格式输出等;

    func newHTTPServer(ctx *Context) *httpServer {
      log := http_api.Log(ctx.nsqlookupd.logf)
    
      router := httprouter.New()
      router.HandleMethodNotAllowed = true
      router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
      router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
      router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
      s := &httpServer{
        ctx:    ctx,
        router: router,
      }
    
      router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
      router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
    
      // v1 negotiate
      router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
      router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
      router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
      router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
    }
    

    2.6 tcp 解析 V1 协议,走内部协议封装的 prot.IOLoop(conn) 进行循环处理客户端命令,直到客户端命令全部解析处理完毕才关闭连接;

    var prot protocol.Protocol
      switch protocolMagic {
      case "  V1":
        prot = &LookupProtocolV1{ctx: p.ctx}
      default:
        protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
          clientConn.RemoteAddr(), protocolMagic)
        return
      }
    
      err = prot.IOLoop(clientConn)
    

    2.7 通过内部协议进行 p.Exec(执行命令)、p.SendResponse(返回结果),保证每个 nsqd 节点都能正确的进行服务注册(register)与注销(unregister),并进行心跳检测(ping)节点的可用性,确保客户端取到的 nsqd 节点列表都是最新可用的。

    for {
        line, err = reader.ReadString('\n')
        if err != nil {
          break
        }
    
        line = strings.TrimSpace(line)
        params := strings.Split(line, " ")
    
        var response []byte
        response, err = p.Exec(client, reader, params)
        if err != nil {
          ctx := ""
          if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
            ctx = " - " + parentErr.Error()
          }
          _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
          if sendErr != nil {
            p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
            break
          }
          continue
        }
    
        if response != nil {
          _, err = protocol.SendResponse(client, response)
          if err != nil {
            break
          }
        }
      }
    
      conn.Close()
    

    3. nsqlookupd 流程图小结

    上述流程小结示意图如下:

    nsqlookupd.png

    【小结】通过源码阅读与解析,可以看出 nsqlookupd 的作用就是管理 nsqd 节点的认证、注册、注销、心跳检测,动态维护分布式集群中最新可用的 nsqd 节点列表供客户端取用;

    源码中使用了很多 RWMutex 读写锁、interface 协议公共接口、goroutine/channel 协程间并发通信,从而保证了高可用、高吞吐量的应用能力。

    相关文章

      网友评论

        本文标题:golang-nsq系列(三)--nsqlookupd源码解析

        本文链接:https://www.haomeiwen.com/subject/ybxkwctx.html