美文网首页
OpenFalcon源码分析(HBS组件)

OpenFalcon源码分析(HBS组件)

作者: Xiao_Yang | 来源:发表于2018-09-17 08:47 被阅读0次

HBS (HeartBeat Server)版本

VERSION = "1.1.0"

HBS组件功能

连接后端MySQL数据库,获取数据库数据及缓存至内存(缓存配置)供agent和judge组件提供服务。

HBS组件逻辑图

HBS逻辑图

Main入口分析

func main() {
    #命令参数配置(配置文件/版本)
    cfg := flag.String("c", "cfg.json", "configuration file")
    version := flag.Bool("v", false, "show version")
    flag.Parse()
   
    #版本显示
    if *version {
        fmt.Println(g.VERSION)
        os.Exit(0)
    }

    #配置文件解析
    g.ParseConfig(*cfg)         //      【参考详细分析】

    #初始化
    db.Init()   //Mysql DB对象初始化      【参考详细分析】
    cache.Init() //从DB获取数据加载到内存   【参考详细分析】

    #清理无心跳Agent
    go cache.DeleteStaleAgents()  //    【参考详细分析】

    #后台线程,HTTP、RPC服务监听与处理
    go http.Start()               //    【参考详细分析】
    go rpc.Start()                //    【参考详细分析】
    
    #终止信号注册与接收Chan,完成程序退出清理工作
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigs
        fmt.Println()
        db.DB.Close()  //DB对象资源关闭
        os.Exit(0)
    }()

    select {}   //阻塞主进程
}

g.ParseConfig(*cfg) 配置文件解析与格式化

# 解析指定的配置文件位置,默认为同级目录下"cfg.json"
func ParseConfig(cfg string) {
    if cfg == "" {
        log.Fatalln("use -c to specify configuration file")
    }
    //判断文件是否存在
    if !file.IsExist(cfg) {
        log.Fatalln("config file:", cfg, "is not existent")
    }

    ConfigFile = cfg
   
    //配置文件转字符串
    configContent, err := file.ToTrimString(cfg)
    if err != nil {
        log.Fatalln("read config file:", cfg, "fail:", err)
    }
    
    //Json反序列化为全局变量结构体
    var c GlobalConfig
    err = json.Unmarshal([]byte(configContent), &c)
    if err != nil {
        log.Fatalln("parse config file:", cfg, "fail:", err)
    }

    configLock.Lock()
    defer configLock.Unlock()
    
    //附值私有全局变量config
    config = &c

    log.Println("read config file:", cfg, "successfully")
}

#公开方法获取全局配置,常用g.Config().XXX获取某项配置
func Config() *GlobalConfig {
    configLock.RLock()
    defer configLock.RUnlock()
    return config
}

#全局配置结构定义
type GlobalConfig struct {
    Debug     bool        `json:"debug"`
    Hosts     string      `json:"hosts"`
    Database  string      `json:"database"`
    MaxConns  int         `json:"maxConns"`
    MaxIdle   int         `json:"maxIdle"`
    Listen    string      `json:"listen"`
    Trustable []string    `json:"trustable"`
    Http      *HttpConfig `json:"http"`
}

db.Init() 初始化MySQL连接对象


# 公开全局变量DB对象
var DB *sql.DB

# 引用"github.com/go-sql-driver/mysql"Driver库
func Init() {
    var err error
    DB, err = sql.Open("mysql", g.Config().Database)
    if err != nil {
        log.Fatalln("open db fail:", err)
    }
    //最大打开连接数
    DB.SetMaxOpenConns(g.Config().MaxConns)
    //最大空闲连接数
    DB.SetMaxIdleConns(g.Config().MaxIdle)
    
    //DB健康检测
    err = DB.Ping()
    if err != nil {
        log.Fatalln("ping db fail:", err)
    }
}

cache.Init() 从DB获取数据加载到内存


