美文网首页
Nsq源码学习

Nsq源码学习

作者: 思维开阔 | 来源:发表于2019-03-03 16:08 被阅读0次

    memoryMsgChan = nil backendMsgChan = nil flusherChan = nil

    nsqd的启动过程从nsq/apps/nsqd文件中main函数启动,使用"github.com/judwhite/go-svc/svc"的svc包进行init, run, stop。program结构实现svc包中的service接口。

    该接口定义了三个生命周期方法

    Init(Environment)该方法在nsqd开始工作之前被执行,完成一些初始化工作

    Start()该方法对应nsqd启动过程,注释要求该方法非阻塞的,

    Stop()对应nsqd的停止过程

    启动的正式函数为nsqd.Main()启动和初始化。启动之后开启tcp和http服务,开启lookupLoop和queueScanLoop服务。

    Http服务有GET和POST请求,能够执行ping,info, pub, mpub, stats, topic/create, topic/delete, topic/pause, topic/unpause, channel/create,channel/delete, channel/empty, channel/pause ,channel/unpause/, config/opt还有一些debug请求。

    tcp提供FIN, RDY, REQU, PUB, MPUB,DPUB,NOP, TOUCH, SUB, CLS, AUTH

    以下为nsqd的启动流程,到数据处理之前。

    func (p *program) Start() error {

        //获取配置文件信息,默认的配置信息

      opts := nsqd.NewOptions()

        //以下解析用户输入的命令行参数,

        flagSet := nsqdFlagSet(opts)

      flagSet.Parse(os.Args[1:])

      rand.Seed(time.Now().UTC().UnixNano())

      if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {

          fmt.Println(version.String("nsqd"))

          os.Exit(0)

      }

      var cfg config

      //以下解析配置文件

      configFile := flagSet.Lookup("config").Value.String()

      if configFile != "" {

          //toml这个包没懂干嘛的,

          _, err := toml.DecodeFile(configFile, &cfg)

          if err != nil {

            log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())

          }

      }

      cfg.Validate()

    //通过options.Resolve方法将两种方式的参数合并到opts

      options.Resolve(opts, flagSet, cfg)

        //创建nsqd实例

        nsqd := nsqd.New(opts)

        //加载以前的元数据

      err := nsqd.LoadMetadata()

      if err != nil {

          log.Fatalf("ERROR: %s", err.Error())

      }

      err = nsqd.PersistMetadata()

      if err != nil {

          log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())

      }

      //nsqd启动的正式开始函数,nsq/nsqd/nsqd.go中

      nsqd.Main()

      p.nsqd = nsqd

      return nil

    }

    在program结构体实现的Start方法中,对用户输入的命令行参数进行解析,判断用户是否指定了自定义配置文件,有,对该文件进行加载与校验,nsqd参数设置通过两种方式实现,命令行和自定义配置文件。

    创建nsqd实例

    通过nsqio/nsq/nsqd/nsqd.go提供的工厂方法。主要完成了一下工作:

    func New(opts *Options) *NSQD {

    dataPath := opts.DataPath

        //数据持久化路径,用户未指定,将使用当前文件夹

        //作为数据持久化的路径

    if opts.DataPath == "" {

    cwd, _ := os.Getwd()

    dataPath = cwd

    }

    if opts.Logger == nil {

    opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)

    }

    //初始化一个NSQD实例,设置其中一些字段

    n := &NSQD{

    startTime:            time.Now(),

    topicMap:            make(map[string]*Topic),

    exitChan:            make(chan int),

    notifyChan:          make(chan interface{}),

    optsNotificationChan: make(chan struct{}, 1),

    dl:                  dirlock.New(dataPath),

    }

    //创建一个http客户端

    httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)

    n.ci = clusterinfo.New(n.logf, httpcli)

    //以下都是对配置一些字段进行校验

    n.swapOpts(opts)

    n.errValue.Store(errStore{})

    var err error

    opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)

    if err != nil {

    n.logf(LOG_FATAL, "%s", err)

    os.Exit(1)

    }

    err = n.dl.Lock()

    if err != nil {

    n.logf(LOG_FATAL, "--data-path=%s in use (possibly by another instance of nsqd)", dataPath)

    os.Exit(1)

    }

    if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {

    n.logf(LOG_FATAL, "--max-deflate-level must be [1,9]")

    os.Exit(1)

    }

    if opts.ID < 0 || opts.ID >= 1024 {

    n.logf(LOG_FATAL, "--node-id must be [0,1024)")

    os.Exit(1)

    }

    if opts.StatsdPrefix != "" {

    var port string

    _, port, err = net.SplitHostPort(opts.HTTPAddress)

    if err != nil {

    n.logf(LOG_FATAL, "failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err)

    os.Exit(1)

    }

    statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port))

    prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1)

    if prefixWithHost[len(prefixWithHost)-1] != '.' {

    prefixWithHost += "."

    }

    opts.StatsdPrefix = prefixWithHost

    }

    if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {

    opts.TLSRequired = TLSRequired

    }

    tlsConfig, err := buildTLSConfig(opts)

    if err != nil {

    n.logf(LOG_FATAL, "failed to build TLS config - %s", err)

    os.Exit(1)

    }

    if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {

    n.logf(LOG_FATAL, "cannot require TLS client connections without TLS key and cert")

    os.Exit(1)

    }

    n.tlsConfig = tlsConfig

    for _, v := range opts.E2EProcessingLatencyPercentiles {

    if v <= 0 || v > 1 {

    n.logf(LOG_FATAL, "Invalid percentile: %v", v)

    os.Exit(1)

    }

    }

    n.logf(LOG_INFO, version.String("nsqd"))

    n.logf(LOG_INFO, "ID: %d", opts.ID)

    return n

    }

    加载之前元数据nsqd运行停止后会创建两个文件:nsqd.dat和nsqd.(id号).dat,若之后启动nsqd仍指定相同的dataPath,nsqd会尝试从两个文件中加载之前保存元数据,即topic和channel信息,如果两个都不存在,该方法在19行返回,认为是一次全新启动,如果两个文件存在但其中保存信息不一致,nsqd停止,并提示用户删除其中一个。

    func (n *NSQD) LoadMetadata() error {

    atomic.StoreInt32(&n.isLoading, 1)

    defer atomic.StoreInt32(&n.isLoading, 0)

    fn := newMetadataFile(n.getOpts())

    // old metadata filename with ID, maintained in parallel to enable roll-back

    fnID := oldMetadataFile(n.getOpts())

    data, err := readOrEmpty(fn)

    if err != nil {

    return err

    }

    dataID, errID := readOrEmpty(fnID)

    if errID != nil {

    return errID

    }

    if data == nil && dataID == nil {

    return nil // fresh start

    }

    if data != nil && dataID != nil {

    if bytes.Compare(data, dataID) != 0 {

    return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)

    }

    }

    if data == nil {

    // only old metadata file exists, use it

    fn = fnID

    data = dataID

    }

    var m meta

    err = json.Unmarshal(data, &m)

    if err != nil {

    return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)

    }

    for _, t := range m.Topics {

    if !protocol.IsValidTopicName(t.Name) {

    n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)

    continue

    }

    topic := n.GetTopic(t.Name)

    if t.Paused {

    topic.Pause()

    }

    for _, c := range t.Channels {

    if !protocol.IsValidChannelName(c.Name) {

    n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)

    continue

    }

    channel := topic.GetChannel(c.Name)

    if c.Paused {

    channel.Pause()

    }

    }

    }

    return nil

    }

    启动nsqd提供的服务&waitGroupWrapper

    nsqd正式工作函数入口为nsqd.Main(),该Main函数位于,nsq/nsqd/nsqd.go中。完成了Tcp, Http, Https(如果需要的话)服务器启动,之前的program实现的Start方法非阻塞,NSQD的Main方法在Start中被调用,这就要求Main方法必须非阻塞的。但无论Tcp服务器还是Http服务器,都需要在一个死循环中对新连接进行业务处理,Main方法通过WaitGroupWrapper保证自身非阻塞。

    func (n *NSQD) Main() {

      var err error

      //建立上下文机制,退出时,等待子协程的结束

      ctx := &context{n}

      n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)

      if err != nil {

          n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)

          os.Exit(1)

      }

      n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)

      if err != nil {

          n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)

          os.Exit(1)

      }

      if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {

          n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)

          if err != nil {

            n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)

            os.Exit(1)

          }

      }

      tcpServer := &tcpServer{ctx: ctx}

      //使用waiGroup等待所有协程退出,为了避免代码冗余和美观,封装成一个类进行。

      n.waitGroup.Wrap(func() {

          //处理tcp协议信息,通过实现接口使用,接口的实现类nsq/nsqd/tcp.go中

          protocol.TCPServer(n.tcpListener, tcpServer, n.logf)

      })

      httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)

      n.waitGroup.Wrap(func() {

          http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)

      })

      if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {

          httpsServer := newHTTPServer(ctx, true, true)

          n.waitGroup.Wrap(func() {

            http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)

          })

      }

      n.waitGroup.Wrap(n.queueScanLoop)

      n.waitGroup.Wrap(n.lookupLoop)

      if n.getOpts().StatsdAddress != "" {

          n.waitGroup.Wrap(n.statsdLoop)

      }

    }

    以上,nsqd启动完毕。开始接收tcp和http请求处理请求。tcp处理实现通过实现Protocol接口类,每一个请求都会在IOLoop中进行处理,建立client类,处理请求返回response。

    处理tcp请求时,nsq/nsqd/tcp.go中的tcpServer结构体实现了nsq/internal/protocol/tcp_server.go中的TCPHandler接口处理tcp请求。

    在tcp.go中的Handler方法中,根据发送过来信息的前四个字节判断是否是V2,如果是使用protocolV2类处理信息。否则直接返回错误。

    var prot protocol.Protocol

    switch protocolMagic {

    case "  V2":

      prot = &protocolV2{ctx: p.ctx}

    default:

      protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))

      clientConn.Close()

      p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",

          clientConn.RemoteAddr(), protocolMagic)

      return

    }

    err = prot.IOLoop(clientConn)

    if err != nil {

      p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)

      return

    }

    信息处理转到nsq/nsqd/protocol_v2.go中的 IOLoop函数执行。

    func (p *protocolV2) IOLoop(conn net.Conn) error {

      var err error

      var line []byte

      var zeroTime time.Time

      clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)

      client := newClientV2(clientID, conn, p.ctx)

      p.ctx.nsqd.AddClient(client.ID, client)

      // synchronize the startup of messagePump in order

      // to guarantee that it gets a chance to initialize

      // goroutine local state derived from client attributes

      // and avoid a potential race with IDENTIFY (where a client

      // could have changed or disabled said attributes)

      //采用这种方式,是为了保证messagePump中的局部变量初始化成功,

      //messagePump对sub订阅进行处理

        messagePumpStartedChan := make(chan bool)

      go p.messagePump(client, messagePumpStartedChan)

      <-messagePumpStartedChan

      for {

          if client.HeartbeatInterval > 0 {

            client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))

          } else {

            client.SetReadDeadline(zeroTime)

          }

          // ReadSlice does not allocate new space for the data each request

          // ie. the returned slice is only valid until the next call to it

          //一次读取一行的数据,以\n结束。

          line, err = client.Reader.ReadSlice('\n')

          if err != nil {

            if err == io.EOF {

                err = nil

            } else {

                err = fmt.Errorf("failed to read command - %s", err)

            }

            break

          }

          // trim the '\n'

          line = line[:len(line)-1]

          // optionally trim the '\r'

          if len(line) > 0 && line[len(line)-1] == '\r' {

            line = line[:len(line)-1]

          }

          //按照分隔符空格符分开。通过查看consumer端代码,发过来的格式,命令名字+" " +(param长度 + param) + 加换行符。

          //param topic, channel以及发送信息

          params := bytes.Split(line, separatorBytes)

          p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

          var response []byte

          //根据解析的信息,判断命令类型,转到对应的命令操作,进行处理和回复。

          response, err = p.Exec(client, params)

          if err != nil {

            ctx := ""

            if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {

                ctx = " - " + parentErr.Error()

            }

            p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

            sendErr := p.Send(client, frameTypeError, []byte(err.Error()))

            if sendErr != nil {

                p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)

                break

            }

            // errors of type FatalClientErr should forceably close the connection

            if _, ok := err.(*protocol.FatalClientErr); ok {

                break

            }

            continue

          }

          if response != nil {

            err = p.Send(client, frameTypeResponse, response)

            if err != nil {

                err = fmt.Errorf("failed to send response - %s", err)

                break

            }

          }

      }

      p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)

      conn.Close()

      close(client.ExitChan)

      if client.Channel != nil {

          client.Channel.RemoveClient(client.ID)

      }

      p.ctx.nsqd.RemoveClient(client.ID)

      return err

    }

    在IOLoop函数处理中,在死循环中不断读取client连接中传过来的信息,每一行一条信息,解析每条信息,p.Exec转到对应命令函数进行处理。在执行sub命中,牵扯到SubEventChan,会导致channel数据的分配发生重塑。

    p.messagePump处理channel中的信息。

    p.messagePump,(首先明白的一点tcp长连接不发送释放命令或者意外的情况下不断开)根据连接的client发送过来的请求类型处理消息。

    以sub命令为例。

    func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {

      var err error

      //memeoryMsgChan和backendMsgChan是订阅channel的。

      var memoryMsgChan chan *Message

      var backendMsgChan chan []byte

      var subChannel *Channel

      // NOTE: `flusherChan` is used to bound message latency for

      // the pathological case of a channel on a low volume topic

      // with >1 clients having >1 RDY counts

      var flusherChan <-chan time.Time

      var sampleRate int32

        //判断是否订阅新通道

      subEventChan := client.SubEventChan

      identifyEventChan := client.IdentifyEventChan

      outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)

      heartbeatTicker := time.NewTicker(client.HeartbeatInterval)

      heartbeatChan := heartbeatTicker.C

      msgTimeout := client.MsgTimeout

      // v2 opportunistically buffers data to clients to reduce write system calls

      // we force flush in two cases:

      //    1. when the client is not ready to receive messages

      //    2. we're buffered and the channel has nothing left to send us

      //      (ie. we would block in this loop anyway)

      //

      flushed := true

      // signal to the goroutine that started the messagePump

      // that we've started up

      close(startedChan)

      for {

          if subChannel == nil || !client.IsReadyForMessages() {

            // the client is not ready to receive messages...

            memoryMsgChan = nil

            backendMsgChan = nil

            flusherChan = nil

            // force flush

            client.writeLock.Lock()

            err = client.Flush()

            client.writeLock.Unlock()

            if err != nil {

                goto exit

            }

            flushed = true

          } else if flushed {

            // last iteration we flushed...

            // do not select on the flusher ticker channel

            memoryMsgChan = subChannel.memoryMsgChan

            backendMsgChan = subChannel.backend.ReadChan()

            flusherChan = nil

          } else {

            // we're buffered (if there isn't any more data we should flush)...

            // select on the flusher ticker channel, too

            memoryMsgChan = subChannel.memoryMsgChan

            backendMsgChan = subChannel.backend.ReadChan()

            flusherChan = outputBufferTicker.C

          }

          select {

          case <-flusherChan:

            // if this case wins, we're either starved

            // or we won the race between other channels...

            // in either case, force flush

            client.writeLock.Lock()

            err = client.Flush()

            client.writeLock.Unlock()

            if err != nil {

                goto exit

            }

            flushed = true

          case <-client.ReadyStateChan:

          case subChannel = <-subEventChan:

            // you can't SUB anymore

            subEventChan = nil

          case identifyData := <-identifyEventChan:

            // you can't IDENTIFY anymore

            identifyEventChan = nil

            outputBufferTicker.Stop()

            if identifyData.OutputBufferTimeout > 0 {

                outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)

            }

            heartbeatTicker.Stop()

            heartbeatChan = nil

            if identifyData.HeartbeatInterval > 0 {

                heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)

                heartbeatChan = heartbeatTicker.C

            }

            if identifyData.SampleRate > 0 {

                sampleRate = identifyData.SampleRate

            }

            msgTimeout = identifyData.MsgTimeout

          case <-heartbeatChan:

            err = p.Send(client, frameTypeResponse, heartbeatBytes)

            if err != nil {

                goto exit

            }

          case b := <-backendMsgChan:

            if sampleRate > 0 && rand.Int31n(100) > sampleRate {

                continue

            }

            msg, err := decodeMessage(b)

            if err != nil {

                p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)

                continue

            }

            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)

            client.SendingMessage()

            err = p.SendMessage(client, msg)

            if err != nil {

                goto exit

            }

            flushed = false

          case msg := <-memoryMsgChan:

            if sampleRate > 0 && rand.Int31n(100) > sampleRate {

                continue

            }

            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)

            client.SendingMessage()

            err = p.SendMessage(client, msg)

            if err != nil {

                goto exit

            }

            flushed = false

          case <-client.ExitChan:

            goto exit

          }

      }

    exit:

      p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)

      heartbeatTicker.Stop()

      outputBufferTicker.Stop()

      if err != nil {

          p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)

      }

    }

    12行subEventChan := client.SubEventChan更新消息泵的意思正是将客户端订阅的channel发送到subEventChan中去。

    72-74行,在这里将从subEventChan中接收到的值保存到subChannel中。

    54-56行,当subChannel被赋值为客户端订阅的channel之后,使用subChannel内存消息队列和磁盘消息队列对memoryMsgChan和backedMsgChan进行赋值。

    101-132行,监听memoryMsgChan和backendMsgChan,当其上有消息传入时(向nsq发布消息,消息被写入到topic,进而被写入到topic。

    当客户端订阅的topic的channel有消息时,写入到topic下的channel的消息,通过p.SendMessage(client,msg)来将消息发送给客户端。

    客户端的tcp发布PUB流程

    首先每一个新连接的客户端都是通过IOLoop进行处理,启动(protocloV2)messagePump,在messagePump里因为是producer所以, memoryMsgChan = nil backendMsgChan = nil flusherChan = nil。

    在IOLoop中p.Exec函数的PUB将消息写到topic中,根据写入的topic中。

    客户端http的PUB看nsq的http。

    客户端SUB订阅

    IOLoop中进行处理,启动messagePump和Exec函数。在Exec函数中,更新订阅channel的信息,更新client的信息。返回ok。

    在messagePumn中当接收到channel有消息时就

    相关文章

      网友评论

          本文标题:Nsq源码学习

          本文链接:https://www.haomeiwen.com/subject/jemoeqtx.html