美文网首页KisFlow-基于Golang的流式计算框架实战@IT·互联网
Golang框架实战-KisFlow流式计算框架(3)-项目构建

Golang框架实战-KisFlow流式计算框架(3)-项目构建

作者: 刘丹冰Aceld | 来源:发表于2024-02-22 18:10 被阅读0次

    连载中...
    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)


    首先我们要先定义KisFlow的核心结构体,KisFlow结构体,通过上述的设计理念,我们得知,KisFlow表示整个一条数据计算流的结构。其中每次数据在一个Flow上,依次执行挂载在当前Flow的Function。

    2.3.1 KisFunction家族

    KisFunction应该是一个链式调用,所以结构体的基本形态应该是一个链表,通过一次Function的调用结束后,默认可以调度到下一个Function的节点上。 在KisFlow中,一共有 saveload, calculate, extend, varify等多种行为的Funciton,所以这里我们采用上述五种function的模板类,方便今后在不同针对不同特征的function做更加灵活和功能隔离的拓展和改造。

    整体的KisFunction的类图设计如下:

    2.2.2 抽象层KisFunction定义

    kis-flow中创建一个新的目录function用来存放function的代码。
    首先抽象接口编写在kis/目录下。

    kis-flow/kis/function.go

    package kis
    
    import (
        "context"
        "kis-flow/config"
    )
    
    // Function 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元,
    //             任意个KisFunction可以组合成一个KisFlow
    type Function interface {
        // Call 执行流式计算逻辑
        Call(ctx context.Context, flow Flow) error
    
        // SetConfig 给当前Function实例配置策略
        SetConfig(s *config.KisFuncConfig) error
        // GetConfig 获取当前Function实例配置策略
        GetConfig() *config.KisFuncConfig
    
        // SetFlow 给当前Function实例设置所依赖的Flow实例
        SetFlow(f Flow) error
        // GetFlow 获取当前Functioin实力所依赖的Flow
        GetFlow() Flow
    
        // CreateId 给当前Funciton实力生成一个随机的实例KisID
        CreateId()
        // GetId 获取当前Function的FID
        GetId() string
        // GetPrevId 获取当前Function上一个Function节点FID
        GetPrevId() string
        // GetNextId 获取当前Function下一个Function节点FID
        GetNextId() string
    
        // Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
        Next() Function
        // Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
        Prev() Function
        // SetN 设置下一层Function实例
        SetN(f Function)
        // SetP 设置上一层Function实例
        SetP(f Function)
    }
    

    2.2.3 KisId随机唯一实例ID

    上述提出了一个新的概念KisId。 KisID为Function的实例ID,用于KisFlow内部区分不同的实例对象。KisId 和 Function Config中的 Fid的区别在于,Fid用来形容一类Funcion策略的ID,而KisId则为在KisFlow中KisFunction已经实例化的 实例对象ID 这个ID是随机生成且唯一。

    创建kis-flow/id/目录,且创建kis_id.go 文件,实现有关kis_id生成的算法。

    kis-flow/id/kis_id.go

    package id
    
    import (
        "github.com/google/uuid"
        "kis-flow/common"
        "strings"
    )
    
    // KisID 获取一个中随机实例ID
    // 格式为  "prefix1-[prefix2-][prefix3-]ID"
    // 如:flow-1234567890
    // 如:func-1234567890
    // 如: conn-1234567890
    // 如: func-1-1234567890
    func KisID(prefix ...string) (kisId string) {
    
        idStr := strings.Replace(uuid.New().String(), "-", "", -1)
        kisId = formatKisID(idStr, prefix...)
    
        return
    }
    
    func formatKisID(idStr string, prefix ...string) string {
        var kisId string
    
        for _, fix := range prefix {
            kisId += fix
            kisId += common.KisIdJoinChar
        }
    
        kisId += idStr
    
        return kisId
    }
    

    kisId模块提供KisID()方法,这里面依赖了第三方分布式ID生成库github.com/google/uuid,生成的随机ID为一个字符串,且调用者可以提供多个前缀,通过-符号进行拼接,得到的随机字符串ID,如:func-1234567890

    针对KisId的前缀,提供了一些字符串的枚举,如下:

    kis-flow/common/const.go

    // KisIdType 用户生成KisId的字符串前缀
    const (
        KisIdTypeFlow       = "flow"
        KisIdTypeConnnector = "conn"
        KisIdTypeFunction   = "func"
        KisIdTypeGlobal     = "global"
        KisIdJoinChar       = "-"
    )
    

    2.2.4 BaseFunction基础父类

    按照设计,我们需要提供一个BaseFunction作为Function的一个子类,实现一些基础的功能接口。留下Call()让具体模式的KisFunctionX来重写实现,下面来进行对BaseFunction结构的定义。
    kis-flow/function/创建kis_base_funciton.go 文件。

    A. 结构定义

    kis-flow/function/kis_base_function.go

    package function
    
    import (
        "context"
        "errors"
        "kis-flow/common"
        "kis-flow/config"
        "kis-flow/id"
        "kis-flow/kis"
    )
    
    type BaseFunction struct {
        // Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
        Id     string
        Config *config.KisFuncConfig
    
        // flow
        Flow kis.Flow //上下文环境KisFlow
    
        // link
        N kis.Function //下一个流计算Function
        P kis.Function //上一个流计算Function
    }
    

    B. 方法实现

    kis-flow/function/kis_base_function.go

    // Call
    // BaseFunction 为空实现,目的为了让其他具体类型的KisFunction,如KisFunction_V 来继承BaseFuncion来重写此方法
    func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil }
    
    func (base *BaseFunction) Next() kis.Function {
        return base.N
    }
    
    func (base *BaseFunction) Prev() kis.Function {
        return base.P
    }
    
    func (base *BaseFunction) SetN(f kis.Function) {
        base.N = f
    }
    
    func (base *BaseFunction) SetP(f kis.Function) {
        base.P = f
    }
    
    func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
        if s == nil {
            return errors.New("KisFuncConfig is nil")
        }
    
        base.Config = s
    
        return nil
    }
    
    func (base *BaseFunction) GetId() string {
        return base.Id
    }
    
    func (base *BaseFunction) GetPrevId() string {
        if base.P == nil {
            //Function为首结点
            return common.FunctionIdFirstVirtual
        }
        return base.P.GetId()
    }
    
    func (base *BaseFunction) GetNextId() string {
        if base.N == nil {
            //Function为尾结点
            return common.FunctionIdLastVirtual
        }
        return base.N.GetId()
    }
    
    func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
        return base.Config
    }
    
    func (base *BaseFunction) SetFlow(f kis.Flow) error {
        if f == nil {
            return errors.New("KisFlow is nil")
        }
        base.Flow = f
        return nil
    }
    
    func (base *BaseFunction) GetFlow() kis.Flow {
        return base.Flow
    }
    
    func (base *BaseFunction) CreateId() {
        base.Id = id.KisID(common.KisIdTypeFunction)
    }
    
    

    这里注意 GetPrevId()GetNextId()两个方法实现,因为如果当前Functioin为双向链表中的第一个节点或者最后一个节点,那么他们的上一个或者下一个是没有节点的,那么ID也就不存在,为了在使用中不出现得不到ID的情况,我们提供了两个虚拟FID,做特殊情况的边界处理,定义在const.go中。

    kis-flow/common/const.go

    const (
        // FunctionIdFirstVirtual 为首结点Function上一层虚拟的Function ID
        FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
        // FunctionIdLastVirtual 为尾结点Function下一层虚拟的Function ID
        FunctionIdLastVirtual = "FunctionIdLastVirtual"
    )
    

    2.2.5 KisFunction的V/S/L/C/E等模式类定义

    下面分别实现V/S/L/C/E 五种不同模式的KisFunction子类。这里分别用创建文件来实现。

    A. KisFunctionV

    kis-flow/function/kis_function_v.go

    package function
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    type KisFunctionV struct {
        BaseFunction
    }
    
    func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
        fmt.Printf("KisFunctionV, flow = %+v\n", flow)
    
        // TODO 调用具体的Function执行方法
    
        return nil
    }
    
    
    

    B. KisFunctionS

    kis-flow/function/kis_function_s.go

    package function
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    type KisFunctionS struct {
        BaseFunction
    }
    
    func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
        fmt.Printf("KisFunctionS, flow = %+v\n", flow)
    
        // TODO 调用具体的Function执行方法
    
        return nil
    }
    
    

    C. KisFunctionL

    kis-flow/function/kis_function_l.go

    package function
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    type KisFunctionL struct {
        BaseFunction
    }
    
    func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
        fmt.Printf("KisFunctionL, flow = %+v\n", flow)
    
        // TODO 调用具体的Function执行方法
    
        return nil
    }
    
    

    D. KisFunctionC

    kis-flow/function/kis_function_c.go

    package function
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    type KisFunctionC struct {
        BaseFunction
    }
    
    func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
        fmt.Printf("KisFunction_C, flow = %+v\n", flow)
    
        // TODO 调用具体的Function执行方法
    
        return nil
    }
    

    E. KisFunctionE

    kis-flow/function/kis_function_e.go

    package function
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    type KisFunctionE struct {
        BaseFunction
    }
    
    func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
        fmt.Printf("KisFunctionE, flow = %+v\n", flow)
    
        // TODO 调用具体的Function执行方法
    
        return nil
    }
    
    

    2.2.6 创建KisFunction实例

    下面提供一个创建具体模式Function的方法,这里采用简单工厂方法模式来实现创建对象。

    kis-flow/function/kis_base_function.go

    func (base *BaseFunction) CreateId() {
        base.Id = id.KisID(common.KisIdTypeFunction)
    }
    
    // NewKisFunction 创建一个NsFunction
    // flow: 当前所属的flow实例
    // s : 当前function的配置策略
    func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
        var f kis.Function
    
        //工厂生产泛化对象
        switch common.KisMode(config.FMode) {
        case common.V:
            f = new(KisFunctionV)
            break
        case common.S:
            f = new(KisFunctionS)
        case common.L:
            f = new(KisFunctionL)
        case common.C:
            f = new(KisFunctionC)
        case common.E:
            f = new(KisFunctionE)
        default:
            //LOG ERROR
            return nil
        }
    
        // 生成随机实例唯一ID
        f.CreateId()
    
        //设置基础信息属性
        if err := f.SetConfig(config); err != nil {
            panic(err)
        }
    
        if err := f.SetFlow(flow); err != nil {
            panic(err)
        }
    
        return f
    }
    
    
    

    注意 NewKisFunction()方法返回的是一个抽象的interface Function

    还要注意,目前到这里没有实现Flow对象,但是KisFunciton的创建需要依赖传递一个Flow对象,我们现在可以暂时简单创建一个Flow对象的构造方法,之后在实现Flow章节再完善这部分的代码。
    kis-filw/kis/中创建flow.go文件。

    kis-flow/kis/flow.go

    package kis
    
    import (
        "context"
        "kis-flow/config"
    )
    
    type Flow interface {
       // TODO
    }
    
    

    kis-flow/flow/下创建kis_flow.go文件,实现如下:

    kis-flow/flow/kis_flow.go

    package flow
    
    import "kis-flow/config"
    
    // KisFlow 用于贯穿整条流式计算的上下文环境
    type KisFlow struct {
        Id   string
        Name string
        // TODO
    }
    
    // TODO for test
    // NewKisFlow 创建一个KisFlow.
    func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
        flow := new(KisFlow)
    
        // 基础信息
        flow.Id = id.KisID(common.KisIdTypeFlow)
        flow.Name = conf.FlowName
    
        return flow
    }
    

    2.2.7 单元测试KisFunction创建实例

    现在来对上述的KisFunction实例的创建做一个简单的单元测试,在kis-flow/test/创建kis_function_test.go文件。

    kis-flow/test/kis_function_test.go

    package test
    
    import (
        "context"
        "kis-flow/common"
        "kis-flow/config"
        "kis-flow/flow"
        "kis-flow/function"
        "testing"
    )
    
    func TestNewKisFunction(t *testing.T) {
        ctx := context.Background()
    
        // 1. 创建一个KisFunction配置实例
        source := config.KisSource{
            Name: "公众号抖音商城户订单数据",
            Must: []string{"order_id", "user_id"},
        }
    
        myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)
        if myFuncConfig1 == nil {
            panic("myFuncConfig1 is nil")
        }
    
        // 2. 创建一个 KisFlow 配置实例
        myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
    
        // 3. 创建一个KisFlow对象
        flow1 := flow.NewKisFlow(myFlowConfig1)
    
        // 4. 创建一个KisFunction对象
        func1 := function.NewKisFunction(flow1, myFuncConfig1)
    
        if err := func1.Call(ctx, flow1); err != nil {
            t.Errorf("func1.Call() error = %v", err)
        }
    }
    
    

    流程很简单,分为四个小步骤:

    1. 创建一个KisFunction配置实例
    2. 创建一个 KisFlow 配置实例
    3. 创建一个KisFlow对象
    4. 创建一个KisFunction对象

    cd到kis-flow/test/目录下执行:

    go test -test.v -test.paniconexit0 -test.run TestNewKisFunction
    

    结果如下:

    === RUN   TestNewKisFunction
    KisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
    
    --- PASS: TestNewKisFunction (0.00s)
    PASS
    ok      kis-flow/test   1.005s
    
    

    我们已经调用到了具体的KisFunciton_C实例的Call()方法。

    2.5 【V0.1】 源代码

    https://github.com/aceld/kis-flow/releases/tag/v0.1


    作者:刘丹冰Aceld github: https://github.com/aceld
    KisFlow开源项目地址:https://github.com/aceld/kis-flow


    连载中...
    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

    相关文章

      网友评论

        本文标题:Golang框架实战-KisFlow流式计算框架(3)-项目构建

        本文链接:https://www.haomeiwen.com/subject/fdlgadtx.html