美文网首页
Go实现消息队列

Go实现消息队列

作者: 小鱼宠ZZ | 来源:发表于2021-08-24 10:56 被阅读0次

    温馨提示:觉得有帮助的话, 给我点赞哦~

    1. GMQ订阅发布
    package gmq
    
    import (
        "errors"
    )
    
    // Payload 负载
    type Payload struct {
        Topic string
        Value interface{}
    }
    
    // Handler 处理业务
    type Handler func(value interface{})
    
    // GMQ 消息中间件
    type GMQ struct {
        p chan Payload
        q chan bool
        h map[string][]Handler
        r bool
    }
    
    // Publish 发布
    func (g *GMQ) Publish(topic string, data interface{}) error {
        if !g.r {
            return errors.New("GMQ is not running yet")
        }
    
        g.p <- Payload{topic, data}
    
        return nil
    }
    
    // Subscribe 订阅
    func (g *GMQ) Subscribe(topic string, handler Handler) {
        if nil == g.h {
            g.h = make(map[string][]Handler)
        }
    
        if nil == g.h[topic] {
            g.h[topic] = []Handler{handler}
        } else {
            g.h[topic] = append(g.h[topic], handler)
        }
    }
    
    // handle 处理业务
    func (g *GMQ) handle(value interface{}, handlers []Handler) {
        for _, handler := range handlers {
            handler(value)
        }
    }
    
    // Run 运行,需要在单独程运行
    func (g *GMQ) Run() {
        if g.r {
            return
        }
    
        // is running
        g.r = true
    
        for {
            select {
            case v := <-g.p:
                if nil != g.h[v.Topic] {
                    go g.handle(v.Value, g.h[v.Topic])
                }
            case v := <-g.q:
                if v {
                    close(g.p)
                    close(g.q)
                    break
                }
            }
        }
    }
    
    // Close 关闭
    func (g *GMQ) Close()  {
        g.q <- true
    }
    
    // NewGMQ 新建GMQ
    func NewGMQ() *GMQ {
        return &GMQ{
            p: make(chan Payload),
            q: make(chan bool),
            h: make(map[string][]Handler),
        }
    }
    
    

    2. 使用演示

    消息订阅(Sub)

    mq := gmq.NewGMQ()
    
    // 保存到全局
    global.MQ = mq
    
    // 订阅订单主题
    mq.Subscribe("order", func(value interface{}) {
        // todo ... 
            // 断言 value.(OrderInfo)
    })
    
    // 订短信主题
    mq.Subscribe("sms", func(value interface{}) {
        // todo ...
    })
    
    mq.Run()
    

    消息发布(Pub)

    type OrderInfo struct {
        // todo ...
    }
    
    orderInfo := OrderInfo {
    
    }
    
    // 发布订单消息
    global.MQ.Publish("order", orderInfo)
    

    温馨提示:觉得有帮助的话, 给我点赞哦~

    相关文章

      网友评论

          本文标题:Go实现消息队列

          本文链接:https://www.haomeiwen.com/subject/uigkiltx.html