美文网首页
10 - 两个常见的架构

10 - 两个常见的架构

作者: 天命_风流 | 来源:发表于2020-07-08 17:39 被阅读0次

    pipe-filter

    image.png
    image.png
    • 下面的代码实现了一个功能:将一段字符串分割、转换、求和,他们分别由三个 filter 实现

    • filter.go

    // Package pipefilter is to define the interfaces and the structures for pipe-filter style implementation
    package pipefilter
    
    // Request is the input of the filter
    type Request interface{}
    
    // Response is the output of the filter
    type Response interface{}
    
    // Filter interface is the definition of the data processing components
    // Pipe-Filter structure
    type Filter interface {
        Process(data Request) (Response, error)
    }
    
    • split_filter.go
    package pipefilter
    
    import (
        "errors"
        "strings"
    )
    
    var SplitFilterWrongFormatError = errors.New("input data should be string")
    
    type SplitFilter struct {
        delimiter string
    }
    
    func NewSplitFilter(delimiter string) *SplitFilter {
        return &SplitFilter{delimiter}
    }
    
    func (sf *SplitFilter) Process(data Request) (Response, error) {
        str, ok := data.(string) //检查数据格式/类型,是否可以处理
        if !ok {
            return nil, SplitFilterWrongFormatError
        }
        parts := strings.Split(str, sf.delimiter)
        return parts, nil
    }
    
    • to_int_filter.go
    package pipefilter
    
    import (
        "errors"
        "strconv"
    )
    
    var ToIntFilterWrongFormatError = errors.New("input data should be []string")
    
    type ToIntFilter struct {
    }
    
    func NewToIntFilter() *ToIntFilter {
        return &ToIntFilter{}
    }
    
    func (tif *ToIntFilter) Process(data Request) (Response, error) {
        parts, ok := data.([]string)
        if !ok {
            return nil, ToIntFilterWrongFormatError
        }
        ret := []int{}
        for _, part := range parts {
            s, err := strconv.Atoi(part)
            if err != nil {
                return nil, err
            }
            ret = append(ret, s)
        }
        return ret, nil
    }
    
    • sum_filter.go
    package pipefilter
    
    import "errors"
    
    var SumFilterWrongFormatError = errors.New("input data should be []int")
    
    type SumFilter struct {
    }
    
    func NewSumFilter() *SumFilter {
        return &SumFilter{}
    }
    
    func (sf *SumFilter) Process(data Request) (Response, error) {
        elems, ok := data.([]int)
        if !ok {
            return nil, SumFilterWrongFormatError
        }
        ret := 0
        for _, elem := range elems {
            ret += elem
        }
        return ret, nil
    }
    
    • straigt_pipeline.go
    package pipefilter
    
    // NewStraightPipeline create a new StraightPipelineWithWallTime
    func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline {
        return &StraightPipeline{
            Name:    name,
            Filters: &filters,
        }
    }
    
    // StraightPipeline is composed of the filters, and the filters are piled as a straigt line.
    type StraightPipeline struct {
        Name    string
        Filters *[]Filter
    }
    
    // Process is to process the coming data by the pipeline
    func (f *StraightPipeline) Process(data Request) (Response, error) {
        var ret interface{}
        var err error
        for _, filter := range *f.Filters {
            ret, err = filter.Process(data)
            if err != nil {
                return ret, err
            }
            data = ret
        }
        return ret, err
    }
    
    • starigt_pipeline_test.go
    package pipefilter
    
    import (
        "reflect"
        "testing"
    )
    
    func TestStraightPipeline(t *testing.T) {
        spliter := NewSplitFilter(",")
        converter := NewToIntFilter()
        sum := NewSumFilter()
        t.Log(reflect.TypeOf(spliter))
        sp := NewStraightPipeline("p1", spliter, converter, sum)
        ret, err := sp.Process("1,2,3")
        if err != nil {
            t.Fatal(err)
        }
        if ret != 6 {
            t.Fatalf("The expected is 6, but the actual is %d", ret)
        }
        t.Log(ret)
    }
    
    • 三个 filter 都实现了 Process 方法,所以他们都是 Filter 接口
    • StraightPipeline 也是一个 Filter,在测试中,它囊括了三个 Filter

    micro-kernel

    image.png
    image.png
    • 下面的代码实现了一个非常简单的微内核架构
    • agaent.go
    package microkernel
    
    import (
        "context"
        "errors"
        "fmt"
        "strings"
        "sync"
    )
    
    const (
        Waiting = iota
        Running
    )
    
    var WrongStateError = errors.New("can not take the operation in the current state")
    
    type CollectorsError struct {
        CollectorErrors []error
    }
    
    func (ce CollectorsError) Error() string {
        var strs []string
        for _, err := range ce.CollectorErrors {
            strs = append(strs, err.Error())
        }
        return strings.Join(strs, ";")
    }
    
    type Event struct {
        Source  string
        Content string
    }
    
    type EventReceiver interface {
        OnEvent(evt Event)
    }
    
    type Collector interface {
        Init(evtReceiver EventReceiver) error  // 初始化使用资源, EventReceiver:将事件回传给一个对象
        Start(agtCtx context.Context) error  // 当 agent 关闭的时候,我们通过 context 将这个 collector 关闭
        Stop() error
        Destory() error  // 释放资源
    }
    
    type Agent struct {
        collectors map[string]Collector  // 一组 collector
        evtBuf     chan Event
        cancel     context.CancelFunc
        ctx        context.Context
        state      int  // 表示现在的状态,启动或关闭
    }
    
    func (agt *Agent) EventProcessGroutine() {  // 这个函数将以协程的方式调用,用于将 evtBuf 中的事件 取出并打印
        var evtSeg [10]Event
        for {
            for i := 0; i < 10; i++ {
                select {
                case evtSeg[i] = <-agt.evtBuf:
                case <-agt.ctx.Done():
                    return
                }
            }
            fmt.Println(evtSeg)
        }
    
    }
    
    func NewAgent(sizeEvtBuf int) *Agent {
        agt := Agent{
            collectors: map[string]Collector{},
            evtBuf:     make(chan Event, sizeEvtBuf),
            state:      Waiting,
        }
    
        return &agt
    }
    
    func (agt *Agent) RegisterCollector(name string, collector Collector) error { // 将 collector 注册到 agent 中
        if agt.state != Waiting {
            return WrongStateError
        }
        agt.collectors[name] = collector
        return collector.Init(agt)
    }
    
    func (agt *Agent) startCollectors() error {  // agent 在 start 的时候将所有的 collector 启动
        var err error
        var errs CollectorsError
        var mutex sync.Mutex
    
        for name, collector := range agt.collectors {
            go func(name string, collector Collector, ctx context.Context) {
                defer func() {
                    mutex.Unlock()
                }()
                err = collector.Start(ctx)
                mutex.Lock()
                if err != nil {
                    errs.CollectorErrors = append(errs.CollectorErrors,
                        errors.New(name+":"+err.Error()))
                }
            }(name, collector, agt.ctx)
        }
        if len(errs.CollectorErrors) == 0 {
            return nil
        }
        return errs
    }
    
    func (agt *Agent) stopCollectors() error {  // 同理,同上
        var err error
        var errs CollectorsError
        for name, collector := range agt.collectors {
            if err = collector.Stop(); err != nil {
                errs.CollectorErrors = append(errs.CollectorErrors,
                    errors.New(name+":"+err.Error()))
            }
        }
        if len(errs.CollectorErrors) == 0 {
            return nil
        }
    
        return errs
    }
    
    func (agt *Agent) destoryCollectors() error {  // 同理,同上
        var err error
        var errs CollectorsError
        for name, collector := range agt.collectors {
            if err = collector.Destory(); err != nil {
                errs.CollectorErrors = append(errs.CollectorErrors,
                    errors.New(name+":"+err.Error()))
            }
        }
        if len(errs.CollectorErrors) == 0 {
            return nil
        }
        return errs
    }
    
    func (agt *Agent) Start() error {  // 启动:创建 channel,创建一个处理事件的协程,启动所有的 collector
        if agt.state != Waiting {
            return WrongStateError
        }
        agt.state = Running
        agt.ctx, agt.cancel = context.WithCancel(context.Background())
        go agt.EventProcessGroutine()
        return agt.startCollectors()
    }
    
    func (agt *Agent) Stop() error {  // 停止:向 channel 发送取消信息,停止所有的 collector
        if agt.state != Running {
            return WrongStateError
        }
        agt.state = Waiting
        agt.cancel()
        return agt.stopCollectors()
    }
    
    func (agt *Agent) Destory() error {  // 释放:释放所有 collector
        if agt.state != Waiting {
            return WrongStateError
        }
        return agt.destoryCollectors()
    }
    
    func (agt *Agent) OnEvent(evt Event) {  // 实现了 OnEvent 方法,所以 Agent 就是一个 EventReceiver 接口
        agt.evtBuf <- evt
    }
    
    • agent_test.go
    package microkernel
    
    import (
        "context"
        "errors"
        "fmt"
        "testing"
        "time"
    )
    
    type DemoCollector struct {
        evtReceiver EventReceiver
        agtCtx      context.Context
        stopChan    chan struct{}
        name        string
        content     string
    }
    
    func NewCollect(name string, content string) *DemoCollector {
        return &DemoCollector{
            stopChan: make(chan struct{}),
            name:     name,
            content:  content,
        }
    }
    
    func (c *DemoCollector) Init(evtReceiver EventReceiver) error {
        fmt.Println("initialize collector", c.name)
        c.evtReceiver = evtReceiver
        return nil
    }
    
    func (c *DemoCollector) Start(agtCtx context.Context) error {
        fmt.Println("start collector", c.name)
        for {
            select {
            case <-agtCtx.Done():
                c.stopChan <- struct{}{}
                break
            default:
                time.Sleep(time.Millisecond * 50)
                c.evtReceiver.OnEvent(Event{c.name, c.content})
            }
        }
    }
    
    func (c *DemoCollector) Stop() error {
        fmt.Println("stop collector", c.name)
        select {
        case <-c.stopChan:
            return nil
        case <-time.After(time.Second * 1):
            return errors.New("failed to stop for timeout")
        }
    }
    
    func (c *DemoCollector) Destory() error {
        fmt.Println(c.name, "released resources.")
        return nil
    }
    
    func TestAgent(t *testing.T) {
        agt := NewAgent(100)
        c1 := NewCollect("c1", "1")
        c2 := NewCollect("c2", "2")
        agt.RegisterCollector("c1", c1)
        agt.RegisterCollector("c2", c2)
        if err := agt.Start(); err != nil {
            fmt.Printf("start error %v\n", err)
        }
        fmt.Println(agt.Start())
        time.Sleep(time.Second * 1)
        agt.Stop()
        agt.Destory()
    }
    
    • 每一个 collector 都要实现四个功能:init、start、stop、destory
    • agent 会通过调用每个 collector 的方法完成工作

    相关文章

      网友评论

          本文标题:10 - 两个常见的架构

          本文链接:https://www.haomeiwen.com/subject/ythccktx.html