美文网首页@IT·互联网KisFlow-基于Golang的流式计算框架实战
Golang框架实战-KisFlow流式计算框架(5)-Func

Golang框架实战-KisFlow流式计算框架(5)-Func

作者: 刘丹冰Aceld | 来源:发表于2024-02-27 20:20 被阅读0次

    连载中...
    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度

    4.1 Router

    现在,将KisFlow提供对外Function开放注册能力,首先我们要定义一些注册函数原型,和管理这些Function的Router映射关系类型。

    创建kis-flow/kis/router.go,定义原型如下:

    kis-flow/kis/router.go

    package kis
    
    import "context"
    
    // FaaS Function as a Service
    type FaaS func(context.Context, Flow) error
    
    // funcRouter
    // key: Function Name
    // value: Function 回调自定义业务
    type funcRouter map[string]FaaS
    
    // flowRouter
    // key: Flow Name
    // value: Flow
    type flowRouter map[string]Flow
    
    

    FaaS:是开发者给KisFlow注册的Function回调业务函数原型,需要传递两个参数,Context和Flow,Context主要承载业务的上线文环境,Flow主要承载KisFlow的上下文环境,我们可以通过Flow获取当前Function的配置信息,当前Function的数据信息,已经Flow上其他节点的Function相关信息等。

    funcRouter: 管理FunctionName和FaaS业务回调的映射关系Map,是一个私有类型,不对外提供引用。需要注意的是,funcRouter的key是FunctionName,因为FunctionId是生成的随机Id,开发者在注册路由的时候,并无法预判和可读,所以关联的业务回调是与FunctionName做的映射关系。

    flowRouter:管理FlowName和Flow实例的映射关系Map,是一个私有类型,不对外提供引用。flowRouter依然是FlowName的映射关系。

    4.2 KisPool

    KisFlow提供一个用来管理全部全局映射关系的类KisPool,KisPool包含Router,且提供对Router的管理能力。

    4.2.1 KisPool的定义

    创建 kis-flow/kis/pool.go 文件,来创建kis_pool模块。

    kis-flow/kis/pool.go

    package kis
    
    import (
        "context"
        "errors"
        "fmt"
        "kis-flow/log"
        "sync"
    )
    
    var _poolOnce sync.Once
    
    //  kisPool 用于管理全部的Function和Flow配置的池子
    type kisPool struct {
        fnRouter funcRouter   // 全部的Function管理路由
        fnLock   sync.RWMutex // fnRouter 锁
    
        flowRouter flowRouter   // 全部的flow对象
        flowLock   sync.RWMutex // flowRouter 锁
    }
    
    // 单例
    var _pool *kisPool
    
    // Pool 单例构造
    func Pool() *kisPool {
        _poolOnce.Do(func() {
            //创建kisPool对象
            _pool = new(kisPool)
    
            // fnRouter初始化
            _pool.fnRouter = make(funcRouter)
    
            // flowRouter初始化
            _pool.flowRouter = make(flowRouter)
        })
    
        return _pool
    }
    

    kis_pool采用单例模式,Pool()方法为获取当前的单例,有关fnRouterflowRouter在生命周期只会初始化一次,通过sync.Once来控制。

    4.2.2 注册及获取Flow

    KisPool可以提供添加和获取Flow信息的接口,如下:

    kis-flow/kis/pool.go

    func (pool *kisPool) AddFlow(name string, flow Flow) {
        pool.flowLock.Lock()
        defer pool.flowLock.Unlock()
    
        if _, ok := pool.flowRouter[name]; !ok {
            pool.flowRouter[name] = flow
        } else {
            errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name)
            panic(errString)
        }
    
        log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name)
    }
    
    func (pool *kisPool) GetFlow(name string) Flow {
        pool.flowLock.RLock()
        defer pool.flowLock.RUnlock()
    
        if flow, ok := pool.flowRouter[name]; ok {
            return flow
        } else {
            return nil
        }
    }
    

    AddFlow会根据相同的FlowName进行做重复校验,相同的Flow无法注册多次。

    4.2.3 注册及调度Function

    KisPool提供注册Funciton回调和调度Funciton方法, 如下。

    kis-flow/kis/pool.go

    // FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
    func (pool *kisPool) FaaS(fnName string, f FaaS) {
        pool.fnLock.Lock()
        defer pool.fnLock.Unlock()
    
        if _, ok := pool.fnRouter[fnName]; !ok {
            pool.fnRouter[fnName] = f
        } else {
            errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
            panic(errString)
        }
    
        log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
    }
    
    // CallFunction 调度 Function
    func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
    
        if f, ok := pool.fnRouter[fnName]; ok {
            return f(ctx, flow)
        }
    
        log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
    
        return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
    }
    

    CallFunction()中,需要传递参数Flow,作为数据流调度的上下文环境。 开发者在自定义FaaS中可以通过Flow来获取一些Funciton信息,所以这里需要给Flow补充几个获取配置信息的接口,之后如果再需要,再继续补充,具体如下:

    kis-flow/kis/flow.go

    type Flow interface {
        // Run 调度Flow,依次调度Flow中的Function并且执行
        Run(ctx context.Context) error
        // Link 将Flow中的Function按照配置文件中的配置进行连接
        Link(fConf *config.KisFuncConfig, fParams config.FParam) error
        // CommitRow 提交Flow数据到即将执行的Function层
        CommitRow(row interface{}) error
        // Input 得到flow当前执行Function的输入源数据
        Input() common.KisRowArr
    
        // ++++++++++++++++++++++++++++++++++
        // GetName 得到Flow的名称
        GetName() string
        // GetThisFunction 得到当前正在执行的Function
        GetThisFunction() Function
        // GetThisFuncConf 得到当前正在执行的Function的配置
        GetThisFuncConf() *config.KisFuncConfig
    }
    

    kis-flow/flow/kis_flow.go

    func (flow *KisFlow) GetName() string {
        return flow.Name
    }
    
    func (flow *KisFlow) GetThisFunction() kis.Function {
        return flow.ThisFunction
    }
    
    func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
        return flow.ThisFunction.GetConfig()
    }
    

    4.3 KisFunction引用KisPool调度

    现在,我们就可以在KisFunctionX中的Call()来通过Pool进行调度了,依次修改每个Function的Call()方法。

    kis-flow/function/kis_function_c.go

    package function
    
    import (
        "context"
        "kis-flow/kis"
        "kis-flow/log"
    )
    
    type KisFunctionC struct {
        BaseFunction
    }
    
    func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
        log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)
    
        // 通过KisPool 路由到具体的执行计算Function中
        if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
            log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
            return err
        }
    
        return nil
    }
    

    kis-flow/function/kis_function_e.go

    package function
    
    import (
        "context"
        "kis-flow/kis"
        "kis-flow/log"
    )
    
    type KisFunctionE struct {
        BaseFunction
    }
    
    func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
        log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)
    
        // 通过KisPool 路由到具体的执行计算Function中
        if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
            log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
            return err
        }
    
        return nil
    }
    
    

    kis-flow/function/kis_function_l.go

    package function
    
    import (
        "context"
        "kis-flow/kis"
        "kis-flow/log"
    )
    
    type KisFunctionL struct {
        BaseFunction
    }
    
    func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
        log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow)
    
        // 通过KisPool 路由到具体的执行计算Function中
        if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
            log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
            return err
        }
    
        return nil
    }
    

    kis-flow/function/kis_function_s.go

    package function
    
    import (
        "context"
        "kis-flow/kis"
        "kis-flow/log"
    )
    
    type KisFunctionS struct {
        BaseFunction
    }
    
    func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
        log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow)
    
        // 通过KisPool 路由到具体的执行计算Function中
        if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
            log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
            return err
        }
    
        return nil
    }
    
    

    kis-flow/function/kis_function_v.go

    package function
    
    import (
        "context"
        "kis-flow/kis"
        "kis-flow/log"
    )
    
    type KisFunctionV struct {
        BaseFunction
    }
    
    func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
        log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow)
    
        // 通过KisPool 路由到具体的执行计算Function中
        if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
            log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
            return err
        }
    
        return nil
    }
    

    4.4 KisPool单元测试

    接下来我们来针对KisPool进行单元测试。

    4.4.1 自定义FaaS

    kis-flow/test/kis_pool_test.go

    package test
    
    import (
        "context"
        "fmt"
        "kis-flow/common"
        "kis-flow/config"
        "kis-flow/flow"
        "kis-flow/kis"
        "testing"
    )
    
    func funcName1Handler(ctx context.Context, flow kis.Flow) error {
        fmt.Println("---> Call funcName1Handler ----")
    
        for index, row := range flow.Input() {
            // 打印数据
            str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
            fmt.Println(str)
    
            // 计算结果数据
            resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
    
            // 提交结果数据
            _ = flow.CommitRow(resultStr)
        }
    
        return nil
    }
    
    func funcName2Handler(ctx context.Context, flow kis.Flow) error {
    
        for _, row := range flow.Input() {
            str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
            fmt.Println(str)
        }
    
        return nil
    }
    
    

    4.4.2 注册FaaS及启动Flow

    kis-flow/test/kis_pool_test.go

    func TestNewKisPool(t *testing.T) {
    
        ctx := context.Background()
    
        // 0. 注册Function
        kis.Pool().FaaS("funcName1", funcName1Handler)
        kis.Pool().FaaS("funcName2", funcName2Handler)
    
        // 1. 创建2个KisFunction配置实例
        source1 := config.KisSource{
            Name: "公众号抖音商城户订单数据",
            Must: []string{"order_id", "user_id"},
        }
    
        source2 := config.KisSource{
            Name: "用户订单错误率",
            Must: []string{"order_id", "user_id"},
        }
    
        myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
        if myFuncConfig1 == nil {
            panic("myFuncConfig1 is nil")
        }
    
        myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
        if myFuncConfig2 == nil {
            panic("myFuncConfig2 is nil")
        }
    
        // 2. 创建一个 KisFlow 配置实例
        myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
    
        // 3. 创建一个KisFlow对象
        flow1 := flow.NewKisFlow(myFlowConfig1)
    
        // 4. 拼接Functioin 到 Flow 上
        if err := flow1.Link(myFuncConfig1, nil); err != nil {
            panic(err)
        }
        if err := flow1.Link(myFuncConfig2, nil); err != nil {
            panic(err)
        }
    
        // 5. 提交原始数据
        _ = flow1.CommitRow("This is Data1 from Test")
        _ = flow1.CommitRow("This is Data2 from Test")
        _ = flow1.CommitRow("This is Data3 from Test")
    
        // 6. 执行flow1
        if err := flow1.Run(ctx); err != nil {
            panic(err)
        }
    }
    

    cd到kis-flow/test/下执行命令:

    go test -test.v -test.paniconexit0 -test.run TestNewKisPool
    

    结果如下:

    === RUN   TestNewKisPool
    Add KisPool FuncName=funcName1
    Add KisPool FuncName=funcName2
    context.Background
    ====> After CommitSrcData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
    
    KisFunctionC, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c0190 ThisFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}
    
    ---> Call funcName1Handler ----
    In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data1 from Test
    In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data2 from Test
    In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data3 from Test
    context.Background
     ====> After commitCurData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionE, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c01e0 ThisFunctionId:func-9cd2ab870b384794b312d2be10bb06fa PrevFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}
    
    In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 0
    In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 1
    In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 2
    --- PASS: TestNewKisPool (0.00s)
    PASS
    ok      kis-flow/test   0.520s
    

    经过日志的详细校验,结果是符合我们预期的。

    好了,现在Function的业务能力已经开放给开发者了,接下来我们来继续完善KisFlow的能力。

    4.5 【V0.3】源代码

    https://github.com/aceld/kis-flow/releases/tag/v0.3


    作者:刘丹冰Aceld github: https://github.com/aceld
    KisFlow开源项目地址:https://github.com/aceld/kis-flow


    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度

    相关文章

      网友评论

        本文标题:Golang框架实战-KisFlow流式计算框架(5)-Func

        本文链接:https://www.haomeiwen.com/subject/kvlaadtx.html