美文网首页
nsq源码剖析-nsqd(1)

nsq源码剖析-nsqd(1)

作者: kekemuyu | 来源:发表于2018-04-06 20:25 被阅读0次

1. 开始

开篇罗嗦了一大堆,终于开始进入正题了。golang的优秀源码很多,比如杀手级应用docker,google的Kubernetes等,但都太大了,这种能不能读?能。但不适合小白读,等你成长成老鸟再读吧。现在开始nsq源码的阅读,nsq是一个分布式消息队列,源码短小精悍,其实我阅读它还有个人的一个原因就是想从中提取出他处理tcp连接的框架。看看最后能不能实现这个愿望。

源码获取:

nsq官网:http://nsq.io/,github地址:https://github.com/nsqio/nsq

2. nsq功能概述

在阅读源码前大概了解一下它的功能对理解源码很有帮助,在官网的文档中有快速开始,可以快速了解一下他的功能。nsq有3个守护程序组成:nsqd,nsqlookup,nsqadmin。

nsqd负责从客户端接收,排队和传送消息。

nsqlookupd负责监控信息。客户端通过nsqlookupd发现nsqd的消费者的一个topic,nsqd节点把信息广播到nsqlookupd。

nsqadmin是一个web ui管理平台各种任务和实时展示聚合信息。

3. nsqd

废话不多说直接找到nsqd的main函数,源代码路径 /nsqio/nsq/apps/nsqd/nsqgd.go,它的外部依赖是比较少的,编译前请自行下载。先看一下他的main函数:

 func main() {    
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        log.Fatal(err)
    }
 }

nsqd为了优雅的关闭退出,使用了svc包来管理程序的运行。先不用管svc,svc.run之后会执行Init和Start函数。

func (p *program) Init(env svc.Environment) error {
    if env.IsWindowsService() {
        dir := filepath.Dir(os.Args[0])
        return os.Chdir(dir)
    }
    return nil
}

func (p *program) Start() error {
    opts := nsqd.NewOptions()

    flagSet := nsqdFlagSet(opts)
    flagSet.Parse(os.Args[1:])

    rand.Seed(time.Now().UTC().UnixNano())
    //not get it
    if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
        fmt.Println(version.String("nsqd"))
        os.Exit(0)
    }

    var cfg config
    configFile := flagSet.Lookup("config").Value.String()
    if configFile != "" {
        _, err := toml.DecodeFile(configFile, &cfg)
        if err != nil {
            log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
        }
    }
    cfg.Validate()

    options.Resolve(opts, flagSet, cfg)
    nsqd := nsqd.New(opts)

    err := nsqd.LoadMetadata()
    if err != nil {
        log.Fatalf("ERROR: %s", err.Error())
    }
    //not get it
    err = nsqd.PersistMetadata()
    if err != nil {
        log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
    }
    nsqd.Main()

    p.nsqd = nsqd
    return nil
}

Init函数没什么好说的,Start中有几个作用:初始化nsqd的参数,nsqd := nsqd.New(opts)根据参数创建nsqd实例,nsqd.LoadMetadata()从本地加载上次的运行数据,err = nsqd.PersistMetadata()不知道做什么先不用管,nsqd.Main()主程序运行。这个Start函数主要是调用了nsqd.Main()函数,其他都可先忽略。接着看最重要的nsqd.Main()。

4.nsqd.Main()

func (n *NSQD) Main() {
    var httpListener net.Listener
    var httpsListener net.Listener
    //like relative with NSQD
    ctx := &context{n}

    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    if err != nil {
        n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()
    //tcpServer not get it
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
    })

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        if err != nil {
            n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
            os.Exit(1)
        }
        n.Lock()
        n.httpsListener = httpsListener
        n.Unlock()
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
        })
    }
    httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    if err != nil {
        n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.httpListener = httpListener
    n.Unlock()
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
    })

    n.waitGroup.Wrap(func() { n.queueScanLoop() })
    n.waitGroup.Wrap(func() { n.lookupLoop() })
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }
}

main中主要有5个函数如下:


微信图片_20180406183619.jpg

5.protocol.TCPServer

路径:nsqio\nsq\internal\protocol\tcp_server.go
看路径就知道这是内部公共函数,代码如下:

package protocol

import (
    "net"
    "runtime"
    "strings"

    "github.com/nsqio/nsq/internal/lg"
)

type TCPHandler interface {
    Handle(net.Conn)
}

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
    logf(lg.INFO, "TCP: listening on %s", listener.Addr())

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                logf(lg.WARN, "temporary Accept() failure - %s", err)
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                logf(lg.ERROR, "listener.Accept() - %s", err)
            }
            break
        }
        go handler.Handle(clientConn)
    }

    logf(lg.INFO, "TCP: closing %s", listener.Addr())
}

TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc)看到这个函数,我们就要知道main函数给它传得是那三个参数是什么,看main中的代码:

  ctx := &context{n}

    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    if err != nil {
        n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()
    //tcpServer not get it
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
    })

由代码可知n.tcpListener是连接监听器,tcpServer是tcpServer类型的变量,n.logf是日志相关参数可先不用管。这里有个参数要注意一下就是第二个参数 TCPHandler是个接口,传入tcpServer等于实现了这个接口,对go接口不熟悉的同学可能在这里就晕了。这个函数中有几个结构需要注意一下,ctx := &context{n} 和 tcpServer := &tcpServer{ctx: ctx},这里为什么要这样的结构封装,为什么用接口TCPHandler作为参数?这些疑问暂且记下。