func Init() {
    log.Println("cache begin")

    log.Println("#1 GroupPlugins...")
    GroupPlugins.Init()   //从DB获取插件目录列表缓存内存【参考详细分析】

    log.Println("#2 GroupTemplates...")
    GroupTemplates.Init() //从DB获取组策略模板缓存内存【参考详细分析】

    log.Println("#3 HostGroupsMap...")
    HostGroupsMap.Init() //从DB获取主机与策略组对应表信息缓存内存 【参考详细分析】

    log.Println("#4 HostMap...")
    HostMap.Init()  //从DB获取主机名和主机ID对应表缓存内存【参考详细分析】

    log.Println("#5 TemplateCache...")
    TemplateCache.Init() //从DB获取与缓存内存所有策略模板列表 【参考详细分析】

    log.Println("#6 Strategies...")
    Strategies.Init(TemplateCache.GetMap()) //从DB获取所有Strategy列表与缓存 【参考详细分析】

    log.Println("#7 HostTemplateIds...")
    HostTemplateIds.Init() //从DB获取及缓存主机ID和模板列表对应表信息 【参考详细分析】

    log.Println("#8 ExpressionCache...")
    ExpressionCache.Init() //从DB获取与缓存Expression表达式信息【参考详细分析】

    log.Println("#9 MonitoredHosts...")
    MonitoredHosts.Init() //从DB获取所有不处于维护状态的主机列表缓存 【参考详细分析】

    log.Println("cache done")

    go LoopInit() //后台线程运行同步数据

}

# 每分钟周期性同步数据
func LoopInit() {
    for {
        time.Sleep(time.Minute)
        GroupPlugins.Init()
        GroupTemplates.Init()
        HostGroupsMap.Init()
        HostMap.Init()
        TemplateCache.Init()
        Strategies.Init(TemplateCache.GetMap())
        HostTemplateIds.Init()
        ExpressionCache.Init()
        MonitoredHosts.Init()
    }
}

  • GroupPlugins.Init() 从DB内获取插件目录列表加载到内存
#GroupPlugins 全局对象实例化,一个HostGroup可以绑定多个Plugin
#key: Groupid value:[]plugins 
var GroupPlugins = &SafeGroupPlugins{M: make(map[int][]string)}

#从DB获取插件目录列表加载到内存
func (this *SafeGroupPlugins) Init() {
    m, err := db.QueryPlugins()  //db查询插件目录条目
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = m
}

#db->plugin 查询MySQL plugin_dir表中内插件目录记录条目
func QueryPlugins() (map[int][]string, error) {
    m := make(map[int][]string)

    sql := "select grp_id, dir from plugin_dir" //执行SQL语句
    rows, err := DB.Query(sql)
    if err != nil {
        log.Println("ERROR:", err)
        return m, err
    }

    defer rows.Close()
    for rows.Next() {               //迭代所有条目,组ID和目录
        var (
            id  int
            dir string
        )

        err = rows.Scan(&id, &dir)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        if _, exists := m[id]; exists {
            m[id] = append(m[id], dir)
        } else {
            m[id] = []string{dir}
        }
    }

    return m, nil
}
  • GroupTemplates.Init() 从DB获取组策略模板加载到内存
#GroupTemplates 全局对象实例化,一个HostGroup对应多个Template
#key: gid value:[]tid
var GroupTemplates = &SafeGroupTemplates{M: make(map[int][]int)}

#从DB获取组策略模板加载到内存
func (this *SafeGroupTemplates) Init() {
    m, err := db.QueryGroupTemplates() //db查询组策略模板
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = m
}

# db->template 从MySQL grp_tpl表中查询DB组策略模板
func QueryGroupTemplates() (map[int][]int, error) {
    m := make(map[int][]int)

    sql := "select grp_id, tpl_id from grp_tpl" //DB查询语句
    rows, err := DB.Query(sql)
    if err != nil {
        log.Println("ERROR:", err)
        return m, err
    }
   
    defer rows.Close()
    
    //迭代获取策略组ID和模板ID
    for rows.Next() {
        var gid, tid int
        err = rows.Scan(&gid, &tid)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        if _, exists := m[gid]; exists {
            m[gid] = append(m[gid], tid)
        } else {
            m[gid] = []int{tid}
        }
    }
    return m, nil
}
  • HostGroupsMap.Init() 主机与策略组对应表信息加载内存
#HostGroupsMap全局对象实例化,一个机器可能在多个group下,做一个map缓存hostid与groupid的对应关系
#key: hid value:[]gid
var HostGroupsMap = &SafeHostGroupsMap{M: make(map[int][]int)}

#从DB获取主机与组信息加载到内存
func (this *SafeHostGroupsMap) Init() {
    m, err := db.QueryHostGroups()  //db查询主机与组信息
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = m
}

