nsqd.Main()主函数
func (n *NSQD) Main() {
var err error
ctx := &context{n}
n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
}
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
os.Exit(1)
}
}
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
})
}
n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
}
这里多处用了n.waitGroup.Wrap(),其实是封装了sync.WaitGroup,类似与python的join()功能,用来等待goroutine执行完毕后再退出程序。
其中w.Add(1)会将计数器+1,w.Done()会让计数器-1,如果计数器归0时,程序不再阻塞,继续执行后面的步骤。
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}
首先我们来看下TCP监听模型,protocol包封装了TCPServer的实现,TCPServer是个通用函数,只要传入listener、TCPHandler和logger变创建了一个TCP服务器。该TCP网络架构为每当accept一个客户端,交给传入的TCPHandler的handle函数处理一个连接。一个连接由一个goroutine来handle。
type TCPHandler interface {
Handle(net.Conn)
}
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
logf(lg.ERROR, "listener.Accept() - %s", err)
}
break
}
go handler.Handle(clientConn)
}
logf(lg.INFO, "TCP: closing %s", listener.Addr())
}
这里可以看出interface的运用,只要实现Handle这个方法,传入的TCPHandler就可以调用Handle()函数处理客户端连接。保证了这个TCPServer的通用性。
接下来看下每个goroutine具体的处理逻辑
type tcpServer struct {
ctx *context
}
func (p *tcpServer) Handle(clientConn net.Conn) {
p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())
// The client should initialize itself by sending a 4 byte sequence indicating
// the version of the protocol that it intends to communicate, this will allow us
// to gracefully upgrade the protocol away from text/line oriented to whatever...
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
return
}
protocolMagic := string(buf)
p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
}
Handle函数首先读取4个字节的协议版本号,根据版本号来创建对应的protocol结构,之后进入最主要的IOLoop()函数,IOLoop主要就是负责消息的接收和分发,下一篇继续解析。
网友评论