美文网首页nsq
Go消息中间件Nsq系列(八)------topic(主题发布)

Go消息中间件Nsq系列(八)------topic(主题发布)

作者: Yangwenliu | 来源:发表于2019-08-28 14:43 被阅读0次

    上一篇: Go消息中间件Nsq系列(七)------go-diskqueue 文件队列实现

    1. Topic/Channel 回顾

    前文有说过 Topic/Channel是发布/订阅模型的一种实现。Topic对应发布, Channel对应于订阅。

    1. 订阅: SUB 消费者订阅某个Topic-Channel的消息
    2. 发布: PUB, 消息提供者往某个NSQD PUB一条消息,在由topic投递给其下面的所有channel
    func PUB(client *clientV2, params [][]byte) ([]byte, error)
    // ... 省略
    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
    err = topic.PutMessage(msg)
    

    其他请往回看.

    2. 思考一下

    1. 要投递就要对应的topic, 那么topic是什么时候获取,如何创建的呢?
    2. 它是怎么处理投递的消息的呢?
    3. 如何暂停/取消暂停, 关闭和删除的呢?
    4. 与topic相关联的channel 查询,创建,删除呢?
    5. 如何统计投递延迟呢?

    3. Topic的创建 GetTopic(topicName string) *Topic

    3.1 topic的结构体定义如下:

    type Topic struct {
        // 64bit atomic vars need to be first for proper alignment on 32bit platforms
        messageCount uint64 // 消息总数
        messageBytes uint64 // 消息字节数
    
        sync.RWMutex
    
        name              string // 主题名称
        channelMap        map[string]*Channel // topic关联的map
        backend           BackendQueue // BackendQueue 后台队列实现
        memoryMsgChan     chan *Message // 内存消息通道 默认10000缓冲区
        startChan         chan int // topic启动通道
        exitChan          chan int // 退出通道
        channelUpdateChan chan int // channel更新通道
        waitGroup         util.WaitGroupWrapper // 多协程任务封装
        exitFlag          int32 // 退出标志
        idFactory         *guidFactory // guid 生成器
    
        ephemeral      bool // 临时的topic
        deleteCallback func(*Topic) // 删除回调
        deleter        sync.Once // 删除仅一次
    
        paused    int32  // 暂停标志位
        pauseChan chan int // 暂停通道
    
        ctx *context // 透传 nsqd 上下文
    }
    

    3.2Topic的获取情况与过程:

    1. nsqd启动时,LoadMetadata() , 获取并启动Topic进行消息投递处理
    2. 通过暴露的api接口去查询的时候 getTopicFromQuery(req *http.Request)
    3. 客户端进行PUB DPUB MPUB SUB 根据对应的Topic去处理

    3.3获取过程:

    GetTopic 是线程安全的操作, 返回一个Topic指针对象
    1.如果该topic已存在 则返回已存在,
    2.否则新建一个topic,并保存起来
    如果不是正在加载的情况, 则根据该topic去lookupd发现获取对应的channels(非临时的)
    并通过chan(异步通知)启动消息投递处理协程

    func (n *NSQD)  GetTopic(topicName string) *Topic {
        // ..其他省略 新建Topic
        t = NewTopic(topicName, &context{n}, deleteCallback)
        return t
    }
    

    3.4Topic的创建:

    是否创建临时的Topic,所使用的backedQueue不一样, 临时的只是一个模拟的队列, 非临时的就是需要持久化的
    然后启动一个协程messagePump来进行消息投递处理

    // Topic constructor
    func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
        t := &Topic{
            name:              topicName,
            channelMap:        make(map[string]*Channel),
            memoryMsgChan:     make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
            startChan:         make(chan int, 1),
            exitChan:          make(chan int),
            channelUpdateChan: make(chan int),
            ctx:               ctx,
            paused:            0,
            pauseChan:         make(chan int),
            deleteCallback:    deleteCallback,
            idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),
        }
    
        // 如果是临时的topic
        if strings.HasSuffix(topicName, "#ephemeral") {
            t.ephemeral = true
            t.backend = newDummyBackendQueue()
        } else {
            // 日志
            dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
                opts := ctx.nsqd.getOpts()
                lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
            }
            // 初始化 磁盘队列, 用于内存消息缓冲超过阈值(默认配置10000, 详细看上一篇)
            t.backend = diskqueue.New(
                topicName,
                ctx.nsqd.getOpts().DataPath,
                ctx.nsqd.getOpts().MaxBytesPerFile,
                int32(minValidMsgLength),
                int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
                ctx.nsqd.getOpts().SyncEvery,
                ctx.nsqd.getOpts().SyncTimeout,
                dqLogf,
            )
        }
    
        // topic核心处理
        t.waitGroup.Wrap(t.messagePump)
    
        // 通知持久化
        t.ctx.nsqd.Notify(t)
    
        return t
    }
    

    4.Topic处理消息投递

    4.1外部调用Topic投递消息:

    1. 单条消息投递(a message)
      如果该topic已关闭,则无法进行消息投递
      进行消息投递, 先走case也就是内存,缓冲区已满 在走default(磁盘队列)
      添加统计信息 (消息计数器, 消息主体大小)
    // PutMessage writes a Message to the queue
    func (t *Topic) PutMessage(m *Message) error {
        t.RLock()
        defer t.RUnlock()
        if atomic.LoadInt32(&t.exitFlag) == 1 {
            return errors.New("exiting")
        }
        err := t.put(m)
        if err != nil {
            return err
        }
        atomic.AddUint64(&t.messageCount, 1)
        atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
        return nil
    }
    

    4.2. 批量消息投递(multiple message)

    如果该topic已关闭,则无法进行消息投递
    遍历投递, 统计..
    如果用可选参数 PutMessage(msgs ...Message) 单或者多都行

    // PutMessages writes multiple Messages to the queue
    func (t *Topic) PutMessages(msgs []*Message) error {
        t.RLock()
        defer t.RUnlock()
        if atomic.LoadInt32(&t.exitFlag) == 1 {
            return errors.New("exiting")
        }
        messageTotalBytes := 0
    
        for i, m := range msgs {
            err := t.put(m)
            if err != nil {
                atomic.AddUint64(&t.messageCount, uint64(i))
                atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
                return err
            }
            messageTotalBytes += len(m.Body)
        }
    
        atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
        atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
        return nil
    }
    

    4.3以下内部调用的put 函数

    select 先走case, 也就是内存缓冲区
    如果case没有执行,看看有没有default, 接着执行default
    也就是说 消息先投递到内存, 内存缓冲区满了之后会将消息写入到磁盘队列
    这里使用了sync.Pool 减少GC
    同时也会看每次写入磁盘是否有错误, 设置其健康状态保存已暴露给api接口/ping使用

    func (t *Topic) put(m *Message) error {
        select {
        case t.memoryMsgChan <- m:
        default:
            b := bufferPoolGet()
            err := writeMessageToBackend(b, m, t.backend)
            bufferPoolPut(b)
            t.ctx.nsqd.SetHealth(err)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to write message to backend - %s",
                    t.name, err)
                return err
            }
        }
        return nil
    }
    

    4.4 Topic的启动

    messagePump topic的核心协程处理 阻塞等待
    通过发送startChan 信号 异步通知 启动

    func (t *Topic) Start() {
        select {
        case t.startChan <- 1: // 写入chan
        default:
        }
    }
    

    4.5 messagePump() 处理消息往channel投递, 以及更新channel

    messagePump() 整个topic处理的核心所在:
    这里使用了一个技巧, 使用channel阻塞等待, 在异步唤醒的操作继续走接下流程.
    从内存或磁盘队列中读取消息, 遍历该topic下所有的channel,复制消息进行投递,如果是延时消息则投递到延时队列, 否则就正常投递
    删除或新建的Channel的时候, 走channelUpdateChan case,去更新当前chans列表
    暂停, 和退出
    至于把memoryMsgChan / backendChan = nil, 这样子 就会跳过select 选取

    // messagePump selects over the in-memory and backend queue and
    // writes messages to every channel for this topic
    func (t *Topic) messagePump() {
        var msg *Message
        var buf []byte
        var err error
        var chans []*Channel
        var memoryMsgChan chan *Message
        var backendChan chan []byte
    
        // do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
        // 这里使用了一个技巧, 使用channel阻塞等待, 在异步唤醒的操作
        for {
            select {
            case <-t.channelUpdateChan:
                continue
            case <-t.pauseChan:
                continue
            case <-t.exitChan:
                goto exit
            case <-t.startChan:
            }
            break
        }
        t.RLock()
        //  读锁  把所有的channel遍历合并
        for _, c := range t.channelMap {
            chans = append(chans, c)
        }
        t.RUnlock()
        // topic没有暂停(pause) 并且 有可进行投递的channel
        if len(chans) > 0 && !t.IsPaused() {
            memoryMsgChan = t.memoryMsgChan
            backendChan = t.backend.ReadChan()
        }
    
        // main message loop
        for {
            select {
            // 从内存或者磁盘 获取消息 并序列化成Message
            case msg = <-memoryMsgChan:
            case buf = <-backendChan:
                msg, err = decodeMessage(buf)
                if err != nil {
                    t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                    continue
                }
            case <-t.channelUpdateChan:
                // 更新channel
                chans = chans[:0]
                t.RLock()
                for _, c := range t.channelMap {
                    chans = append(chans, c)
                }
                t.RUnlock()
                if len(chans) == 0 || t.IsPaused() {
                    memoryMsgChan = nil
                    backendChan = nil
                } else {
                    memoryMsgChan = t.memoryMsgChan
                    backendChan = t.backend.ReadChan()
                }
                continue
            case <-t.pauseChan:
                // 暂停
                if len(chans) == 0 || t.IsPaused() {
                    memoryMsgChan = nil
                    backendChan = nil
                } else {
                    memoryMsgChan = t.memoryMsgChan
                    backendChan = t.backend.ReadChan()
                }
                continue
            case <-t.exitChan:
                // 退出
                goto exit
            }
            // 遍历所有的channel, 复制消息进行投递
            // 如果是延时消息则投递到延时队列, 否则就正常投递
            for i, channel := range chans {
                chanMsg := msg
                // copy the message because each channel
                // needs a unique instance but...
                // fastpath to avoid copy if its the first channel
                // (the topic already created the first copy)
                if i > 0 {
                    chanMsg = NewMessage(msg.ID, msg.Body)
                    chanMsg.Timestamp = msg.Timestamp
                    chanMsg.deferred = msg.deferred
                }
                if chanMsg.deferred != 0 {
                    channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                    continue
                }
                err := channel.PutMessage(chanMsg)
                if err != nil {
                    t.ctx.nsqd.logf(LOG_ERROR,
                        "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                        t.name, msg.ID, channel.name, err)
                }
            }
        }
    
    exit:
        t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
    }
    

    5. Topic的暂停/恢复 , 删除/关闭

    5.1 暂停与恢复

    通过设置标志位&t.paused 使用chan 异步通信控制

    // 暂停  标志位 paused = 1
    func (t *Topic) Pause() error {
        return t.doPause(true)
    }
    // resume 恢复 标志位 paused = 0
    func (t *Topic) UnPause() error {
        return t.doPause(false)
    }
    
    // 原子操作 设置标志位值, 给messagePump发送异步通知
    func (t *Topic) doPause(pause bool) error {
        if pause {
            atomic.StoreInt32(&t.paused, 1)
        } else {
            atomic.StoreInt32(&t.paused, 0)
        }
    
        select {
        case t.pauseChan <- 1:
        case <-t.exitChan:
        }
    
        return nil
    }
    // 判断是否暂停 状态
    func (t *Topic) IsPaused() bool {
        return atomic.LoadInt32(&t.paused) == 1
    }
    
    

    5.2 删除与关闭

    如果是删除 则删除当前关联的channel, 关闭所有客户端连接, 并清空磁盘消息,关闭磁盘读写
    关闭仅仅是关闭当前连接,把未读的消息flush到磁盘

    // Delete empties the topic and all its channels and closes
    func (t *Topic) Delete() error {
        return t.exit(true)
    }
    
    // Close persists all outstanding topic data and closes all its channels
    func (t *Topic) Close() error {
        return t.exit(false)
    }
    // 1. cas锁
    // 2. 关闭 exitChan
    // 3. 阻塞等待messagePump() 结束
    // 4. 如果标记为删除 删除记录并关闭所有客户端连接, 并清空磁盘文件,关闭磁盘队列
    //        -- 否则 关闭关闭所有客户端连接而已, 把剩余的消息写到磁盘
    func (t *Topic) exit(deleted bool) error {
        if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
            return errors.New("exiting")
        }
    
        if deleted {
            t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)
    
            // since we are explicitly deleting a topic (not just at system exit time)
            // de-register this from the lookupd
            t.ctx.nsqd.Notify(t)
        } else {
            t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
        }
    
        close(t.exitChan)
    
        // synchronize the close of messagePump()
        t.waitGroup.Wait()
    
        if deleted {
            t.Lock()
            for _, channel := range t.channelMap {
                delete(t.channelMap, channel.name)
                channel.Delete()
            }
            t.Unlock()
    
            // empty the queue (deletes the backend files, too)
            t.Empty()
            return t.backend.Delete()
        }
    
        // close all the channels
        for _, channel := range t.channelMap {
            err := channel.Close()
            if err != nil {
                // we need to continue regardless of error to close all the channels
                t.ctx.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)
            }
        }
    
        // write anything leftover to disk
        t.flush()
        return t.backend.Close()
    }
    // 磁盘队列的 empty
    func (t *Topic) Empty() error {
        for {
            select {
            case <-t.memoryMsgChan:
            default:
                goto finish
            }
        }
    
    finish:
        return t.backend.Empty()
    }
    
    func (t *Topic) flush() error {
        var msgBuf bytes.Buffer
    
        if len(t.memoryMsgChan) > 0 {
            t.ctx.nsqd.logf(LOG_INFO,
                "TOPIC(%s): flushing %d memory messages to backend",
                t.name, len(t.memoryMsgChan))
        }
    
        for {
            select {
            case msg := <-t.memoryMsgChan: // 写入到磁盘
                err := writeMessageToBackend(&msgBuf, msg, t.backend)
                if err != nil {
                    t.ctx.nsqd.logf(LOG_ERROR,
                        "ERROR: failed to write message to backend - %s", err)
                }
            default:
                goto finish
            }
        }
    
    finish:
        return nil
    }
    

    6. 与Topic关联的Channel的获取,创建,删除

    GetChannel()获取Channel的逻辑是在channelMap里面去查找,如果找不到的话就新建返回, 如果是新建的话, 要去发送通知去更新chans
    同时对外提供了接口进行查询GetExistingChannel(), 删除DeleteExistingChannel()

    
    // GetChannel performs a thread safe operation
    // to return a pointer to a Channel object (potentially new)
    // for the given Topic
    // 获取或者创建topic对应的channel, 他是线程安全,
    // 如果是新建,则发送通知至messagePump进行更新
    func (t *Topic) GetChannel(channelName string) *Channel {
        t.Lock()
        channel, isNew := t.getOrCreateChannel(channelName)
        t.Unlock()
    
        if isNew {
            // update messagePump state
            select {
            case t.channelUpdateChan <- 1:
            case <-t.exitChan:
            }
        }
    
        return channel
    }
    
    // this expects the caller to handle locking
    func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
        channel, ok := t.channelMap[channelName]
        if !ok {
            // 在与topic关联的channel中找不到则新建
            deleteCallback := func(c *Channel) {
                t.DeleteExistingChannel(c.name)
            }
            channel = NewChannel(t.name, channelName, t.ctx, deleteCallback)
            t.channelMap[channelName] = channel
            t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name)
            return channel, true
        }
        return channel, false
    }
    // 对外暴露接口
    func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {
        t.RLock()
        defer t.RUnlock()
        channel, ok := t.channelMap[channelName]
        if !ok {
            return nil, errors.New("channel does not exist")
        }
        return channel, nil
    }
    
    // DeleteExistingChannel removes a channel from the topic only if it exists
    // 对外暴露接口 删除已存在的channel
    // 如果已存在, 从map中删除, 并发送更新通知至channelUpdateChan
    // 如果已经channel已经删除完毕, 并且是临时topic 则删除该topic
    func (t *Topic) DeleteExistingChannel(channelName string) error {
        t.Lock()
        channel, ok := t.channelMap[channelName]
        if !ok {
            t.Unlock()
            return errors.New("channel does not exist")
        }
        delete(t.channelMap, channelName)
        // not defered so that we can continue while the channel async closes
        numChannels := len(t.channelMap)
        t.Unlock()
    
        t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name)
    
        // delete empties the channel before closing
        // (so that we dont leave any messages around)
        channel.Delete()
    
        // update messagePump state
        select {
        case t.channelUpdateChan <- 1:
        case <-t.exitChan:
        }
    
        if numChannels == 0 && t.ephemeral == true {
            // 仅执行一次 删除当前topic
            go t.deleter.Do(func() { t.deleteCallback(t) })
        }
    
        return nil
    }
    

    7. 消息投递的延时统计

    TopicStats结构体定义, 使用了该库
    https://github.com/bmizerany/perks 使用了该库

    // topic统计信息结构体定义
    type TopicStats struct {
        // 主题名称
        TopicName    string         `json:"topic_name"`
        // 该主题下所有channel的统计信息  1:n
        Channels     []ChannelStats `json:"channels"`
        // 所有消息未读量
        Depth        int64          `json:"depth"`
        // 磁盘队列未读消息
        BackendDepth int64          `json:"backend_depth"`
        // 消息总数
        MessageCount uint64         `json:"message_count"`
        // 消息字节
        MessageBytes uint64         `json:"message_bytes"`
        // 是否暂停
        Paused       bool           `json:"paused"`
    
        // 四分位数(Quartile),即统计学中,把所有数值由小到大排列并分成四等份,处于三个分割点位置的得分就是四分位数。
        E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
    }
    // https://github.com/bmizerany/perks 使用了该库 进行channel投递延迟统计
    // 具体我不知道咋算... 
    func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile {
        var latencyStream *quantile.Quantile
        t.RLock()
        realChannels := make([]*Channel, 0, len(t.channelMap))
        for _, c := range t.channelMap {
            realChannels = append(realChannels, c)
        }
        t.RUnlock()
        for _, c := range realChannels {
            if c.e2eProcessingLatencyStream == nil {
                continue
            }
            if latencyStream == nil {
                latencyStream = quantile.New(
                    t.ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,
                    t.ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles)
            }
            latencyStream.Merge(c.e2eProcessingLatencyStream)
        }
        return latencyStream
    }
    

    8. 其他的一些方法

    Depth() , GenerateID(), Exiting()

    // 消息未读数量= 内存未读+磁盘未读
    func (t *Topic) Depth() int64 {
        return int64(len(t.memoryMsgChan)) + t.backend.Depth()
    }
    // 生成message id
    func (t *Topic) GenerateID() MessageID {
    retry:
        id, err := t.idFactory.NewGUID()
        if err != nil {
            time.Sleep(time.Millisecond)
            goto retry
        }
        return id.Hex()
    }
    // 返回是否关闭/删除状态
    func (t *Topic) Exiting() bool {
        return atomic.LoadInt32(&t.exitFlag) == 1
    }
    

    相关文章

      网友评论

        本文标题:Go消息中间件Nsq系列(八)------topic(主题发布)

        本文链接:https://www.haomeiwen.com/subject/mlaeectx.html