#db->group 从MySQ Lgrp_host表中查询主机与组信息
func QueryHostGroups() (map[int][]int, error) {
    m := make(map[int][]int)

    sql := "select grp_id, host_id from grp_host" //DB查询语句
    rows, err := DB.Query(sql)
    if err != nil {
        log.Println("ERROR:", err)
        return m, err
    }

    defer rows.Close()
    
    //迭代获取主机ID、策略组ID信息列表
    for rows.Next() {
        var gid, hid int
        err = rows.Scan(&gid, &hid)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        if _, exists := m[hid]; exists {
            m[hid] = append(m[hid], gid)
        } else {
            m[hid] = []int{gid}
        }
    }

    return m, nil
}
  • HostMap.Init() 主机名和主机ID对应表加载到内存
#HostMap全局对象实例化
#每次心跳的时候agent把hostname汇报上来,经常要知道这个机器的hostid,把此信息缓存
#key: hostname value: hostid
var HostMap = &SafeHostMap{M: make(map[string]int)}

#从DB获取主机名与hid信息缓存内存
func (this *SafeHostMap) Init() {
    m, err := db.QueryHosts() //db查询主机名与hid信息
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = m
}

#db->host 从MySQ host表中查询主机名与hid条目
func QueryHosts() (map[string]int, error) {
    m := make(map[string]int)

    sql := "select id, hostname from host" //db查询语句
    rows, err := DB.Query(sql)
    if err != nil {
        log.Println("ERROR:", err)
        return m, err
    }

    defer rows.Close()
    //迭代获取主机名、hid信息
    for rows.Next() {
        var (
            id       int
            hostname string
        )

        err = rows.Scan(&id, &hostname)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }
        m[hostname] = id
    }
    return m, nil
}
  • TemplateCache.Init() 获取与缓存所有策略模板列表
# TemplateCache全局对象实例化,所有templates列表
# key: tid  value: template
var TemplateCache = &SafeTemplateCache{M: make(map[int]*model.Template)}

# 获取与缓存所有策略模板列表
func (this *SafeTemplateCache) Init() {
    ts, err := db.QueryTemplates()  //获取与缓存
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = ts
}

#db->template 从DB tpl表获取所有的策略模板信息并缓存内存
func QueryTemplates() (map[int]*model.Template, error) {

    templates := make(map[int]*model.Template)

    sql := "select id, tpl_name, parent_id, action_id, create_user from tpl"     // db查询语句
    rows, err := DB.Query(sql) 
    if err != nil {
        log.Println("ERROR:", err)
        return templates, err
    }

    defer rows.Close()
     
    //迭代查询与缓存策略模板列表
    for rows.Next() {
        t := model.Template{}
        err = rows.Scan(&t.Id, &t.Name, &t.ParentId, &t.ActionId, &t.Creator)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }
        templates[t.Id] = &t
    }

    return templates, nil
}

  • Strategies.Init(TemplateCache.GetMap()) 获取所有Strategy列表与缓存
#Strategies全局对象实例化
#Strategy和Expression在功能上有类似的地方,都是对于某些主机的某些采集指标进
#行判断,符合条件就执行相应的动作。不过,Strategy需要依附于模(Teamplate)
#存在,即模板是一个Strategy组,包含多个Strategy,模板可以与主机进行映射,也
#可以和主机组进行映射。另外,模板和模板之前也可以有继承关系。比如,模板T1包含
#判断CPU异常Strategy,模板T2包含判断网络异常Strategy,模板T3集成了T1和
#T2,这样T3就包含了T1和T2的所有异常判断Strategy。 而Expression则存在去全
#局的范围,也不需要划分为组的形式

var Strategies = &SafeStrategies{M: make(map[int]*model.Strategy)}

func (this *SafeStrategies) Init(tpls map[int]*model.Template) {
    m, err := db.QueryStrategies(tpls)  //获取与缓存
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = m
}

