美文网首页
NSQ 源码学习笔记(一)

NSQ 源码学习笔记(一)

作者: 莫Y兮 | 来源:发表于2018-04-06 00:08 被阅读735次

    首先我们来看一下Nsq的组织结构:

    • nsqd:接收,分发队列信息的守护进程,可以单独部署,也可以集群化运行
    • nsqlookupd:管理nsqd节点,服务发现
    • nsqadmin:nsq的可视化管理工具

    NSQ的拓补图

    @拓扑图 | center

    NSQ中Topic和channel的关系

    Topic会将消息发送到每个订阅者(channel)
    channel的读消费类似负载均衡,会均匀的投递到各个消费端

    @Topic和channel的关系 | center

    三个模块中nsqd模块最为重要,我们从这个模块开始学习它的源码

    入口函数

    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
    var cfg config
    configFile := flagSet.Lookup("config").Value.String()
    if configFile != "" {
        _, err := toml.DecodeFile(configFile, &cfg)
        if err != nil {
            log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
        }
    }
    cfg.Validate()
    
    opts := nsqd.NewOptions()
    options.Resolve(opts, flagSet, cfg)
    nsqd := nsqd.New(opts)
    
    nsqd.LoadMetadata()
    err := nsqd.PersistMetadata()
    if err != nil {
        log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
    }
    nsqd.Main()
    <-signalChan
    nsqd.Exit()
    
    1. 首先用 signal.Notify 阻塞系统的 killctrl+c 信号,让进程可以处于deamon的状态运行
    2. 按优先级合并配置文件:命令行 > 配置文件 > 默认值
    3. nsqd.LoadMetadata 读取dat文件,加载topic和channel信息,并同步运行和停止的状态
    4. 将进程的运行状态(topic和channel信息)持久化到dat文件中
    5. 执行 nsqd.Main 直到捕捉退出信号

    nsqd.Main 的代码位于 nsqd/nsqd.go

    NSQD主函数(TCP监听)

    func (n *NSQD) Main() {
        var httpListener net.Listener
        var httpsListener net.Listener
    
        ctx := &context{n}
    
        tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
        if err != nil {
            n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
            os.Exit(1)
        }
        n.Lock()
        n.tcpListener = tcpListener
        n.Unlock()
        tcpServer := &tcpServer{ctx: ctx}
        n.waitGroup.Wrap(func() {
            protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
        })
        ...
    }
    

      NSQD首先启动了tcp监听模型,为了保证通用性,在 protocol 包中封装了TCPServer,需要传入 Listener, TCPHandler, Logger 对象。所有的TCP监听均可以用这个模式来创建监听,只要传入对应的 ListenerTCPHandler ,那么Listener在Accept到Connect的时候,将其交给对应TCPHandler.Handle(clientConn) 执行。

    TCPHandler 的Interface实现

    package protocol
    
    type TCPHandler interface {
        Handle(net.Conn)
    }
    
    func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
        l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))
    
        for {
            clientConn, err := listener.Accept()
            if err != nil {
                if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                    l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                    runtime.Gosched()
                    continue
                }
                // theres no direct way to detect this error because it is not exposed
                if !strings.Contains(err.Error(), "use of closed network connection") {
                    l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
                }
                break
            }
    
            // 启动Goroutine 去执行Handle函数
            go handler.Handle(clientConn)
        }
    
        l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
    }
    

      这里体现了Go在实现Interface的便捷之处,不需要显示的声明实现了某个Interface,只需要完全的实现Interface中定义的方法,那么就会默认该类型实现了接口。在这里不同的Handler,只要实现了Handle(net.Conn),就可以被当做TCPHandler对象传入。在代码中的体现是:
      执行Handle函数时是启动一个Goroutine来执行的,这里其实是per connect per goroutine,由于Golang的特性,Goroutine在执行时的调度模式是epoll模式,可以很好的利用系统的多核资源。

    main函数中TCPServer的实现

    type tcpServer struct {
        ctx *context
    }
    
    func (p *tcpServer) Handle(clientConn net.Conn) {
        p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())
    
        // 客户端应该初始化本身通过发送一个4字节序列表示协议的版本,
        // 这样将允许我们优雅地升级兼容协议
        buf := make([]byte, 4)
        _, err := io.ReadFull(clientConn, buf)
        if err != nil {
            p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
            return
        }
        protocolMagic := string(buf)
    
        p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
    
        var prot protocol.Protocol
        switch protocolMagic {
        case "  V2":
            prot = &protocolV2{ctx: p.ctx} // V2版本的协议操作
        default:
            protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
            clientConn.Close()
            p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
                clientConn.RemoteAddr(), protocolMagic)
            return
        }
    
        err = prot.IOLoop(clientConn)
        if err != nil {
            p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
            return
        }
    }
    

      源码中标记了需要在通讯时预留4个字节的版本号信息,用来兼容协议的升级。如果未来有协议升级,只需要在protocolMagic中添加新的case分支就可以了。

    NSQD主函数(HTTP/HTTPS监听)

        if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
            httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
            if err != nil {
                n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
                os.Exit(1)
            }
            n.Lock()
            n.httpsListener = httpsListener
            n.Unlock()
            httpsServer := newHTTPServer(ctx, true, true)
            n.waitGroup.Wrap(func() {
                http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
            })
        }
        httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
        if err != nil {
            n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
            os.Exit(1)
        }
        n.Lock()
        n.httpListener = httpListener
        n.Unlock()
        httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
        n.waitGroup.Wrap(func() {
            http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
        })
    

      这里不论是http还是https的监听,httpsServerhttpServer作为Handler对象,均在内部声明了路由规则,不同的请求定义了不同的操作,最后通过http_api.Serve()绑定端口监听

    NSQD默认自启的操作

        n.waitGroup.Wrap(func() { n.queueScanLoop() }) // 循环消息分发
        n.waitGroup.Wrap(func() { n.idPump() }) // 生产唯一消息id的一个队列
        n.waitGroup.Wrap(func() { n.lookupLoop() }) // 如果nsqd有变化,同步nsqlookup
        if n.getOpts().StatsdAddress != "" {
            // 定时将nsqd的状态以短连接的方式发送至一个状态监护进程.包括了nsqd的应用资源信息,以及nsqd上topic的信息
            n.waitGroup.Wrap(func() { n.statsdLoop() })
        }
    

      启动监听后,除了通过监听启动的操作外,NSQD还有一些类似守护进程的操作会一直运行,包括:

    • 循环消息分发
    • 生产唯一消息ID
    • nsqlookup的状态同步
    • 状态监控

    相关文章

      网友评论

          本文标题:NSQ 源码学习笔记(一)

          本文链接:https://www.haomeiwen.com/subject/icljhftx.html