美文网首页
OpenFalcon 源码分析(Nodata组件)

OpenFalcon 源码分析(Nodata组件)

作者: Xiao_Yang | 来源:发表于2018-09-20 09:15 被阅读0次

    Nodata版本

    VERSION = "0.0.11"

    Nodata组件功能

    nodata用于检测监控数据的上报异常。nodata和实时报警judge模块协同工作,过程为: 配置了nodata的采集项超时未上报数据,nodata生成一条默认的模拟数据;用户配置相应的报警策略,收到mock数据就产生报警。采集项上报异常检测,作为judge模块的一个必要补充,能够使judge的实时报警功能更加可靠、完善。【官方描述】

    Nodata组件逻辑图

    系统流图

    官方系统流图

    模块结构

    官方模块结构图

    main入口分析

    
    func main() {
        //命令参数解析
        cfg := flag.String("c", "cfg.json", "configuration file")
        version := flag.Bool("v", false, "show version")
        versionGit := flag.Bool("vg", false, "show version")
        flag.Parse()
        //版本输出
        if *version {
            fmt.Println(g.VERSION)
            os.Exit(0)
        }
        //gitcommit序列号输出
        if *versionGit {
            fmt.Println(g.VERSION, g.COMMIT)
            os.Exit(0)
        }
    
        // 全局配置格式化
        g.ParseConfig(*cfg)
        // 统计
        g.StartProc()
    
        // 缓存Nodata配置
        config.Start()              // 【参考详细分析】
        // 缓存Nodata配置主机的采集数据点
        collector.Start()           // 【参考详细分析】
        // judge策略判断 
        judge.Start()               // 【参考详细分析】
    
        // http API服务
        http.Start()                // 【参考详细分析】
    
        select {}
    }
    
    
    

    config.Start() 从DB加载Nodata配置(dashboard配置nodata策略写入mysql)

    # 加载nodata配置主函数
    func Start() {
        if !g.Config().Config.Enabled {
            log.Println("config.Start warning, not enabled")
            return
        }
    
        service.InitDB()     //初始化DB
        StartNdConfigCron()  //加载nodata配置缓存至内存 
        log.Println("config.Start ok")
    }
    
    ## 初始化DB连接
    func InitDB() {
        _, err := GetDbConn(dbBaseConnName)   //"db.base"连接conn初始化并保存至内存Map
        if err != nil {
            log.Fatalln("config.InitDB error", err)
            return // never go here
        }
    
        log.Println("config.InitDB ok")
    }
    
    ### GetDbConn实现函数
    ### makeDbConn函数实现sql客户端连接dbconn
    ### 内存map dbConnMap[connName]保存dbconn
    func GetDbConn(connName string) (c *sql.DB, e error) {
        dbLock.Lock()
        defer dbLock.Unlock()
    
        var err error
        var dbConn *sql.DB
        dbConn = dbConnMap[connName]
        if dbConn == nil {
            dbConn, err = makeDbConn()  //创建sql客户端连接
            if err != nil {
                closeDbConn(dbConn)
                return nil, err
            }
            dbConnMap[connName] = dbConn
        }
    
        err = dbConn.Ping()      //dbconn检测,conn.Ping()
        if err != nil {
            closeDbConn(dbConn)  //dbconn关闭,conn.Close()
            delete(dbConnMap, connName)
            return nil, err
        }
    
        return dbConn, err
    }
    
    func makeDbConn() (conn *sql.DB, err error) {
        conn, err = sql.Open("mysql", g.Config().Config.Dsn)
        if err != nil {
            return nil, err
        }
    
        conn.SetMaxIdleConns(int(g.Config().Config.MaxIdle))
        err = conn.Ping()
    
        return conn, err
    }
    
    ##  周期任务同步nodata配置
    func StartNdConfigCron() {
        ndconfigCron.AddFuncCC(ndconfigCronSpec, func() {
            start := time.Now().Unix()
            cnt, _ := syncNdConfig()   // 同步nodata配置
            end := time.Now().Unix()
            if g.Config().Debug {
                log.Printf("config cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start))
            }
    
            // 统计
            g.ConfigCronCnt.Incr()
            g.ConfigLastTs.SetCnt(end - start)
            g.ConfigLastCnt.SetCnt(int64(cnt))
        }, 1)
        ndconfigCron.Start()          //start cron
    } 
    
    #### 获取nodata配置、重新格式化及缓存配置全局公开Map(NdConfigMap)
    func syncNdConfig() (cnt int, errt error) {
        // 获取nodata配置函数调用
        configs := service.GetMockCfgFromDB()
        // 重新格式化配置NodateConfig结构
        nm := nmap.NewSafeMap()
        for _, ndc := range configs {
            endpoint := ndc.Endpoint
            metric := ndc.Metric
            tags := ndc.Tags
            if endpoint == "" {
                log.Printf("bad config: %+v\n", ndc)
                continue
            }
            pk := cutils.PK(endpoint, metric, tags)
            nm.Put(pk, ndc)
        }
    
        // 缓存map
        SetNdConfigMap(nm)
    
        return nm.Size(), nil    //返回map长度
    }
    
    ##### 底层获取nodata配置实现函数
    func GetMockCfgFromDB() map[string]*cmodel.NodataConfig {
        ret := make(map[string]*cmodel.NodataConfig)
    
        dbConn, err := GetDbConn("nodata.mockcfg") //获取dbConn连接
        if err != nil {
            log.Println("db.get_conn error, mockcfg", err)
            return ret
        }
    
        q := fmt.Sprintf("SELECT id,name,obj,obj_type,metric,tags,dstype,step,mock FROM mockcfg")
        rows, err := dbConn.Query(q) //执行mockcfg表查询语句
        if err != nil {
            log.Println("db.query error, mockcfg", err)
            return ret
        }
    
        defer rows.Close()
        for rows.Next() {         //迭代查询结果集
            t := MockCfg{}
            tags := ""
            err := rows.Scan(&t.Id, &t.Name, &t.Obj, &t.ObjType, &t.Metric, &tags, &t.Type, &t.Step, &t.Mock)
            if err != nil {
                log.Println("db.scan error, mockcfg", err)
                continue
            }
            t.Tags = cutils.DictedTagstring(tags) //"tagskey=value"格式化map[Key]Value
    
            err = checkMockCfg(&t) //检测配置是否有效
            if err != nil {
                log.Println("check mockcfg, error:", err)
                continue
            }
    
            endpoints := getEndpoint(t.ObjType, t.Obj) //获取endpoint列表(hosts slice),后面有objtype为"host/group/other"处理函数分析。
           if len(endpoints) < 1 {
                continue
            }
    
            for _, ep := range endpoints {
                uuid := cutils.PK(ep, t.Metric, t.Tags)  //UUID format 'endpoint/metric/k=v,k=v(tags)'
                ncfg := cmodel.NewNodataConfig(t.Id, t.Name, t.ObjType, ep, t.Metric, t.Tags, t.Type, t.Step, t.Mock)
    
                val, found := ret[uuid]
                if !found {   //如果不存在则新建
                    ret[uuid] = ncfg
                    continue
                }
    
                //如果存在,判断配置类型
                if isSpuerNodataCfg(val, ncfg) {
                    // val is spuer than ncfg, so drop ncfg
                    log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, val.Name, ncfg.Name)
                } else {
                    ret[uuid] = ncfg //如果原val配置类型为group,而新ncfg配置类型为host则覆盖原有配置 
                    log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, ncfg.Name, val.Name)
                }
            }
        }
    
        return ret   //返回map[UUID]*cmodel.NodataConfig
    }
    
    
    ###### 根据objType获取Hosts slice
    func getEndpoint(objType string, obj string) []string {
        switch objType {
        case "host":         //类型Host与处理函数
            return getEndpointFromHosts(obj)
        case "group":        //类型group与处理函数
            return getEndpointFromGroups(obj)
        case "other":        //类型other与处理函数
            return getEndpointFromOther(obj)
        default:
            return make([]string, 0)
        }
    }
    //类型Host与处理函数
    func getEndpointFromHosts(hosts string) []string {
        ret := make([]string, 0)
    
        hlist := strings.Split(hosts, "\n")  //分隔处理
        for _, host := range hlist {
            nh := strings.TrimSpace(host)
            if nh != "" {
                ret = append(ret, nh)
            }
        }
    
        return ret
    }
    //类型group与处理函数
    func getEndpointFromGroups(grps string) []string {
        grplist := strings.Split(grps, "\n")
    
        // get host map, avoid duplicating
        hosts := make(map[string]string)
        for _, grp := range grplist {
            ngrp := strings.TrimSpace(grp)
            if len(ngrp) < 1 {
                continue
            }
    
            hostmap := GetHostsFromGroup(grp) //根据Group名称获取主机MAP
            for hostname := range hostmap {
                if hostname != "" {
                    hosts[hostname] = hostname
                }
            }
        }
    
        // get host slice
        ret := make([]string, 0)
        for key := range hosts {
            ret = append(ret, key)
        }
    
        return ret
    }
    //类型other与处理函数,同类型Host与处理函数
    func getEndpointFromOther(other string) []string {
        return getEndpointFromHosts(other)
    }
    
    

    collector.Start() 缓存Nodata配置主机的采集数据点

    
    # 运行收集nodata数据主函数
    func Start() {
        if !g.Config().Collector.Enabled {
            log.Println("collector.Start warning, not enabled")
            return
        }
    
        StartCollectorCron()   //周期任务,收集nodata数据
        log.Println("collector.Start ok")
    }
    ## 定时任务执行,收集函数调用与任务运行 
    func StartCollectorCron() {
        collectorCron.AddFuncCC("*/20 * * * * ?", func() {
            start := time.Now().Unix()
            cnt := collectDataOnce()   //收集函数调用
            end := time.Now().Unix()
            if g.Config().Debug {
                log.Printf("collect cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start))
            }
    
            //统计
            g.CollectorCronCnt.Incr()
            g.CollectorLastTs.SetCnt(end - start)
            g.CollectorLastCnt.SetCnt(int64(cnt))
            g.CollectorCnt.IncrBy(int64(cnt))
        }, 1)
        collectorCron.Start()
    }
    
    ### 收集功能实现函数
    func collectDataOnce() int {
        keys := config.Keys()
        keysLen := len(keys)
    
        // 并发+同步控制
        cfg := g.Config().Collector    //collector全局配置
        concurrent := int(cfg.Concurrent) //并发数
        if concurrent < 1 || concurrent > 50 {
            concurrent = 10
        }
        sema := tsema.NewSemaphore(concurrent)   //创建并发同步
    
        batch := int(cfg.Batch)  //全局批量处理数
        if batch < 100 || batch > 1000 {
            batch = 200 //batch不能太小, 否则channel将会很大
        }
    
        //根据nodata配置数长度和批量处理数创建channel长度
        batchCnt := (keysLen + batch - 1) / batch
        rch := make(chan int, batchCnt+1)
    
        i := 0
        for i < keysLen {     
            leftLen := keysLen - i
            fetchSize := batch // 每次处理batch个配置
            if leftLen < fetchSize {
                fetchSize = leftLen
            }
            fetchKeys := keys[i : i+fetchSize]
    
            // 并发collect数据
            sema.Acquire()
            //线程批量处理(fetchKeys, fetchSize)
            go func(keys []string, keySize int) {
                defer sema.Release()
                size, err := fetchItemsAndStore(keys, keySize)//批查获取函数调用
                if err != nil {
                    log.Printf("fetchItemAndStore fail, size:%v, error:%v", size, err)
                }
                if g.Config().Debug {
                    log.Printf("fetchItemAndStore keys:%v, key_size:%v, ret_size:%v", keys, keySize, size)
                }
                rch <- size
            }(fetchKeys, fetchSize) 
    
            i += fetchSize
        }
    
        collectCnt := 0
        //计数
        for i := 0; i < batchCnt; i++ {
            select {
            case cnt := <-rch:
                collectCnt += cnt
            }
        }
    
        return collectCnt
    }
    
    #### 获取数据实现函数
    func fetchItemsAndStore(fetchKeys []string, fetchSize int) (size int, errt error) {
        if fetchSize < 1 {
            return
        }
    
        // form request args
        args := make([]*cmodel.GraphLastParam, 0)
        for _, key := range fetchKeys {
            ndcfg, found := config.GetNdConfig(key) //根据hostname返回nodata配置
            if !found {
                continue
            }
    
            endpoint := ndcfg.Endpoint //endpoint主机对象
            counter := cutils.Counter(ndcfg.Metric, ndcfg.Tags) // 格式metric/tags(k=v,k=v)
            arg := &cmodel.GraphLastParam{endpoint, counter} //请求参数
            args = append(args, arg)
        }
        if len(args) < 1 {
            return
        }
    
        resp, err := queryLastPoints(args)//API调用查询endpoint最近一次采集数据。(POST请求API组件api调用,查看后面函数分析)
        if err != nil {
            return 0, err
        }
    
        // store items
        fts := time.Now().Unix()   //存储Items时间float time,Judge逻辑用到
        for _, glr := range resp {
            //log.Printf("collect:%v\n", glr)
            if glr == nil || glr.Value == nil {
                continue
            }
            AddItem(cutils.PK2(glr.Endpoint, glr.Counter), NewDataItem(glr.Value.Timestamp, float64(glr.Value.Value), "OK", fts))  //缓存收集到的监控数据(ItemMap)。Value.Timestamp数据项时间戳,Value.Value数据项值,"OK"数据项状态,fts数据项存储时间。
        }
    
        return len(resp), nil
    }
    
    ##### config.GetNdConfig 根据hostname返回NodataConfig配置
    func GetNdConfig(key string) (*cmodel.NodataConfig, bool) {
        rwlock.RLock()
        defer rwlock.RUnlock()
    
        val, found := NdConfigMap.Get(key)//map操作
        if found && val != nil {
            return val.(*cmodel.NodataConfig), true
        }
        return &cmodel.NodataConfig{}, false
    }
    
    ##### API组件POST请求实现函数
    func queryLastPoints(param []*cmodel.GraphLastParam) (resp []*cmodel.GraphLastResp, err error) {
        cfg := g.Config()
        uri := fmt.Sprintf("%s/api/v1/graph/lastpoint", cfg.PlusApi.Addr) //接口定义
    
        var req *httplib.BeegoHttpRequest
        headers := map[string]string{"Content-type": "application/json"}
        req, err = requests.CurlPlus(uri, "POST", "nodata", cfg.PlusApi.Token, 
            headers, map[string]string{}) //Curl请求头与方法
    
        if err != nil {
            return
        }
        req.SetTimeout(time.Duration(cfg.PlusApi.ConnectTimeout)*time.Millisecond,
            time.Duration(cfg.PlusApi.RequestTimeout)*time.Millisecond)
    
        b, err := json.Marshal(param)
        if err != nil {
            return
        }
    
        req.Body(b)   //请求体
    
        err = req.ToJson(&resp)   //请求执行与response json
        if err != nil {
            return
        }
    
        return resp, nil
    }
    
    ##### 缓存数据点
    func AddItem(key string, val *DataItem) {
        listv, found := ItemMap.Get(key)
        if !found {
            ll := tlist.NewSafeListLimited(3) //每个采集指标,缓存最新的3个数据点
            ll.PushFrontViolently(val)
            ItemMap.Put(key, ll)
            return
        }
    
        listv.(*tlist.SafeListLimited).PushFrontViolently(val)
    }
    
    
    func NewDataItem(ts int64, val float64, fstatus string, fts int64) *DataItem {
        return &DataItem{Ts: ts, Value: val, FStatus: fstatus, FTs: fts}
    }
    
    
    type GraphLastResp struct {
            Endpoint    string        `json:"endpoint"`
            Counter     string        `json:"counter"`
            Value       *RRDData      `json:"value"`
    }
    
    

    judge.Start()

    # Judge运行入口函数
    func Start() {
        StartJudgeCron()  //运行调用
        log.Println("judge.Start ok")
    }
    
    # 定时任务Judge执行函数
    func StartJudgeCron() {
        judgeCron.AddFuncCC(judgeCronSpec, func() {
            start := time.Now().Unix()
            judge()    //执行judge函数调用
            end := time.Now().Unix()
            if g.Config().Debug {
                log.Printf("judge cron, time %ds, start %s\n", end-start, ttime.FormatTs(start))
            }
    
            // 统计
            g.JudgeCronCnt.Incr()
            g.JudgeLastTs.SetCnt(end - start)
    
            // 触发Mock列表数据发送
            sender.SendMockOnceAsync()  
        }, 1)
        judgeCron.Start()    //执行Cron
    }
    
    ## judge实现函数
    func judge() {
        now := time.Now().Unix()
        keys := config.Keys()
        for _, key := range keys {
            ndcfg, found := config.GetNdConfig(key) //根据hostname返回nodata配置
            if !found { //策略不存在,不处理
                continue
            }
            step := ndcfg.Step
            mock := ndcfg.Mock
    
            item, found := collector.GetFirstItem(key) //根据hostname返回collector最近一次配置
            if !found { //没有数据,未开始采集,不处理
                continue
            }
            
            //3*step(or 180)超时时间
            lastTs := now - getTimeout(step) 
            if item.FStatus != "OK" || item.FTs < lastTs { //数据采集失败,不处理
                continue
            }
            //采集到的数据为mock数据,则认为上报超时了
            if fCompare(mock, item.Value) == 0 { 
                //判断采集到的最后一次数据项值timestamp+step(上报周期)
                //如果小于当前时间则认为已经上报超时了,过了应该上报的周期
                //如果大于当前时间则表示还在有效的上报周期内
                if LastTs(key)+step <= now {
                    TurnNodata(key, now)  //设置nodata状态
                    genMock(genTs(now, step), key, ndcfg) //添加到Mock列表
                }
                continue
            }
              
            //判断采集到的最后一次数据项值timestamp,如果小于
            //3*step(or 180)超时时间则认为数据过期,则认为上报超时
            if item.Ts < lastTs {
                if LastTs(key)+step <= now {
                    TurnNodata(key, now)
                    genMock(genTs(now, step), key, ndcfg)
                }
                continue
            }
    
            TurnOk(key, now)
        }
    }
    
    ### 返回最后一次采集数据的timestamp
    func LastTs(key string) int64 {
        statusLock.RLock()
    
        var ts int64 = 0
        v, found := StatusMap.Get(key)
        if !found {
            statusLock.RUnlock()
            return ts
        }
    
        ts = v.(*NodataStatus).Ts
    
        statusLock.RUnlock()
        return ts
    }
    
    // NoData Data Item Struct
    type DataItem struct {
        Ts      int64    //timestamp
        Value   float64  
        FStatus string  // OK|ERR
        FTs     int64   //存储时间float
    }
    
    // Nodata Status Struct
    type NodataStatus struct {
        Key    string
        Status string // OK|NODATA
        Cnt    int
        Ts     int64
    }
    
    
    ### 设置为Nodata状态
    func TurnNodata(key string, ts int64) {
        statusLock.Lock()
    
        v, found := StatusMap.Get(key)
        if !found {
            // create new status
            ns := NewNodataStatus(key, "NODATA", 1, ts)
            StatusMap.Put(key, ns)
            statusLock.Unlock()
            return
        }
    
        // update status
        ns := v.(*NodataStatus)
        ns.Status = "NODATA"
        ns.Cnt += 1
        ns.Ts = ts
    
        statusLock.Unlock()
        return
    }
    
    ### 添加Item至Mock列表
    func genMock(ts int64, key string, ndcfg *cmodel.NodataConfig) {
        sender.AddMock(key, ndcfg.Endpoint, ndcfg.Metric, cutils.SortedTags(ndcfg.Tags), ts, ndcfg.Type, ndcfg.Step, ndcfg.Mock)
    }
    
    func AddMock(key string, endpoint string, metric string, tags string, ts int64, dstype string, step int64, value interface{}) {
        item := &cmodel.JsonMetaData{metric, endpoint, ts, step, value, dstype, tags}
        MockMap.Put(key, item)    //put into map
    }
    
    

    sender.SendMockOnceAsync() 发送模拟数据

    # 发送Mock数据入口函数
    func SendMockOnceAsync() {
        go SendMockOnce()
    }
    
    ## 并发发送和统计
    func SendMockOnce() int {
        if !sema.TryAcquire() {
            return -1
        }
        defer sema.Release()
    
        // not enabled
        if !g.Config().Sender.Enabled {
            return 0
        }
    
        start := time.Now().Unix()
        cnt, _ := sendMock()   //调用发送功能模块
        end := time.Now().Unix()
        if g.Config().Debug {
            log.Printf("sender cron, cnt %d, time %ds, start %s", cnt, end-start, ttime.FormatTs(start))
        }
    
        // 统计
        g.SenderCronCnt.Incr()
        g.SenderLastTs.SetCnt(end - start)
        g.SenderCnt.IncrBy(int64(cnt))
    
        return cnt
    }
    
    ### 发送模拟数据
    func sendMock() (cnt int, errt error) {
    
        cfg := g.Config().Sender  //全局配置加载
        batch := int(cfg.Batch)   //批量值
        connTimeout := cfg.ConnectTimeout //连接超时
        requTimeout := cfg.RequestTimeout  //API请求超时
    
        // 发送至transfer组件
        mocks := MockMap.Slice()  //获取Mock列表
        MockMap.Clear()        //清空列表空间
        mockSize := len(mocks)
        i := 0
        for i < mockSize {
            leftLen := mockSize - i
            sendSize := batch
            if leftLen < sendSize {
                sendSize = leftLen
            }
            fetchMocks := mocks[i : i+sendSize]  //取一批量
            i += sendSize
    
            items := make([]*cmodel.JsonMetaData, 0)
            //整理为slice
            for _, val := range fetchMocks {
                if val == nil {
                    continue
                }
                items = append(items, val.(*cmodel.JsonMetaData))
            }
            cntonce, err := sendItemsToTransfer(items, len(items), "nodata.mock",
                time.Millisecond*time.Duration(connTimeout),
                time.Millisecond*time.Duration(requTimeout)) //API POT调用,发送Mock至Transfer
            if err == nil {
                if g.Config().Debug {
                    log.Println("send items:", items)
                }
                cnt += cntonce
            }
        }
    
        return cnt, nil
    }
    
    // API接口调用发送Mock至Transfer 
    func sendItemsToTransfer(items []*cmodel.JsonMetaData, size int, httpcliname string,
        connT, reqT time.Duration) (cnt int, errt error) {
        if size < 1 {
            return
        }
    
        cfg := g.Config()
        transUlr := fmt.Sprintf("http://%s/api/push", cfg.Sender.TransferAddr) //请求接口API
        hcli := thttpclient.GetHttpClient(httpcliname, connT, reqT)
    
        // 请求体
        itemsBody, err := json.Marshal(items)
        if err != nil {
            log.Println(transUlr+", format body error,", err)
            errt = err
            return
        }
    
        // 构造与执行API
        req, err := http.NewRequest("POST", transUlr, bytes.NewBuffer(itemsBody))   
        req.Header.Set("Content-Type", "application/json; charset=UTF-8")    //请求内容类型
        req.Header.Set("Connection", "close")  
        postResp, err := hcli.Do(req)  //执行POST
        if err != nil {
            log.Println(transUlr+", post to dest error,", err)
            errt = err
            return
        }
        defer postResp.Body.Close()
        
        //响应状态200判断
        if postResp.StatusCode/100 != 2 {
            log.Println(transUlr+", post to dest, bad response,", postResp.Body)
            errt = fmt.Errorf("request failed, %s", postResp.Body)
            return
        }
    
        return size, nil
    }
    

    http.Start() http API服务

    func Start() {
        go startHttpServer()
    }
    
    func configRoutes() {
        configCommonRoutes()    //公共API路由,可参考Agent模块
        configProcHttpRoutes()   //
        configDebugHttpRoutes()  //Debug API路由
    }
    
    func startHttpServer() {
        if !g.Config().Http.Enabled {
            return
        }
    
        addr := g.Config().Http.Listen
        if addr == "" {
            return
        }
    
        configRoutes()        //配置路由
    
        s := &http.Server{       //httpServer实例
            Addr:           addr,
            MaxHeaderBytes: 1 << 30,
        }
    
        log.Println("http.startHttpServer ok, listening", addr)
        log.Fatalln(s.ListenAndServe())  //监听与服务
    }
    
    ## Proc统计API模块
    func configProcHttpRoutes() {
        // counters
        http.HandleFunc("/proc/counters", func(w http.ResponseWriter, r *http.Request) {
        })
        http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) {
        })
        // judge.status, /proc/status/$endpoint/$metric/$tags-pairs
        http.HandleFunc("/proc/status/", func(w http.ResponseWriter, r *http.Request) {
        })
        // collector.last.item, /proc/collect/$endpoint/$metric/$tags-pairs
        http.HandleFunc("/proc/collect/", func(w http.ResponseWriter, r *http.Request) {
        })
        // config.mockcfg
        http.HandleFunc("/proc/config", func(w http.ResponseWriter, r *http.Request) {
        })
        // config.mockcfg /proc/config/$endpoint/$metric/$tags-pairs
        http.HandleFunc("/proc/config/", func(w http.ResponseWriter, r *http.Request) {
        })
        // config.hostgroup, /group/$grpname
        http.HandleFunc("/proc/group/", func(w http.ResponseWriter, r *http.Request) {
            urlParam := r.URL.Path[len("/proc/group/"):]
            RenderDataJson(w, service.GetHostsFromGroup(urlParam))
        })
    }
    
    ## Debug API
    func configDebugHttpRoutes() {
        http.HandleFunc("/debug/collector/collect", func(w http.ResponseWriter, r *http.Request) {
        })
        http.HandleFunc("/debug/config/sync", func(w http.ResponseWriter, r *http.Request) {
        })
        http.HandleFunc("/debug/sender/send", func(w http.ResponseWriter, r *http.Request) {
        })
    }
    
    

    思考与查证

    • nodata config、collector 、judge各模块执行周期是多少?
    • Judge主要判断Nodata主机的两点要素是?

    扩展学习

    相关文章

      网友评论

          本文标题:OpenFalcon 源码分析(Nodata组件)

          本文链接:https://www.haomeiwen.com/subject/uwednftx.html