#获取所有的Strategy列表
func QueryStrategies(tpls map[int]*model.Template) (map[int]*model.Strategy, error) {
    ret := make(map[int]*model.Strategy)

    if tpls == nil || len(tpls) == 0 {
        return ret, fmt.Errorf("illegal argument")
    }

    now := time.Now().Format("15:04")
    sql := fmt.Sprintf(
        "select %s from strategy as s where (s.run_begin='' and s.run_end='') "+
            "or (s.run_begin <= '%s' and s.run_end >= '%s')"+
            "or (s.run_begin > s.run_end and !(s.run_begin > '%s' and s.run_end < '%s'))",
        "s.id, s.metric, s.tags, s.func, s.op, s.right_value, s.max_step, s.priority, s.note, s.tpl_id",
        now,
        now,
        now,
        now,
    )             //db查询语句

    rows, err := DB.Query(sql)
    if err != nil {
        log.Println("ERROR:", err)
        return ret, err
    }

    defer rows.Close()
    //迭代Strategy条目
    for rows.Next() {
        s := model.Strategy{}
        var tags string
        var tid int
        err = rows.Scan(&s.Id, &s.Metric, &tags, &s.Func, &s.Operator, &s.RightValue, &s.MaxStep, &s.Priority, &s.Note, &tid)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        tt := make(map[string]string)

        if tags != "" {
            arr := strings.Split(tags, ",")
            for _, tag := range arr {
                kv := strings.SplitN(tag, "=", 2)
                if len(kv) != 2 {
                    continue
                }
                tt[kv[0]] = kv[1]
            }
        }
        s.Tags = tt
        s.Tpl = tpls[tid]
        if s.Tpl == nil {
            log.Printf("WARN: tpl is nil. strategy id=%d, tpl id=%d", s.Id, tid)
            // 如果Strategy没有对应的Tpl,那就没有action,就没法报警,无需往后传递了
            continue
        }
        ret[s.Id] = &s
    }
    return ret, nil
}


type Strategy struct {
    Id         int               `json:"id"`
    Metric     string            `json:"metric"`
    Tags       map[string]string `json:"tags"`
    Func       string            `json:"func"`       // e.g. max(#3) all(#3)
    Operator   string            `json:"operator"`   // e.g. < !=
    RightValue float64           `json:"rightValue"` // critical value
    MaxStep    int               `json:"maxStep"`
    Priority   int               `json:"priority"`
    Note       string            `json:"note"`
    Tpl        *Template         `json:"tpl"`
}

  • HostTemplateIds.Init() 查询及缓存主机ID和模板列表对应表信息
# HostTemplateIds全局对象实例化
# 一个机器ID对应了多个模板ID
# key: hid value:[]tid
var HostTemplateIds = &SafeHostTemplateIds{M: make(map[int][]int)}
# 
func (this *SafeHostTemplateIds) Init() {
    m, err := db.QueryHostTemplateIds() //
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = m
}

#db->template 查询及缓存主机ID和模板对应表信息
func QueryHostTemplateIds() (map[int][]int, error) {
    ret := make(map[int][]int)
    rows, err := DB.Query("select a.tpl_id, b.host_id from grp_tpl as a inner join grp_host as b on a.grp_id=b.grp_id") //db查询语句
    if err != nil {
        log.Println("ERROR:", err)
        return ret, err
    }

    defer rows.Close()
    //迭代查询及缓存主机ID和模板对应表信息
    for rows.Next() {
        var tid, hid int

        err = rows.Scan(&tid, &hid)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        if _, ok := ret[hid]; ok {
            ret[hid] = append(ret[hid], tid)
        } else {
            ret[hid] = []int{tid}
        }
    }

    return ret, nil
}
  • ExpressionCache.Init() 获取与缓存Expression表达式信息
# ExpressionCache全局对象实例化,缓存所有正常的Expression
# 
var ExpressionCache = &SafeExpressionCache{}

# 查询与缓存所有正常的Expression
func (this *SafeExpressionCache) Init() {
    es, err := db.QueryExpressions()  //db查询与缓存
    if err != nil { 
        return
    }

    this.Lock()
    defer this.Unlock()
    this.L = es
}

# db->express 从MySQL expression表查询所有正常的expression信息并缓存内存
func QueryExpressions() (ret []*model.Expression, err error) {
    sql := "select id, expression, func, op, right_value, max_step, priority, note, action_id from expression where action_id>0 and pause=0"
    rows, err := DB.Query(sql)
    if err != nil {
        log.Println("ERROR:", err)
        return ret, err
    }

    defer rows.Close()
    for rows.Next() {
        e := model.Expression{}
        var exp string
        err = rows.Scan(
            &e.Id,           
            &exp,            //expression->进一步解析出Metric,Tags
            &e.Func,         
            &e.Operator,     
            &e.RightValue,   
            &e.MaxStep,     
            &e.Priority,     
            &e.Note,       
            &e.ActionId,   
        )

        if err != nil {
            log.Println("WARN:", err)
            continue
        }
        //解析expression内的Metric和Tags值
        e.Metric, e.Tags, err = parseExpression(exp)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        ret = append(ret, &e)
    }

    return ret, nil
}

