美文网首页程序员@IT·互联网KisFlow-基于Golang的流式计算框架实战
KisFlow-Golang流式实时计算案例(一)快速开始Qui

KisFlow-Golang流式实时计算案例(一)快速开始Qui

作者: 刘丹冰Aceld | 来源:发表于2024-04-21 15:24 被阅读0次

    Golang框架实战-KisFlow流式计算框架专栏

    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度
    Golang框架实战-KisFlow流式计算框架(6)-Connector
    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
    Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
    Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
    Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
    Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
    Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

    案例:
    KisFlow-Golang流式计算案例(一)快速开始QuickStart
    KisFlow-Golang流式计算案例(二)-Flow并流操作
    KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用


    DownLoad kis-flow source

    $go get github.com/aceld/kis-flow
    

    《KisFlow开发者文档》

    1. KisFlow快速开始(使用配置文件)

    案例源代码: kis-flow-usage/2-quick_start_with_config at main · aceld/kis-flow-usage

    首先我们创建一个项目,项目的文件路径如下:

    项目目录

    ├── Makefile
    ├── conf
    │   ├── flow-CalStuAvgScore.yml
    │   ├── func-AvgStuScore.yml
    │   └── func-PrintStuAvgScore.yml
    ├── faas_stu_score_avg.go
    ├── faas_stu_score_avg_print.go
    └── main.go
    

    Flow

    定义当前的Flow,当前的Flow名称为:"CalStuAvgScore",这是一个计算学生平均分值的数据流。


    定义两个Function,Function1为:Calculate,是计算学生平均分的逻辑,Function2为Expand 为打印最终结果。

    Config

    有关Flow和Function的配置文件如下。

    (1) Flow Config

    conf/flow-CalStuAvgScore.yml

    kistype: flow
    status: 1
    flow_name: CalStuAvgScore
    flows:
     - fname: AvgStuScore
     - fname: PrintStuAvgScore
    

    (2) Function1 Config

    conf/func-AvgStuScore.yml

    kistype: func
    fname: AvgStuScore
    fmode: Calculate
    source:
     name: 学生学分
     must:
     - stu_id
    

    (3) Function2(Slink) Config

    conf/func-PrintStuAvgScore.yml

    kistype: func
    fname: PrintStuAvgScore
    fmode: Expand
    source:
     name: 学生学分
     must:
     - stu_id
    

    Main

    接下来是主逻辑,主要分成三步骤:

    • 加载配置文件,获取Flow实例;
    • 提交数据;
    • 运行Flow。

    main.go

    package main
    
    import (
        "context"
        "fmt"
    
        "github.com/aceld/kis-flow/file"
        "github.com/aceld/kis-flow/kis"
    )
    
    func main() {
        ctx := context.Background()
    
        // Load Configuration from file
        if err := file.ConfigImportYaml("conf/"); err != nil {
            panic(err)
    
        }
    
        // Get the flow
        flow1 := kis.Pool().GetFlow("CalStuAvgScore")
        if flow1 == nil {
            panic("flow1 is nil")
    
        }
    
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
    
        // Run the flow
        if err := flow1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
    
        }
    
        return
    }
    

    Function1

    第一个计算流程的实现逻辑如下, AvgStuScoreIn 为输入数据类型,当前有三个学分,AvgStuScoreOut为输出数据类型,为平均分值。

    faas_stu_score_avg.go

    package main
    import (
        "context"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
    
    )
    type AvgStuScoreIn struct {
        serialize.DefaultSerialize
        StuId  int `json:"stu_id"`
        Score1 int `json:"score_1"`
        Score2 int `json:"score_2"`
        Score3 int `json:"score_3"`
    
    }
    type AvgStuScoreOut struct {
        serialize.DefaultSerialize
        StuId    int     `json:"stu_id"`
        AvgScore float64 `json:"avg_score"`
    
    // AvgStuScore(FaaS) 计算学生平均分
    func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    
        for _, row := range rows {
            out := AvgStuScoreOut{
                StuId:    row.StuId,
                AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
            }
            // 提交结果数据
            _ = flow.CommitRow(out)
    
    
        }
        return nil
    }
    

    Function2

    打印的计算逻辑为直接打印数据即可,如下。

    faas_stu_score_avg_print.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
    
    )
    
    type PrintStuAvgScoreIn struct {
        serialize.DefaultSerialize
        StuId    int     `json:"stu_id"`
        AvgScore float64 `json:"avg_score"`
    }
    
    type PrintStuAvgScoreOut struct {
        serialize.DefaultSerialize
    }
    
    func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
        for _, row := range rows {
            fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    
        }
        return nil
    }
    

    OutPut

    最后运行程序,得到结果如下:

    Add KisPool FuncName=AvgStuScore
    Add KisPool FuncName=PrintStuAvgScore
    Add FlowRouter FlowName=CalStuAvgScore
    stuid: [101], avg score: [90]
    stuid: [102], avg score: [76.66666666666667]</pre>
    

    2. KisFlow快速开始(使用原生接口,动态配置)

    案例源代码: kis-flow-usage/1-quick_start at main · aceld/kis-flow-usage

    项目目录

    ├── faas_stu_score_avg.go
    ├── faas_stu_score_avg_print.go
    └── main.go
    

    Flow

    Main

    main.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/common"
        "github.com/aceld/kis-flow/config"
        "github.com/aceld/kis-flow/flow"
        "github.com/aceld/kis-flow/kis"
    )
    
    func main() {
        ctx := context.Background()
    
        // Create a new flow configuration
        myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)
    
        // Create new function configuration
        avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
        printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)
    
        // Create a new flow
        flow1 := flow.NewKisFlow(myFlowConfig1)
    
        // Link functions to the flow
        _ = flow1.Link(avgStuScoreConfig, nil)
        _ = flow1.Link(printStuScoreConfig, nil)
    
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
    
        // Run the flow
        if err := flow1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    
        return
    }
    
    func init() {
        // Register functions
        kis.Pool().FaaS("AvgStuScore", AvgStuScore)
        kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
    }
    

    Function1

    faas_stu_score_avg.go

    package main
    
    import (
        "context"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
    )
    
    type AvgStuScoreIn struct {
        serialize.DefaultSerialize
        StuId  int `json:"stu_id"`
        Score1 int `json:"score_1"`
        Score2 int `json:"score_2"`
        Score3 int `json:"score_3"`
    }
    
    type AvgStuScoreOut struct {
        serialize.DefaultSerialize
        StuId    int     `json:"stu_id"`
        AvgScore float64 `json:"avg_score"`
    }
    
    // AvgStuScore(FaaS) 计算学生平均分
    func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
        for _, row := range rows {
    
            out := AvgStuScoreOut{
                StuId:    row.StuId,
                AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
            }
    
            // 提交结果数据
            _ = flow.CommitRow(out)
        }
    
        return nil
    }
    

    Function2

    faas_stu_score_avg_print.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
    )
    
    type PrintStuAvgScoreIn struct {
        serialize.DefaultSerialize
        StuId    int     `json:"stu_id"`
        AvgScore float64 `json:"avg_score"`
    }
    
    type PrintStuAvgScoreOut struct {
        serialize.DefaultSerialize
    }
    
    func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    
        for _, row := range rows {
            fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
        }
    
        return nil
    }
    

    OutPut

    Add KisPool FuncName=AvgStuScore
    Add KisPool FuncName=PrintStuAvgScore
    funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source.
    funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source.
    stuid: [101], avg score: [90]
    stuid: [102], avg score: [76.66666666666667]
    

    作者:刘丹冰Aceld github: https://github.com/aceld
    KisFlow开源项目地址:https://github.com/aceld/kis-flow

    Golang框架实战-KisFlow流式计算框架专栏

    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度
    Golang框架实战-KisFlow流式计算框架(6)-Connector
    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
    Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
    Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
    Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
    Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型
    Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

    案例:
    KisFlow-Golang流式计算案例(一)快速开始QuickStart
    KisFlow-Golang流式计算案例(二)-Flow并流操作
    KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用

    相关文章

      网友评论

        本文标题:KisFlow-Golang流式实时计算案例(一)快速开始Qui

        本文链接:https://www.haomeiwen.com/subject/wpfvxjtx.html