背景
公司开发新系统,使用go语言。
模式为来一个客户给部署一套,用户在万个以下。
为方便部署,要求单可执行文件。
分模块之后耦合严重,单独部署消息队列服务没有必要,且会增加部署成本。
所以自己手写了一个生产者消费者的库,整合到程序中。
核心代码
核心结构体的定义
// 结构定义
type Broker struct {
event chan interface{} // 接收事件的管道
handlers []func(interface{}) // 处理事件的方法
handlersMu sync.RWMutex // 添加方法时的锁
Name string // 名称
wait *sync.WaitGroup // wait group 用于停止时等待所有事件处理完成
onceStart sync.Once // 确保只启动一次
onceStop sync.Once // 确保只关闭一次
}
启动代码 事件处理循环
func (b *Broker) Start() {
b.onceStart.Do(func() {
b.wait.Add(1)
go func() {
for {
event, ok := <-b.event
if ok {
// 事件分发
b.handlersMu.RLock()
for _, v := range b.handlers {
v(event) // 有recover
}
b.handlersMu.RUnlock()
} else {
// 通道已经关闭
b.wait.Done()
return
}
}
}()
})
}
创建并启动
// NewStartedBroker 创建broker,并开始
func NewStartedBroker(name string, chanBuf int) *Broker {
b := &Broker{
event: make(chan interface{}, chanBuf),
handlers: make([]func(interface{}), 0),
Name: name,
wait: &sync.WaitGroup{},
}
b.Start()
return b
}
注册处理方法
// Register 注册事件
func (b *Broker) Register(ctx context.Context, f func(interface{})) (err error) {
b.handlersMu.Lock()
defer b.handlersMu.Unlock()
b.handlers = append(b.handlers, func(o interface{}) {
defer func() {
if err := recover(); err != nil {
err = fmt.Errorf("panic on broker handler msg name:%v err:%v msg:%v", b.Name, err, o)
log.Println(err)
}
}()
f(o)
})
return nil
}
事件发送
// Send 注册事件
func (b *Broker) Send(ctx context.Context, o interface{}) (err error) {
defer func() {
if errs := recover(); errs != nil {
err = fmt.Errorf("消息处理异常 name:%v msg:%v err:%v", b.Name, o, errs)
log.Println(err)
}
}()
b.event <- o
return
}
关闭方法
// Stop 调用stop之前确保写入方都已经退出了,不然要panic
func (b *Broker) Stop() {
b.onceStop.Do(func() {
close(b.event)
b.wait.Wait()
})
}
删除所有处理方法
func (b *Broker) Clear() {
b.handlersMu.Lock()
defer b.handlersMu.Unlock()
b.handlers = b.handlers[0:0]
}
网友评论