美文网首页nsq
Nsq从入门到实践

Nsq从入门到实践

作者: bysir | 来源:发表于2018-07-27 22:26 被阅读0次

    当nsq跑起来之后, 我们可能会遇到以下问题

    • 分布式部署
    • 处理错误(何时requeue)
    • 如何使用golang lib
    • 消息生命周期如何, 如重试/断线重连逻辑.

    抱着不应该只停留在入门的态度, 笔者粗浅的研究了一下这几个问题, 希望也对有同样疑问的人有帮助.

    分布式部署

    注意:
    由于NSQ的分布式网络结构, 必须为每一个NSQD分配单独的地址(如IP或host)以保证消费者在lookup找到NSQD节点后能够正确的连接到对于的NSQD节点, 这也就意味着需要好好的规划NSQD的广播地址. (下文会说到在Rancher中如何配置NSQD的广播地址)

    NSQ推荐的部署方式是 让NSQD与TOPIC生产者一一对应 既一个生产者一个NSQD.

    如: user服务需要发送一个Topic为register的消息, 那么就需要为user服务创建一个NSQD, 广播地址设置这台服务器的ip, 例如10.0.0.1. 现在消费者端需要连接上Nsqlookupd就能得到生产者NSQD节点的地址即10.0.0.1.

    如果如果想要消除单点故障, 那么我们需要为user服务再添加一台服务器, 并且也在这台机器上添加一个Nsqd节点并连接上Nsqlookupd, 广播地址设置为这台服务器的ip, 如10.0.0.2, 现在消费者通过Nsqlookupd就能得到两个Nsqd节点的地址并连接了, 只要有一个服务器是"活"的, 那么整个系统就能正常使用.

    详细可参考官方文档的拓扑图: topology_patterns

    topology_patterns

    在Rancher中部署

    官方文档提供的分布式部署方式是多主机部署, 所以如何在Rancher中部署就只有自己实践了.

    由于篇幅较多, 所以另起一篇.

    NSQ Requeue And Backoff

    建议结合官方文档来看

    requeue(重试)

    当错误发生, 需要重试时就应该使用nsq的requeue功能.

    backoff(避退)

    backoff能降低消费者吞吐量以让消费者从错误中恢复.

    当消费者在backoff状态时, 这个消费者将不再处理任何消息, 直到backoff超时

    当触发backoff时控制台将打印:

    // 进入backoff状态, RDY设置为0代表准备接收0条消息(不接收消息) (协议详情看 https://nsq.io/clients/tcp_protocol_spec.html)
    WRN    1 [test/test] backing off for 1m4s (backoff level 6), setting all to RDY 0
    // 时间到了将设置RDY为1接收1条消息以测试状态, 官方将这个状态称为`tests the waters`
    WRN    1 [test/test] (DESKTOP-HELJ7V4:4150) backoff timeout expired, sending RDY 1
    

    当有多个消费者竞争时, 出错的消费者应当主动backoff不再处理消息(以让出更多的机会给其他消费者).
    如果只有一个消费者, 则消费者会等到backoff超时后才开始处理消息(空出时间让消费者恢复).

    避退是存在于整个消费者上的, 所以消费者每当一个消息处理失败了之后都会增加这个消费者的backoff level. 这会影响这个消费者的处理能力.

    到底需不需要用backoff, 就要看业务了:

    • 消息是用来更新数据库订单状态的, 这是一个不容易出错的逻辑, 如果需要requeue则需要backoff让出优先级, 让其他消费者来做, 尽量以挽救这个订单.
    • 消息是用来通知第三方(如支付宝支付成功的http回调)的, 一般requeue是发生在第三方端响应不满足预期的响应, 这不是我方消费者的错误, 应当不使用backoff, 避免阻塞消息消费.

    参考:

    golang lib

    nsq提供golang的client lib. 支持全部特性.

    本着不重复造轮子原则, 我也想尽大可能的使用nsq lib里的代码逻辑来实现需求, 但有些需求它实现不了, 我也只好自己写代码了.

    先看看它原有的几个逻辑

    消息自动重试:

    // Handler is the message processing interface for Consumer
    //
    // Implement this interface for handlers that return whether or not message
    // processing completed successfully.
    //
    // When the return value is nil Consumer will automatically handle FINishing.
    //
    // When the returned value is non-nil Consumer will automatically handle REQueing.
    type Handler interface {
        HandleMessage(message *Message) error
    }
    

    消息自动重试与判断失败:

    func (r *Consumer) handlerLoop(handler Handler) {
        r.log(LogLevelDebug, "starting Handler")
    
        for {
            message, ok := <-r.incomingMessages
            if !ok {
                goto exit
            }
    
            if r.shouldFailMessage(message, handler) {
                message.Finish()
                continue
            }
    
            err := handler.HandleMessage(message)
            if err != nil {
                r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
                if !message.IsAutoResponseDisabled() {
                    message.Requeue(-1)
                }
                continue
            }
    
            if !message.IsAutoResponseDisabled() {
                message.Finish()
            }
        }
    
    exit:
        r.log(LogLevelDebug, "stopping Handler")
        if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
            r.exit()
        }
    }
    

    判断失败:

    func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
        // message passed the max number of attempts
        if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
            r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
                message.ID, message.Attempts)
    
            logger, ok := handler.(FailedMessageLogger)
            if ok {
                logger.LogFailedMessage(message)
            }
    
            return true
        }
        return false
    }
    

    接下来就是优化他们了

    优化requeue

    可以看到当handler返回的error不为空时, nsq将自动requeue, 这种重试是很方便但是

    使用这个重试机制的坏处是:

    • 不能自定义requeue的等待时间(默认等待时间=config.DefaultRequeueDelay*Attempts)
    • 会在控制台打印一个ERR(不能自定义格式, 而且有一些err不应该打印到控制台), 这点可能有洁癖的开发者受不了.
    • 一些错误不应该重试, 如入参不合法, 再怎么重试也是徒劳. 这时候应该直接失败.

    所以我建议不要使用这个err机制, 而应当手动使用msg.Requeue(-1)或者msg.RequeueWithoutBackoff(-1) 来显示指定requeue.

    shouldFailMessage

    我们可以使用 FailedMessageLogger interface自定义当消息失败时的处理方式.

    但它的shouldFailMessage又有什么需求满足不了呢?

    • 在失败的时候拿到最后一次错误信息
    • shouldFailMessage只能判断处理重试次数过多的失败, 不能处理直接失败的消息.

    所以就只有如下自己实现啦:

    我们直接在Handler中判断Attempts来实现错误处理.

    但为了保证我们的消息不被shouldFailMessage处理, 需要配置MaxAttempts为0或者一个比较大的数.

    我就不把自己乱糟糟的代码拿出来了, 你们可以更简单的实现.

    nsqadmin

    nsqadmin 提供一个web页面来管理nsq的消息/Topic/Channel.

    Lookup

    loopup

    我们知道还没有生产者产生消息时(比如刚刚才部署), topic不存在, 这时如果有消费者连接上nsqlookup就会一直报错 topic not found, 为了避免这个报错, 就可以在Create Topic/Channel栏目中预先创建topic和channel.

    比如下图是添加了名为test的topic.

    可以看到提示说这个topic当前不活跃, 也就是只在nsqloopup新建了topic但是没有在任何nsqd里生产. 这个提示在nsq开始发送第一个消息后消失.

    如何保证消息被至少投递一次

    重试

    在Handler中返回一个错误就会触发重试, 重试的消息被存储在nsq的Deferred队列, 一定延时后消费者会再次收到此消息.

    断线恢复

    发送给消费者的消息总会被nsq先存储在InFlight队列, 消费者处理完消息需要给nsq发送FIN消息, 这时nsq才算完成了消息的投递.

    如果消费者没有发送FIN给nsq的话(如断线了)会出现什么情况? 在nsq后台有一个专门的协程处理InFlight队列, 当消息超过了一定时间还没有被FIN 则会重新加入队列发送给其他消费者.

    相关文章

      网友评论

        本文标题:Nsq从入门到实践

        本文链接:https://www.haomeiwen.com/subject/cpdgmftx.html