#解析Expression
func parseExpression(exp string) (metric string, tags map[string]string, err error) {
    //获取字串"()"内字符
    left := strings.Index(exp, "(")
    right := strings.Index(exp, ")")
    tagStrs := strings.TrimSpace(exp[left+1 : right])
    
    //将tags以空格分隔截为Slice
    arr := strings.Fields(tagStrs)
    if len(arr) < 2 {
        err = fmt.Errorf("tag not enough. exp: %s", exp)
        return
    }

    tags = make(map[string]string)
    //迭代slice,将每个Item再以"="为分隔成[2]Slice再解析为Map
        for _, item := range arr {
        kv := strings.Split(item, "=")
        if len(kv) != 2 {
            err = fmt.Errorf("parse %s fail", exp)
            return
        }
        tags[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
    }
   
    //获取Key:"metric"的值
    metric, exists := tags["metric"]
    if !exists {
        err = fmt.Errorf("no metric give of %s", exp)
        return
    }

    delete(tags, "metric")
    return
}

type Expression struct {
    Id         int               `json:"id"`
    Metric     string            `json:"metric"`
    Tags       map[string]string `json:"tags"`
    Func       string            `json:"func"`       // e.g. max(#3) all(#3)
    Operator   string            `json:"operator"`   // e.g. < !=
    RightValue float64           `json:"rightValue"` // critical value
    MaxStep    int               `json:"maxStep"`
    Priority   int               `json:"priority"`
    Note       string            `json:"note"`
    ActionId   int               `json:"actionId"`
}


  • MonitoredHosts.Init() 所有不处于维护状态的主机列表缓存
#MonitoredHosts全局对象实例化,所有不处于维护状态的主机列表
#key: hid  value: Host
var MonitoredHosts = &SafeMonitoredHosts{M: make(map[int]*model.Host)}

#所有不处于维护状态的主机列表缓存
func (this *SafeMonitoredHosts) Init() {
    m, err := db.QueryMonitoredHosts() //查询所有不处于维护状态的主机列表
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = m
}

#db->host 从MySQL host表获取所有不处于维护状态的主机列表加载
func QueryMonitoredHosts() (map[int]*model.Host, error) {
    hosts := make(map[int]*model.Host)
    now := time.Now().Unix()
    sql := fmt.Sprintf("select id, hostname from host where maintain_begin > %d or maintain_end < %d", now, now) //db查询语句
    rows, err := DB.Query(sql)
    if err != nil {
        log.Println("ERROR:", err)
        return hosts, err
    }

    defer rows.Close()
    //迭代获取hid,hostname
    for rows.Next() {
        t := model.Host{}
        err = rows.Scan(&t.Id, &t.Name)
        if err != nil {
            log.Println("WARN:", err)
            continue
        }
        hosts[t.Id] = &t
    }

    return hosts, nil
}

cache.DeleteStaleAgents() 清理无心跳Agent

func deleteStaleAgents() {
    // 一天都没有心跳的Agent,从内存中干掉
    before := time.Now().Unix() - 3600*24
    keys := Agents.Keys()
    count := len(keys)
    if count == 0 {
        return
    }

    for i := 0; i < count; i++ {
        curr, _ := Agents.Get(keys[i])
        if curr.LastUpdate < before {
            // 从Agent列表Map M中删除
            Agents.Delete(curr.ReportRequest.Hostname)
        }
    }
}

http.Start() HTTP API接口服务运行与监听处理

# 初始化路由
func init() {
    configCommonRoutes() // 公共API接口路由
    configProcRoutes()   //
}

# 运行Http Server
func Start() {
    if !g.Config().Http.Enabled {
        return
    }

    addr := g.Config().Http.Listen
    if addr == "" {
        return
    }
    s := &http.Server{
        Addr:           addr,
        MaxHeaderBytes: 1 << 30,  //限制最大头字节(2^30)
    }
    log.Println("http listening", addr)
    log.Fatalln(s.ListenAndServe())
}

# 公共API接口路由/health、/version、/workdir、/config/reload
func configCommonRoutes() {
    //健康检测
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("ok"))
    })
     //版本
    http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte(g.VERSION))
    })
    //工作目录
    http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {
        RenderDataJson(w, file.SelfDir())
    })
    //重载配置
    http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {
        if strings.HasPrefix(r.RemoteAddr, "127.0.0.1") {
            g.ParseConfig(g.ConfigFile)
            RenderDataJson(w, g.Config())
        } else {
            w.Write([]byte("no privilege"))
        }
    })
}

