美文网首页golang学习篇章
nsq 的消息流转分析

nsq 的消息流转分析

作者: Best博客 | 来源:发表于2021-12-28 15:49 被阅读0次

    nsq

    带着一些问题,想看看nsq里面是怎么实现的

    nsqd 都做了哪些事情

    1. 将自己的host信息注册到nsqlookupd
    2. 对外提供http/https/tcp三个协议服务
    3. 封装了自己的通讯协议(http,tcp最后都是走协议交互的)
    4. message 持久化
    5. topic,channel管理
    6. 接收客户端生产消息-- 消息流转 -- 消息push到消费者客户端
    7. 支持分布式部署,但各个node相互独立并无通讯,所以未做数据分片
    8. 消息顺序无序 (想改成有序?)

    go-nsq 都做了哪些事情

    1. 通过topicName,channelName找nsqlookupd要到了所有nsqd,并全部建立tcp
    2. 告知nsqd消息每次push几条,超时时间,最大重试次数(MaxAttempts)等等(nsqd暴露能力,客户端连接时指定)
    3. 本地并发消费消息的work数量(每一条消息消费是串的,但是支持设置并发work数量)
    4. nsqlookupd告知消费的topic在新的nsqd出现了,会响应并建立新的tcp监听

    nsqlookupd 都做了哪些事情

    1. 负责接收处理nsqd的注册信息
    2. 负责接收处理nsqd的unregister消息
    3. 负责响应go-nsq 通过topic,channel 索取 nsqd的响应(doLookup)
    4. 负责推送新增或变更topic的nsqd信息到go-nsq
    5. 支持集群部署(每一个nsqlookupd都能单独对外提供所有服务,这根分布式不同,分布式是所有node加起来对外提供整体服务,也就是每个节点只是整体服务的一小部分)

    有关nsq无序改变成有序需要考虑的一些事

    1. go-nsqd消费的指定topic消息可能是来自于多个nsqd的,所以要优先保证topic只存在于一个nsqd中(业务对于该topic指定nsqd即可)
    2. go-nsqd 在连接nsqd的时候会设置每次push(nsq是push模式)过来的消息条数,当然这个MaxInFlight默认为1,你要注意这个值非1,然后本地work也大于1的时候,有序也无法保证了
    3. go-nsqd 会设置MaxAttempts,消息消费超时后的重复次数,服务端一超时就会从inflight移除,重新put走消息push流程,这也是个能影响到消息插队的因素。 保持简单我们可以通过设置业务单条msg消费的最长时间,nsq认为100%成功消费, 其他由业务方案保证(持久,增加work。。。)
    4. nsqd 持久化是将内存队列消息存入磁盘,但消息下发流程是内存chan,磁盘chan在一个select的io多路复用上的,所以最求有序就将消息流转设置成100%磁盘chan(非内存模式)模式吧

    服务管理

    前面说了nsq对外起了http,https,tcp三个服务,每个服务的start,Init,stop是通过github.com/judwhite/go-svc 这个包统一管理的

    package main
    
    import (
        "sync"
        "github.com/judwhite/go-svc"
    )
    
    //一个服务启动总不是分为几部
    //1.服务启动前的准备工作 Init()
    // 2. 服务运行的核心
    // 3. 服务被kill 的时候的退出工作 Stop()
    type program struct {
        wg   sync.WaitGroup
        quit chan struct{}
    }
    func main() {
        prg := &program{}
        if err := svc.Run(prg); err != nil {
            log.Fatal(err)
        }
    }
    
    func (p *program) Init(env svc.Environment) error {
        return nil
    }
    
    func (p *program) Start() error {
        return nil
    }
    
    func (p *program) Stop() error {
    //被kill,github.com/judwhite/go-svc帮你监听了系统信号,并触发这个stop
    //记得sync.Once避免多次收到的幂等性
        return nil
    }
    

    nsqd消息持久化

    //持久化的数据先写新文件,成功后mv修改文件名
        err := p.nsqd.LoadMetadata()
        if err != nil {
            logFatal("failed to load metadata - %s", err)
        }
        err = p.nsqd.PersistMetadata()
        if err != nil {
            logFatal("failed to persist metadata - %s", err)
        }
    

    消息流转

    1. 客户端publish消息到nsqd, 这里分析下http方式

    //路由定义,参数定里面要求带必要参数 ,topicname,message等

    1. router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
    
    1. 通过topicname获取topic对象
    reqParams, topic, err := s.getTopicFromQuery(req)
    
    1. 根据本次的topic以及消息信息,获取message对象
        msg := NewMessage(topic.GenerateID(), body)
    
    1. 使用topic对象提供的put方法将消息进行投递到内存chan或者磁盘chan
        err = topic.PutMessage(msg) // 投递 PutMessage的实现见下文
        if err != nil {
            return nil, http_api.Err{503, "EXITING"}
        }
    
    // 投递消息的实现
    func (t *Topic) PutMessage(m *Message) error {
        t.RLock()
        defer t.RUnlock()
        if atomic.LoadInt32(&t.exitFlag) == 1 {
            return errors.New("exiting")
        }
        err := t.put(m) // 投递消息到内存chan或者 磁盘中的实现
        if err != nil {
            return err
        }
        atomic.AddUint64(&t.messageCount, 1)//topic全局对象记录消息又加了一个
        atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))// 记录topic下的消息总字节数
        return nil
    }
    
    func (t *Topic) put(m *Message) error {
        select {
    //优先写入内存chan,这个内存队列会被NewTopic时候起的gotoutinue给for select 监控消费,
    //然后又入 topic的所有channel的内存chan
        case t.memoryMsgChan <- m:
        default:
            err := writeMessageToBackend(m, t.backend) //写入磁盘的方法实现
            t.nsqd.SetHealth(err)
            if err != nil {
                t.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to write message to backend - %s",
                    t.name, err)
                return err
            }
        }
        return nil
    }
    //写入磁盘
    //1.定义格式
    //2.写入 diskQueue 这个抽象,这里面有自己的ioloop进行异步,缓冲,批量刷盘
    func writeMessageToBackend(msg *Message, bq BackendQueue) error {
        buf := bufferPoolGet()
        defer bufferPoolPut(buf)
        _, err := msg.WriteTo(buf)
        if err != nil {
            return err
        }
        return bq.Put(buf.Bytes())
    }
    
    // 消息被持久化到磁盘的格式
    // message format:
    // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
    // |       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
    // |       8-byte         ||    ||                 16-byte                      || N-byte
    // ------------------------------------------------------------------------------------------...
    //   nanosecond timestamp    ^^                   message ID                       message body
    //                        (uint16)
    //                         2-byte
    //                        attempts(重试次数)
    func decodeMessage(b []byte) (*Message, error) {
        var msg Message
    
        if len(b) < minValidMsgLength {
            return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
        }
    
        msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
        msg.Attempts = binary.BigEndian.Uint16(b[8:10])
        copy(msg.ID[:], b[10:10+MsgIDLength])
        msg.Body = b[10+MsgIDLength:]
    
        return &msg, nil
    }
    
    
    
    1. 消息从topic的chan流转到topic关联的所有channel的队列中
    1. NewTopic生成topic对象的时候就起了goroutinue监控
    // Topic constructor
    func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
            ......
           ......  此处省略对象初始化的其他代码
          ......
        t.waitGroup.Wrap(t.messagePump) //这个 messagePump 是消息流转泵
    
        t.nsqd.Notify(t, !t.ephemeral)
    
        return t
    }
    
    // messagePump selects over the in-memory and backend queue and
    // writes messages to every channel for this topic
    func (t *Topic) messagePump() {
            ......
           ......  此处省略其他代码
          ......
    
        // main message loop
        for {
            select {
            case msg = <-memoryMsgChan:
            case buf = <-backendChan:
                msg, err = decodeMessage(buf)
                        ......
            case <-t.channelUpdateChan:
                    ......
                continue
            case <-t.pauseChan:
                    ......
                continue
            case <-t.exitChan:
                goto exit
            }
    
                    //往topic对应的所有channel的内存队列写
            for i, channel := range chans {
                    ......
                if chanMsg.deferred != 0 {
                                    //nsq 还支持延时消息队列,就是这里支持的
                    channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                    continue
                }
                err := channel.PutMessage(chanMsg)
                if err != nil {
                    t.nsqd.logf(LOG_ERROR,
                        "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                        t.name, msg.ID, channel.name, err)
                }
            }
        }
    
    exit:
        t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
    }
    
    1. 消息从channel推送到各个消费客户端
    1. nsqd通过Main方法进行服务启动的
    func (n *NSQD) Main() error {
            ......
    
        n.waitGroup.Wrap(func() {
            exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) //TCPServer 这个便是与客户端进行tcp连接的逻辑了,push数据很定也是这里负责最终交付流程了
        })
        if n.httpListener != nil {
            httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
            n.waitGroup.Wrap(func() {
                exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
            })
        }
        if n.httpsListener != nil {
            httpsServer := newHTTPServer(n, true, true)
            n.waitGroup.Wrap(func() {
                exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
            })
        }
    
        n.waitGroup.Wrap(n.queueScanLoop)    //消息超时,重试等监控业务流程
        n.waitGroup.Wrap(n.lookupLoop)    //注册中心业务流程
        if n.getOpts().StatsdAddress != "" {
            n.waitGroup.Wrap(n.statsdLoop)
        }
    
        err := <-exitCh
        return err
    }
    
    
    2. 客户端连接管理
    func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
            ......
        for {
            clientConn, err := listener.Accept()//连接事件,拿到连接客户端
            if err != nil {
                            ......
                break
            }
                     ......
            go func() {
                handler.Handle(clientConn)//为每个客户端起一个goroutinue,进行协议对接
                wg.Done()
            }()
        }
            ......
        return nil
    }
    2. 处理客户端tcp发送过来的各种指令(其实就是前面说到的nsq自定义协议)
    func (p *tcpServer) Handle(conn net.Conn) {
      ......
        client := prot.NewClient(conn)
        p.conns.Store(conn.RemoteAddr(), client)
        err = prot.IOLoop(client)    //起for 进行不断的监听客户端发送的包,并解析成协议对应指令
        if err != nil {
            p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
        }
    
        p.conns.Delete(conn.RemoteAddr())
        client.Close()
    }
    
    // 监听获取数据包,解析成cmd协议指令
    func (p *protocolV2) IOLoop(c protocol.Client) error {
    
        client := c.(*clientV2)
        go p.messagePump(client, messagePumpStartedChan)//消息泵,负责接收channel对应的队列消息,并推送给客户端
        <-messagePumpStartedChan
    
          //下面的for是用于鉴定客户端的数据包,然后将其解析成cmd指定,最后执行
        for {
            line = line[:len(line)-1]
            if len(line) > 0 && line[len(line)-1] == '\r' {
                line = line[:len(line)-1]
            }
            params := bytes.Split(line, separatorBytes)   //协议解析
            var response []byte
            response, err = p.Exec(client, params)  //命令执行
                   ......
    }
    
    3. channel的消息推送给客户端消费者
    func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
     ......
     ......
        for {
            if subChannel == nil || !client.IsReadyForMessages() {
                // the client is not ready to receive messages...
                memoryMsgChan = nil
                backendMsgChan = nil
                flusherChan = nil
                // force flush
                client.writeLock.Lock()
                err = client.Flush()
                client.writeLock.Unlock()
                if err != nil {
                    goto exit
                }
                flushed = true
            } else if flushed {
                // last iteration we flushed...
                // do not select on the flusher ticker channel
                memoryMsgChan = subChannel.memoryMsgChan
                backendMsgChan = subChannel.backend.ReadChan()
                flusherChan = nil
            } else {
                // we're buffered (if there isn't any more data we should flush)...
                // select on the flusher ticker channel, too
                memoryMsgChan = subChannel.memoryMsgChan
                backendMsgChan = subChannel.backend.ReadChan()
                flusherChan = outputBufferTicker.C
            }
    
            select {
            case <-flusherChan:
                           ......
            case <-client.ReadyStateChan:
            case subChannel = <-subEventChan:
             ......
            case identifyData := <-identifyEventChan:
                     ......
            case <-heartbeatChan:
                     ......
            case b := <-backendMsgChan://磁盘chan的消息消费
                if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                    continue
                }
    
                msg, err := decodeMessage(b)
                if err != nil {
                    p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                    continue
                }
                msg.Attempts++//重试次数累加,这个值会被发送客户端,客户端发现到达最大值就直接回复finish,达到丢弃效果,所以服务端只是傻瓜式,有消费者客户端决定消息丢弃不,不丢弃就继续。。。
    
                subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
                client.SendingMessage()
                err = p.SendMessage(client, msg)//把消息推送到客户端
                if err != nil {
                    goto exit
                }
                flushed = false
            case msg := <-memoryMsgChan://channel的内存队列消息
                if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                    continue
                }
                msg.Attempts++
    
                subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
                client.SendingMessage()
                err = p.SendMessage(client, msg)
                if err != nil {
                    goto exit
                }
                flushed = false
            case <-client.ExitChan:
                goto exit
            }
        }
    
    exit:
        p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
        heartbeatTicker.Stop()
        outputBufferTicker.Stop()
        if err != nil {
            p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
        }
    }
    
    
    

    总结

    1.平时还是调用服务居多,如果自己写个tcp服务对外,那岂不是自己可以把它的协议封装,解析,沾包处理拿过来用么。

    1. nsq在处理消息流转,消息输入到输出看做一个io模型的话,那nsq还是做了很多事情的,比如持久化,相互独立的分布式,io多路复用的编程思想,重试机制
    2. nsq其实是多个子服务组装而成的,代码层级维护也可以细品品,如果实现功能的同时保持简单。。
    3. nsq的很多设置都可以通过启动命令行参数指定,这个cli到代码config这个过程还是挺简单灵活的

    参考文献:

    http://doc.yonyoucloud.com/doc/wiki/project/nsq-guide/quick_start.html
    nsq客户端
    消费流程

    相关文章

      网友评论

        本文标题:nsq 的消息流转分析

        本文链接:https://www.haomeiwen.com/subject/nimyqrtx.html