美文网首页@IT·互联网KisFlow-基于Golang的流式计算框架实战
Golang框架实战-KisFlow流式计算框架(8)-KisF

Golang框架实战-KisFlow流式计算框架(8)-KisF

作者: 刘丹冰Aceld | 来源:发表于2024-03-04 13:54 被阅读0次

    Golang框架实战-KisFlow流式计算框架专栏

    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度
    Golang框架实战-KisFlow流式计算框架(6)-Connector
    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出


    7.1 Action Abort(终止流程)

    KisFlow Action 是指在执行Function的时候,同时可以控制Flow的调度逻辑,KisFlow提供一些Action动作让开发者做选择,本节先介绍最简单的Action动作,Abort(终止当前Flow)。

    我们最终的Abort的使用形式如下:

    func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
        fmt.Println("---> Call AbortFuncHandler ----")
    
        for _, row := range flow.Input() {
            str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
            fmt.Println(str)
        }
    
        return flow.Next(kis.ActionAbort)  // 终止Flow
    }
    

    AbortFuncHandler()是一个Function 的业务回调方法,是由开发者自定义的,在执行完当前Funciton之后,正常的情况是继续执行下一个Funciton,但是如果传递flow.Next(kis.ActionAbort) 作为当前Funciton的返回值,那么则不会执行到下一个Funciton,而是直接终止当前Flow的调度计算流。

    下面我们先来实现KisFlow的 Abort Action动作模式。

    7.1.1 Abort接口定义

    首先,先对Flow的Abort()接口做定义。

    kis-flow/kis/flow.go

    type Flow interface {
        // Run 调度Flow,依次调度Flow中的Function并且执行
        Run(ctx context.Context) error
        // Link 将Flow中的Function按照配置文件中的配置进行连接
        Link(fConf *config.KisFuncConfig, fParams config.FParam) error
        // CommitRow 提交Flow数据到即将执行的Function层
        CommitRow(row interface{}) error
        // Input 得到flow当前执行Function的输入源数据
        Input() common.KisRowArr
        // GetName 得到Flow的名称
        GetName() string
        // GetThisFunction 得到当前正在执行的Function
        GetThisFunction() Function
        // GetThisFuncConf 得到当前正在执行的Function的配置
        GetThisFuncConf() *config.KisFuncConfig
        // GetConnector 得到当前正在执行的Function的Connector
        GetConnector() (Connector, error)
        // GetConnConf 得到当前正在执行的Function的Connector的配置
        GetConnConf() (*config.KisConnConfig, error)
        // GetConfig 得到当前Flow的配置
        GetConfig() *config.KisFlowConfig
        // GetFuncConfigByName 得到当前Flow的配置
        GetFuncConfigByName(funcName string) *config.KisFuncConfig
    
        //  --- KisFlow Action ---
        // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
        Next(acts ...ActionFunc) error
    }
    

    这里面提供一个接口Next(acts ...ActionFunc) error,其中参数是一个可变参数,类型为ActionFunc,这个是我们给KisFlow定义的Action相关的方法。有关Action的定义模块如下:

    7.1.2 Action模块定义

    Action是用来在Flow执行过程中,通过Function来控制Flow执行特殊动作的行为配置模块,包括上面的Abort行为,Abort也属于其中一个Action。Action的模块定义如下,在kis-flow/kis/下创建action.go文件,实现:

    kis-flow/kis/action.go

    package kis
    
    // Action KisFlow执行流程Actions
    type Action struct {
        // Abort 终止Flow的执行
        Abort bool
    }
    
    // ActionFunc KisFlow Functional Option 类型
    type ActionFunc func(ops *Action)
    
    // LoadActions 加载Actions,依次执行ActionFunc操作函数
    func LoadActions(acts []ActionFunc) Action {
        action := Action{}
    
        if acts == nil {
            return action
        }
    
        for _, act := range acts {
            act(&action)
        }
    
        return action
    }
    
    // ActionAbort 终止Flow的执行
    func ActionAbort(action *Action) {
        action.Abort = true
    }
    
    

    首先,现在Action只有Abort一个行为,我们用bool类型来表示Abort是否为终止,true则为需要终止flow的调用。
    其次,type ActionFunc func(ops *Action)这个函数原型为一个函数类型,函数的形参是传递进来一个Action{} 指针,而 func ActionAbort(action *Action)则是它的一个具体的函数,ActionAbort()的方法的目的就是将Action的Abort成员设置为true。

    最后看func LoadActions(acts []ActionFunc) Action方法。这个形参是一个ActionFunc函数数组,LoadActions()则是创建一个新的Action{} ,然后依次执行[]ActionFunc的函数来改变Aciton{}的成员,最终将新的Action{}返回上层。

    7.1.3 Next方法实现

    接下来,我们需要给KisFlow模块实现这个接口,首先需要给KisFlow添加一个Action{}成员,表示每次执行完Function之后所携带的动作。

    kis-flow/flow/kis_flow.go

    // KisFlow 用于贯穿整条流式计算的上下文环境
    type KisFlow struct {
        // 基础信息
        Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
        Name string                // Flow的可读名称
        Conf *config.KisFlowConfig // Flow配置策略
    
        // Function列表
        Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
        FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
        FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
        flock          sync.RWMutex            // 管理链表插入读写的锁
        ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
        ThisFunctionId string                  // 当前执行到的Function ID
        PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID
    
        // Function列表参数
        funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
        fplock     sync.RWMutex             // 管理funcParams的读写锁
    
        // 数据
        buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
        data   common.KisDataMap // 流式计算各个层级的数据源
        inPut  common.KisRowArr  // 当前Function的计算输入数据
    
        // +++++++++++++++++++++
        
        // KisFlow Action
        action kis.Action        // 当前Flow所携带的Action动作
    }
    
    

    然后实现KisFlow的Next()接口,如下:

    kis-flow/flow/kis_flow.go

    // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
    func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {
    
        // 加载Function FaaS 传递的 Action动作
        flow.action = kis.LoadActions(acts)
    
        return nil
    }
    

    每次开发者在执行Function的自定义业务回调中,最后会调用flow.Next()来传递Action,所以Next(acts ...kis.ActionFunc) error就是讲传递的Action属性加载进来并且在flow.action保存。

    7.1.4 Abort控制Flow流程

    现在有个Abort来控制Flow流,那么我们需要给KisFlow添加一个成员来表示这个状态

    kis-flow/flow/kis_flow.go

    // KisFlow 用于贯穿整条流式计算的上下文环境
    type KisFlow struct {
        // 基础信息
        Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
        Name string                // Flow的可读名称
        Conf *config.KisFlowConfig // Flow配置策略
    
        // Function列表
        Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
        FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
        FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
        flock          sync.RWMutex            // 管理链表插入读写的锁
        ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
        ThisFunctionId string                  // 当前执行到的Function ID
        PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID
    
        // Function列表参数
        funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
        fplock     sync.RWMutex             // 管理funcParams的读写锁
    
        // 数据
        buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
        data   common.KisDataMap // 流式计算各个层级的数据源
        inPut  common.KisRowArr  // 当前Function的计算输入数据
        action kis.Action        // 当前Flow所携带的Action动作
    
        // +++++++++
        abort  bool              // 是否中断Flow
    }
    

    在每次执行到flow.Run()方法时,需要重置abort变量,并且在循环调度的时候加上对flow.abort的判断。

    kis-flow/flow/kis_flow.go

    // Run 启动KisFlow的流式计算, 从起始Function开始执行流
    func (flow *KisFlow) Run(ctx context.Context) error {
    
        // +++++++++
        // 重置 abort
        flow.abort = false  //  每次进入调度,要重置abort状态
    
        // ... ...
    
        // ... ...
    
        //流式链式调用
        for fn != nil && flow.abort != true { // ++++ 如果设置abort则不进入下次循环调度
    
            // ... ...
            // ... ...
            
            if err := fn.Call(ctx, flow); err != nil {
                //Error
                return err
            } else {
                //Success
    
                // ... ...
    
                fn = fn.Next()
            }
        }
    
        return nil
    
    

    这样在每次Call()调度到Funciton的自定方法时,如果return flow.Next(ActionAbort)就会对flow的Action状态进行改变,从而就控制了flow的流程终止。最后就是将Action的Abort状态传递给KisFlow的Abort状态。

    既然有了Abort状态,那么我们可以通过给Flow执行过程中添加一个设定,如果当前的Function没有提交本层的结果数据,也就是flow.buffer为空,那么将不会进入下一层,在本层直接结束退出Flow的Run()调用。

    kis-flow/flow/kis_flow_data.go

    //commitCurData 提交Flow当前执行Function的结果数据
    func (flow *KisFlow) commitCurData(ctx context.Context) error {
    
        // 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环
        if len(flow.buffer) == 0 {
            // ++++++++++++
            flow.abort = true
            return nil
        }
    
        // ... ...
        // ... ...
    
        return nil
    
    

    7.1.5 Action捕获及处理

    接下来来实现一个专门处理Action动作的方法,定义在kis-flow/flow/kis_flow_action.go文件中,如下:

    kis-flow/flow/kis_flow_action.go

    package flow
    
    import (
        "context"
        "errors"
        "fmt"
        "kis-flow/kis"
    )
    
    // dealAction  处理Action,决定接下来Flow的流程走向
    func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
    
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    
        // 更新上一层 FuncitonId 游标
        flow.PrevFunctionId = flow.ThisFunctionId
        fn = fn.Next()
    
        // Abort Action 强制终止
        if flow.action.Abort {
            flow.abort = true
        }
    
        // 清空Action
        flow.action = kis.Action{}
    
        return fn, nil
    }
    

    然后稍微改进下KisFlow的Run() 流程,将dealAction()方法嵌入进去。

    kis-flow/flow/kis_flow.go

    // Run 启动KisFlow的流式计算, 从起始Function开始执行流
    func (flow *KisFlow) Run(ctx context.Context) error {
    
        var fn kis.Function
    
        fn = flow.FlowHead
        flow.abort = false
    
        if flow.Conf.Status == int(common.FlowDisable) {
            //flow被配置关闭
            return nil
        }
    
        // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
        flow.PrevFunctionId = common.FunctionIdFirstVirtual
    
        // 提交数据流原始数据
        if err := flow.commitSrcData(ctx); err != nil {
            return err
        }
    
        //流式链式调用
        for fn != nil && flow.abort == false {
    
            // flow记录当前执行到的Function 标记
            fid := fn.GetId()
            flow.ThisFunction = fn
            flow.ThisFunctionId = fid
    
            // 得到当前Function要处理与的源数据
            if inputData, err := flow.getCurData(); err != nil {
                log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
                return err
            } else {
                flow.inPut = inputData
            }
    
            if err := fn.Call(ctx, flow); err != nil {
                //Error
                return err
            } else {
                //Success
                
                // +++++++++++++++++++++++++++++++
                fn, err = flow.dealAction(ctx, fn)
                if err != nil {
                    return err
                }
                // +++++++++++++++++++++++++++++++
            }
        }
    
        return nil
    }
    

    7.1.6 Action Abort单元测试

    首先我们新建一个Function业务,配置文件如下:

    kis-flow/test/load_conf/func/func-AbortFunc.yml

    kistype: func
    fname: abortFunc
    fmode: Calculate
    source:
      name: 用户订单错误率
      must:
        - order_id
        - user_id
    

    当前的Funciton的名称为abortFunc,然后实现其FaaS函数,如下:

    kis-flow/test/faas/faas_abort.go

    package faas
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    // type FaaS func(context.Context, Flow) error
    
    func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
        fmt.Println("---> Call AbortFuncHandler ----")
    
        for _, row := range flow.Input() {
            str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
            fmt.Println(str)
        }
    
        return flow.Next(kis.ActionAbort)
    }
    
    

    这个Function就会最终调用flow.Next(kis.ActionAbort)来终止Flow,接下来我们新建一个Flow,将上面的Function作为中间的Function,看检测是否会终止之后的Function被执行。
    新建的flow的配置如下:

    kis-flow/test/load_conf/flow/flow-FlowName2.yml

    kistype: flow
    status: 1
    flow_name: flowName2
    flows:
      - fname: funcName1
      - fname: abortFunc
      - fname: funcName3
    
    

    当前Flow的名称为flowName2,当前的Flow有三个Function,其中funcNam1 和 funcName2我们之前都已经定义好了,abortFunc是我们新建的,并且在中间。如果abort功能满足,则funcName3将不会被调度。

    接下来实现单元测试用例。

    kis-flow/test/kis_action_test.go

    package test
    
    import (
        "context"
        "kis-flow/common"
        "kis-flow/file"
        "kis-flow/kis"
        "kis-flow/test/caas"
        "kis-flow/test/faas"
        "testing"
    )
    
    func TestActionAbort(t *testing.T) {
        ctx := context.Background()
    
        // 0. 注册Function 回调业务
        kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
        kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
        kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
    
        // 0. 注册ConnectorInit 和 Connector 回调业务
        kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
        kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
    
        // 1. 加载配置文件并构建Flow
        if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
            panic(err)
        }
    
        // 2. 获取Flow
        flow1 := kis.Pool().GetFlow("flowName2")
    
        // 3. 提交原始数据
        _ = flow1.CommitRow("This is Data1 from Test")
        _ = flow1.CommitRow("This is Data2 from Test")
        _ = flow1.CommitRow("This is Data3 from Test")
    
        // 4. 执行flow1
        if err := flow1.Run(ctx); err != nil {
            panic(err)
        }
    }
    

    其中下面的代码是初始化注册的代码,大家也可以写在其他文件中,这样就不需要每次都携带这部分代码了。

        // 0. 注册Function 回调业务
        kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
        kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
        kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
    
        // 0. 注册ConnectorInit 和 Connector 回调业务
        kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
        kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
    

    cd 到kis-flow/test/目录下执行如下指令:

    go test -test.v -test.paniconexit0 -test.run  TestActionAbort
    

    结果如下:

    === RUN   TestActionAbort
    Add KisPool FuncName=funcName1
    Add KisPool FuncName=abortFunc
    Add KisPool FuncName=funcName3
    Add KisPool CaaSInit CName=ConnName1
    Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
    ===> Call Connector InitDemo1
    &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
    Add FlowRouter FlowName=flowName1
    
    Add FlowRouter FlowName=flowName2
    
    context.Background
    ====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
    
    KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>}
    
    ---> Call funcName1Handler ----
    In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test
    In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test
    In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test
    context.Background
     ====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>}
    
    ---> Call AbortFuncHandler ----
    In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0
    In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1
    In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2
    --- PASS: TestActionAbort (0.00s)
    PASS
    ok      kis-flow/test   0.487s
    
    

    通过结果可以看到,在执行完 AbortFuncHandler 后,没有继续执行,而是退出了Flow的Run()方法。

    7.2 Action DataReuse(复用上层数据)

    Action DataReuse 为服用上层数据,含义为,当前的执行Function提交到下一层的结果将不被使用,而是直接将当前Function的上一层结果数据,复用到下一层,作为下一层Funciton的数据源。

    下面来实现Action DataReuse功能。

    7.2.1 DataReuse Action添加

    在Action中添加DataReuse成员,是一个bool类型。

    kis-flow/kis/action.go

    // Action KisFlow执行流程Actions
    type Action struct {
        // +++++++++++++
        // DataReuse 是否复用上层Function数据
        DataReuse bool
    
        // Abort 终止Flow的执行
        Abort bool
    }
    
    
    // ActionDataReuse Next复用上层Function数据Option
    func ActionDataReuse(act *Action) {
        act.DataReuse = true
    }
    
    

    然后提供一个ActionFunc,命名为:ActionDataReuse,实现中为改变DataReuse状态为true。

    7.2.2 复用上层数据到下层

    这里需要再实现一个提交数据的方法,为如何提交复用数据,具体逻辑如下:

    kis-flow/flow/kis_flow_data.go

    // commitReuseData
    func (flow *KisFlow) commitReuseData(ctx context.Context) error {
    
        // 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环
        if len(flow.data[flow.PrevFunctionId]) == 0 {
            flow.abort = true
            return nil
        }
    
        // 本层结果数据等于上层结果数据(复用上层结果数据到本层)
        flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
    
        // 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层)
        flow.buffer = flow.buffer[0:0]
    
        log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
    
        return nil
    }
    

    逻辑很简单,与commitCurData()不同的是,commitCurData()为将flow.buffer的数据提交到flow.data[flow.ThisFunctionId]中,而commitReuseData()为将上一层的结果数据提交到flow.data[flow.ThisFunctionId]中。

    7.2.3 处理DataReuse Action动作

    然后在dealAction()方法中添加对Action DataReuse的动作捕获,如下:

    kis-flow/flow/kis_flow_action.go

    // dealAction  处理Action,决定接下来Flow的流程走向
    func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
    
        // ++++++++++++++++
        // DataReuse Action
        if flow.action.DataReuse {
            if err := flow.commitReuseData(ctx); err != nil {
                return nil, err
            }
        } else {
            if err := flow.commitCurData(ctx); err != nil {
                return nil, err
            }
        }
        
    
        // 更新上一层 FuncitonId 游标
        flow.PrevFunctionId = flow.ThisFunctionId
        fn = fn.Next()
    
        // Abort Action 强制终止
        if flow.action.Abort {
            flow.abort = true
        }
    
        // 清空Action
        flow.action = kis.Action{}
    
        return fn, nil
    }
    

    7.2.4 单元测试

    下面来针对DataReuse做单元测试,首先创建一个名字为dataReuseFunc 的Funciton,先创建配置文件。

    kis-flow/test/load_conf/func/func-dataReuseFunc.yml

    kistype: func
    fname: dataReuseFunc
    fmode: Calculate
    source:
      name: 用户订单错误率
      must:
        - order_id
        - user_id
    

    同时新建一个Flow流,名称为flowName3,配置如下:

    kis-flow/test/load_conf/flow/func-FlowName3.yml

    kistype: flow
    status: 1
    flow_name: flowName3
    flows:
      - fname: funcName1
      - fname: dataReuseFunc
      - fname: funcName3
    
    

    针对dataReuseFunc的Function的逻辑业务,如下:

    kis-flow/test/faas/faas_data_reuse.go

    package faas
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    // type FaaS func(context.Context, Flow) error
    
    func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
        fmt.Println("---> Call DataReuseFuncHandler ----")
    
        for index, row := range flow.Input() {
            str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
            fmt.Println(str)
    
            // 计算结果数据
            resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
    
            // 提交结果数据
            _ = flow.CommitRow(resultStr)
        }
    
        return flow.Next(kis.ActionDataReuse)
    }
    
    

    最后实现测试用例,如下:

    kis-flow/test/kis_action_test.go

    func TestActionDataReuse(t *testing.T) {
        ctx := context.Background()
    
        // 0. 注册Function 回调业务
        kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
        kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // 添加dataReuesFunc 业务
        kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
    
        // 0. 注册ConnectorInit 和 Connector 回调业务
        kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
        kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
    
        // 1. 加载配置文件并构建Flow
        if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
            panic(err)
        }
    
        // 2. 获取Flow
        flow1 := kis.Pool().GetFlow("flowName3")
    
        // 3. 提交原始数据
        _ = flow1.CommitRow("This is Data1 from Test")
        _ = flow1.CommitRow("This is Data2 from Test")
        _ = flow1.CommitRow("This is Data3 from Test")
    
        // 4. 执行flow1
        if err := flow1.Run(ctx); err != nil {
            panic(err)
        }
    }
    

    cd 到 kis-flow/test/下执行:

    go test -test.v -test.paniconexit0 -test.run  TestActionDataReuse
    

    结果是:

    === RUN   TestActionDataReuse
    Add KisPool FuncName=funcName1
    Add KisPool FuncName=dataReuseFunc
    Add KisPool FuncName=funcName3
    Add KisPool CaaSInit CName=ConnName1
    Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
    ===> Call Connector InitDemo1
    &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
    Add FlowRouter FlowName=flowName5
    ===> Call Connector InitDemo1
    &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
    Add FlowRouter FlowName=flowName1
    Add FlowRouter FlowName=flowName2
    Add FlowRouter FlowName=flowName3
    Add FlowRouter FlowName=flowName4
    context.Background
    ====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
    
    KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call funcName1Handler ----
    In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test
    In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test
    In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test
    context.Background
     ====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call DataReuseFuncHandler ----
    In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0
    In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1
    In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2
    context.Background
     ====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call funcName3Handler ----
    In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0
    In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1
    In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2
    --- PASS: TestActionDataReuse (0.02s)
    PASS
    ok      kis-flow/test   0.523s
    
    

    通过结果可以看出,在最后的funcName3Handler中得到的数据是funcName1传递下来的数据,中间的ReuseFunction将上层的数据复用到了下一层,变成了FuncName3的数据源。

    7.3 Action ForceEntryNext(强制进入下一层)

    7.3.1 ForceEntryNext Action属性

    目前的KisFlow为,如果当前的Function没有commit数据(本层的结果数据),那么当前的Function结束后,将不会继续调度下一层Function。 但是有的Flow的流式计算可能需要继续向下执行,哪怕没有数据,所以这里可以通过ForceEntryNext这个动作来触发。
    首先我们在Action中新增一个ForceEntryNext 属性。

    kis-flow/kis/action.go

    // Action KisFlow执行流程Actions
    type Action struct {
        // DataReuse 是否复用上层Function数据
        DataReuse bool
    
        // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
        // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
        ForceEntryNext bool
    
        // Abort 终止Flow的执行
        Abort bool
    }
    
    // ActionForceEntryNext 强制进入下一层
    func ActionForceEntryNext(act *Action) {
        act.ForceEntryNext = true
    }
    
    

    且提供配置函数ActionForceEntryNext()来修改这个属性状态。

    7.3.2 捕获Action

    在捕获Action的dealAction()方法中,加上对这个状态的判断,如果被设置,则需要将flow.Abort状态改成false,flow将继续执行下一层。

    kis-flow/flow/kis_flow_action.go

    // dealAction  处理Action,决定接下来Flow的流程走向
    func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
    
        // DataReuse Action
        if flow.action.DataReuse {
            if err := flow.commitReuseData(ctx); err != nil {
                return nil, err
            }
        } else {
            if err := flow.commitCurData(ctx); err != nil {
                return nil, err
            }
        }
    
        // ++++++++++++++++++++++++++++
        // ForceEntryNext Action
        if flow.action.ForceEntryNext {
            if err := flow.commitVoidData(ctx); err != nil {
                return nil, err
            }
            flow.abort = false
        }
    
        // 更新上一层 FuncitonId 游标
        flow.PrevFunctionId = flow.ThisFunctionId
        fn = fn.Next()
    
        // Abort Action 强制终止
        if flow.action.Abort {
            flow.abort = true
        }
    
        // 清空Action
        flow.action = kis.Action{}
    
        return fn, nil
    }
    

    这里有一个细节,我们需要调用一个方法commitVoidData(),即提交空数据,原因是,如果不提交空数据,那么flow.buffer依然为空,那么不会执行数据的提交动作,那么会导致flow.data[flow.ThisFunctionId]这条不存在,也就是key不存在,那么再执行到flow.getCurData()会出现找不到key的异常而panic。所以这里需要提交一个空的数据到flow.data[flow.ThisFunctionId]中。
    具体的commitVoidData()实现如下:

    kis-flow/flow/kis_flow_data.go

    func (flow *KisFlow) commitVoidData(ctx context.Context) error {
        if len(flow.buffer) != 0 {
            return nil
        }
    
        // 制作空数据
        batch := make(common.KisRowArr, 0)
    
        // 将本层计算的缓冲数据提交到本层结果数据中
        flow.data[flow.ThisFunctionId] = batch
    
        log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
    
        return nil
    }
    

    7.3.3 单元测试,不设置ForceEntryNext

    首先,创建一个noResultFunc的Function配置,且实现相关的回调业务函数。

    kis-flow/test/load_conf/func/func-NoResultFunc.yml

    kistype: func
    fname: noResultFunc
    fmode: Calculate
    source:
      name: 用户订单错误率
      must:
        - order_id
        - user_id
    

    kis-flow/test/faas/faas_no_result.go

    package faas
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    // type FaaS func(context.Context, Flow) error
    
    func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
        fmt.Println("---> Call NoResultFuncHandler ----")
    
        for _, row := range flow.Input() {
            str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
            fmt.Println(str)
        }
    
        return flow.Next()
    }
    
    

    这里面在Function的最后,只调用flow.Next() 不传递任何Action动作。
    然后新建一个FlowName4,配置如下:

    kis-flow/test/load_conf/flow-FlowName4.yml

    kistype: flow
    status: 1
    flow_name: flowName4
    flows:
      - fname: funcName1
      - fname: noResultFunc
      - fname: funcName3
    
    

    最后我们编写单元测试用例代码,将noResultFunc放在中间的部分。

    kis-flow/test/kis_action_test.go

    func TestActionForceEntry(t *testing.T) {
        ctx := context.Background()
    
        // 0. 注册Function 回调业务
        kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
        kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // 添加noResultFunc 业务
        kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
    
        // 0. 注册ConnectorInit 和 Connector 回调业务
        kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
        kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
    
        // 1. 加载配置文件并构建Flow
        if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
            panic(err)
        }
    
        // 2. 获取Flow
        flow1 := kis.Pool().GetFlow("flowName4")
    
        // 3. 提交原始数据
        _ = flow1.CommitRow("This is Data1 from Test")
        _ = flow1.CommitRow("This is Data2 from Test")
        _ = flow1.CommitRow("This is Data3 from Test")
    
        // 4. 执行flow1
        if err := flow1.Run(ctx); err != nil {
            panic(err)
        }
    }
    

    cd到kis-flow/test/ 下执行:

    go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry
    

    结果如下:

    === RUN   TestActionForceEntry
    Add KisPool FuncName=funcName1
    Add KisPool FuncName=noResultFunc
    Add KisPool FuncName=funcName3
    Add KisPool CaaSInit CName=ConnName1
    Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
    ===> Call Connector InitDemo1
    &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
    Add FlowRouter FlowName=flowName1
    Add FlowRouter FlowName=flowName2
    Add FlowRouter FlowName=flowName3
    Add FlowRouter FlowName=flowName4
    ===> Call Connector InitDemo1
    &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
    Add FlowRouter FlowName=flowName5
    context.Background
    ====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
    
    KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call funcName1Handler ----
    In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test
    In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test
    In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test
    context.Background
     ====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call NoResultFuncHandler ----
    In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0
    In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1
    In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2
    --- PASS: TestActionForceEntry (0.02s)
    PASS
    ok      kis-flow/test   0.958s
    
    

    因为noResultFunc不会生成任何的结果数据,所以下一层Function将不会被执行,最后只执行到

    ---> Call NoResultFuncHandler ----
    

    7.3.4 单元测试,设置ForceEntryNext

    下面我们将Action为ForceEntryNext加上,在NoResultFuncHandler() 中,加上flow.Next(kis.ActionForceEntryNext),如下:

    kis-flow/test/faas/faas_no_result.go

    package faas
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    // type FaaS func(context.Context, Flow) error
    
    func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
        fmt.Println("---> Call NoResultFuncHandler ----")
    
        for _, row := range flow.Input() {
            str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
            fmt.Println(str)
        }
    
        return flow.Next(kis.ActionForceEntryNext)
    }
    
    

    cd到kis-flow/test/ 下执行:

    go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry
    

    结果如下:

    === RUN   TestActionForceEntry
    Add KisPool FuncName=funcName1
    Add KisPool FuncName=noResultFunc
    Add KisPool FuncName=funcName3
    Add KisPool CaaSInit CName=ConnName1
    Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
    ===> Call Connector InitDemo1
    &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
    Add FlowRouter FlowName=flowName5
    ===> Call Connector InitDemo1
    &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
    Add FlowRouter FlowName=flowName1
    Add FlowRouter FlowName=flowName2
    Add FlowRouter FlowName=flowName3
    Add FlowRouter FlowName=flowName4
    context.Background
    ====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
    
    KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call funcName1Handler ----
    In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test
    In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test
    In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test
    context.Background
     ====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call NoResultFuncHandler ----
    In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0
    In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1
    In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2
    context.Background
     ====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call funcName3Handler ----
    --- PASS: TestActionForceEntry (0.01s)
    PASS
    ok      kis-flow/test   0.348s
    
    

    会发现,Function第三层funcName3Handler 被执行到,但是没有任何的数据。

    7.4 Action JumpFunc(流程跳转)

    接下来,来实现JumpFunc Action,JumpFunc是可以在当前Flow中任意跳转到指定的FuncName继续执行(前提是跳转的FuncName当当前Flow中存在)

    注意:JumpFunc容易出现无限循环流,所以在业务的设计要慎用。

    7.4.1 Action添加JumpFunc

    首先在Action添加一个JumpFunc属性,注意,JunpFunc不是一个bool状态,而是一个string字符串,表示具体要跳转的FunctionName名称。

    kis-flow/kis/action.go

    // Action KisFlow执行流程Actions
    type Action struct {
        // DataReuse 是否复用上层Function数据
        DataReuse bool
    
        // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
        // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
        ForceEntryNext bool
    
        // ++++++++++
        // JumpFunc 跳转到指定Function继续执行
        JumpFunc string
    
        // Abort 终止Flow的执行
        Abort bool
    }
    
    
    // ActionJumpFunc 会返回一个ActionFunc函数,并且会将funcName赋值给Action.JumpFunc
    // (注意:容易出现Flow循环调用,导致死循环)
    func ActionJumpFunc(funcName string) ActionFunc {
        return func(act *Action) {
            act.JumpFunc = funcName
        }
    }
    

    然后提供一个修改JumpFunc的配置方法ActionJumpFunc(),注意这个方法和之前的方法写法有一些不同,主要是返回一个匿名函数并且执行,目的则是修改Action中的JumpFunc属性。

    7.4.2 捕获Action

    接下来,我们来捕获JumpFunc的Action动作,判断JumpFunc是否为空字符串即可。

    kis-flow/flow/kis_flow_action.go

    // dealAction  处理Action,决定接下来Flow的流程走向
    func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
    
        // DataReuse Action
        if flow.action.DataReuse {
            if err := flow.commitReuseData(ctx); err != nil {
                return nil, err
            }
        } else {
            if err := flow.commitCurData(ctx); err != nil {
                return nil, err
            }
        }
    
        // ForceEntryNext Action
        if flow.action.ForceEntryNext {
            if err := flow.commitVoidData(ctx); err != nil {
                return nil, err
            }
            flow.abort = false
        }
    
        // ++++++++++++++++++++++++++++++++
        // JumpFunc Action
        if flow.action.JumpFunc != "" {
            if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
                //当前JumpFunc不在flow中
                return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
            }
    
            jumpFunction := flow.Funcs[flow.action.JumpFunc]
            // 更新上层Function
            flow.PrevFunctionId = jumpFunction.GetPrevId()
            fn = jumpFunction
    
            // 如果设置跳跃,强制跳跃
            flow.abort = false
        // ++++++++++++++++++++++++++++++++
    
        } else {
    
            // 更新上一层 FuncitonId 游标
            flow.PrevFunctionId = flow.ThisFunctionId
            fn = fn.Next()
        }
    
        // Abort Action 强制终止
        if flow.action.Abort {
            flow.abort = true
        }
    
        // 清空Action
        flow.action = kis.Action{}
    
        return fn, nil
    }
    
    

    如果设置JumpFunc,则需要修改下次执行的fn指针,否则则正常寻址fn.Next()

    7.4.3 单元测试

    接下来来定义一个跳转Action的Function,配置,如下:

    kis-flow/test/load_conf/func/func-jumpFunc.yml

    kistype: func
    fname: jumpFunc
    fmode: Calculate
    source:
      name: 用户订单错误率
      must:
        - order_id
        - user_id
    

    并且实现相关的Funciton业务逻辑,如下:

    kis-flow/test/faas/faas_jump.go

    package faas
    
    import (
        "context"
        "fmt"
        "kis-flow/kis"
    )
    
    // type FaaS func(context.Context, Flow) error
    
    func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
        fmt.Println("---> Call JumpFuncHandler ----")
    
        for _, row := range flow.Input() {
            str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
            fmt.Println(str)
        }
    
        return flow.Next(kis.ActionJumpFunc("funcName1"))
    }
    
    

    这里,最后通过flow.Next(kis.ActionJumpFunc("funcName1"))来指定跳转到funcName1的Function。

    新建一个Flow,为FlowName5,配置如下:

    kis-flow/test/load_conf/flow/flow-FlowName5.yml

    kistype: flow
    status: 1
    flow_name: flowName5
    flows:
      - fname: funcName1
      - fname: funcName2
      - fname: jumpFunc
    
    

    之后,来实现单元测试用例代码,如下:

    kis-flow/test/kis_action_test.go

    func TestActionJumpFunc(t *testing.T) {
        ctx := context.Background()
    
        // 0. 注册Function 回调业务
        kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
        kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
        kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // 添加jumpFunc 业务
    
        // 0. 注册ConnectorInit 和 Connector 回调业务
        kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
        kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
    
        // 1. 加载配置文件并构建Flow
        if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
            panic(err)
        }
    
        // 2. 获取Flow
        flow1 := kis.Pool().GetFlow("flowName5")
    
        // 3. 提交原始数据
        _ = flow1.CommitRow("This is Data1 from Test")
        _ = flow1.CommitRow("This is Data2 from Test")
        _ = flow1.CommitRow("This is Data3 from Test")
    
        // 4. 执行flow1
        if err := flow1.Run(ctx); err != nil {
            panic(err)
        }
    }
    
    

    cd到kis-flow/test/执行:

    go test -test.v -test.paniconexit0 -test.run  TestActionJumpFunc
    

    结果如下:

    ... 
    ...
    
    ---> Call funcName1Handler ----
    In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test
    In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test
    In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test
    context.Background
     ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call funcName2Handler ----
    In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0
    ===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
    ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
    In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1
    ===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
    ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
    In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2
    ===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
    ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
    context.Background
     ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
    All Level Data =
     map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
    
    KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call JumpFuncHandler ----
    In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0
    In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1
    In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2
    KisFunctionV, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId:func-f6ca8010d66744429bf6069c9897a928 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
    
    ---> Call funcName1Handler ----
    
     ... 
     ...
    
    

    发现我们会无限循环的调度Flow,这样说明我们的JumpFunc Action已经生效。

    7.5【V0.6】源代码

    https://github.com/aceld/kis-flow/releases/tag/v0.6


    作者:刘丹冰Aceld github: https://github.com/aceld
    KisFlow开源项目地址:https://github.com/aceld/kis-flow

    Golang框架实战-KisFlow流式计算框架专栏

    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度
    Golang框架实战-KisFlow流式计算框架(6)-Connector
    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

    相关文章

      网友评论

        本文标题:Golang框架实战-KisFlow流式计算框架(8)-KisF

        本文链接:https://www.haomeiwen.com/subject/ctjaadtx.html