美文网首页golang学习篇章
nsq客户端源码分析

nsq客户端源码分析

作者: Best博客 | 来源:发表于2020-05-17 00:44 被阅读0次

    nsq为生产环境的消息中间,遭遇消息重复消费,思索客户端与服务端是怎么通过 REQ 与 FIN 通讯的(客户端回复:REQ 重新入队,FIN 成功消费)。后面发现这个重复消费的问题还得结合nsq服务端启动配置的参数结合分析才行。
    https://github.com/nsqio/nsq

    1.分析nsq客户端时怎么接收nsqd服务端的消息的

    //以下是客户端包源代码文档demo
    
    type myMessageHandler struct {}
    
    func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
        if len(m.Body) == 0 {
            // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
            return nil
        }
        err := processMessage(m.Body)
        // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
        return err
    }
    
    func main() {
        // Instantiate a consumer that will subscribe to the provided channel.
        config := nsq.NewConfig()  //得到配置consumer的配置对象
        consumer, err := nsq.NewConsumer("topic", "channel", config)   //得到consumer对象
        if err != nil {
            log.Fatal(err)
        }
    
        // Set the Handler for messages received by this Consumer. Can be called multiple times.
        // See also AddConcurrentHandlers.
        consumer.AddHandler(&myMessageHandler{})  //设置接收tcp传送过来的
    //nsqd的消息,此方法里面 handlerLoop 一个for循环就 直接对接一个 incomingMessages 的channel了,达到解耦的目的. 并且会根据 err是否为nil去触发不同的事件,达到往channel写进不同的值(就是不同值的cmd结构体,req与fin)
    
        // Use nsqlookupd to discover nsqd instances.
        // See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
        err = consumer.ConnectToNSQLookupd("localhost:4161")
    //连接客户端与服务端nsq,前面都是配置属性和准备好方法,这里才是通信的起点,golang的channel秒到了解耦真是方便,你看还没连接服务器,就可以把消费服务器的消费者方法先执行起来,这个连接方法里面除连接外,就是tcp的readLoop与writeLoop 套路了,跟之前了解到的纯websocket的套路大同小异
        if err != nil {
            log.Fatal(err)
        }
    
        // Gracefully stop the consumer.
        consumer.Stop()
    }
    
    //1.以上是一个nsq消费者 初始化config对象(可通过自定义配置文件去控制)
    //2.根据config对象去初始化 一个  consumer对象,这才是重点,
    //2.1: 根据config初始化的对象达到了大部分consumer属性值的控制(也就是行为控制),但是我们还需要自己自定义一个结构体(该结构体得具备HandleMessage方法),说白了就是闭包去初始化consumer需要灵活配置的方法。
    //2.2: 这个 HandleMessage 方法是用来处理nsqd推送过来的消息的,这个方法返回err为 nil 则表示消费成功,err不为nil则表示消费不成功,那么该消息会重新入nsqd队列。
    // 2.3:  在2.2中说的都是结果,以下就是来分析返回的 err为nil与非nil 时候  nsq客户端分别给服务端nsqd 返回的 (REQ与FIN)是怎么来的
    
    

    整理流程:

    nsq.NewConfig() 初始化配置对象

    nsq.NewConsumer 初始化消费者对象

    nsqConsumer.AddConcurrentHandlers 初始化消费者消费用来处理nsqd推送过来消息的处理器handle

    nsqConsumer.ConnectToNSQD(mqHost)
    1.conn := NewConn(addr, &r.config, &consumerConnDelegate{r}) 初始化一个conn对象

    1. resp, err := conn.Connect() 这里面建立tcp与nsqd的连接,
      2.1. go c.readLoop() 接收nsqd推送过来的信息,并 r.incomingMessages <- msg 往这个channel里面推送,这个channel就是上面的 handle 的channel消费的channel了,如此nsqd到消费者这条路就通了
      2.2. go c.writeLoop() 监控 case resp := <-c.msgResponseChan: 也就是 c.msgResponseChan的channel,并根据resp不同触发不同的回复事件,resp的不同由我们上面定义handle的返回值确定,最终就是回复nsqd的是REQ还是FIN

    相关文章

      网友评论

        本文标题:nsq客户端源码分析

        本文链接:https://www.haomeiwen.com/subject/objrohtx.html