memoryMsgChan = nil backendMsgChan = nil flusherChan = nil
nsqd的启动过程从nsq/apps/nsqd文件中main函数启动,使用"github.com/judwhite/go-svc/svc"的svc包进行init, run, stop。program结构实现svc包中的service接口。
该接口定义了三个生命周期方法
Init(Environment)该方法在nsqd开始工作之前被执行,完成一些初始化工作
Start()该方法对应nsqd启动过程,注释要求该方法非阻塞的,
Stop()对应nsqd的停止过程
启动的正式函数为nsqd.Main()启动和初始化。启动之后开启tcp和http服务,开启lookupLoop和queueScanLoop服务。
Http服务有GET和POST请求,能够执行ping,info, pub, mpub, stats, topic/create, topic/delete, topic/pause, topic/unpause, channel/create,channel/delete, channel/empty, channel/pause ,channel/unpause/, config/opt还有一些debug请求。
tcp提供FIN, RDY, REQU, PUB, MPUB,DPUB,NOP, TOUCH, SUB, CLS, AUTH
以下为nsqd的启动流程,到数据处理之前。
func (p *program) Start() error {
//获取配置文件信息,默认的配置信息
opts := nsqd.NewOptions()
//以下解析用户输入的命令行参数,
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
rand.Seed(time.Now().UTC().UnixNano())
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}
var cfg config
//以下解析配置文件
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
//toml这个包没懂干嘛的,
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()
//通过options.Resolve方法将两种方式的参数合并到opts
options.Resolve(opts, flagSet, cfg)
//创建nsqd实例
nsqd := nsqd.New(opts)
//加载以前的元数据
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
//nsqd启动的正式开始函数,nsq/nsqd/nsqd.go中
nsqd.Main()
p.nsqd = nsqd
return nil
}
在program结构体实现的Start方法中,对用户输入的命令行参数进行解析,判断用户是否指定了自定义配置文件,有,对该文件进行加载与校验,nsqd参数设置通过两种方式实现,命令行和自定义配置文件。
创建nsqd实例
通过nsqio/nsq/nsqd/nsqd.go提供的工厂方法。主要完成了一下工作:
func New(opts *Options) *NSQD {
dataPath := opts.DataPath
//数据持久化路径,用户未指定,将使用当前文件夹
//作为数据持久化的路径
if opts.DataPath == "" {
cwd, _ := os.Getwd()
dataPath = cwd
}
if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
//初始化一个NSQD实例,设置其中一些字段
n := &NSQD{
startTime: time.Now(),
topicMap: make(map[string]*Topic),
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
dl: dirlock.New(dataPath),
}
//创建一个http客户端
httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)
n.ci = clusterinfo.New(n.logf, httpcli)
//以下都是对配置一些字段进行校验
n.swapOpts(opts)
n.errValue.Store(errStore{})
var err error
opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
if err != nil {
n.logf(LOG_FATAL, "%s", err)
os.Exit(1)
}
err = n.dl.Lock()
if err != nil {
n.logf(LOG_FATAL, "--data-path=%s in use (possibly by another instance of nsqd)", dataPath)
os.Exit(1)
}
if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {
n.logf(LOG_FATAL, "--max-deflate-level must be [1,9]")
os.Exit(1)
}
if opts.ID < 0 || opts.ID >= 1024 {
n.logf(LOG_FATAL, "--node-id must be [0,1024)")
os.Exit(1)
}
if opts.StatsdPrefix != "" {
var port string
_, port, err = net.SplitHostPort(opts.HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err)
os.Exit(1)
}
statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port))
prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1)
if prefixWithHost[len(prefixWithHost)-1] != '.' {
prefixWithHost += "."
}
opts.StatsdPrefix = prefixWithHost
}
if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {
opts.TLSRequired = TLSRequired
}
tlsConfig, err := buildTLSConfig(opts)
if err != nil {
n.logf(LOG_FATAL, "failed to build TLS config - %s", err)
os.Exit(1)
}
if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {
n.logf(LOG_FATAL, "cannot require TLS client connections without TLS key and cert")
os.Exit(1)
}
n.tlsConfig = tlsConfig
for _, v := range opts.E2EProcessingLatencyPercentiles {
if v <= 0 || v > 1 {
n.logf(LOG_FATAL, "Invalid percentile: %v", v)
os.Exit(1)
}
}
n.logf(LOG_INFO, version.String("nsqd"))
n.logf(LOG_INFO, "ID: %d", opts.ID)
return n
}
加载之前元数据nsqd运行停止后会创建两个文件:nsqd.dat和nsqd.(id号).dat,若之后启动nsqd仍指定相同的dataPath,nsqd会尝试从两个文件中加载之前保存元数据,即topic和channel信息,如果两个都不存在,该方法在19行返回,认为是一次全新启动,如果两个文件存在但其中保存信息不一致,nsqd停止,并提示用户删除其中一个。
func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
fn := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := oldMetadataFile(n.getOpts())
data, err := readOrEmpty(fn)
if err != nil {
return err
}
dataID, errID := readOrEmpty(fnID)
if errID != nil {
return errID
}
if data == nil && dataID == nil {
return nil // fresh start
}
if data != nil && dataID != nil {
if bytes.Compare(data, dataID) != 0 {
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
}
}
if data == nil {
// only old metadata file exists, use it
fn = fnID
data = dataID
}
var m meta
err = json.Unmarshal(data, &m)
if err != nil {
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}
for _, t := range m.Topics {
if !protocol.IsValidTopicName(t.Name) {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
topic := n.GetTopic(t.Name)
if t.Paused {
topic.Pause()
}
for _, c := range t.Channels {
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
continue
}
channel := topic.GetChannel(c.Name)
if c.Paused {
channel.Pause()
}
}
}
return nil
}
启动nsqd提供的服务&waitGroupWrapper
nsqd正式工作函数入口为nsqd.Main(),该Main函数位于,nsq/nsqd/nsqd.go中。完成了Tcp, Http, Https(如果需要的话)服务器启动,之前的program实现的Start方法非阻塞,NSQD的Main方法在Start中被调用,这就要求Main方法必须非阻塞的。但无论Tcp服务器还是Http服务器,都需要在一个死循环中对新连接进行业务处理,Main方法通过WaitGroupWrapper保证自身非阻塞。
func (n *NSQD) Main() {
var err error
//建立上下文机制,退出时,等待子协程的结束
ctx := &context{n}
n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
}
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
os.Exit(1)
}
}
tcpServer := &tcpServer{ctx: ctx}
//使用waiGroup等待所有协程退出,为了避免代码冗余和美观,封装成一个类进行。
n.waitGroup.Wrap(func() {
//处理tcp协议信息,通过实现接口使用,接口的实现类nsq/nsqd/tcp.go中
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
})
}
n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
}
以上,nsqd启动完毕。开始接收tcp和http请求处理请求。tcp处理实现通过实现Protocol接口类,每一个请求都会在IOLoop中进行处理,建立client类,处理请求返回response。
处理tcp请求时,nsq/nsqd/tcp.go中的tcpServer结构体实现了nsq/internal/protocol/tcp_server.go中的TCPHandler接口处理tcp请求。
在tcp.go中的Handler方法中,根据发送过来信息的前四个字节判断是否是V2,如果是使用protocolV2类处理信息。否则直接返回错误。
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
信息处理转到nsq/nsqd/protocol_v2.go中的 IOLoop函数执行。
func (p *protocolV2) IOLoop(conn net.Conn) error {
var err error
var line []byte
var zeroTime time.Time
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
p.ctx.nsqd.AddClient(client.ID, client)
// synchronize the startup of messagePump in order
// to guarantee that it gets a chance to initialize
// goroutine local state derived from client attributes
// and avoid a potential race with IDENTIFY (where a client
// could have changed or disabled said attributes)
//采用这种方式,是为了保证messagePump中的局部变量初始化成功,
//messagePump对sub订阅进行处理
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
for {
if client.HeartbeatInterval > 0 {
client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
} else {
client.SetReadDeadline(zeroTime)
}
// ReadSlice does not allocate new space for the data each request
// ie. the returned slice is only valid until the next call to it
//一次读取一行的数据,以\n结束。
line, err = client.Reader.ReadSlice('\n')
if err != nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("failed to read command - %s", err)
}
break
}
// trim the '\n'
line = line[:len(line)-1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
//按照分隔符空格符分开。通过查看consumer端代码,发过来的格式,命令名字+" " +(param长度 + param) + 加换行符。
//param topic, channel以及发送信息
params := bytes.Split(line, separatorBytes)
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
var response []byte
//根据解析的信息,判断命令类型,转到对应的命令操作,进行处理和回复。
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}
// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
conn.Close()
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
p.ctx.nsqd.RemoveClient(client.ID)
return err
}
在IOLoop函数处理中,在死循环中不断读取client连接中传过来的信息,每一行一条信息,解析每条信息,p.Exec转到对应命令函数进行处理。在执行sub命中,牵扯到SubEventChan,会导致channel数据的分配发生重塑。
p.messagePump处理channel中的信息。
p.messagePump,(首先明白的一点tcp长连接不发送释放命令或者意外的情况下不断开)根据连接的client发送过来的请求类型处理消息。
以sub命令为例。
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
//memeoryMsgChan和backendMsgChan是订阅channel的。
var memoryMsgChan chan *Message
var backendMsgChan chan []byte
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time
var sampleRate int32
//判断是否订阅新通道
subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
// v2 opportunistically buffers data to clients to reduce write system calls
// we force flush in two cases:
// 1. when the client is not ready to receive messages
// 2. we're buffered and the channel has nothing left to send us
// (ie. we would block in this loop anyway)
//
flushed := true
// signal to the goroutine that started the messagePump
// that we've started up
close(startedChan)
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
// you can't SUB anymore
subEventChan = nil
case identifyData := <-identifyEventChan:
// you can't IDENTIFY anymore
identifyEventChan = nil
outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}
heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}
if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}
msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}
12行subEventChan := client.SubEventChan更新消息泵的意思正是将客户端订阅的channel发送到subEventChan中去。
72-74行,在这里将从subEventChan中接收到的值保存到subChannel中。
54-56行,当subChannel被赋值为客户端订阅的channel之后,使用subChannel内存消息队列和磁盘消息队列对memoryMsgChan和backedMsgChan进行赋值。
101-132行,监听memoryMsgChan和backendMsgChan,当其上有消息传入时(向nsq发布消息,消息被写入到topic,进而被写入到topic。
当客户端订阅的topic的channel有消息时,写入到topic下的channel的消息,通过p.SendMessage(client,msg)来将消息发送给客户端。
客户端的tcp发布PUB流程
首先每一个新连接的客户端都是通过IOLoop进行处理,启动(protocloV2)messagePump,在messagePump里因为是producer所以, memoryMsgChan = nil backendMsgChan = nil flusherChan = nil。
在IOLoop中p.Exec函数的PUB将消息写到topic中,根据写入的topic中。
客户端http的PUB看nsq的http。
客户端SUB订阅
IOLoop中进行处理,启动messagePump和Exec函数。在Exec函数中,更新订阅channel的信息,更新client的信息。返回ok。
在messagePumn中当接收到channel有消息时就
网友评论