TCPServer函数主要流程如下:
每接收到一个连接,就创建一个这个连接的handle协程,那么现在的关键是看这个协程是怎么样的,我们知道handler是一个接口类型,我们要找到Handle函数就要找参数tcpServer对应的类型,type tcpServer struct是在tcp.go中定义的,那么在tcp.go中就可以找到Handle函数的原型。下面就分析Handle函数。

微信图片_20180406183619.jpg

6.(p *tcpServer) Handle(clientConn net.Conn)

路径:nsqio\nsq\nsqd\tcp.go

type tcpServer struct {
    ctx *context
}

func (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())

    // The client should initialize itself by sending a 4 byte sequence indicating
    // the version of the protocol that it intends to communicate, this will allow us
    // to gracefully upgrade the protocol away from text/line oriented to whatever...
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
        return
    }
    protocolMagic := string(buf)

    p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx}
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}

微信图片_20180406183619.jpg

客户端一旦连接上,就要发送4个字节" V2"否则服务端就断开连接。收到协议protocolMagic后执行prot.IOLoop(clientConn),从名字就能看出这是处理协议的地方。 prot 是个接口,通过prot = &protocolV2{ctx: p.ctx}可以实例化接口,再次自问为什么用接口?

7.(p *protocolV2) IOLoop(conn net.Conn)

路径:nsqio\nsq\nsqd\protocol_v2.go

type protocolV2 struct {
    ctx *context
}

func (p *protocolV2) IOLoop(conn net.Conn) error {
    var err error
    var line []byte
    var zeroTime time.Time

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)

    // synchronize the startup of messagePump in order
    // to guarantee that it gets a chance to initialize
    // goroutine local state derived from client attributes
    // and avoid a potential race with IDENTIFY (where a client
    // could have changed or disabled said attributes)
    messagePumpStartedChan := make(chan bool)
    //not get it
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan

    for {
        //not get it
        if client.HeartbeatInterval > 0 {
            client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
        } else {
            client.SetReadDeadline(zeroTime)
        }

        // ReadSlice does not allocate new space for the data each request
        // ie. the returned slice is only valid until the next call to it
        line, err = client.Reader.ReadSlice('\n')
        if err != nil {
            if err == io.EOF {
                err = nil
            } else {
                err = fmt.Errorf("failed to read command - %s", err)
            }
            break
        }

        // trim the '\n'
        line = line[:len(line)-1]
        // optionally trim the '\r'
        if len(line) > 0 && line[len(line)-1] == '\r' {
            line = line[:len(line)-1]
        }
        params := bytes.Split(line, separatorBytes)

        p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

        var response []byte
        response, err = p.Exec(client, params)
        if err != nil {
            ctx := ""
            if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
                ctx = " - " + parentErr.Error()
            }
            p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

            sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
            if sendErr != nil {
                p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
                break
            }

            // errors of type FatalClientErr should forceably close the connection
            if _, ok := err.(*protocol.FatalClientErr); ok {
                break
            }
            continue
        }

        if response != nil {
            err = p.Send(client, frameTypeResponse, response)
            if err != nil {
                err = fmt.Errorf("failed to send response - %s", err)
                break
            }
        }
    }

    p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
    conn.Close()
    close(client.ExitChan)
    if client.Channel != nil {
        client.Channel.RemoveClient(client.ID)
    }

    return err
}
微信图片_20180406183619.jpg

8.(p *protocolV2) Exec(client *clientV2, params [][]byte)

路径:nsqio\nsq\nsqd\protocol_v2.go

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    if bytes.Equal(params[0], []byte("IDENTIFY")) {
        return p.IDENTIFY(client, params)
    }
    err := enforceTLSPolicy(client, p, params[0])
    if err != nil {
        return nil, err
    }
    switch {
    case bytes.Equal(params[0], []byte("FIN")):
        return p.FIN(client, params)
    case bytes.Equal(params[0], []byte("RDY")):
        return p.RDY(client, params)
    case bytes.Equal(params[0], []byte("REQ")):
        return p.REQ(client, params)
    case bytes.Equal(params[0], []byte("PUB")):
        return p.PUB(client, params)
    case bytes.Equal(params[0], []byte("MPUB")):
        return p.MPUB(client, params)
    case bytes.Equal(params[0], []byte("DPUB")):
        return p.DPUB(client, params)
    case bytes.Equal(params[0], []byte("NOP")):
        return p.NOP(client, params)
    case bytes.Equal(params[0], []byte("TOUCH")):
        return p.TOUCH(client, params)
    case bytes.Equal(params[0], []byte("SUB")):
        return p.SUB(client, params)
    case bytes.Equal(params[0], []byte("CLS")):
        return p.CLS(client, params)
    case bytes.Equal(params[0], []byte("AUTH")):
        return p.AUTH(client, params)
    }
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

以上就是协议里定义的各种功能函数,到此我们粗略的过了一遍tcpserver服务。

9.总结

把上面的流程串一下,主要是理清了主要流程,一些分支还需要再次阅读源码。一些结构体字段的意义和作用也没有介绍,还有为什么handle和ioloop流程用接口实现,这里先不做解释,我们先了解一下nsqd中tcpserver服务的主要脉络,而且我们要不断自问什么要这样设计,如果是我,我会如何做呢?

微信图片_20180406183619.jpg

相关文章

网友评论

      本文标题:nsq源码剖析-nsqd(1)

      本文链接:https://www.haomeiwen.com/subject/iyeshftx.html