美文网首页
falcon agent模块源码解析

falcon agent模块源码解析

作者: 蒋植玉 | 来源:发表于2019-08-27 22:45 被阅读0次

    导读

    falcon是小米开源的监控平台,广泛用于许多互联网公司。agent模块是metric采集服务,它就像搬运工,将各种metric输送到transfer服务。如果说这些是它的主要工作,那么插件安装,同步信任ip名单等便是它的副业。本着抓主要矛盾的原则,本文不包含副业解读。

    main函数

    main函数主要做一些初始化工作,并启动http服务。

     func main() {
          //入参cfg := flag.String("c","cfg.json","configuration file")
            version := flag.Bool("v", false,"show version")
            check := flag.Bool("check", false,"check collector")
            //入参解析
          flag.Parse()
          if *version {
              fmt.Println(g.VERSION)
             os.Exit(0)
    }
    //check collect功能是否正常
    if *check {
        funcs.CheckCollector()
        os.Exit(0)
    }
    //解析配置
    g.ParseConfig(*cfg)
    if g.Config().Debug {
    g.InitLog("debug")
    }else {
    g.InitLog("info")
    }
    
    //初始化工作目录
    g.InitRootDir()
    //初始化本地ip,通过向heartbeat服务发起一次连接获取本地ip
    g.InitLocalIp()
    //初始化rpc client,new一个结构体出来
    g.InitRpcClients()
    //初始化一系列Mapper,内含采集metric的各种函数
    funcs.BuildMappers()
    //初始化更新cpu 硬盘统计协程
    go cron.InitDataHistory()
    //启动同步agent状态定时器
    cron.ReportAgentStatus()
    //启动同步插件定时器,插件安装异步实现
    cron.SyncMinePlugins()
    //启动配置的buildin  metric采集项定时器
    cron.SyncBuiltinMetrics()
    //启动同步信任ip定时器
    cron.SyncTrustableIps()
    //启动采集metric定时器
    cron.Collect()
    //启动http服务
    go http.Start()
    //阻塞
    select {}
    }
    

    长连接

    为了回避创建连接销毁连接的开销,agent与transfer通过长连接通信。调用下面的call函数上报metric到transfer。

    func (this *SingleConnRpcClient) Call(method string, argsinterface{}, replyinterface{}) error {
        //上锁,协程安全
        this.Lock()
        defer this.Unlock()
        / /长连接生成函数
          err := this.serverConn()
          if err != nil {
            return err
    }
    timeout := time.Duration(10 * time.Second)
    done := make(chan error,1)
    go func() {
    //发起调用
    err := this.rpcClient.Call(method, args, reply)
    done <- err
    }()
    select {
    case <-time.After(timeout):
    //超时,连接可能已经不可用,需要关闭连接
    log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
    this.close()
    return errors.New(this.RpcServer +" rpc call timeout")
    case err := <-done:
    if err != nil {
    //如果出错也关闭连接,不管什么错误,有点粗暴
    this.close()
    return err
      }
    }
    return nil
    }
    

    生成连接。

    func (this *SingleConnRpcClient) serverConn() error {
    if this.rpcClient != nil {
    return nil
    }
    var err error
    var retry int =1
          for {
    //出错的时候,调用close函数会把rpcClient 置为nil,从而触发重建连接
    if this.rpcClient != nil {
         return nil
    }
    //发起连接
    this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
    if err != nil {
        log.Printf("dial %s fail: %v", this.RpcServer, err)
        if retry >3 {
            return err
    }
    //重试3次,每次休眠2的retry幂次方
    time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
    retry++
    continue
          }
    return err
    }
    }
    

    metric采集上传

    采集metric主要入口

    func Collect() {
    
    //开关检测
    
    if !g.Config().Transfer.Enabled {
          return  }
    //没有配置transfer地址
    if len(g.Config().Transfer.Addrs) ==0 {
    return  
     }
    
    //开始执行各个采集函数,funcs.Mappers是在main函数里初始化好的
    
    for _, v :=range funcs.Mappers {
    go collect(int64(v.Interval), v.Fs)
    }
    }
    

    具体调用采集函数的入口

    func collect(sec int64, fns []func() []*model.MetricValue) {
    t := time.NewTicker(time.Second * time.Duration(sec))
    defer t.Stop()
    for {
    <-t.C
    hostname, err := g.Hostname()
    if err != nil {
    continue
                  }
    mvs := []*model.MetricValue{}
    ignoreMetrics := g.Config().IgnoreMetrics
    for _, fn :=range fns {
    //调用采集函数
    items := fn()
    if items == nil {
    continue
     }
    if len(items) ==0 {
            continue
    }
    
    for _, mv :=range items {
      if b, ok := ignoreMetrics[mv.Metric]; ok && b {
          continue
     }else {
    mvs = append(mvs, mv)
    }
    }
    }
    now := time.Now().Unix()
    for j :=0; j < len(mvs); j++ {
    mvs[j].Step = sec
    mvs[j].Endpoint = hostname
    mvs[j].Timestamp = now
    }
    //通过长连接将metrics send到transfer
    g.SendToTransfer(mvs)
    }
    }
    

    以上是agent采集metric的主要流程。各个采集功能函数都在funcs.Mappers里面,如果需要crud采集功能,只需修改funcs.Mappers相关逻辑即可。用一个采集内存的功能函数做例,其他的异曲同工。

    //返回值连同其他采集结果调用SendToTransfer发送到transfer
    func MemMetrics() []*model.MetricValue {
    //获取内存信息
    m, err := nux.MemInfo()
    if err != nil {
    log.Println(err)
    return nil
    }
    memFree := m.MemFree + m.Buffers + m.Cached
    if m.MemAvailable >0 {
    memFree = m.MemAvailable
    }
    memUsed := m.MemTotal - memFree
    pmemFree :=0.0
          pmemUsed :=0.0
          if m.MemTotal !=0 {
    pmemFree = float64(memFree) *100.0 / float64(m.MemTotal)
    pmemUsed = float64(memUsed) *100.0 / float64(m.MemTotal)
    }
    pswapFree :=0.0
          pswapUsed :=0.0
          if m.SwapTotal !=0 {
    pswapFree = float64(m.SwapFree) *100.0 / float64(m.SwapTotal)
    pswapUsed = float64(m.SwapUsed) *100.0 / float64(m.SwapTotal)
    }
    return []*model.MetricValue{
    //总内存
                  GaugeValue("mem.memtotal", m.MemTotal),
    //used的内存
                  GaugeValue("mem.memused", memUsed),
    //free的内存
                  GaugeValue("mem.memfree", memFree),
    //swap总内存
                  GaugeValue("mem.swaptotal", m.SwapTotal),
    //swap used内存
                  GaugeValue("mem.swapused", m.SwapUsed),
    //swap free内存
                  GaugeValue("mem.swapfree", m.SwapFree),
    //内存free比例
                  GaugeValue("mem.memfree.percent", pmemFree),
    //内存使用比例
                  GaugeValue("mem.memused.percent", pmemUsed),
    //swap free比例
                  GaugeValue("mem.swapfree.percent", pswapFree),
    //swap used比例
                  GaugeValue("mem.swapused.percent", pswapUsed),
          }
    }
    

    SendToTransfer函数核心还是调用SendMetrics函数

    func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
    rand.Seed(time.Now().UnixNano())
    //随机选取transfer节点,负载均衡
    for _, i :=range rand.Perm(len(Config().Transfer.Addrs)) {
    addr := Config().Transfer.Addrs[i]
    c := getTransferClient(addr)
    if c == nil {
    c = initTransferClient(addr)
    }
    
    //在这个函数里面会调用长连接那一节的call函数,来初始化或者复用连接
    
    if updateMetrics(c, metrics, resp) {
    break
                  }
    }
    }
    

    上面说的是agent自己采集的metric,用户也可以调用push接口主动上传。比如mysql,mongdb等这些外部的进程的采集。这是push接口的主要实现,

    func configPushRoutes() {
    http.HandleFunc("/v1/push",func(w http.ResponseWriter, req *http.Request) {
        if req.ContentLength ==0 {
            http.Error(w,"body is blank", http.StatusBadRequest)
            return
           }
    decoder := json.NewDecoder(req.Body)
    var metrics []*model.MetricValue
    err := decoder.Decode(&metrics)
    if err != nil {
              http.Error(w,"connot decode body", http.StatusBadRequest)
    return
             }
    
    //同样调用的SendToTransfer
    g.SendToTransfer(metrics)
    w.Write([]byte("success"))
    })
    }
    

    结束

    纸上得来终觉浅,绝知此事要躬行。

    相关文章

      网友评论

          本文标题:falcon agent模块源码解析

          本文链接:https://www.haomeiwen.com/subject/ruxuectx.html