KisFlow-Golang流式实时计算案例(二)-Flow并流

作者: 刘丹冰Aceld | 来源:发表于2024-04-22 10:34 被阅读0次

    Golang框架实战-KisFlow流式计算框架专栏

    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度
    Golang框架实战-KisFlow流式计算框架(6)-Connector
    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
    Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
    Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
    Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
    Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
    Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

    案例:
    KisFlow-Golang流式计算案例(一)快速开始QuickStart
    KisFlow-Golang流式计算案例(二)-Flow并流操作
    KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用


    DownLoad kis-flow source

    $go get github.com/aceld/kis-flow
    

    《KisFlow开发者文档》

    案例源代码:
    https://github.com/aceld/kis-flow-usage/tree/main/8-connector

    KisFlow也可以通过Connector来实现两个Flow的拼接。
    通过下面两个Flow的拼接,来介绍有关Connector的接口及使用方式。

    数据流图

    案例介绍

    假设一个学生包括四个属性:

    学生ID:stu_id
    学分1:score_1
    学分2: score_2
    学分3: score_3
    

    定义Flow1:CalStuAvgScore-1-2,计算一个学生的学分1(score_1)和学分2(score_2)的平均分(avg_score_1_2)。
    定义Flow2:CalStuAvgScore-3,计算一个学生的学分3(score_3)、和avg_score_1_2的平均分,也就是学分1、学分2、学分3的平均分,其中学分1、学分2的平均由Flow1提供。

    Flow1流

    Flow1由4个Function组成,其中V(Function:VerifyStu)为校验StuId合法性,C(Function:AvgStuScore12)为计算学分1和学分2的平均分,S(Function:SaveScoreAvg12)为将avg_score_1_2存入Redis,E(Function: PrintStuAvgScore)为打印学分1和学分2平均分结果。

    Flow2流

    Flow2由4个Funciton组成,其中V(Function:VerifyStu)为校验StuId合法性,L(Function:LoadScoreAvg12)为读取Flow1的计算完的当前学生学分1和学分2的平均分avg_score_1_2,C(Function: AvgStuScore3)为通过学分3和学分1、2平均分再次计算三科平均分,E(Function:PrintStuAvgScore)为打印学分1、学分2、学分3的平均分。

    conf/func/func-AvgStuScore-3.yml

    kistype: func
    fname: AvgStuScore3
    fmode: Calculate
    source:
        name: SourceStuScore
        must:
            - stu_id
    
    

    conf/func/func-LoadScoreAvg-1-2.yml

    kistype: func
    fname: LoadScoreAvg12
    fmode: Load
    source:
        name: SourceStuScore
        must:
            - stu_id
    option:
        cname: Score12Cache
    
    

    基础数据协议

    stu_proto.go

    package main
    
    type StuScore1_2 struct {
        StuId  int `json:"stu_id"`
        Score1 int `json:"score_1"`
        Score2 int `json:"score_2"`
    }
    
    type StuScoreAvg struct {
        StuId    int     `json:"stu_id"`
        AvgScore float64 `json:"avg_score"`
    }
    
    type StuScore3 struct {
        StuId      int     `json:"stu_id"`
        AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg
        Score3     int     `json:"score_3"`
    }
    
    

    Connector Init

    本项目中定义的Connector:Score12Cache,是一个关联Redis的链接资源,该Connector需要提供一个初始化办法,用来启动KisFlow时作初始化连接使用。

    conn_init.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/log"
        "github.com/go-redis/redis/v8"
    )
    
    // type ConnInit func(conn Connector) error
    
    func InitScore12Cache(connector kis.Connector) error {
        fmt.Println("===> Call Connector InitScore12Cache")
    
        // init Redis Conn Client
        rdb := redis.NewClient(&redis.Options{
            Addr:     connector.GetConfig().AddrString, // Redis-Server address
            Password: "",                               // password
            DB:       0,                                // select db
        })
    
        // Ping test
        pong, err := rdb.Ping(context.Background()).Result()
        if err != nil {
            log.Logger().ErrorF("Failed to connect to Redis: %v", err)
            return err
        }
        fmt.Println("Connected to Redis:", pong)
    
        // set rdb to connector
        connector.SetMetaData("rdb", rdb)
    
        return nil
    }
    
    

    这里将Redis链接成功的实例存储在connector的缓存变量"rdb"中。

        // set rdb to connector
        connector.SetMetaData("rdb", rdb)
    

    FaaS实现

    Function(V): VerifyStu

    faas_stu_verify.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
    )
    
    type VerifyStuIn struct {
        serialize.DefaultSerialize
        StuId int `json:"stu_id"`
    }
    
    func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
        fmt.Printf("->Call Func VerifyStu\n")
    
        for _, stu := range rows {
            // 过滤掉不合法的数据
            if stu.StuId < 0 || stu.StuId > 999 {
                // 终止当前Flow流程,不会再继续执行当前Flow的后续Function
                return flow.Next(kis.ActionAbort)
            }
        }
    
        return flow.Next(kis.ActionDataReuse)
    }
    
    

    VerifyStu()用来对数据进行校验,如果不满足要求,则中止该本条数据流,最后通过flow.Next(kis.ActionDataReuse)数据重复使用传递给下一曾。

    Function(C): AvgStuScore12

    faas_save_score_avg_1_2.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
    )
    
    type AvgStuScoreIn_1_2 struct {
        serialize.DefaultSerialize
        StuScore1_2
    }
    
    type AvgStuScoreOut_1_2 struct {
        serialize.DefaultSerialize
        StuScoreAvg
    }
    
    func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
        fmt.Printf("->Call Func AvgStuScore12\n")
    
        for _, row := range rows {
    
            out := AvgStuScoreOut_1_2{
                StuScoreAvg: StuScoreAvg{
                    StuId:    row.StuId,
                    AvgScore: float64(row.Score1+row.Score2) / 2,
                },
            }
    
            // 提交结果数据
            _ = flow.CommitRow(out)
        }
    
        return flow.Next()
    }
    
    

    AvgStuScore12()为计算score_1和score_2的平均分,得到计算结果avg_score

    Function(S): SaveScoreAvg12

    faas_save_score_avg_1_2.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
        "github.com/go-redis/redis/v8"
        "strconv"
    )
    
    type SaveStuScoreIn struct {
        serialize.DefaultSerialize
        StuScoreAvg
    }
    
    func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {
    
        var rdb *redis.Client
    
        // Get Redis Client
        rdb = conn.GetMetaData("rdb").(*redis.Client)
    
        // Set data to redis
        pipe := rdb.Pipeline()
    
        for _, score := range rows {
            // make key
            key := conn.GetConfig().Key + strconv.Itoa(score.StuId)
    
            pipe.HMSet(context.Background(), key, map[string]interface{}{
                "avg_score": score.AvgScore,
            })
        }
    
        _, err := pipe.Exec(ctx)
        if err != nil {
            return err
        }
    
        return nil
    }
    
    func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
        fmt.Printf("->Call Func SaveScoreScore12\n")
    
        conn, err := flow.GetConnector()
        if err != nil {
            fmt.Printf("SaveScoreScore12(): GetConnector err = %s\n", err.Error())
            return err
        }
    
        if BatchSetStuScores(ctx, conn, rows) != nil {
            fmt.Printf("SaveScoreScore12(): BatchSetStuScores err = %s\n", err.Error())
            return err
        }
    
        return flow.Next(kis.ActionDataReuse)
    }
    
    

    SaveScoreAvg12()将数据通过绑定的Connector,存入Redis中,使用的Key为Connector配置好的Key。最后将源数据透传给下一次Function。

    Function(E): PrintStuAvgScore

    faas_stu_score_avg_print.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
    )
    
    type PrintStuAvgScoreIn struct {
        serialize.DefaultSerialize
        StuId    int     `json:"stu_id"`
        AvgScore float64 `json:"avg_score"`
    }
    
    func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
        fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName())
    
        for _, row := range rows {
            fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
        }
    
        return flow.Next()
    }
    
    

    PrintStuAvgScore()为打印当前学生的平均分值。

    Function(L): LoadScoreAvg12

    faas_load_score_avg_1_2.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
        "github.com/go-redis/redis/v8"
        "strconv"
    )
    
    type LoadStuScoreIn struct {
        serialize.DefaultSerialize
        StuScore3
    }
    
    type LoadStuScoreOut struct {
        serialize.DefaultSerialize
        StuScore3
    }
    
    func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {
    
        var rdb *redis.Client
    
        // Get Redis Client
        rdb = conn.GetMetaData("rdb").(*redis.Client)
    
        // make key
        key := conn.GetConfig().Key + strconv.Itoa(stuId)
    
        // get data from redis
        result, err := rdb.HGetAll(ctx, key).Result()
        if err != nil {
            return 0, err
        }
    
        // get value
        avgScoreStr, ok := result["avg_score"]
        if !ok {
            return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId)
        }
    
        // parse to float64
        avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
        if err != nil {
            return 0, err
        }
    
        return avgScore, nil
    }
    
    func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
        fmt.Printf("->Call Func LoadScoreAvg12\n")
    
        conn, err := flow.GetConnector()
        if err != nil {
            fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error())
            return err
        }
    
        for _, row := range rows {
            stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
            if err != nil {
                fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error())
                return err
            }
    
            out := LoadStuScoreOut{
                StuScore3: StuScore3{
                    StuId:      row.StuId,
                    Score3:     row.Score3,
                    AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)
                },
            }
    
            // commit result
            _ = flow.CommitRow(out)
        }
    
        return flow.Next()
    }
    
    

    LoadScoreAvg12()为从绑定的Connector中链接资源Redis,通过配置中的Key,读取score_1和score_2的平均分值,然后将上游的源数据加上新读取的score1和score2的平均分发送给下层。

    Function(C): AvgStuScore3

    faas_stu_score_avg_3.go

    package main
    
    import (
        "context"
        "fmt"
        "github.com/aceld/kis-flow/kis"
        "github.com/aceld/kis-flow/serialize"
    )
    
    type AvgStuScore3In struct {
        serialize.DefaultSerialize
        StuScore3
    }
    
    type AvgStuScore3Out struct {
        serialize.DefaultSerialize
        StuScoreAvg
    }
    
    func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
        fmt.Printf("->Call Func AvgStuScore12\n")
    
        for _, row := range rows {
    
            out := AvgStuScore3Out{
                StuScoreAvg: StuScoreAvg{
                    StuId:    row.StuId,
                    AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,
                },
            }
    
            // 提交结果数据
            _ = flow.CommitRow(out)
        }
    
        return flow.Next()
    }
    
    

    AvgStuScore3()将score_3和读取的score_1和score_2的平均分值,再从新计算三个学分的平均分值,得到最终的平均分值avg_score

    Register FaaS & CaaSInit/CaaS(注册Function/Connector)

    main.go

    func init() {
        // Register functions
        kis.Pool().FaaS("VerifyStu", VerifyStu)
        kis.Pool().FaaS("AvgStuScore12", AvgStuScore12)
        kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12)
        kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
        kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12)
        kis.Pool().FaaS("AvgStuScore3", AvgStuScore3)
    
        // Register connectors
        kis.Pool().CaaSInit("Score12Cache", InitScore12Cache)
    }
    
    

    主流程

    main.go

    package main
    
    import (
        "context"
        "github.com/aceld/kis-flow/file"
        "github.com/aceld/kis-flow/kis"
        "sync"
    )
    
    func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {
    
        // Submit a string
        _ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`)
        _ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`)
    
        // Run the flow
        if err := flow.Run(ctx); err != nil {
            return err
        }
    
        return nil
    }
    
    func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {
    
        // Submit a string
        _ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`)
        _ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`)
    
        // Run the flow
        if err := flow.Run(ctx); err != nil {
            return err
        }
    
        return nil
    }
    
    func main() {
        ctx := context.Background()
    
        // Load Configuration from file
        if err := file.ConfigImportYaml("conf/"); err != nil {
            panic(err)
        }
    
        var wg sync.WaitGroup
        wg.Add(2)
    
        go func() {
            // run flow1
            defer wg.Done()
            // Get the flow
            flow1 := kis.Pool().GetFlow("CalStuAvgScore12")
            if flow1 == nil {
                panic("flow1 is nil")
            }
    
            if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
                panic(err)
            }
        }()
    
        go func() {
            defer wg.Done()
            // Get the flow
            flow2 := kis.Pool().GetFlow("CalStuAvgScore3")
            if flow2 == nil {
                panic("flow2 is nil")
            }
    
            // run flow2
            if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
                panic(err)
            }
        }()
    
        wg.Wait()
    
        return
    }
    

    开启两个Goroutine分别执行Flow1和Flow2,来计算学生101和学生102的最终平均分。

    运行结果

    ===> Call Connector InitScore12Cache
    Connected to Redis: PONG
    Add FlowRouter FlowName=CalStuAvgScore12
    ===> Call Connector InitScore12Cache
    Connected to Redis: PONG
    Add FlowRouter FlowName=CalStuAvgScore3
    ->Call Func VerifyStu
    ->Call Func VerifyStu
    ->Call Func AvgStuScore12
    ->Call Func LoadScoreAvg12
    ->Call Func SaveScoreScore12
    ->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
    stuid: [101], avg score: [95]
    stuid: [102], avg score: [90]
    ->Call Func AvgStuScore12
    ->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
    stuid: [101], avg score: [90]
    stuid: [102], avg score: [83.33333333333333]
    

    我们看到在Flow[CalStuAvgScore3]最终算出了学分1、学分2、学分3的平均分数值。


    作者:刘丹冰Aceld github: https://github.com/aceld
    KisFlow开源项目地址:https://github.com/aceld/kis-flow

    Golang框架实战-KisFlow流式计算框架专栏

    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度
    Golang框架实战-KisFlow流式计算框架(6)-Connector
    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
    Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
    Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
    Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
    Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
    Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

    案例:
    KisFlow-Golang流式计算案例(一)快速开始QuickStart
    KisFlow-Golang流式计算案例(二)-Flow并流操作
    KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用

    相关文章

      网友评论

        本文标题:KisFlow-Golang流式实时计算案例(二)-Flow并流

        本文链接:https://www.haomeiwen.com/subject/yievxjtx.html