# 配置缓存信息API接口路由/expressions、/agents、/hosts、/strategies
# /templates、/plugins/
func configProcRoutes() {
    //expression缓存信息获取
    http.HandleFunc("/expressions", func(w http.ResponseWriter, r *http.Request) {
        RenderDataJson(w, cache.ExpressionCache.Get())
    })
    //agents主机名列表缓存信息获取
    http.HandleFunc("/agents", func(w http.ResponseWriter, r *http.Request) {
        RenderDataJson(w, cache.Agents.Keys())
    })
    //主机名与hid对应表缓存信息获取
    http.HandleFunc("/hosts", func(w http.ResponseWriter, r *http.Request) {
        data := make(map[string]*model.Host, len(cache.MonitoredHosts.Get()))
        for k, v := range cache.MonitoredHosts.Get() {
            data[fmt.Sprint(k)] = v
        }
        RenderDataJson(w, data)
    })
    //所有strategie缓存信息获取
    http.HandleFunc("/strategies", func(w http.ResponseWriter, r *http.Request) {
        data := make(map[string]*model.Strategy, len(cache.Strategies.GetMap()))
        for k, v := range cache.Strategies.GetMap() {
            data[fmt.Sprint(k)] = v
        }
        RenderDataJson(w, data)
    })
 
   //所有template缓存信息获取
    http.HandleFunc("/templates", func(w http.ResponseWriter, r *http.Request) {
        data := make(map[string]*model.Template, len(cache.TemplateCache.GetMap()))
        for k, v := range cache.TemplateCache.GetMap() {
            data[fmt.Sprint(k)] = v
        }
        RenderDataJson(w, data)
    })
   
    //"/plugins/XXXXX" 根据hostname获取关联的插件
    http.HandleFunc("/plugins/", func(w http.ResponseWriter, r *http.Request) {
        hostname := r.URL.Path[len("/plugins/"):]
        RenderDataJson(w, cache.GetPlugins(hostname))
    })

}

rpc.Start() 启动RPC服务及监听处理

# RPC服务
type Hbs int
type Agent int

func Start() {
    addr := g.Config().Listen

    server := rpc.NewServer()
     
    //注册Agent/Hbs模块
    server.Register(new(Agent))
    server.Register(new(Hbs))

    l, e := net.Listen("tcp", addr)
    if e != nil {
        log.Fatalln("listen error:", e)
    } else {
        log.Println("listening", addr)
    }

    for {
        conn, err := l.Accept()
        if err != nil {
            log.Println("listener accept fail:", err)
            time.Sleep(time.Duration(100) * time.Millisecond)
            continue
        }
        go server.ServeCodec(jsonrpc.NewServerCodec(conn))
    }
}

//Hbs.GetExpressions 返回全局缓存的expression表达式
func (t *Hbs) GetExpressions(req model.NullRpcRequest, reply *model.ExpressionResponse) error {}

//Hbs.GetStrategies 返回缓存的主机与策略映射列表
func (t *Hbs) GetStrategies(req model.NullRpcRequest, reply *model.StrategiesResponse) error {}

//Agent.MinePlugins 返回指定主机名对应的插件列表
func (t *Agent) MinePlugins(args model.AgentHeartbeatRequest, reply *model.AgentPluginsResponse) error {}

//Agent.ReportStatus 报告agent状态 "1 or nil"
func (t *Agent) ReportStatus(args *model.AgentReportRequest, reply *model.SimpleRpcResponse) error {}

//Agent.TrustableIps 返回可信IP列表
func (t *Agent) TrustableIps(args *model.NullRpcRequest, ips *string) error {}

//Agent.BuiltinMetrics agent按照server端的配置,按需采集的metric,比如net.port.listen port=22 或者 proc.num name=zabbix_agentd
func (t *Agent) BuiltinMetrics(args *model.AgentHeartbeatRequest, reply *model.BuiltinMetricResponse) error {}

思考与查证

  • 查看源码,找出多长时间从DB刷新一次配置项缓存至内存?
  • 如修改了cfg.json配置文件,除重启服务外还有其它方法重新加载配置吗?
  • 详细分析RPC "Hbs.GetStrategies"内CalcInheritStrategies方法。

相关文章

网友评论

      本文标题:OpenFalcon源码分析(HBS组件)

      本文链接:https://www.haomeiwen.com/subject/ywbhnftx.html