美文网首页
golang nsq源码分析&添加中文注释系列(二):Nsqd入

golang nsq源码分析&添加中文注释系列(二):Nsqd入

作者: 晓亮1988 | 来源:发表于2019-12-24 15:58 被阅读0次

    以往看网上的源码分析时,基本都是前面一段讲解,后面跟一大段代码,没有上下文分析,我就暗暗的想,如果一个函数或一段逻辑能有中文注释(俺小本毕业英语不太好)带有上下文分析,这样读源码岂不是会更快顺畅。。。不废话了,我们开始吧。

    上一篇大概讲解了基本介绍,我们也把Nsqd一步一步跑起来了(假设您已动手尝试过),本篇则从源码入口开始讲解

    前言

    • 针对特殊的包或者方法,会单独开一篇博客讲解,请注意代码里面的链接地址,建议手动尝试一下
    • 文章宗旨是学习大神的每一行代码,所以看起来会比较啰嗦,建议您一边看代码一遍读文章(效果更佳)

    NSQ整体流程

    NSQ由3个守护进程组成:

    • nsqd 是接收、保存和传送消息到客户端的守护进程
    • nsqlookupd 是管理的拓扑信息,维护着所有nsqd的状态,并提供了最终一致发现服务的守护进程
    • nsqadmin 是一个web ui的实时监控集群和执行各种管理任务
      image

    nsqd入口文件:nsq/apps/nsqd/main.go

    废话不多说,都在酒里了(代码里),直接看注释就能理解

    package main
    
    import (
        "flag"
        "fmt"
        "math/rand"
        "os"
        "path/filepath"
        "sync"
        "syscall"
        "time"
    
        // toml开源包
        "github.com/BurntSushi/toml"
        // go-options开源包
        "github.com/mreiferson/go-options"
        // 内部版本号
        "github.com/nsqio/nsq/internal/version"
        // 命令行控制包svc 服务控制
        "github.com/judwhite/go-svc/svc"
    
        // 内部包 日志中间件 log
        "github.com/nsqio/nsq/internal/lg"
    
        // nsqd真正工作的区域
        "github.com/nsqio/nsq/nsqd"
    )
    
    /*
    定义业务program结构体
    */
    type program struct {
    
        // once能确保实例化对象Do方法在多线程环境只运行一次,内部通过互斥锁实现
        once sync.Once
        nsqd *nsqd.NSQD
    }
    
    /*
    采用SVC包进行服务控制,主要是统一管理服务,对于信号控制不用每次都写在业务上,在ctrl+c时,能正常监听defer结束,方便获取很多日志,参数等
    */
    func main() {
        // 实例化
        prg := &program{}
        /*
        http://shuchimao.com/2019/12/20/golang服务控制实践-svc包转/
        
        // Implement this interface and pass it to the Run function to start your program.
    type Service interface {
        // Init is called before the program/service is started and after it's
        // determined if the program is running as a Windows Service.
        Init(Environment) error
    
        // Start is called after Init. This method must be non-blocking.
        Start() error
    
        // Stop is called in response to os.Interrupt, os.Kill, or when a
        // Windows Service is stopped.
        Stop() error
    }
        svc 第一个参数需要实现Service接口才可以正常运行,这也就是大伙看到的program 实现的init/start/stop三个函数
    
        使用svc启动相关程序
        */
        if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
            logFatal("%s", err)
        }
    }
    
    func (p *program) Init(env svc.Environment) error {
        // 检查是否是windows 服务。。。目测一般时候也用不到
        if env.IsWindowsService() {
            dir := filepath.Dir(os.Args[0])
            return os.Chdir(dir)
        }
        return nil
    }
    
    func (p *program) Start() error {
        /*
        实例化并初始一些配置和默认值
        /nsq/nsqd/options.go
        */
        opts := nsqd.NewOptions()
    
        /*
        封装了命令行的一些检查项,设置检查项的默认值
        使用apps目录:/nsq/apps/nsqd/options.go
        然后parse解析
        */
        flagSet := nsqdFlagSet(opts)
        flagSet.Parse(os.Args[1:])
    
        // 生成随机数 time.Now().UnixNano()  单位纳秒
        rand.Seed(time.Now().UTC().UnixNano())
    
        // 打印版本号,接收命令行参数version  默认值:false
        /*
        执行效果
        bj-m-server:nsqd yixia$ go run ./ --version=true
        nsqd v1.2.1-alpha (built w/go1.12.2)
        */
        if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
            // 打印版本号  %!V(string=nsqd v1.2.1-alpha (built w/go1.12.2))
            fmt.Println(version.String("nsqd"))
            os.Exit(0)
        }
    
        // 获取外部的配置文件,解析toml文件格式
        var cfg config
        /*
        bj-m-server:nsqd yixia$ go run ./ --config=config.toml
        */
        configFile := flagSet.Lookup("config").Value.String()
        // 如果不为空
        if configFile != "" {
            // 加载,读出的数据采用空_  抛弃,赋值给cfg
            _, err := toml.DecodeFile(configFile, &cfg)
            // 抛错
            if err != nil {
                logFatal("failed to load config file %s - %s", configFile, err)
            }
        }
        // 检查配置文件
        cfg.Validate()
    
        // 采用优先级从高到低依次进行解析,最终
        options.Resolve(opts, flagSet, cfg)
        /*
        传入用户自定义配置,实例化nsqd
    
        nsqd.new以后做了那些事情,大概捋一下,后续看的时候能加深印象
        1、检查命令行cli   opts.DataPath 、  opts.Logger没设置  设置默认值
        2、实例NSQD主对象
        3、监听tcp   net.Listen("tcp", opts.TCPAddress)
        4、监听http  net.Listen("tcp", opts.HTTPAddress)
        5、监听https tls.Listen("tcp", opts.HTTPSAddress, n.tlsConfig)
    
        综合以上了解,基本做的事情就是实例化主对象,并对cli 自定义的命令一顿操作。。。然后就这样了,return (*NSQD, error)
        */
        nsqd, err := nsqd.New(opts)
        if err != nil {
            logFatal("failed to instantiate nsqd - %s", err)
        }
        p.nsqd = nsqd
    
        /*
        加载历史数据,数据来源nsqd.dat -> 历史数据格式{"topics":[],"version":"1.2.1-alpha"}
        1、获取历史数据
        2、解析成对应的结构体 json.Unmarshal(data, &m)
        3、遍历 for _, t := range m.Topics , 解析每个topic -> channel
        4、启动N个topic.Start()(重点代码中有一个GetTopic,采用线程线程安全方式,重点学习)
        5、func (n *NSQD) LoadMetadata() error
        */
        err = p.nsqd.LoadMetadata()
        if err != nil {
            logFatal("failed to load metadata - %s", err)
        }
    
        /*
        持久化最新数据
        1、获取原始数据文件名 fileName := newMetadataFile(n.getOpts())
        2、遍历 nsqd.topicMap -> ndqd.channelMap  ,这是对topicMap和channelMap加了互斥锁
        3、将最新数据写入到临时文件中,明文明文件名为:nsqd.dat.333569681738193261.tmp
        4、func (n *NSQD) PersistMetadata() error
        */
        err = p.nsqd.PersistMetadata()
        if err != nil {
            logFatal("failed to persist metadata - %s", err)
        }
    
        /*
        开启协程进入nsqd.Main主函数
        Main方法里重点使用了封装的WaitGroup
        下列出现的n.waitGroup.Wrap均采用了封装groutine
        可以看我另一篇文章讲解了封装的流程和使用方法:http://shuchimao.com/2019/12/24/golang积累-waitgroup包装/
    
        方法:
        func (n *NSQD) Main() error {
    
        大体执行思路
        1、实例化context
        2、建立退出通道,保证退出函数只运行一次,创建了匿名函数exitFunc
        3、初始化并监听TCPServer
        3.1、exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
        3.2、TCPServer采用无限循环方式监听tcp client长连接,当有一个client连接,分配一个groutine进行处理
        for -> listener.Accept() -> groutine
        4、初始化HTTPServer
        4.1、使用httprouter进行路由设置,然后初始化各种接口
    
        5、初始化HttpsServer
        6、监控循环队列:n.waitGroup.Wrap(n.queueScanLoop)
        7、节点信息管理:n.waitGroup.Wrap(n.lookupLoop)
        8、统计信息:n.waitGroup.Wrap(n.statsdLoop)
        */
        go func() {
            err := p.nsqd.Main()
            if err != nil {
                p.Stop()
                os.Exit(1)
            }
        }()
    
        return nil
    }
    
    func (p *program) Stop() error {
        /*
            /*
        底层源码
        func (o *Once) Do(f func()) {
        if atomic.LoadUint32(&o.done) == 1 {
            return
        }
        // Slow-path.
        o.m.Lock()
        defer o.m.Unlock()
        if o.done == 0 {
            defer atomic.StoreUint32(&o.done, 1)
            f()
        }
    }
    
        可以看成这样的链式操作
        p.once.Do == program.once.Do
    
        确保在执行时只执行一次退出操作
        */
    
        p.once.Do(func() {
            p.nsqd.Exit()
        })
        return nil
    }
    
    func logFatal(f string, args ...interface{}) {
        lg.LogFatal("[nsqd] ", f, args...)
    }
    
    

    Nsqd流程图

    偷个懒,从网上摘录的流程图直接拿下来了大家先看看

    image

    小结

    nsqd代码逻辑清晰,利用Go协程高效并发处理分布式多节点nsqd的生产和消费,学习并发处理nsqd是最佳项目,每行代码都值得学习,坚持读每一行代码相信大伙一定会受益匪浅的,等我们把这项目2000多行代码都读差不多了,在回头看看成长,绝对比看几本书学的快,学以致用,多动手,多练习。

    下一章具体分析nsqd主程序。

    相关文章

      网友评论

          本文标题:golang nsq源码分析&添加中文注释系列(二):Nsqd入

          本文链接:https://www.haomeiwen.com/subject/mkrfoctx.html