nsq为生产环境的消息中间,遭遇消息重复消费,思索客户端与服务端是怎么通过 REQ 与 FIN 通讯的(客户端回复:REQ 重新入队,FIN 成功消费)。后面发现这个重复消费的问题还得结合nsq服务端启动配置的参数结合分析才行。
https://github.com/nsqio/nsq
1.分析nsq客户端时怎么接收nsqd服务端的消息的
//以下是客户端包源代码文档demo
type myMessageHandler struct {}
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
return nil
}
err := processMessage(m.Body)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return err
}
func main() {
// Instantiate a consumer that will subscribe to the provided channel.
config := nsq.NewConfig() //得到配置consumer的配置对象
consumer, err := nsq.NewConsumer("topic", "channel", config) //得到consumer对象
if err != nil {
log.Fatal(err)
}
// Set the Handler for messages received by this Consumer. Can be called multiple times.
// See also AddConcurrentHandlers.
consumer.AddHandler(&myMessageHandler{}) //设置接收tcp传送过来的
//nsqd的消息,此方法里面 handlerLoop 一个for循环就 直接对接一个 incomingMessages 的channel了,达到解耦的目的. 并且会根据 err是否为nil去触发不同的事件,达到往channel写进不同的值(就是不同值的cmd结构体,req与fin)
// Use nsqlookupd to discover nsqd instances.
// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
err = consumer.ConnectToNSQLookupd("localhost:4161")
//连接客户端与服务端nsq,前面都是配置属性和准备好方法,这里才是通信的起点,golang的channel秒到了解耦真是方便,你看还没连接服务器,就可以把消费服务器的消费者方法先执行起来,这个连接方法里面除连接外,就是tcp的readLoop与writeLoop 套路了,跟之前了解到的纯websocket的套路大同小异
if err != nil {
log.Fatal(err)
}
// Gracefully stop the consumer.
consumer.Stop()
}
//1.以上是一个nsq消费者 初始化config对象(可通过自定义配置文件去控制)
//2.根据config对象去初始化 一个 consumer对象,这才是重点,
//2.1: 根据config初始化的对象达到了大部分consumer属性值的控制(也就是行为控制),但是我们还需要自己自定义一个结构体(该结构体得具备HandleMessage方法),说白了就是闭包去初始化consumer需要灵活配置的方法。
//2.2: 这个 HandleMessage 方法是用来处理nsqd推送过来的消息的,这个方法返回err为 nil 则表示消费成功,err不为nil则表示消费不成功,那么该消息会重新入nsqd队列。
// 2.3: 在2.2中说的都是结果,以下就是来分析返回的 err为nil与非nil 时候 nsq客户端分别给服务端nsqd 返回的 (REQ与FIN)是怎么来的
整理流程:
nsq.NewConfig() 初始化配置对象
nsq.NewConsumer 初始化消费者对象
nsqConsumer.AddConcurrentHandlers 初始化消费者消费用来处理nsqd推送过来消息的处理器handle
nsqConsumer.ConnectToNSQD(mqHost)
1.conn := NewConn(addr, &r.config, &consumerConnDelegate{r}) 初始化一个conn对象
- resp, err := conn.Connect() 这里面建立tcp与nsqd的连接,
2.1. go c.readLoop() 接收nsqd推送过来的信息,并 r.incomingMessages <- msg 往这个channel里面推送,这个channel就是上面的 handle 的channel消费的channel了,如此nsqd到消费者这条路就通了
2.2. go c.writeLoop() 监控 case resp := <-c.msgResponseChan: 也就是 c.msgResponseChan的channel,并根据resp不同触发不同的回复事件,resp的不同由我们上面定义handle的返回值确定,最终就是回复nsqd的是REQ还是FIN
网友评论