美文网首页
nsq-protocolV2中的消息流转

nsq-protocolV2中的消息流转

作者: nil_ddea | 来源:发表于2019-08-21 15:48 被阅读0次

nsq的nsqd组件中一条消息的流转过程

protocolV2

tcp链接建立后,connect交protocolV2的IOLoop方法处理。

client

protocolV2将connection包装成一个client,生成一个clientID,注册到NSQD中。

clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
p.ctx.nsqd.AddClient(client.ID, client)

断开连接时移除这个client。

// 接收到SUB命令时 client会订阅一个channel 断开链接后移除这个client
if client.Channel != nil { 
        client.Channel.RemoveClient(client.ID)
}
p.ctx.nsqd.RemoveClient(client.ID)

client结构中主要的字段:

net.Conn 对应的tcp链接
Reader *bufio.Reader 对tcp链接读缓冲 16k
Writer *bufio.Writer 对tcp链接写缓冲 16k
Channel *Channel 订阅的channel
SubEventChan chan *Channel 在IOLoop中通知messagePump订阅
ClientID string 唯一标示

IOLoop

IOLoop主要包括:

  • 主循环,从client中读取一行解析出对应命令并执行。
  • 一个goroutine执行protocolV2的messagePump,处理订阅消息。

IOLoop的主循环

读client的connect的Reader包装,
这里用到了bufio.Reader的ReadSlice方法,利用了bufio.Reader的缓冲,避免重复申请内存。
读出的一行按separatorBytes分割成多个参数,执行Exec。

line, err = client.Reader.ReadSlice('\n')
// trim the '\n'
line = line[:len(line)-1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)-1] == '\r' {
    line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes)
p.Exec(client, params)

Exec方法依靠params[0]决定具体调用的方法。

if bytes.Equal(params[0], []byte("IDENTIFY")) {
        return p.IDENTIFY(client, params)
    }
    err := enforceTLSPolicy(client, p, params[0])
    if err != nil {
        return nil, err
    }
    switch {
    case bytes.Equal(params[0], []byte("FIN")):
        return p.FIN(client, params)
    case bytes.Equal(params[0], []byte("RDY")):
        return p.RDY(client, params)
    case bytes.Equal(params[0], []byte("REQ")):
        return p.REQ(client, params)
    case bytes.Equal(params[0], []byte("PUB")):
        return p.PUB(client, params) // write to memoryChan or backenQueue
    case bytes.Equal(params[0], []byte("MPUB")):
        return p.MPUB(client, params)
    case bytes.Equal(params[0], []byte("DPUB")):
        return p.DPUB(client, params)
    case bytes.Equal(params[0], []byte("NOP")):
        return p.NOP(client, params)
    case bytes.Equal(params[0], []byte("TOUCH")):
        return p.TOUCH(client, params)
    case bytes.Equal(params[0], []byte("SUB")):
        return p.SUB(client, params)
    case bytes.Equal(params[0], []byte("CLS")):
        return p.CLS(client, params)
    case bytes.Equal(params[0], []byte("AUTH")):
        return p.AUTH(client, params)
    }

主要分析PUBSUB


PUB主要代码

topicName := string(params[1])
topic := p.ctx.nsqd.GetTopic(topicName) // 向下追可以看到如果是一个不存在的topic 将自动创建
msg := NewMessage(topic.GenerateID(), messageBody)
err = topic.PutMessage(msg)

PUB实现了将消息发布到一个topic的逻辑。
向下追topic.PutMessage的代码,在这里消息被写入memoryMsgChan或backend。

select {
    case t.memoryMsgChan <- m: // 写入memoryMsgChan
    default: // 写入backend
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }

在nsq中,通过channel消费topic中的内容,在topic得到发布的消息时,会将topic中的消息复制到他的每一个channel中,这个过程在topic.messagePump中实现。

//  主要代码
for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
        }
        for i, channel := range chans {
            chanMsg := msg
            channel.PutMessage(chanMsg)
        }
    }

流程简单概括为:
--> 读connect一行拆分多个参数
--> 执行第一个参数对应的方法(PUB)
--> 根据第二个参数获取topic,没有的话创建
--> 将消息的内容放到对应topic的memoryMsgChan或backendChan
--> topic的messagePump取出消息,将消息复制到每一个channel中
一个完整的PUB方法结束。


SUB代码

topicName := string(params[1])
channelName := string(params[2])
topic := p.ctx.nsqd.GetTopic(topicName) // 没有则创建
channel = topic.GetChannel(channelName) // 同上
channel.AddClient(client.ID, client)
client.Channel = channel
client.SubEventChan <- channel

SUB方法从两个参数中得到订阅的topic和channel,并将client注册到改channel中。
同时将channel传入client.SubEventChan,将在protocolV2的messagePump中进一步处理。


messagePump

这里有点复杂,没全看懂,写一下基本流程。
messagePump用来处理SUB之后的逻辑,从方才的SUB方法中看到,订阅一个频道后向client.SubEventChan中添加了这个频道。这个频道会在在messagePump取出进一步操作。

// 主要代码
for{
if subChannel == nil || !client.IsReadyForMessages() {
        } else if flushed {
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
        } else {
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }
select {
        case subChannel = <-subEventChan: 
            subEventChan = nil
        case b := <-backendMsgChan:
            msg, err := decodeMessage(b)
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            p.SendMessage(client, msg)
        case msg := <-memoryMsgChan:
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            p.SendMessage(client, msg)
        }
}

在SubEventChan中取得频道之后,循环memoryMsgChan和backendMsgChan指向这个频道。
在循环中select获取这个频道的subEventChan或memoryMsgChan。
上文中PUB方法最后将消息放入了channel中,和放入topic中一样channel有memoryMsgChan和backendMsgChan,在这里被订阅后开始消费。


只记录了最基本的SUB、PUB消息流转,还有大量细节。

相关文章

网友评论

      本文标题:nsq-protocolV2中的消息流转

      本文链接:https://www.haomeiwen.com/subject/jwwbsctx.html