美文网首页
go 编程的架构模式 (microkernel)

go 编程的架构模式 (microkernel)

作者: OOM_Killer | 来源:发表于2019-08-13 23:39 被阅读0次

    前面已经介绍了一下 pipe-filter 架构模式的编程思路(pipe-filter 架构模式),
    第二种就是 microkernel 微内核模式,这种模式其实非常好理解,就是相当于原生nginx添加模块。使得nginx变得强大。

    microkernel

    microkernel

    可以将 微核心架构理解成一个 核心要添加新功能就是加插件。其特点为 易扩展,错误隔离,保持架构的一致性。
    完整的代码 https://github.com/geektime-geekbang/go_learning/tree/master/code/ch41/microkernel

    下面介绍一下microkernel 的结构。
    Agent
    agent 相当于一个注册中心,所有要Agent去做的事情都注册到Agent里面来,注册进Agent的事情叫做 Collector 。每个Collector有一个名字。用map 存储了。

    type Agent struct {
        collectors map[string]Collector  // 注册进 Agent的collector
        evtBuf     chan Event            //  collector 回传给 Agent 的事件
        cancel     context.CancelFunc // 任务取消的方法
        ctx        context.Context    // 任务取消的上下文
        state      int                // Agent 的运行状态
    }
    

    Collector
    Collector 是一个收集器,是需要注册进上面的Agent的。每个Collector需要实现 Init,Start,Stop,Destroy 方法,到时候由Agent统一进行Init,Start等操作,这里在Init中提到了 EventReceiver,所有的Collector在初始化的时候传入一个事件接收源,其实说白了就是agent,下面再具体分析。

    type Collector interface {
        Init(evtReceiver EventReceiver) error    // Collector 将收集到的数据回传到 Agent (任何实现EventReceiver的对象)
        Start(agtCtx context.Context) error      // 启动所有的Collector (参数为agent中的取消上下文)
        Stop() error                              //   停止
        Destroy() error                          //   摧毁
    }
    

    Event
    Agent 实现了 OnEvent 方法,所以Agent 可以作为上面Init 方法的参数,作为事件的接收者。

    type Event struct {
        Source  string   // 事件源
        Content string   // 事件内容
    }
    
    type EventReceiver interface {
        OnEvent(evt Event)  // 实现OnEvent 既可以作为 EventReciver来接收事件 如下面的 Agent
    }
    
    func (agt *Agent) OnEvent(evt Event) {
        agt.evtBuf <- evt  // Agent 可以来接收事件
    }
    

    开始起一个微内核

    整个 微内核的架构就是这样了,刚才提到了,Agent会统一对注册进去的Collector进行初始化(Init),启动(Start),停止(Stop)的操作。那如何注册呢,还需要一个注册的函数。
    注册

    func (agt *Agent) RegisterCollector(name string, collector Collector) error {
        if agt.state != Waiting {
            return WrongStateError
        }
        agt.collectors[name] = collector   // agent map注册
        return collector.Init(agt)  // 注册完立即进行Init 操作。且事件接收者为Agent
    }
    

    启动
    启动时Agent 会遍历所有的 Collector,将所有的Collector 全部拉起来。

    func (agt *Agent) Start() error {
        if agt.state != Waiting {     // 状态不对,直接报错
            return WrongStateError
        }
        agt.state = Running    // 启动了,更改状态
        agt.ctx, agt.cancel = context.WithCancel(context.Background())  // 来一个取消的上下文和取消函数
        go agt.EventProcessGroutine()    // 收集事件 (具体业务了)
        return agt.startCollectors()    //  启动所有的Collector
    }
    
    // 模拟的收集具体的业务事件。这里的事件是由各个 collector 上报的
    func (agt *Agent) EventProcessGroutine() {
        var evtSeg [10]Event
        for {
            for i := 0; i < 10; i++ {
                select {
                case evtSeg[i] = <-agt.evtBuf:   // 将 collector 收集的事件放到 evtBuf 中
                case <-agt.ctx.Done():           // 执行上下文完成,结束 
                    return
                }
            }
            fmt.Println(evtSeg)   // 当 collector 收集的事件满 10 个,打印一次。
        }
    }
    
    // Agent 来拉起所有的 Collectors
    func (agt *Agent) startCollectors() error {
        var err error
        var errs CollectorsError
        var mutex sync.Mutex
        for name, collector := range agt.collectors {
            go func(name string, collector Collector, ctx context.Context) {
                defer func() {
                    mutex.Unlock()
                }()
                err = collector.Start(ctx)
                mutex.Lock()
                if err != nil {
                    errs.CollectorErrors = append(errs.CollectorErrors,
                        errors.New(name+":"+err.Error()))
                }
            }(name, collector, agt.ctx)
        }
            if len(errs.CollectorErrors) == 0 {     // 这里需要判断有没有错误,确定没有错误,返回nil。否则其实返回的也不是nil
            return nil
        }
        return errs
    }
    

    同理 停止(Stop) 和 摧毁 (Destroy)

    来一个模拟的 DemoCollect

    type DemoCollector struct {      // 示例 Collector
        evtReceiver microkernel.EventReceiver   // 事件发给这里
        stopChan    chan struct{}    // 用来停止该Collector
        name        string    // Collector 名字
        content     string    // Collector 的要做的内容(假设,这个根据业务场景,都不一定是string)
    }
    
    func NewCollect(name string, content string) *DemoCollector {   // 创建一个 Collect
        return &DemoCollector{
            stopChan: make(chan struct{}),
            name:     name,
            content:  content,
        }
    }
    
    func (c *DemoCollector) Init(evtReceiver microkernel.EventReceiver) error {   // 初始化一个这个 Collect
        fmt.Println("initialize collector", c.name)
        c.evtReceiver = evtReceiver    // Agent 作为数据的上报源
        return nil
    }
    
    func (c *DemoCollector) Start(agtCtx context.Context) error {   // 拉起一个 Collect
        fmt.Println("start collector", c.name)
    
        for {    // 死循环
            select {
            case <-agtCtx.Done():      // 收到 Done 了
                c.stopChan <- struct{}{}  // 停掉该 Collect (Stop 方法那里会等 stopChan 这个信号)
                break
            default:
                time.Sleep(time.Millisecond * 50)
                c.evtReceiver.OnEvent(microkernel.Event{c.name, c.content}) // 向 Agent 上报事件
            }
        }
    
    }
    
    func (c *DemoCollector) Stop() error {
        fmt.Println("stop collector", c.name)
        select {
        case <-c.stopChan:   // 收到停止信号了,停掉
            return nil  // 一般停完再做点啥,在这做些善后吧
        case <-time.After(time.Second * 1):
            return errors.New("failed to stop for timeout")
        }
    }
    
    func (c *DemoCollector) Destroy() error {
        fmt.Println(c.name, "released resources.")
        return nil
    }
    
    func main() {
        var err error = nil
        agt := microkernel.NewAgent(100)
        c1 := NewCollect("c1", "1a")
        c2 := NewCollect("c2", "2b")
        if err = agt.RegisterCollector("c1", c1);err != nil {
            goto ERR
        }
    
        if err = agt.RegisterCollector("c2", c2);err != nil {
            goto ERR
        }
    
        if err = agt.Start();err != nil {
            goto ERR
        }
    
        time.Sleep(time.Second * 2)
        if err = agt.Stop();err != nil {
            goto ERR
        }
    
        return
    
    ERR:
        fmt.Println("An Error Occur :",err)
    }
    

    注意
    停止的时候是 agt.Stop() 方法里有调用 agt.cancel() , 即发送了 agtCtx.Done()。
    Collector 实现的 Stop 做一些 停止 Collector 时该做的事情

    func (agt *Agent) Stop() error {
        if agt.state != Running {
            return WrongStateError
        }
        agt.state = Waiting
        agt.cancel()
        return agt.stopCollectors()
    }
    

    相关文章

      网友评论

          本文标题:go 编程的架构模式 (microkernel)

          本文链接:https://www.haomeiwen.com/subject/prszdctx.html