Nsqd 的启动借助了srv的包,srv定义的service接口包括以下方法
type Service interface {
// Init is called before the program/service is started and after it's
// determined if the program is running as a Windows Service.
Init(Environment) error
// Start is called after Init. This method must be non-blocking.
Start() error
// Stop is called in response to os.Interrupt, os.Kill, or when a
// Windows Service is stopped.
Stop() error
}
srv 在启动的时候依次调用这几个函数实现;并注册了信号,实现优雅的关闭服务
func Run(service Service, sig ...os.Signal) error {
env := environment{}
if err := service.Init(env); err != nil {
return err
}
if err := service.Start(); err != nil {
return err
}
if len(sig) == 0 {
sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
}
signalChan := make(chan os.Signal, 1)
signalNotify(signalChan, sig...)
<-signalChan
return service.Stop()
}
-
program的启动首先加载命令行参数和配置的参数合并
-
创建nsqd,创建过程中对配置的dataPath上锁,直至系统推出
同样配置下启动另外一个nsqd将获取锁失败
截屏2020-02-2315.20.53.png -
调用nsqd.Main,启动httpserver和tcpserver
系统退出
- 先关闭tcp和http的监听套接字
- 存储元数据
- 关闭topic
- 等待其他的协程结束
- 解锁文件锁
func (n *NSQD) Exit() {
if n.tcpListener != nil {
n.tcpListener.Close()
}
if n.tcpServer != nil {
n.tcpServer.CloseAll()
}
if n.httpListener != nil {
n.httpListener.Close()
}
if n.httpsListener != nil {
n.httpsListener.Close()
}
n.Lock()
// 存储元数据,再次启动将获取元数据
err := n.PersistMetadata()
if err != nil {
n.logf(LOG_ERROR, "failed to persist metadata - %s", err)
}
n.logf(LOG_INFO, "NSQ: closing topics")
// 关闭所有的topic,不再推送数据到客户端
for _, topic := range n.topicMap {
topic.Close()
}
n.Unlock()
n.logf(LOG_INFO, "NSQ: stopping subsystems")
close(n.exitChan)
n.waitGroup.Wait()
n.dl.Unlock()
n.logf(LOG_INFO, "NSQ: bye")
}
网友评论