美文网首页KisFlow-基于Golang的流式计算框架实战@IT·互联网
Golang框架实战-KisFlow流式计算框架(11)-Pro

Golang框架实战-KisFlow流式计算框架(11)-Pro

作者: 刘丹冰Aceld | 来源:发表于2024-03-14 20:34 被阅读0次

    Golang框架实战-KisFlow流式计算框架专栏

    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度
    Golang框架实战-KisFlow流式计算框架(6)-Connector
    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
    Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
    Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
    Golang框架实战-KisFlow流式计算框架(10)-Flow多副本


    在介绍本章之前,我们先普及一下Prometheus Metrics的服务启动办法。 有关Prometheus是个什么东东,希望大家可以去额外补充下知识,我用一句大白话来解释就是,系统的监控指标。

    那么KisFlow作为流式计算框架,那么有关每个Function的调度时间、总体的数据量、算法速度等等指标可能也是项目中或者开发者所要关注的一些数据,那么这些数据,经过KisFlow,可以通过Prometheus的Metrics打点来进行记录。

    所以接下来我们可以通过全局的配置,开发者可以选择是否开启Prometheus的数据埋点能力。

    10.1 Prometheus Metrics服务

    10.1.1 prometheus client sdk

    首先在kis-flow/go.mod中,新增require:

    module kis-flow
    
    go 1.18
    
    require (
        github.com/google/uuid v1.5.0
        github.com/patrickmn/go-cache v2.1.0+incompatible
        github.com/prometheus/client_golang v1.14.0  //++++++++
        gopkg.in/yaml.v3 v3.0.1
    )
    

    这里我们采用prometheus的官方Golang客户端SDK。https://github.com/prometheus/client_golang 有关具体的介绍,参考官方的README文档:https://github.com/prometheus/client_golang/blob/main/README.md

    其次,我们先简单的写一个prometheus的服务,可以让外界得到KisFlow服务的相关指标数据等。 在kis-flow/下新建kis-flow/metrics/目录,作为KisFlow统计指标的代码部分。

    kis-flow/metrics/kis_metrics.go

    package metrics
    
    import (
        "github.com/prometheus/client_golang/prometheus/promhttp"
        "kis-flow/common"
        "kis-flow/log"
        "net/http"
    )
    
    
    // RunMetricsService 启动Prometheus监控服务
    func RunMetricsService(serverAddr string) error {
    
        // 注册Prometheus 监控路由路径
        http.Handle(common.METRICS_ROUTE, promhttp.Handler())
    
        // 启动HttpServer
        err := http.ListenAndServe(serverAddr, nil) //多个进程不可监听同一个端口
        if err != nil {
            log.Logger().ErrorF("RunMetricsService err = %s\n", err)
        }
    
        return err
    }
    

    其中METRICS_ROUTE作为监控服务的http路由路径,定义在kis-flow/common/const.go中:
    如下:

    kis-flow/common/const.go

    // ... ...
    
    // metrics
    const (
        METRICS_ROUTE string = "/metrics"
    )
    
    // ... ...
    
    

    接下来来简单说明下上述的代码,RunMetricsService() 是启动prometheus监控的http服务代码,为什么要启动这个服务,目的是,我们可以通过http的请求来获取kisflow目前进程的运行指标,那么都有哪些指标,现在我们还没有进行统计,prometheus会默认提供当前进程的go版本号、gc垃圾回收时间、内存分配等等基础统计指标信息。

    • serverAddr参数: 这个作为prometheus监控的地址,一般是本地地址加上一个端口号:如"0.0.0.0:20004"。
    http.Handle(common.METRICS_ROUTE, promhttp.Handler())
    

    这行代码则表示,"0.0.0.0:20004/metrics" 为获取指标入口。

    上述代码写完之后,别忘了拉去一下https://github.com/prometheus/client_golang 相关的依赖包。

    $ go mod tidy
    

    拉取之后,当前的go.mod 的依赖大致如下(会有版本号的区别):

    kis-flow/go.mod

    module kis-flow
    
    go 1.18
    
    require (
        github.com/google/uuid v1.5.0
        github.com/patrickmn/go-cache v2.1.0+incompatible
        github.com/prometheus/client_golang v1.14.0
        gopkg.in/yaml.v3 v3.0.1
    )
    
    require (
        github.com/beorn7/perks v1.0.1 // indirect
        github.com/cespare/xxhash/v2 v2.1.2 // indirect
        github.com/golang/protobuf v1.5.2 // indirect
        github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
        github.com/prometheus/client_model v0.3.0 // indirect
        github.com/prometheus/common v0.37.0 // indirect
        github.com/prometheus/procfs v0.8.0 // indirect
        golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
        google.golang.org/protobuf v1.28.1 // indirect
    )
    

    10.1.2 prometheus server 服务启动单元测试

    接下来来简单测试下服务是否可以启动。

    kis-flow/test/下创建prometheus_server_test.go文件:

    kis-flow/test/prometheus_server_test.go

    package test
    
    import (
        "kis-flow/metrics"
        "testing"
    )
    
    func TestPrometheusServer(t *testing.T) {
    
        err := metrics.RunMetricsService("0.0.0.0:20004")
        if err != nil {
            panic(err)
        }
    }
    

    这里的监控地址为"0.0.0.0:20004"。接下来来启动本单元测试用例,打开一个终端A,cdkis-flow/test/目录下:

     $ go test -test.v -test.paniconexit0 -test.run TestPrometheusServer
     === RUN   TestPrometheusServer
     
    

    然后打开另一个终端B,输入如下指令,模拟http客户端进行请求:

     $ curl http://0.0.0.0:20004/metrics
    
    

    之后我们在终端B得到监控指标的结果如下:

    # HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
    # TYPE go_gc_duration_seconds summary
    go_gc_duration_seconds{quantile="0"} 0
    go_gc_duration_seconds{quantile="0.25"} 0
    go_gc_duration_seconds{quantile="0.5"} 0
    go_gc_duration_seconds{quantile="0.75"} 0
    go_gc_duration_seconds{quantile="1"} 0
    go_gc_duration_seconds_sum 0
    go_gc_duration_seconds_count 0
    # HELP go_goroutines Number of goroutines that currently exist.
    # TYPE go_goroutines gauge
    go_goroutines 8
    # HELP go_info Information about the Go environment.
    # TYPE go_info gauge
    go_info{version="go1.18.8"} 1
    # HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
    # TYPE go_memstats_alloc_bytes gauge
    go_memstats_alloc_bytes 3.2364e+06
    # HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
    # TYPE go_memstats_alloc_bytes_total counter
    go_memstats_alloc_bytes_total 3.2364e+06
    # HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
    # TYPE go_memstats_buck_hash_sys_bytes gauge
    go_memstats_buck_hash_sys_bytes 1.446507e+06
    # HELP go_memstats_frees_total Total number of frees.
    # TYPE go_memstats_frees_total counter
    go_memstats_frees_total 0
    # HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
    # TYPE go_memstats_gc_sys_bytes gauge
    go_memstats_gc_sys_bytes 3.561224e+06
    # HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
    # TYPE go_memstats_heap_alloc_bytes gauge
    go_memstats_heap_alloc_bytes 3.2364e+06
    # HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
    # TYPE go_memstats_heap_idle_bytes gauge
    go_memstats_heap_idle_bytes 4.636672e+06
    # HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
    # TYPE go_memstats_heap_inuse_bytes gauge
    go_memstats_heap_inuse_bytes 3.260416e+06
    # HELP go_memstats_heap_objects Number of allocated objects.
    # TYPE go_memstats_heap_objects gauge
    go_memstats_heap_objects 21294
    # HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
    # TYPE go_memstats_heap_released_bytes gauge
    go_memstats_heap_released_bytes 4.636672e+06
    # HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
    # TYPE go_memstats_heap_sys_bytes gauge
    go_memstats_heap_sys_bytes 7.897088e+06
    # HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
    # TYPE go_memstats_last_gc_time_seconds gauge
    go_memstats_last_gc_time_seconds 0
    # HELP go_memstats_lookups_total Total number of pointer lookups.
    # TYPE go_memstats_lookups_total counter
    go_memstats_lookups_total 0
    # HELP go_memstats_mallocs_total Total number of mallocs.
    # TYPE go_memstats_mallocs_total counter
    go_memstats_mallocs_total 21294
    # HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures.
    # TYPE go_memstats_mcache_inuse_bytes gauge
    go_memstats_mcache_inuse_bytes 9600
    # HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system.
    # TYPE go_memstats_mcache_sys_bytes gauge
    go_memstats_mcache_sys_bytes 15600
    # HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures.
    # TYPE go_memstats_mspan_inuse_bytes gauge
    go_memstats_mspan_inuse_bytes 46376
    # HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system.
    # TYPE go_memstats_mspan_sys_bytes gauge
    go_memstats_mspan_sys_bytes 48960
    # HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place.
    # TYPE go_memstats_next_gc_bytes gauge
    go_memstats_next_gc_bytes 4.194304e+06
    # HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations.
    # TYPE go_memstats_other_sys_bytes gauge
    go_memstats_other_sys_bytes 1.171301e+06
    # HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator.
    # TYPE go_memstats_stack_inuse_bytes gauge
    go_memstats_stack_inuse_bytes 491520
    # HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator.
    # TYPE go_memstats_stack_sys_bytes gauge
    go_memstats_stack_sys_bytes 491520
    # HELP go_memstats_sys_bytes Number of bytes obtained from system.
    # TYPE go_memstats_sys_bytes gauge
    go_memstats_sys_bytes 1.46322e+07
    # HELP go_threads Number of OS threads created.
    # TYPE go_threads gauge
    go_threads 7
    # HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served.
    # TYPE promhttp_metric_handler_requests_in_flight gauge
    promhttp_metric_handler_requests_in_flight 1
    # HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
    # TYPE promhttp_metric_handler_requests_total counter
    promhttp_metric_handler_requests_total{code="200"} 1
    promhttp_metric_handler_requests_total{code="500"} 0
    promhttp_metric_handler_requests_total{code="503"} 0
    

    我们已经给KisFlow提供了Function,Flow,Connector等的配置是通过kistype来区分的。
    接下来我们来实现kistype等于global全局配置,在这个配置里,我们来设置是否启动Prometheus和Metrics统计的开关。
    接下来给KisFlow加上全局配置文件的属性加载。

    10.2 KisFlow全局配置

    10.2.1 全局配置文件加载

    全局配置的yaml的格式如下:

    #kistype Global为kisflow的全局配置
    kistype: global
    #是否启动prometheus监控
    prometheus_enable: true
    #是否需要kisflow单独启动端口监听
    prometheus_listen: true
    #prometheus取点监听地址
    prometheus_serve: 0.0.0.0:20004
    

    10.2.2 结构体定义

    接下来我们根据上述的配置协议,来定义KisFlow的策略配置结构体,并且提供一些响应的初始化方法。
    我们在项目文档中创建kis_global_config.go文件,在这里我们将需要的Config定义实现。

    kis-flow/config/kis_global_config.go

    package config
    
    type KisGlobalConfig struct {
        //kistype Global为kisflow的全局配置
        KisType string `yaml:"kistype"`
        //是否启动prometheus监控
        EnableProm bool `yaml:"prometheus_enable"`
        //是否需要kisflow单独启动端口监听
        PrometheusListen bool `yaml:"prometheus_listen"`
        //prometheus取点监听地址
        PrometheusServe string `yaml:"prometheus_serve"`
    }
    
    // GlobalConfig 默认全局配置,全部均为关闭
    var GlobalConfig = new(KisGlobalConfig)
    

    这里提供了一个全局的GlobalConfig对象,并且是公有变量,方便其他模块共享全局配置。

    10.2.3 配置文件解析

    接下来,针对全局配置做做导入配置的解析,在kif-flow/flie/config_import.go中,添加如下函数:

    kis-flow/file/config_import.go

    // kisTypeGlobalConfigure 解析Global配置文件,yaml格式
    func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error {
        // 全局配置
        if ok := yaml.Unmarshal(confData, config.GlobalConfig); ok != nil {
            return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
        }
    
        // TODO 初始化Prometheus指标
    
    
        // TODO 启动Prometheus指标Metrics服务
    
        return nil
    }
    

    这里加载全局的yaml配置文件,加载之后,判断是否要启动初始化Prometheus指标监控,这个稍后我们再添加。

    那么kisTypeGlobalConfigure()在哪里被调度,和其他的配置文件一样,在加载扫描本地配置文件的时候,被调度即可,如下:

    kis-flow/file/config_import.go

    // parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中
    func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
        // ... ...
    
        err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
            // ... ... 
    
            // 判断kisType是否存在
            if kisType, ok := confMap["kistype"]; !ok {
                return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))
            } else {
                switch kisType {
                case common.KisIdTypeFlow:
                    return kisTypeFlowConfigure(all, confData, filePath, kisType)
    
                case common.KisIdTypeFunction:
                    return kisTypeFuncConfigure(all, confData, filePath, kisType)
    
                case common.KisIdTypeConnnector:
                    return kisTypeConnConfigure(all, confData, filePath, kisType)
    
                // +++++++++++++++++++++++++++++++++
                case common.KisIdTypeGlobal:
                    return kisTypeGlobalConfigure(confData, filePath, kisType)
                // +++++++++++++++++++++++++++++++++
    
                default:
                    return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
                }
            }
        })
    
        if err != nil {
            return nil, err
        }
    
        return all, nil
    }
    

    在这里,我们增加kistype的Case:KisIdTypeGlobal来调用kisTypeGlobalConfigure()

    接下来,我们来创建Mertrics统计模块,本节先统计一个最简单的指标,KisFlow当前处理过的数据总量(以处理的源数据数量为准)

    10.3 Metrics统计指标-DataTotal全量数据

    10.3.1 KisMertics

    首先创建一个KisMrtics模块,创建目录kis-flow/metrics/,并且创建文件kis_metrics.go文件:

    kis-flow/metrics/kis_metrics.go

    package metrics
    
    import (
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promhttp"
        "kis-flow/common"
        "kis-flow/log"
        "net/http"
    )
    
    // kisMetrics kisFlow的Prometheus监控指标
    type kisMetrics struct {
        //数据数量总量
        DataTotal prometheus.Counter
    }
    
    var Metrics *kisMetrics
    
    // RunMetricsService 启动Prometheus监控服务
    func RunMetricsService(serverAddr string) error {
    
        // 注册Prometheus 监控路由路径
        http.Handle(common.METRICS_ROUTE, promhttp.Handler())
    
        // 启动HttpServer
        err := http.ListenAndServe(serverAddr, nil) //多个进程不可监听同一个端口
        if err != nil {
            log.Logger().ErrorF("RunMetricsService err = %s\n", err)
        }
    
        return err
    }
    
    func InitMetrics() {
        Metrics = new(kisMetrics)
    
        // DataTotal初始化Counter
        Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
            Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
            Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
        })
    
        // 注册Metrics
        prometheus.MustRegister(Metrics.DataTotal)
    }
    
    
    • kisMetrics struct: 为KisFlow的需要统计的指标,目前只有一个统计指标DataTotal,类型为prometheus.Counter(有关prometheus.Counter类型的说明请参考:有关prometheus.Counter指标的概念)
    • Metrics *kisMetrics:是KisFlow一个全局的指标统计对象,公有,方便其他模块获取。
    • RunMetricsService(serverAddr string): 为启动prometheus服务监听,在之前的章节我们已经对这部分做了单元测试。
    • InitMetrics(): 为初始化全局对象和一些指标的初始化,最后需要调用prometheus.MustRegister将指标注册到prometheus中,这是prometheus统计编程的必要基本过程。

    这里面有两个常量,分别表示指标显示名称和含义,我们定义在下面:

    kis-flow/common/const.go

    // metrics
    const (
        METRICS_ROUTE string = "/metrics"
    
        COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
        COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
    )
    
    

    10.3.2 DataTotal指标统计

    作为KisFlow的处理全量数据,我们只需要在commitSrcData ()方法中统计就可以了,commitSrcData() 提交当前Flow的数据源数据, 表示首次提交当前Flow的原始数据源 ,表示数据首次进入KisFlow中,下面我们添加代码如下:

    kis-flow/flow/kis_flow_data.go

    func (flow *KisFlow) commitSrcData(ctx context.Context) error {
    
        // 制作批量数据batch
        dataCnt := len(flow.buffer)
        batch := make(common.KisRowArr, 0, dataCnt)
    
        for _, row := range flow.buffer {
            batch = append(batch, row)
        }
    
        // 清空之前所有数据
        flow.clearData(flow.data)
    
        // 首次提交,记录flow原始数据
        // 因为首次提交,所以PrevFunctionId为FirstVirtual 因为没有上一层Function
        flow.data[common.FunctionIdFirstVirtual] = batch
    
        // 清空缓冲Buf
        flow.buffer = flow.buffer[0:0]
    
        // +++++++++++++++++++++++++++++++
        // 首次提交数据源数据,进行统计数据总量
        if config.GlobalConfig.EnableProm == true {
            // 统计数据总量 Metrics.DataTota 指标累计加1
            metrics.Metrics.DataTotal.Add(float64(dataCnt))
        }
        // ++++++++++++++++++++++++++++++
    
        log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
    
        return nil
    }
    

    先根据全局配置判断是否统计指标,如果为true则,通过下面代码来对metrics的全量数据进行统计:

            metrics.Metrics.DataTotal.Add(float64(dataCnt))
    

    dataCnt为累计增加的数量。

    10.3.3 Metrics启动

    在导入配置之后,我们需要启动metrics服务,调度如下:

    kis-flow/file/config_import.go

    // kisTypeGlobalConfigure 解析Global配置文件,yaml格式
    func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error {
        // 全局配置
        if ok := yaml.Unmarshal(confData, config.GlobalConfig); ok != nil {
            return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
        }
    
        // ++++++++++++++++++++
        // 启动Metrics服务
        metrics.RunMetrics()
    
        return nil
    }
    

    其中RunMetrics()是实现如下:

    kis-flow/metrics/kis_metrics.go

    // RunMetrics 启动Prometheus指标服务
    func RunMetrics() {
        // 初始化Prometheus指标
        InitMetrics()
    
        if config.GlobalConfig.EnableProm == true && config.GlobalConfig.PrometheusListen == true {
            // 启动Prometheus指标Metrics服务
            go RunMetricsService(config.GlobalConfig.PrometheusServe)
        }
    }
    
    

    这样,在导入全局配置后,看是否开启统计,如果统计,我们就会开一个协程来启动PrometheusServe,监听的ip和端口会在配置文件里进行配置。

    接下来我们先对DataTotal指标做一个单元测试,来进行验证。

    10.4 KisMetrics单元测试

    10.4.1 全局配置文件创建

    创建一个全局配置文件kis-flow.ymlkis-flow/test/load_conf/下,内容如下:

    kis-flow/test/load_conf/kis-flow.yml

    #kistype Global为kisflow的全局配置
    kistype: global
    #是否启动prometheus监控
    prometheus_enable: true
    #是否需要kisflow单独启动端口监听
    prometheus_listen: true
    #prometheus取点监听地址
    prometheus_serve: 0.0.0.0:20004
    

    10.4.2 新建单元测试

    接下来创建测试用例代码,在kis-flow/test/下,创建kis_metrics_test.go文件,如下:

    kis-flow/test/kis_metrics_test.go

    package test
    
    import (
        "context"
        "kis-flow/common"
        "kis-flow/file"
        "kis-flow/kis"
        "kis-flow/test/caas"
        "kis-flow/test/faas"
        "testing"
        "time"
    )
    
    func TestMetricsDataTotal(t *testing.T) {
        ctx := context.Background()
    
        // 0. 注册Function 回调业务
        kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
        kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
        kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
    
        // 0. 注册ConnectorInit 和 Connector 回调业务
        kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
        kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
    
        // 1. 加载配置文件并构建Flow
        if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
            panic(err)
        }
    
        // 2. 获取Flow
        flow1 := kis.Pool().GetFlow("flowName1")
        
        n := 0
    
        for n < 10 {
            // 3. 提交原始数据
            _ = flow1.CommitRow("This is Data1 from Test")
    
            // 4. 执行flow1
            if err := flow1.Run(ctx); err != nil {
                panic(err)
            }
    
            time.Sleep(1 * time.Second)
            n++
        }
        
        select {}
    }
    

    这个Case和我们一般启动KisFlow一样,只不过,这里面会出现一个for循环,每割1秒回启动一次流式计算,并且提交一条数据,一共循环10次,之后我们可以通过prometheus的监听服务来查看数据的总量。
    最后加select{}的目的是为了防止主协程退出,导致prometheus的监听子协程连同退出。

    执行单元测试,cd到kis-flow/test/下,执行:

    go test -test.v -test.paniconexit0 -test.run TestMetricsDataTotal
    

    会得到很多日志输出,我们等待10s,之后再开启一个终端,输入如下指令:

     $ curl http://0.0.0.0:20004/metrics 
    

    得到如下结果:

    # ... ...
    # HELP kisflow_data_total KisFlow全部Flow的数据总量
    # TYPE kisflow_data_total counter
    kisflow_data_total 10
    # ... ...
    
    

    其中我们会发现,kisflow_data_total指标已经出现,结果是10,说明我们的metrics指标已经可以统计了,那么接下来我们就可以再基于这个逻辑,新增一些其他KisFlow需要关心的比较复杂的指标。

    接下来我们来统计其他的一些关键指标,包括:Flow处理数据总量、Flow被调度次数、Function被调度次数、Flow执行时间、Function执行时间等。

    10.5 Metrics统计指标-其他统计指标

    10.5.1 指标:Flow处理数据总量

    (1)指标定义

    首先定义指标类型,如下:

    kis-flow/metrics/kis_metrics.go

    // kisMetrics kisFlow的Prometheus监控指标
    type kisMetrics struct {
        // 数据数量总量
        DataTotal prometheus.Counter
        // 各Flow处理数据总量
        FlowDataTotal *prometheus.GaugeVec
    }
    

    FlowDataTotal 采用 prometheus.GaugeVec类型。主要是为了区分是哪个Flow产生的数据。

    (2)指标初始化及注册

    kis-flow/metrics/kis_metrics.go

    func InitMetrics() {
        Metrics = new(kisMetrics)
    
        // DataTotal初始化Counter
        Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
            Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
            Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
        })
    
        // +++++++++++
        // FlowDataTotal初始化GaugeVec
        Metrics.FlowDataTotal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
                Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
            },
            // 标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // 注册Metrics
        prometheus.MustRegister(Metrics.DataTotal)
        prometheus.MustRegister(Metrics.FlowDataTotal) // +++
    }
    

    相关常量定义:

    kis-flow/common/const.go

    
    // metrics
    const (
        METRICS_ROUTE string = "/metrics"
    
        // ++++++++
        LABEL_FLOW_NAME     string = "flow_name"
        LABEL_FLOW_ID       string = "flow_id"
        LABEL_FUNCTION_NAME string = "func_name"
        LABEL_FUNCTION_MODE string = "func_mode"
    
        COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
        COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
    
        // +++++++ 
        GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
        GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
    )
    
    

    (3)统计指标埋点

    作为flow的数据总量,我们应该在数据每次提交源数据的时候进行统计即可。

    kis-flow/flow/kis_flow_data.go

    func (flow *KisFlow) commitSrcData(ctx context.Context) error {
    
        // 制作批量数据batch
        dataCnt := len(flow.buffer)
        batch := make(common.KisRowArr, 0, dataCnt)
    
        for _, row := range flow.buffer {
            batch = append(batch, row)
        }
    
        // 清空之前所有数据
        flow.clearData(flow.data)
    
        // 首次提交,记录flow原始数据
        // 因为首次提交,所以PrevFunctionId为FirstVirtual 因为没有上一层Function
        flow.data[common.FunctionIdFirstVirtual] = batch
    
        // 清空缓冲Buf
        flow.buffer = flow.buffer[0:0]
    
        // 首次提交数据源数据,进行统计数据总量
        if config.GlobalConfig.EnableProm == true {
            // 统计数据总量 Metrics.DataTota 指标累计加1
            metrics.Metrics.DataTotal.Add(float64(dataCnt))
    
            // ++++++++
            //统计当前Flow数量指标
            metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
        }
    
        log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
    
        return nil
    }
    

    所以埋点的位置这之前的统计从数据量埋点的位置一样,只不过在累加数据的时候,加上flow.Name标签。

    10.5.2 指标:Flow被调度次数

    (1)指标定义

    首先定义指标类型,如下:

    kis-flow/metrics/kis_metrics.go

    // kisMetrics kisFlow的Prometheus监控指标
    type kisMetrics struct {
        // 数据数量总量
        DataTotal prometheus.Counter
        // 各Flow处理数据总量
        FlowDataTotal *prometheus.GaugeVec
        // Flow被调度次数
        FlowScheduleCntsToTal *prometheus.GaugeVec //++++
    }
    

    FlowScheduleCntsToTal 采用 prometheus.GaugeVec类型。主要是为了区分是哪个Flow产生的数据。

    (2)指标初始化及注册

    kis-flow/metrics/kis_metrics.go

    func InitMetrics() {
        Metrics = new(kisMetrics)
    
        // DataTotal初始化Counter
        Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
            Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
            Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
        })
    
        // FlowDataTotal初始化GaugeVec
        Metrics.FlowDataTotal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
                Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
            },
            // 标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // +++++++++++++
        // FlowScheduleCntsToTal初始化GaugeVec
        Metrics.FlowScheduleCntsToTal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_SCHE_CNTS_NAME,
                Help: common.GANGE_FLOW_SCHE_CNTS_HELP,
            },
            //标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // 注册Metrics
        prometheus.MustRegister(Metrics.DataTotal)
        prometheus.MustRegister(Metrics.FlowDataTotal) 
        // +++++
        prometheus.MustRegister(Metrics.FlowScheduleCntsToTal)
    }
    

    相关常量定义:

    kis-flow/common/const.go

    
    // metrics
    const (
        METRICS_ROUTE string = "/metrics"
    
        LABEL_FLOW_NAME     string = "flow_name"
        LABEL_FLOW_ID       string = "flow_id"
        LABEL_FUNCTION_NAME string = "func_name"
        LABEL_FUNCTION_MODE string = "func_mode"
    
        COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
        COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
    
        GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
        GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
    
        // +++++++
        GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
        GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数"
    )
    
    

    (3)统计指标埋点

    如果统计每个Flow的调度次数,我们应该在启动Flow的主入口flow.Run()进行统计,如下:

    kis-flow/flow/kis_flow.go

    // Run 启动KisFlow的流式计算, 从起始Function开始执行流
    func (flow *KisFlow) Run(ctx context.Context) error {
    
        var fn kis.Function
    
        fn = flow.FlowHead
        flow.abort = false
    
        if flow.Conf.Status == int(common.FlowDisable) {
            // flow被配置关闭
            return nil
        }
    
        // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
        flow.PrevFunctionId = common.FunctionIdFirstVirtual
    
        // 提交数据流原始数据
        if err := flow.commitSrcData(ctx); err != nil {
            return err
        }
    
        // +++++++++++ Metrics
        if config.GlobalConfig.EnableProm == true {
            // 统计Flow的调度次数
            metrics.Metrics.FlowScheduleCntsToTal.WithLabelValues(flow.Name).Inc()
        }
        // ++++++++++++++++++++
    
        // 流式链式调用
        for fn != nil && flow.abort == false {
    
            // flow记录当前执行到的Function 标记
            fid := fn.GetId()
            flow.ThisFunction = fn
            flow.ThisFunctionId = fid
    
            // 得到当前Function要处理与的源数据
            if inputData, err := flow.getCurData(); err != nil {
                log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
                return err
            } else {
                flow.inPut = inputData
            }
    
            if err := fn.Call(ctx, flow); err != nil {
                // Error
                return err
            } else {
                // Success
                fn, err = flow.dealAction(ctx, fn)
                if err != nil {
                    return err
                }
    
            }
        }
    
        return nil
    }
    

    所以埋点的位置这之前的统计从数据量埋点的位置一样,只不过在累加数据的时候,加上flow.Name标签。

    10.5.3 指标:Function被调度次数

    (1)指标定义

    首先定义指标类型,如下:

    kis-flow/metrics/kis_metrics.go

    // kisMetrics kisFlow的Prometheus监控指标
    type kisMetrics struct {
        // 数据数量总量
        DataTotal prometheus.Counter
        // 各Flow处理数据总量
        FlowDataTotal *prometheus.GaugeVec
        // Flow被调度次数
        FlowScheduleCntsToTal *prometheus.GaugeVec 
        // Function被调度次数
        FuncScheduleCntsTotal *prometheus.GaugeVec //++++
    }
    

    FuncScheduleCntsTotal 采用 prometheus.GaugeVec类型。主要是为了区分是哪个Function产生的数据。

    (2)指标初始化及注册

    kis-flow/metrics/kis_metrics.go

    func InitMetrics() {
        Metrics = new(kisMetrics)
    
        // DataTotal初始化Counter
        Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
            Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
            Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
        })
    
        // FlowDataTotal初始化GaugeVec
        Metrics.FlowDataTotal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
                Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
            },
            // 标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // FlowScheduleCntsToTal初始化GaugeVec
        Metrics.FlowScheduleCntsToTal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_SCHE_CNTS_NAME,
                Help: common.GANGE_FLOW_SCHE_CNTS_HELP,
            },
            //标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // ++++++++++
        // FuncScheduleCntsTotal初始化GaugeVec
        Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FUNC_SCHE_CNTS_NAME,
                Help: common.GANGE_FUNC_SCHE_CNTS_HELP,
            },
            //标签名称
            []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
        )
    
        // 注册Metrics
        prometheus.MustRegister(Metrics.DataTotal)
        prometheus.MustRegister(Metrics.FlowDataTotal) 
        prometheus.MustRegister(Metrics.FlowScheduleCntsToTal)
        // +++++++
        prometheus.MustRegister(Metrics.FuncScheduleCntsTotal)
    }
    

    相关常量定义:

    kis-flow/common/const.go

    
    // metrics
    const (
        METRICS_ROUTE string = "/metrics"
    
        LABEL_FLOW_NAME     string = "flow_name"
        LABEL_FLOW_ID       string = "flow_id"
        LABEL_FUNCTION_NAME string = "func_name"
        LABEL_FUNCTION_MODE string = "func_mode"
    
        COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
        COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
    
        GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
        GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
    
        GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
        GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数"
    
        // +++++++++ 
        GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
        GANGE_FUNC_SCHE_CNTS_HELP string = "KisFlow各个Function被调度的次数"
    )
    
    

    (3)统计指标埋点

    如果统计每个Function的调度次数,我们应该在启动Flow的主入口flow.Run()进行统计,如下:

    kis-flow/flow/kis_flow.go

    // Run 启动KisFlow的流式计算, 从起始Function开始执行流
    func (flow *KisFlow) Run(ctx context.Context) error {
    
        var fn kis.Function
    
        fn = flow.FlowHead
        flow.abort = false
    
        if flow.Conf.Status == int(common.FlowDisable) {
            // flow被配置关闭
            return nil
        }
    
        // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
        flow.PrevFunctionId = common.FunctionIdFirstVirtual
    
        // 提交数据流原始数据
        if err := flow.commitSrcData(ctx); err != nil {
            return err
        }
    
        if config.GlobalConfig.EnableProm == true {
            // 统计Flow的调度次数
            metrics.Metrics.FlowScheduleCntsToTal.WithLabelValues(flow.Name).Inc()
        }
    
        // 流式链式调用
        for fn != nil && flow.abort == false {
    
            // flow记录当前执行到的Function 标记
            fid := fn.GetId()
            flow.ThisFunction = fn
            flow.ThisFunctionId = fid
    
            // ++++++++++++
            fName := fn.GetConfig().FName
            fMode := fn.GetConfig().FMode
    
            // +++++++++++++++++++++++++++
            if config.GlobalConfig.EnableProm == true {
                // 统计Function调度次数
                metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()
            }
            // ++++++++++++++++++++++++++++
    
            // 得到当前Function要处理与的源数据
            if inputData, err := flow.getCurData(); err != nil {
                log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
                return err
            } else {
                flow.inPut = inputData
            }
    
            if err := fn.Call(ctx, flow); err != nil {
                // Error
                return err
            } else {
                // Success
                fn, err = flow.dealAction(ctx, fn)
                if err != nil {
                    return err
                }
            }
        }
    
        return nil
    }
    

    在埋点的位置为循环调度function的时候,每次在执行Funciton的Call()方法之前进行调度数据统计,并且按照fName和fMode进行分组。

    10.5.4 指标:Function执行时间

    (1)指标定义

    定义指标类型,如下:

    kis-flow/metrics/kis_metrics.go

    // kisMetrics kisFlow的Prometheus监控指标
    type kisMetrics struct {
        // 数据数量总量
        DataTotal prometheus.Counter
        // 各Flow处理数据总量
        FlowDataTotal *prometheus.GaugeVec
        // Flow被调度次数
        FlowScheduleCntsToTal *prometheus.GaugeVec 
        // Function被调度次数
        FuncScheduleCntsTotal *prometheus.GaugeVec 
        // Function执行时间
        FunctionDuration *prometheus.HistogramVec //++++
    }
    

    FunctionDuration 采用 prometheus.HistogramVec类型。这是一个不同区间值的分布统计,不同的时间区间会落到相应的Bucket中。

    (2)指标初始化及注册

    kis-flow/metrics/kis_metrics.go

    func InitMetrics() {
        Metrics = new(kisMetrics)
    
        // DataTotal初始化Counter
        Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
            Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
            Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
        })
    
        // FlowDataTotal初始化GaugeVec
        Metrics.FlowDataTotal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
                Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
            },
            // 标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // FlowScheduleCntsToTal初始化GaugeVec
        Metrics.FlowScheduleCntsToTal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_SCHE_CNTS_NAME,
                Help: common.GANGE_FLOW_SCHE_CNTS_HELP,
            },
            //标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // FuncScheduleCntsTotal初始化GaugeVec
        Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FUNC_SCHE_CNTS_NAME,
                Help: common.GANGE_FUNC_SCHE_CNTS_HELP,
            },
            //标签名称
            []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
        )
    
        // ++++++++++++++++++++++++++
        // FunctionDuration初始化HistogramVec
        Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
            Name:    common.HISTOGRAM_FUNCTION_DURATION_NAME,
            Help:    common.HISTOGRAM_FUNCTION_DURATION_HELP,
            Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, //单位ms,最大半分钟
        },
            []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
        )
    
        // 注册Metrics
        prometheus.MustRegister(Metrics.DataTotal)
        prometheus.MustRegister(Metrics.FlowDataTotal) 
        prometheus.MustRegister(Metrics.FlowScheduleCntsToTal)
        prometheus.MustRegister(Metrics.FuncScheduleCntsTotal)
        // +++++++
        prometheus.MustRegister(Metrics.FunctionDuration)
    }
    

    相关常量定义:

    kis-flow/common/const.go

    
    // metrics
    const (
        METRICS_ROUTE string = "/metrics"
    
        LABEL_FLOW_NAME     string = "flow_name"
        LABEL_FLOW_ID       string = "flow_id"
        LABEL_FUNCTION_NAME string = "func_name"
        LABEL_FUNCTION_MODE string = "func_mode"
    
        COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
        COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
    
        GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
        GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
    
        GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
        GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数"
    
        GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
        GANGE_FUNC_SCHE_CNTS_HELP string = "KisFlow各个Function被调度的次数"
        // ++++++++
        HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
        HISTOGRAM_FUNCTION_DURATION_HELP string = "Function执行耗时"
    )
    
    

    (3)统计指标埋点

    如果统计每个Function的调度实行时长,我们应该在启动Flow的主入口flow.Run()进行统计,如下:

    kis-flow/flow/kis_flow.go

    // Run 启动KisFlow的流式计算, 从起始Function开始执行流
    func (flow *KisFlow) Run(ctx context.Context) error {
    
        var fn kis.Function
    
        fn = flow.FlowHead
        flow.abort = false
    
        if flow.Conf.Status == int(common.FlowDisable) {
            // flow被配置关闭
            return nil
        }
        
        // ++++++++++ Metrics +++++++++
        var funcStart time.Time
    
        // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
        flow.PrevFunctionId = common.FunctionIdFirstVirtual
    
        // 提交数据流原始数据
        if err := flow.commitSrcData(ctx); err != nil {
            return err
        }
    
        if config.GlobalConfig.EnableProm == true {
            // 统计Flow的调度次数
            metrics.Metrics.FlowScheduleCntsToTal.WithLabelValues(flow.Name).Inc()
        }
    
        // 流式链式调用
        for fn != nil && flow.abort == false {
    
            // flow记录当前执行到的Function 标记
            fid := fn.GetId()
            flow.ThisFunction = fn
            flow.ThisFunctionId = fid
    
            fName := fn.GetConfig().FName
            fMode := fn.GetConfig().FMode
    
            if config.GlobalConfig.EnableProm == true {
                // 统计Function调度次数
                metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()
                
                // +++++++++++++++
                // 统计Function 耗时 记录开始时间
                funcStart = time.Now()
            }
    
            // 得到当前Function要处理与的源数据
            if inputData, err := flow.getCurData(); err != nil {
                log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
                return err
            } else {
                flow.inPut = inputData
            }
    
            if err := fn.Call(ctx, flow); err != nil {
                // Error
                return err
            } else {
                // Success
                fn, err = flow.dealAction(ctx, fn)
                if err != nil {
                    return err
                }
    
                // +++++++++++++++
                // 统计Function 耗时
                if config.GlobalConfig.EnableProm == true {
                    // Function消耗时间
                    duration := time.Since(funcStart)
    
                    // 统计当前Function统计指标,做时间统计
                    metrics.Metrics.FunctionDuration.With(
                        prometheus.Labels{
                            common.LABEL_FUNCTION_NAME: fName,
                            common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000)
                }
                // +++++++++++++++
    
            }
        }
    
        return nil
    }
    

    在埋点的位置每次在执行Funciton的Call()方法之前进行起始时间记录,然后在执行Function之后,算出执行时间,左后进行统计,按照相对应的时间区间,放入到响应的HistogramVec中的bucket中。

    10.5.5 指标:Flow执行时间

    (1)指标定义

    定义指标类型,如下:

    kis-flow/metrics/kis_metrics.go

    // kisMetrics kisFlow的Prometheus监控指标
    type kisMetrics struct {
        // 数据数量总量
        DataTotal prometheus.Counter
        // 各Flow处理数据总量
        FlowDataTotal *prometheus.GaugeVec
        // Flow被调度次数
        FlowScheduleCntsToTal *prometheus.GaugeVec 
        // Function被调度次数
        FuncScheduleCntsTotal *prometheus.GaugeVec 
        // Function执行时间
        FunctionDuration *prometheus.HistogramVec
        // Flow执行时间
        FlowDuration *prometheus.HistogramVec // ++++
    }
    

    FlowDuration 采用 prometheus.HistogramVec类型。这是一个不同区间值的分布统计,不同的时间区间会落到相应的Bucket中。

    (2)指标初始化及注册

    kis-flow/metrics/kis_metrics.go

    func InitMetrics() {
        Metrics = new(kisMetrics)
    
        // DataTotal初始化Counter
        Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
            Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
            Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
        })
    
        // FlowDataTotal初始化GaugeVec
        Metrics.FlowDataTotal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
                Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
            },
            // 标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // FlowScheduleCntsToTal初始化GaugeVec
        Metrics.FlowScheduleCntsToTal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FLOW_SCHE_CNTS_NAME,
                Help: common.GANGE_FLOW_SCHE_CNTS_HELP,
            },
            //标签名称
            []string{common.LABEL_FLOW_NAME},
        )
    
        // FuncScheduleCntsTotal初始化GaugeVec
        Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: common.GANGE_FUNC_SCHE_CNTS_NAME,
                Help: common.GANGE_FUNC_SCHE_CNTS_HELP,
            },
            //标签名称
            []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
        )
    
        // FunctionDuration初始化HistogramVec
        Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
            Name:    common.HISTOGRAM_FUNCTION_DURATION_NAME,
            Help:    common.HISTOGRAM_FUNCTION_DURATION_HELP,
            Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, //单位ms,最大半分钟
        },
            []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
        )
        
    
        // +++++++++++++
        // FlowDuration初始化HistogramVec
        Metrics.FlowDuration = prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    common.HISTOGRAM_FLOW_DURATION_NAME,
                Help:    common.HISTOGRAM_FLOW_DURATION_HELP,
                Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000, 60000}, //单位ms,最大1分钟
            },
            []string{common.LABEL_FLOW_NAME},
        )
    
        // 注册Metrics
        prometheus.MustRegister(Metrics.DataTotal)
        prometheus.MustRegister(Metrics.FlowDataTotal) 
        prometheus.MustRegister(Metrics.FlowScheduleCntsToTal)
        prometheus.MustRegister(Metrics.FuncScheduleCntsTotal)
        prometheus.MustRegister(Metrics.FunctionDuration)
        prometheus.MustRegister(Metrics.FlowDuration) // +++++
    }
    

    相关常量定义:

    kis-flow/common/const.go

    
    // metrics
    const (
        METRICS_ROUTE string = "/metrics"
    
        LABEL_FLOW_NAME     string = "flow_name"
        LABEL_FLOW_ID       string = "flow_id"
        LABEL_FUNCTION_NAME string = "func_name"
        LABEL_FUNCTION_MODE string = "func_mode"
    
        COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
        COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
    
        GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
        GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
    
        GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
        GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数"
    
        GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
        GANGE_FUNC_SCHE_CNTS_HELP string = "KisFlow各个Function被调度的次数"
        HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
        HISTOGRAM_FUNCTION_DURATION_HELP string = "Function执行耗时"
    
        // ++++++++
        HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration"
        HISTOGRAM_FLOW_DURATION_HELP string = "Flow执行耗时"
    )
    
    

    (3)统计指标埋点

    如果统计每个Flow的调度实行时长,我们应该在启动Flow的主入口flow.Run()进行统计,如下:

    kis-flow/flow/kis_flow.go

    // Run 启动KisFlow的流式计算, 从起始Function开始执行流
    func (flow *KisFlow) Run(ctx context.Context) error {
    
        var fn kis.Function
    
        fn = flow.FlowHead
        flow.abort = false
    
        if flow.Conf.Status == int(common.FlowDisable) {
            // flow被配置关闭
            return nil
        }
        
        var funcStart time.Time
        // ++++++++++ Metrics +++++++++
        var flowStart time.Time
    
    
        // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
        flow.PrevFunctionId = common.FunctionIdFirstVirtual
    
        // 提交数据流原始数据
        if err := flow.commitSrcData(ctx); err != nil {
            return err
        }
    
        if config.GlobalConfig.EnableProm == true {
            // 统计Flow的调度次数
            metrics.Metrics.FlowScheduleCntsToTal.WithLabelValues(flow.Name).Inc()
            
            // +++++++
            // 统计Flow的执行消耗时长
            flowStart = time.Now()
        }
    
        // 流式链式调用
        for fn != nil && flow.abort == false {
    
            // flow记录当前执行到的Function 标记
            fid := fn.GetId()
            flow.ThisFunction = fn
            flow.ThisFunctionId = fid
    
            fName := fn.GetConfig().FName
            fMode := fn.GetConfig().FMode
    
            if config.GlobalConfig.EnableProm == true {
                // 统计Function调度次数
                metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()
                
                // 统计Function 耗时 记录开始时间
                funcStart = time.Now()
            }
    
            // 得到当前Function要处理与的源数据
            if inputData, err := flow.getCurData(); err != nil {
                log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
                return err
            } else {
                flow.inPut = inputData
            }
    
            if err := fn.Call(ctx, flow); err != nil {
                // Error
                return err
            } else {
                // Success
                fn, err = flow.dealAction(ctx, fn)
                if err != nil {
                    return err
                }
    
                // 统计Function 耗时
                if config.GlobalConfig.EnableProm == true {
                    // Function消耗时间
                    duration := time.Since(funcStart)
    
                    // 统计当前Function统计指标,做时间统计
                    metrics.Metrics.FunctionDuration.With(
                        prometheus.Labels{
                            common.LABEL_FUNCTION_NAME: fName,
                            common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000)
                }
            }
        }
    
        // +++++++++++++++++++++++++
        // Metrics
        if config.GlobalConfig.EnableProm == true {
            // 统计Flow执行耗时
            duration := time.Since(flowStart)
            metrics.Metrics.FlowDuration.WithLabelValues(flow.Name).Observe(duration.Seconds() * 1000)
        }
    
        return nil
    }
    

    在埋点的位置在flow进入Run()方法之后,进行起始时间记录,然后在Run()最后进行duration统计,统计办法与Function的统计时长类似。

    接下来我们先对DataTotal指标做一个单元测试,来进行验证。

    10.6 KieMetrics单元测试(其他Metrics指标)

    10.6.1 新建单元测试

    单元测试用例我们复用之前的TestMetricsDataTotal()方法即可,如下:

    kis-flow/test/kis_metrics_test.go

    package test
    
    import (
        "context"
        "kis-flow/common"
        "kis-flow/file"
        "kis-flow/kis"
        "kis-flow/test/caas"
        "kis-flow/test/faas"
        "testing"
        "time"
    )
    
    func TestMetricsDataTotal(t *testing.T) {
        ctx := context.Background()
    
        // 0. 注册Function 回调业务
        kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
        kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
        kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
    
        // 0. 注册ConnectorInit 和 Connector 回调业务
        kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
        kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
    
        // 1. 加载配置文件并构建Flow
        if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
            panic(err)
        }
    
        // 2. 获取Flow
        flow1 := kis.Pool().GetFlow("flowName1")
        
        n := 0
    
        for n < 10 {
            // 3. 提交原始数据
            _ = flow1.CommitRow("This is Data1 from Test")
    
            // 4. 执行flow1
            if err := flow1.Run(ctx); err != nil {
                panic(err)
            }
    
            time.Sleep(1 * time.Second)
            n++
        }
        
        select {}
    }
    

    执行单元测试,cd到kis-flow/test/下,执行:

    go test -test.v -test.paniconexit0 -test.run TestMetricsDataTotal
    

    会得到很多日志输出,我们等待10s,之后再开启一个终端,输入如下指令:

     $ curl http://0.0.0.0:20004/metrics 
    

    得到如下结果:

    # HELP flow_data_total KisFlow各个FlowID数据流的数据数量总量
    # TYPE flow_data_total gauge
    flow_data_total{flow_name="flowName1"} 10
    # HELP flow_run_duration Flow执行耗时
    # TYPE flow_run_duration histogram
    flow_run_duration_bucket{flow_name="flowName1",le="0.005"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.01"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.03"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.08"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.1"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.5"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="1"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="5"} 9
    flow_run_duration_bucket{flow_name="flowName1",le="10"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="100"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="1000"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="5000"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="30000"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="60000"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="+Inf"} 10
    flow_run_duration_sum{flow_name="flowName1"} 29.135023
    flow_run_duration_count{flow_name="flowName1"} 10
    # HELP flow_schedule_cnts KisFlow各个FlowID被调度的次数
    # TYPE flow_schedule_cnts gauge
    flow_schedule_cnts{flow_name="flowName1"} 10
    # HELP func_run_duration Function执行耗时
    # TYPE func_run_duration histogram
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.005"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.01"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.03"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.08"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.1"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.5"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="1"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="5"} 9
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="10"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="100"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="1000"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="5000"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="30000"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="+Inf"} 10
    func_run_duration_sum{func_mode="Calculate",func_name="funcName3"} 20.925857
    func_run_duration_count{func_mode="Calculate",func_name="funcName3"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.005"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.01"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.03"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.08"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.1"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.5"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="1"} 1
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="5"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="10"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="100"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="1000"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="5000"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="30000"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="+Inf"} 10
    func_run_duration_sum{func_mode="Save",func_name="funcName2"} 27.026124
    func_run_duration_count{func_mode="Save",func_name="funcName2"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.005"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.01"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.03"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.08"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.1"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.5"} 5
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="1"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="5"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="10"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="100"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="1000"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="5000"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="30000"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="+Inf"} 10
    func_run_duration_sum{func_mode="Verify",func_name="funcName1"} 4.811095
    func_run_duration_count{func_mode="Verify",func_name="funcName1"} 10
    # HELP func_schedule_cnts KisFlow各个Function被调度的次数
    # TYPE func_schedule_cnts gauge
    func_schedule_cnts{func_mode="Calculate",func_name="funcName3"} 10
    func_schedule_cnts{func_mode="Save",func_name="funcName2"} 10
    func_schedule_cnts{func_mode="Verify",func_name="funcName1"} 10
    
    # HELP kisflow_data_total KisFlow全部Flow的数据总量
    # TYPE kisflow_data_total counter
    kisflow_data_total 10
    
    # ... ...
    
    
    
    

    其中我们会发现,我们之前的统计指标均已经出现了:

    • kisflow_data_total:为总数据量,目前是10条数据,因为我们一共Commit提交了10条源数据。
    • flow_data_total:为flow的数据总量,目前我们只启动了flowName1,该数据被通缉到了标签flowName1中。
    flow_data_total{flow_name="flowName1"} 10
    
    • flow_schedule_cnts:为flow的调度次数,因为我们一共执行了10次 flow.Run()方法,所以这个调度次数是10。
    # HELP flow_schedule_cnts KisFlow各个FlowID被调度的次数
    # TYPE flow_schedule_cnts gauge
    flow_schedule_cnts{flow_name="flowName1"} 10
    
    • func_schedule_cnts: 为各个Function的被调度次数,这里因为每个Flow会关联3个Function,所以每个Function的调度次数应该和Flow1的调度次数相同,都是10。
    # HELP func_schedule_cnts KisFlow各个Function被调度的次数
    # TYPE func_schedule_cnts gauge
    func_schedule_cnts{func_mode="Calculate",func_name="funcName3"} 10
    func_schedule_cnts{func_mode="Save",func_name="funcName2"} 10
    func_schedule_cnts{func_mode="Verify",func_name="funcName1"} 10
    
    • func_run_duration_bucket: 为Function的执行耗时分布统计。(有关HISTOGRAM 的统计方式比较复杂,这里就不赘述了,有兴趣的开发者可以去查阅一些相关资料。)
    # HELP func_run_duration Function执行耗时
    # TYPE func_run_duration histogram
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.005"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.01"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.03"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.08"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.1"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.5"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="1"} 0
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="5"} 9
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="10"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="100"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="1000"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="5000"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="30000"} 10
    func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="+Inf"} 10
    func_run_duration_sum{func_mode="Calculate",func_name="funcName3"} 20.925857
    func_run_duration_count{func_mode="Calculate",func_name="funcName3"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.005"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.01"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.03"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.08"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.1"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.5"} 0
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="1"} 1
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="5"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="10"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="100"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="1000"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="5000"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="30000"} 10
    func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="+Inf"} 10
    func_run_duration_sum{func_mode="Save",func_name="funcName2"} 27.026124
    func_run_duration_count{func_mode="Save",func_name="funcName2"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.005"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.01"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.03"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.08"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.1"} 0
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.5"} 5
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="1"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="5"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="10"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="100"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="1000"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="5000"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="30000"} 10
    func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="+Inf"} 10
    func_run_duration_sum{func_mode="Verify",func_name="funcName1"} 4.811095
    func_run_duration_count{func_mode="Verify",func_name="funcName1"} 10
    
    • flow_run_duration_bucket: 为为Flow的执行耗时分布统计。(有关HISTOGRAM 的统计方式比较复杂,这里就不赘述了,有兴趣的开发者可以去查阅一些相关资料。)
    # HELP flow_run_duration Flow执行耗时
    # TYPE flow_run_duration histogram
    flow_run_duration_bucket{flow_name="flowName1",le="0.005"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.01"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.03"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.08"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.1"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="0.5"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="1"} 0
    flow_run_duration_bucket{flow_name="flowName1",le="5"} 9
    flow_run_duration_bucket{flow_name="flowName1",le="10"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="100"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="1000"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="5000"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="30000"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="60000"} 10
    flow_run_duration_bucket{flow_name="flowName1",le="+Inf"} 10
    flow_run_duration_sum{flow_name="flowName1"} 29.135023
    flow_run_duration_count{flow_name="flowName1"} 10
    

    10.7 有关KisFlow的Metrics的Grafana看板展示

    既然有了Prometheus的指标统计,我们可以给KisFlow的流式计算程序结合Grafana进行看板展示。
    由于各个开发者的项目的统计指标和看板要求不一定相同,这里本文就不提供具体的Grafana看板的配置文件了,下面提供一个KisFlow的项目看板,作为演示参考,如下:

    10.8 【V0.9】 源代码

    https://github.com/aceld/kis-flow/releases/tag/v0.9


    作者:刘丹冰Aceld github: https://github.com/aceld
    KisFlow开源项目地址:https://github.com/aceld/kis-flow

    Golang框架实战-KisFlow流式计算框架专栏

    Golang框架实战-KisFlow流式计算框架(1)-概述
    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
    Golang框架实战-KisFlow流式计算框架(4)-数据流
    Golang框架实战-KisFlow流式计算框架(5)-Function调度
    Golang框架实战-KisFlow流式计算框架(6)-Connector
    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
    Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
    Golang框架实战-KisFlow流式计算框架(10)-Flow多副本

    相关文章

      网友评论

        本文标题:Golang框架实战-KisFlow流式计算框架(11)-Pro

        本文链接:https://www.haomeiwen.com/subject/ehpwzdtx.html