美文网首页
golang 基于channel的生产者消费者实现

golang 基于channel的生产者消费者实现

作者: 樹上一隻貓 | 来源:发表于2019-11-30 15:47 被阅读0次

    背景


    公司开发新系统,使用go语言。

    模式为来一个客户给部署一套,用户在万个以下。

    为方便部署,要求单可执行文件。

    分模块之后耦合严重,单独部署消息队列服务没有必要,且会增加部署成本。

    所以自己手写了一个生产者消费者的库,整合到程序中。

    核心代码


    核心结构体的定义
    // 结构定义
    type Broker struct {
        event      chan interface{} // 接收事件的管道
        handlers   []func(interface{})  // 处理事件的方法
        handlersMu sync.RWMutex // 添加方法时的锁
        Name       string // 名称
        wait       *sync.WaitGroup // wait group 用于停止时等待所有事件处理完成
        onceStart  sync.Once // 确保只启动一次
        onceStop   sync.Once // 确保只关闭一次
    }
    
    启动代码 事件处理循环
    func (b *Broker) Start() {
        b.onceStart.Do(func() {
            b.wait.Add(1)
            go func() {
                for {
                    event, ok := <-b.event
                    if ok {
                        // 事件分发
                        b.handlersMu.RLock()
                        for _, v := range b.handlers {
                            v(event) // 有recover
                        }
                        b.handlersMu.RUnlock()
                    } else {
                        // 通道已经关闭
                        b.wait.Done()
                        return
                    }
                }
            }()
        })
    }
    
    创建并启动
    // NewStartedBroker 创建broker,并开始
    func NewStartedBroker(name string, chanBuf int) *Broker {
        b := &Broker{
            event:    make(chan interface{}, chanBuf),
            handlers: make([]func(interface{}), 0),
            Name:     name,
            wait:     &sync.WaitGroup{},
        }
        b.Start()
        return b
    }
    
    注册处理方法
    // Register 注册事件
    func (b *Broker) Register(ctx context.Context, f func(interface{})) (err error) {
        b.handlersMu.Lock()
        defer b.handlersMu.Unlock()
        b.handlers = append(b.handlers, func(o interface{}) {
            defer func() {
                if err := recover(); err != nil {
                    err = fmt.Errorf("panic on broker handler msg name:%v err:%v msg:%v", b.Name, err, o)
                    log.Println(err)
                }
            }()
            f(o)
        })
        return nil
    }
    
    事件发送
    // Send 注册事件
    func (b *Broker) Send(ctx context.Context, o interface{}) (err error) {
        defer func() {
            if errs := recover(); errs != nil {
                err = fmt.Errorf("消息处理异常 name:%v msg:%v err:%v", b.Name, o, errs)
                log.Println(err)
            }
        }()
        b.event <- o
        return
    }
    
    关闭方法
    // Stop 调用stop之前确保写入方都已经退出了,不然要panic
    func (b *Broker) Stop() {
        b.onceStop.Do(func() {
            close(b.event)
            b.wait.Wait()
        })
    }
    
    删除所有处理方法
    func (b *Broker) Clear() {
        b.handlersMu.Lock()
        defer b.handlersMu.Unlock()
        b.handlers = b.handlers[0:0]
    }
    

    链接

    项目代码 https://github.com/krilie/go-smq

    相关文章

      网友评论

          本文标题:golang 基于channel的生产者消费者实现

          本文链接:https://www.haomeiwen.com/subject/dssowctx.html