美文网首页KisFlow-基于Golang的流式计算框架实战@IT·互联网
Golang框架实战-KisFlow流式计算框架(2)-项目构建

Golang框架实战-KisFlow流式计算框架(2)-项目构建

作者: 刘丹冰Aceld | 来源:发表于2024-02-22 16:07 被阅读0次

    连载中...
    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)


    2. V0.1-项目构建及基础模块定义

    首先我们创建我们的项目,项目的主文件目录就叫KisFlow,且在Github上创建对应的仓库: https://github.com/aceld/kis-flow 然后将项目代码clone到本地。

    2.0 项目构建

    (这里如果你是按照本教程开发,需要在自己的仓库重新创建一个新项目,并且clone到本地开发)

    2.0.1 创建项目目录

    接下来,我们先将项目中的必要的文件目录创建好,项目的目录结构如下:

     kis-flow /
    .
    ├── LICENSE
    ├── README.md
    ├── common/
    ├── example/
    ├── function/
    ├── conn/
    ├── config/
    ├── flow/
    └── kis/
    

    这里我们创建三个文件夹, common/为 存放我们一些公用的基础常量和一些枚举参数,还有一些工具类的方法。 flow/为存放KisFlow的核心代码。 function/为存放KisFunction的核心代码。 conn/为存放KisConnector的核心代码。 config/ 存放flow、functioin、connector等策略配置信息模块。 example/为我们针对KisFlow的一些测试案例和test单元测试案例等,能够及时验证我们的项目效果。 kis/来存放所有模块的抽象层。

    2.0.1 创建go.mod

    cd 到 kis-flow的项目根目录,执行如下指令:

    我们会得到go.mod文件,这个是作为当前项目的包管理文件,如下:

    module kis-flow
    go 1.18
    

    首先因为在之后会有很多调试日志要打印,我们先把日志模块集成了,日志模块KisFlow提供一个默认的标准输出Logger对象,再对我开放一个SetLogger() 方法来进行重新设置开发者自己的Logger模块。

    2.1 KisLogger

    2.1.1 Logger抽象接口

    将Logger的定义在kis-flow/log/目录下,创建kis_log.go文件:

    kis-flow/log/kis_log.go

    package log
    
    import "context"
    
    type KisLogger interface {
        // InfoFX 有上下文的Info级别日志接口, format字符串格式
        InfoFX(ctx context.Context, str string, v ...interface{})
        // ErrorFX 有上下文的Error级别日志接口, format字符串格式
        ErrorFX(ctx context.Context, str string, v ...interface{})
        // DebugFX 有上下文的Debug级别日志接口, format字符串格式
        DebugFX(ctx context.Context, str string, v ...interface{})
    
        // InfoF 无上下文的Info级别日志接口, format字符串格式
        InfoF(str string, v ...interface{})
        // ErrorF 无上下文的Error级别日志接口, format字符串格式
        ErrorF(str string, v ...interface{})
        // DebugF 无上下文的Debug级别日志接口, format字符串格式
        DebugF(str string, v ...interface{})
    }
    
    // kisLog 默认的KisLog 对象
    var kisLog KisLogger
    
    // SetLogger 设置KisLog对象, 可以是用户自定义的Logger对象
    func SetLogger(newlog KisLogger) {
        kisLog = newlog
    }
    
    // Logger 获取到kisLog对象
    func Logger() KisLogger {
        return kisLog
    }
    
    

    KisLogger提供了三个级别的日志,分别是Info、Error、Debug。且也分别提供了具备context参数与不具备context参数的两套日志接口。
    提供一个全局对象kisLog,默认的KisLog 对象。以及方法SetLogger()Logger()供开发可以设置自己的Logger对象以及获取到Logger对象。

    2.1.2 默认的日志对象KisDefaultLogger

    如果开发没有自定义的日志对象定义,那么KisFlow会提供一个默认的日志对象kisDefaultLogger,这个类实现了KisLogger的全部接口,且都是默认打印到标准输出的形式来打印日志,定义在kis-flow/log/目录下,创建kis_default_log.go文件。

    kis-flow/log/kis_default_log.go

    package log
    
    import (
        "context"
        "fmt"
    )
    
    // kisDefaultLog 默认提供的日志对象
    type kisDefaultLog struct{}
    
    func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {
        fmt.Printf(str, v...)
    }
    
    func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {
        fmt.Printf(str, v...)
    }
    
    func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {
        fmt.Printf(str, v...)
    }
    
    func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {
        fmt.Println(ctx)
        fmt.Printf(str, v...)
    }
    
    func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {
        fmt.Println(ctx)
        fmt.Printf(str, v...)
    }
    
    func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {
        fmt.Println(ctx)
        fmt.Printf(str, v...)
    }
    
    func init() {
        // 如果没有设置Logger, 则启动时使用默认的kisDefaultLog对象
        if Logger() == nil {
            SetLogger(&kisDefaultLog{})
        }
    }
    
    

    这里在init()初始化方法中,会判断目前是否已经有设置全局的Logger对象,如果没有,KisFlow会默认选择kisDefaultLog 作为全局Logger日志对象。

    2.1.3 单元测试KisLogger

    现在,我们先不针对KisLogger做过多的方法开发,我们优先将现有的程序跑起来,做一个单元测试来测试创建一个KisLogger

    kis-flow/test/kis_log_test.go

    package test
    
    import (
        "context"
        "kis-flow/log"
        "testing"
    )
    
    func TestKisLogger(t *testing.T) {
        ctx := context.Background()
    
        log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
        log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
        log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")
    
        log.Logger().InfoF("TestKisLogger InfoF")
        log.Logger().ErrorF("TestKisLogger ErrorF")
        log.Logger().DebugF("TestKisLogger DebugF")
    }
    
    

    我们cdkis-flow/test/目录下执行单元测试指令:

    go test -test.v -test.paniconexit0 -test.run TestKisLogger
    

    得到结果如下:

    === RUN   TestKisLogger
    context.Background
    TestKisLogger InfoFX
    context.Background
    TestKisLogger ErrorFX
    context.Background
    TestKisLogger DebugFX
    TestKisLogger InfoF
    TestKisLogger ErrorF
    TestKisLogger DebugF
    --- PASS: TestKisLogger (0.00s)
    PASS
    ok      kis-flow/test   0.509s
    

    2.2 KisConfig

    在KisFlow中,我们定义了三种核心模块,分别是KisFunction, KisFlow, KisConnector ,所以KisConfig也分别需要针对这三个模块进行定义,我们将全部有关KisConfig的代码都放在kis-flow/config/目录下。

    ➜  kis-flow git:(master) ✗ tree
    .
    ├── LICENSE
    ├── README.md
    ├── common/
    │   └── 
    ├── example/
    │   └── 
    ├── config/
    │   ├──
    ├── test/
    └── go.mod
    

    2.2.1 KisFuncConfig 定义

    KisFuncConfig在设计文档中的yaml文件形式如下:

    kistype: func
    fname: 测试KisFunction_S1
    fmode: Save
    source:
     name: 被校验的测试数据源1-用户订单维度
     must:
     - userid
     - orderid
    
    option:
     cname: 测试KisConnector_1
     retry_times: 3
     retry_duration: 500
     default_params:
     default1: default1_param
     default2: default2_param
    

    参数说明:



    接下来我们根据上述的配置协议,来定义KisFunction的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建kis_func_config.go文件,在这里我们将需要的Config定义实现。

    A. 结构体定义

    kis-flow/config/kis_func_config.go

    package config
    
    import (
        "kis-flow/common"
        "kis-flow/log"
    )
    
    // FParam 在当前Flow中Function定制固定配置参数类型
    type FParam map[string]string
    
    // KisSource 表示当前Function的业务源
    type KisSource struct {
        Name string   `yaml:"name"` //本层Function的数据源描述
        Must []string `yaml:"must"` //source必传字段
    }
    
    // KisFuncOption 可选配置
    type KisFuncOption struct {
        CName        string `yaml:"cname"`           //连接器Connector名称
        RetryTimes   int    `yaml:"retry_times"`     //选填,Function调度重试(不包括正常调度)最大次数
        RetryDuriton int    `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms)
        Params       FParam `yaml:"default_params"`  //选填,在当前Flow中Function定制固定配置参数
    }
    
    // KisFuncConfig 一个KisFunction策略配置
    type KisFuncConfig struct {
        KisType string        `yaml:"kistype"`
        FName   string        `yaml:"fname"`
        FMode   string        `yaml:"fmode"`
        Source  KisSource     `yaml:"source"`
        Option  KisFuncOption `yaml:"option"`
    }
    

    这里KisFuncConfig是相关结构体,其中 FParamKisSourceKisFuncOption均为一些相关的参数类型。

    B. 相关方法定义

    下面我们先简单的提供创建KisFuncConfig的构造方法。

    kis-flow/config/kis_func_config.go

    // NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息
    func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {
         config := new(KisFuncConfig)
         config.FName = funcName
    
         if source == nil {
             log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName)
             return nil
         }
    
         config.Source = *source
         config.FMode = string(mode)
    
         //FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
         if mode == common.S || mode == common.L {
                 if option == nil {
                       log.Logger().ErrorF("Funcion S/L need option->Cid\n")
                       return nil
                 } else if option.CName == "" {
                       log.Logger().ErrorF("Funcion S/L need option->Cid\n")
                       return nil
                 }
           }
    
          if option != nil {
               config.Option = *option
          }
    
         return config
    }
    

    上述代码中提到了common.Scommon.L两个枚举类型,这是我们针对KisFunction提供的五种类型的枚举值,我们可以将他们定义在 kis-flow/common/const.go文件中。

    kis-flow/common/const.go

    package common
    
    type KisMode string
    
    const (
        // V 为校验特征的KisFunction, 
        // 主要进行数据的过滤,验证,字段梳理,幂等等前置数据处理
        V KisMode = "Verify"
    
        // S 为存储特征的KisFunction, 
        // S会通过NsConnector进行将数据进行存储,数据的临时声明周期为NsWindow
        S KisMode = "Save"
    
        // L 为加载特征的KisFunction,
        // L会通过KisConnector进行数据加载,通过该Function可以从逻辑上与对应的S Function进行并流
        L KisMode = "Load"
    
        // C 为计算特征的KisFunction, 
        // C会通过KisFlow中的数据计算,生成新的字段,将数据流传递给下游S进行存储,或者自己也已直接通过KisConnector进行存储
        C KisMode = "Calculate"
    
        // E 为扩展特征的KisFunction,
        // 作为流式计算的自定义特征Function,如,Notify 调度器触发任务的消息发送,删除一些数据,重置状态等。
        E KisMode = "Expand"
    )
    

    如果fmodeSave或者Load说明这个function有查询库或者存储数据的行为,那么这个Function就需要关联一个KisConnector,那么CName就需要传递进来。

    C. 创建KisFuncConfig单元测试

    现在,我们先不针对KisFuncConfig做过多的方法开发,我们优先将现有的程序跑起来,做一个单元测试来测试创建一个KisFuncConfig

    kis-flow/test/kis_config_test.go

    func TestNewFuncConfig(t *testing.T) {
        source := config.KisSource{
            Name: "公众号抖音商城户订单数据",
            Must: []string{"order_id", "user_id"},
        }
    
        option := config.KisFuncOption{
            CName:        "connectorName1",
            RetryTimes:   3,
            RetryDuriton: 300,
    
            Params: config.FParam{
                "param1": "value1",
                "param2": "value2",
            },
        }
    
        myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
    
        log.Logger().InfoF("funcName1: %+v\n", myFunc1)
    }
    

    我们cdkis-flow/test/目录下执行单元测试指令:

    go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig
    

    得到结果如下:

    === RUN   TestNewFuncConfig
    funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:公众号抖音商城户订单数据 Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}
    
    --- PASS: TestNewFuncConfig (0.00s)
    PASS
    ok      kis-flow/test   0.545s
    

    好了,现在最简单的KisFuncConfig的策略创建基本完成了。

    2.2.2 KisFlowConfig 定义

    KisFlowConfig在设计文档中的yaml文件形式如下:

    kistype: flow
    status: 1
    flow_name: MyFlow1
    flows:
      - fname: 测试PrintInput
        params:
          args1: value1
          args2: value2
      - fname: 测试KisFunction_S1
      - fname: 测试PrintInput
        params:
          args1: value11
          args2: value22
          default2: newDefault
      - fname: 测试PrintInput
      - fname: 测试KisFunction_S1
        params:
          my_user_param1: ffffffxxxxxx
      - fname: 测试PrintInput
    

    参数说明:

    A. 结构体定义

    接下来我们根据上述的配置协议,来定义KisFlow的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建kis_flow_config.go文件,在这里我们将需要的Config定义实现。

    kis-flow/config/kis_flow_config.go

    package config
    
    import "kis-flow/common"
    
    // KisFlowFunctionParam 一个Flow配置中Function的Id及携带固定配置参数
    type KisFlowFunctionParam struct {
        FuncName string `yaml:"fname"`  //必须
        Params   FParam `yaml:"params"` //选填,在当前Flow中Function定制固定配置参数
    }
    
    // KisFlowConfig 用户贯穿整条流式计算上下文环境的对象
    type KisFlowConfig struct {
        KisType  string                 `yaml:"kistype"`
        Status   int                    `yaml:"status"`
        FlowName string                 `yaml:"flow_name"`
        Flows    []KisFlowFunctionParam `yaml:"flows"`
    }
    

    这里提供了一个新的参数类型 KisFlowFunctionParam ,这个表示配置KisFlow的时候,在调度的时候,flow默认传递当前被调度Function的自定义默认参数,如果不需要可以不添加此参数。

    B. 相关方法定义

    提供一个新建KisFlowConfig的构造方法。

    kis-flow/config/kis_flow_config.go

    // NewFlowConfig 创建一个Flow策略配置对象, 用于描述一个KisFlow信息
    func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
        config := new(KisFlowConfig)
        config.FlowName = flowName
        config.Flows = make([]KisFlowFunctionParam, 0)
    
        config.Status = int(enable)
    
        return config
    }
    
    // AppendFunctionConfig 添加一个Function Config 到当前Flow中
    func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
        fConfig.Flows = append(fConfig.Flows, params)
    }
    

    有关flow携带的Function配置,这里我们采用通过AppendFunctionConfig动态的去添加,目的是为了,今后可能有关kisflow的配置会从数据库/动态远程配置等中提取,那么就需要动态的将配置组合进来。

    C. KisFlowConfig单元测试

    同样,我们简单些一个单元测试来测试KisFlowConfig的创建。

    kis-flow/test/kis_config_test.go

    
    func TestNewFlowConfig(t *testing.T) {
    
        flowFuncParams1 := config.KisFlowFunctionParam{
            FuncName: "funcName1",
            Params: config.FParam{
                "flowSetFunParam1": "value1",
                "flowSetFunParam2": "value2",
            },
        }
    
        flowFuncParams2 := config.KisFlowFunctionParam{
            FuncName: "funcName2",
            Params: config.FParam{
                "default": "value1",
            },
        }
    
        myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable)
        myFlow1.AppendFunctionConfig(flowFuncParams1)
        myFlow1.AppendFunctionConfig(flowFuncParams2)
    
        log.Logger().InfoF("myFlow1: %+v\n", myFlow1)
    }
    

    我们cdkis-flow/test/目录下执行单元测试指令:

    $ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig
    

    得到结果如下:

    === RUN   TestNewFlowConfig
    myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}
    
    --- PASS: TestNewFlowConfig (0.00s)
    PASS
    ok      kis-flow/test   0.251s
    

    2.2.3 KisConnConfig

    KisConnConfig在设计文档中的yaml文件形式如下:

    kistype: conn
    cname: 测试KisConnector_1
    addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
    type: redis
    key: userid_orderid_option
    params:
      args1: value1
      args2: value2
    load: null
    save:
      - 测试KisFunction_S1
    

    A. 结构体定义

    接下来我们根据上述的配置协议,来定义KisConnector的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建kis_conn_config.go文件,在这里我们将需要的Config定义实现。

    kis-flow/config/kis_conn_config.go

    package config
    
    import (
        "errors"
        "fmt"
        "kis-flow/common"
    )
    
    // KisConnConfig KisConnector 策略配置
    type KisConnConfig struct {
        //配置类型
        KisType string `yaml:"kistype"`
        //唯一描述标识
        CName string `yaml:"cname"`
        //基础存储媒介地址
        AddrString string `yaml:"addrs"`
        //存储媒介引擎类型"Mysql" "Redis" "Kafka"等
        Type common.KisConnType `yaml:"type"`
        //一次存储的标识:如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等
        Key string `yaml:"key"`
        //配置信息中的自定义参数
        Params map[string]string `yaml:"params"`
        //存储读取所绑定的NsFuncionID
        Load []string `yaml:"load"`
        Save []string `yaml:"save"`
    }
    

    B. 相关方法定义

    kis-flow/config/kis_conn_config.go

    // NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息
    func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
        strategy := new(KisConnConfig)
        strategy.CName = cName
        strategy.AddrString = addr
    
        strategy.Type = t
        strategy.Key = key
        strategy.Params = param
    
        return strategy
    }
    
    // WithFunc Connector与Function进行关系绑定
    func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
    
        switch common.KisMode(fConfig.FMode) {
        case common.S:
            cConfig.Save = append(cConfig.Save, fConfig.FName)
        case common.L:
            cConfig.Load = append(cConfig.Load, fConfig.FName)
        default:
            return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
        }
    
        return nil
    }
    

    这里也是通过提供WithFunc方法来动态的添加Conn和Function的关联关系 ###

    C. KisConnConfig 单元测试 同样,我们简单些一个单元测试来测试KisConnConfig的创建。

    kis-flow/test/kis_config_test.go

    func TestNewConnConfig(t *testing.T) {
    
        source := config.KisSource{
            Name: "公众号抖音商城户订单数据",
            Must: []string{"order_id", "user_id"},
        }
    
        option := config.KisFuncOption{
            CName:        "connectorName1",
            RetryTimes:   3,
            RetryDuriton: 300,
    
            Params: config.FParam{
                "param1": "value1",
                "param2": "value2",
            },
        }
    
        myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
    
        connParams := config.FParam{
            "param1": "value1",
            "param2": "value2",
        }
    
        myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
    
        if err := myConnector1.WithFunc(myFunc1); err != nil {
            log.Logger().ErrorF("WithFunc err: %s\n", err.Error())
        }
    
        log.Logger().InfoF("myConnector1: %+v\n", myConnector1)
    }
    

    我们cdkis-fow/test/目录下执行单元测试指令:

    $ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig
    

    得到结果如下:

    === RUN   TestNewConnConfig
    myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}
    
    --- PASS: TestNewConnConfig (0.00s)
    PASS
    ok      kis-flow/test   0.481s
    

    作者:刘丹冰Aceld github: https://github.com/aceld
    KisFlow开源项目地址:https://github.com/aceld/kis-flow


    连载中...
    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

    相关文章

      网友评论

        本文标题:Golang框架实战-KisFlow流式计算框架(2)-项目构建

        本文链接:https://www.haomeiwen.com/subject/grnwadtx.html