pipe-filter
image.pngimage.png
-
下面的代码实现了一个功能:将一段字符串分割、转换、求和,他们分别由三个 filter 实现
-
filter.go
// Package pipefilter is to define the interfaces and the structures for pipe-filter style implementation
package pipefilter
// Request is the input of the filter
type Request interface{}
// Response is the output of the filter
type Response interface{}
// Filter interface is the definition of the data processing components
// Pipe-Filter structure
type Filter interface {
Process(data Request) (Response, error)
}
- split_filter.go
package pipefilter
import (
"errors"
"strings"
)
var SplitFilterWrongFormatError = errors.New("input data should be string")
type SplitFilter struct {
delimiter string
}
func NewSplitFilter(delimiter string) *SplitFilter {
return &SplitFilter{delimiter}
}
func (sf *SplitFilter) Process(data Request) (Response, error) {
str, ok := data.(string) //检查数据格式/类型,是否可以处理
if !ok {
return nil, SplitFilterWrongFormatError
}
parts := strings.Split(str, sf.delimiter)
return parts, nil
}
- to_int_filter.go
package pipefilter
import (
"errors"
"strconv"
)
var ToIntFilterWrongFormatError = errors.New("input data should be []string")
type ToIntFilter struct {
}
func NewToIntFilter() *ToIntFilter {
return &ToIntFilter{}
}
func (tif *ToIntFilter) Process(data Request) (Response, error) {
parts, ok := data.([]string)
if !ok {
return nil, ToIntFilterWrongFormatError
}
ret := []int{}
for _, part := range parts {
s, err := strconv.Atoi(part)
if err != nil {
return nil, err
}
ret = append(ret, s)
}
return ret, nil
}
- sum_filter.go
package pipefilter
import "errors"
var SumFilterWrongFormatError = errors.New("input data should be []int")
type SumFilter struct {
}
func NewSumFilter() *SumFilter {
return &SumFilter{}
}
func (sf *SumFilter) Process(data Request) (Response, error) {
elems, ok := data.([]int)
if !ok {
return nil, SumFilterWrongFormatError
}
ret := 0
for _, elem := range elems {
ret += elem
}
return ret, nil
}
- straigt_pipeline.go
package pipefilter
// NewStraightPipeline create a new StraightPipelineWithWallTime
func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline {
return &StraightPipeline{
Name: name,
Filters: &filters,
}
}
// StraightPipeline is composed of the filters, and the filters are piled as a straigt line.
type StraightPipeline struct {
Name string
Filters *[]Filter
}
// Process is to process the coming data by the pipeline
func (f *StraightPipeline) Process(data Request) (Response, error) {
var ret interface{}
var err error
for _, filter := range *f.Filters {
ret, err = filter.Process(data)
if err != nil {
return ret, err
}
data = ret
}
return ret, err
}
- starigt_pipeline_test.go
package pipefilter
import (
"reflect"
"testing"
)
func TestStraightPipeline(t *testing.T) {
spliter := NewSplitFilter(",")
converter := NewToIntFilter()
sum := NewSumFilter()
t.Log(reflect.TypeOf(spliter))
sp := NewStraightPipeline("p1", spliter, converter, sum)
ret, err := sp.Process("1,2,3")
if err != nil {
t.Fatal(err)
}
if ret != 6 {
t.Fatalf("The expected is 6, but the actual is %d", ret)
}
t.Log(ret)
}
- 三个 filter 都实现了 Process 方法,所以他们都是 Filter 接口
- StraightPipeline 也是一个 Filter,在测试中,它囊括了三个 Filter
micro-kernel
image.pngimage.png
- 下面的代码实现了一个非常简单的微内核架构
- agaent.go
package microkernel
import (
"context"
"errors"
"fmt"
"strings"
"sync"
)
const (
Waiting = iota
Running
)
var WrongStateError = errors.New("can not take the operation in the current state")
type CollectorsError struct {
CollectorErrors []error
}
func (ce CollectorsError) Error() string {
var strs []string
for _, err := range ce.CollectorErrors {
strs = append(strs, err.Error())
}
return strings.Join(strs, ";")
}
type Event struct {
Source string
Content string
}
type EventReceiver interface {
OnEvent(evt Event)
}
type Collector interface {
Init(evtReceiver EventReceiver) error // 初始化使用资源, EventReceiver:将事件回传给一个对象
Start(agtCtx context.Context) error // 当 agent 关闭的时候,我们通过 context 将这个 collector 关闭
Stop() error
Destory() error // 释放资源
}
type Agent struct {
collectors map[string]Collector // 一组 collector
evtBuf chan Event
cancel context.CancelFunc
ctx context.Context
state int // 表示现在的状态,启动或关闭
}
func (agt *Agent) EventProcessGroutine() { // 这个函数将以协程的方式调用,用于将 evtBuf 中的事件 取出并打印
var evtSeg [10]Event
for {
for i := 0; i < 10; i++ {
select {
case evtSeg[i] = <-agt.evtBuf:
case <-agt.ctx.Done():
return
}
}
fmt.Println(evtSeg)
}
}
func NewAgent(sizeEvtBuf int) *Agent {
agt := Agent{
collectors: map[string]Collector{},
evtBuf: make(chan Event, sizeEvtBuf),
state: Waiting,
}
return &agt
}
func (agt *Agent) RegisterCollector(name string, collector Collector) error { // 将 collector 注册到 agent 中
if agt.state != Waiting {
return WrongStateError
}
agt.collectors[name] = collector
return collector.Init(agt)
}
func (agt *Agent) startCollectors() error { // agent 在 start 的时候将所有的 collector 启动
var err error
var errs CollectorsError
var mutex sync.Mutex
for name, collector := range agt.collectors {
go func(name string, collector Collector, ctx context.Context) {
defer func() {
mutex.Unlock()
}()
err = collector.Start(ctx)
mutex.Lock()
if err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}(name, collector, agt.ctx)
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) stopCollectors() error { // 同理,同上
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Stop(); err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) destoryCollectors() error { // 同理,同上
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Destory(); err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) Start() error { // 启动:创建 channel,创建一个处理事件的协程,启动所有的 collector
if agt.state != Waiting {
return WrongStateError
}
agt.state = Running
agt.ctx, agt.cancel = context.WithCancel(context.Background())
go agt.EventProcessGroutine()
return agt.startCollectors()
}
func (agt *Agent) Stop() error { // 停止:向 channel 发送取消信息,停止所有的 collector
if agt.state != Running {
return WrongStateError
}
agt.state = Waiting
agt.cancel()
return agt.stopCollectors()
}
func (agt *Agent) Destory() error { // 释放:释放所有 collector
if agt.state != Waiting {
return WrongStateError
}
return agt.destoryCollectors()
}
func (agt *Agent) OnEvent(evt Event) { // 实现了 OnEvent 方法,所以 Agent 就是一个 EventReceiver 接口
agt.evtBuf <- evt
}
- agent_test.go
package microkernel
import (
"context"
"errors"
"fmt"
"testing"
"time"
)
type DemoCollector struct {
evtReceiver EventReceiver
agtCtx context.Context
stopChan chan struct{}
name string
content string
}
func NewCollect(name string, content string) *DemoCollector {
return &DemoCollector{
stopChan: make(chan struct{}),
name: name,
content: content,
}
}
func (c *DemoCollector) Init(evtReceiver EventReceiver) error {
fmt.Println("initialize collector", c.name)
c.evtReceiver = evtReceiver
return nil
}
func (c *DemoCollector) Start(agtCtx context.Context) error {
fmt.Println("start collector", c.name)
for {
select {
case <-agtCtx.Done():
c.stopChan <- struct{}{}
break
default:
time.Sleep(time.Millisecond * 50)
c.evtReceiver.OnEvent(Event{c.name, c.content})
}
}
}
func (c *DemoCollector) Stop() error {
fmt.Println("stop collector", c.name)
select {
case <-c.stopChan:
return nil
case <-time.After(time.Second * 1):
return errors.New("failed to stop for timeout")
}
}
func (c *DemoCollector) Destory() error {
fmt.Println(c.name, "released resources.")
return nil
}
func TestAgent(t *testing.T) {
agt := NewAgent(100)
c1 := NewCollect("c1", "1")
c2 := NewCollect("c2", "2")
agt.RegisterCollector("c1", c1)
agt.RegisterCollector("c2", c2)
if err := agt.Start(); err != nil {
fmt.Printf("start error %v\n", err)
}
fmt.Println(agt.Start())
time.Sleep(time.Second * 1)
agt.Stop()
agt.Destory()
}
- 每一个 collector 都要实现四个功能:init、start、stop、destory
- agent 会通过调用每个 collector 的方法完成工作
网友评论