温馨提示:觉得有帮助的话, 给我点赞哦~
1. GMQ订阅发布
package gmq
import (
"errors"
)
// Payload 负载
type Payload struct {
Topic string
Value interface{}
}
// Handler 处理业务
type Handler func(value interface{})
// GMQ 消息中间件
type GMQ struct {
p chan Payload
q chan bool
h map[string][]Handler
r bool
}
// Publish 发布
func (g *GMQ) Publish(topic string, data interface{}) error {
if !g.r {
return errors.New("GMQ is not running yet")
}
g.p <- Payload{topic, data}
return nil
}
// Subscribe 订阅
func (g *GMQ) Subscribe(topic string, handler Handler) {
if nil == g.h {
g.h = make(map[string][]Handler)
}
if nil == g.h[topic] {
g.h[topic] = []Handler{handler}
} else {
g.h[topic] = append(g.h[topic], handler)
}
}
// handle 处理业务
func (g *GMQ) handle(value interface{}, handlers []Handler) {
for _, handler := range handlers {
handler(value)
}
}
// Run 运行,需要在单独程运行
func (g *GMQ) Run() {
if g.r {
return
}
// is running
g.r = true
for {
select {
case v := <-g.p:
if nil != g.h[v.Topic] {
go g.handle(v.Value, g.h[v.Topic])
}
case v := <-g.q:
if v {
close(g.p)
close(g.q)
break
}
}
}
}
// Close 关闭
func (g *GMQ) Close() {
g.q <- true
}
// NewGMQ 新建GMQ
func NewGMQ() *GMQ {
return &GMQ{
p: make(chan Payload),
q: make(chan bool),
h: make(map[string][]Handler),
}
}
2. 使用演示
消息订阅(Sub)
mq := gmq.NewGMQ()
// 保存到全局
global.MQ = mq
// 订阅订单主题
mq.Subscribe("order", func(value interface{}) {
// todo ...
// 断言 value.(OrderInfo)
})
// 订短信主题
mq.Subscribe("sms", func(value interface{}) {
// todo ...
})
mq.Run()
消息发布(Pub)
type OrderInfo struct {
// todo ...
}
orderInfo := OrderInfo {
}
// 发布订单消息
global.MQ.Publish("order", orderInfo)
温馨提示:觉得有帮助的话, 给我点赞哦~
网友评论