美文网首页
nsqd的TCP网络架构

nsqd的TCP网络架构

作者: fake_smile_boy | 来源:发表于2018-12-10 10:06 被阅读0次

    nsqd.Main()主函数

    func (n *NSQD) Main() {
    var err error
    ctx := &context{n}
    
    n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)
    if err != nil {
        n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
        os.Exit(1)
    }
    n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    if err != nil {
        n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
        os.Exit(1)
    }
    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        if err != nil {
            n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
            os.Exit(1)
        }
    }
    
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
    })
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
    })
    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
        })
    }
    
    n.waitGroup.Wrap(n.queueScanLoop)
    n.waitGroup.Wrap(n.lookupLoop)
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }
    }
    

    这里多处用了n.waitGroup.Wrap(),其实是封装了sync.WaitGroup,类似与python的join()功能,用来等待goroutine执行完毕后再退出程序。
    其中w.Add(1)会将计数器+1,w.Done()会让计数器-1,如果计数器归0时,程序不再阻塞,继续执行后面的步骤。

    type WaitGroupWrapper struct {
        sync.WaitGroup
    }
    
    func (w *WaitGroupWrapper) Wrap(cb func()) {
        w.Add(1)
        go func() {
            cb()
            w.Done()
        }()
    }
    

    首先我们来看下TCP监听模型,protocol包封装了TCPServer的实现,TCPServer是个通用函数,只要传入listener、TCPHandler和logger变创建了一个TCP服务器。该TCP网络架构为每当accept一个客户端,交给传入的TCPHandler的handle函数处理一个连接。一个连接由一个goroutine来handle。

    type TCPHandler interface {
        Handle(net.Conn)
    }
    
    func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
        logf(lg.INFO, "TCP: listening on %s", listener.Addr())
    
        for {
            clientConn, err := listener.Accept()
            if err != nil {
                if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                    logf(lg.WARN, "temporary Accept() failure - %s", err)
                    runtime.Gosched()
                    continue
                }
                // theres no direct way to detect this error because it is not exposed
                if !strings.Contains(err.Error(), "use of closed network connection") {
                    logf(lg.ERROR, "listener.Accept() - %s", err)
                }
                break
            }
            go handler.Handle(clientConn)
        }
    
        logf(lg.INFO, "TCP: closing %s", listener.Addr())
    }
    

    这里可以看出interface的运用,只要实现Handle这个方法,传入的TCPHandler就可以调用Handle()函数处理客户端连接。保证了这个TCPServer的通用性。

    接下来看下每个goroutine具体的处理逻辑

    type tcpServer struct {
        ctx *context
    }
    
    func (p *tcpServer) Handle(clientConn net.Conn) {
        p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())
    
        // The client should initialize itself by sending a 4 byte sequence indicating
        // the version of the protocol that it intends to communicate, this will allow us
        // to gracefully upgrade the protocol away from text/line oriented to whatever...
        buf := make([]byte, 4)
        _, err := io.ReadFull(clientConn, buf)
        if err != nil {
            p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
            return
        }
        protocolMagic := string(buf)
    
        p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
    
        var prot protocol.Protocol
        switch protocolMagic {
        case "  V2":
            prot = &protocolV2{ctx: p.ctx}
        default:
            protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
            clientConn.Close()
            p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
                clientConn.RemoteAddr(), protocolMagic)
            return
        }
    
        err = prot.IOLoop(clientConn)
        if err != nil {
            p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
            return
        }
    }
    

    Handle函数首先读取4个字节的协议版本号,根据版本号来创建对应的protocol结构,之后进入最主要的IOLoop()函数,IOLoop主要就是负责消息的接收和分发,下一篇继续解析。

    相关文章

      网友评论

          本文标题:nsqd的TCP网络架构

          本文链接:https://www.haomeiwen.com/subject/uaiatqtx.html